diff --git a/packages/core/src/api/sync/__tests__/collector.test.js b/packages/core/src/api/sync/__tests__/collector.test.js index 13441fd91..598752e9c 100644 --- a/packages/core/src/api/sync/__tests__/collector.test.js +++ b/packages/core/src/api/sync/__tests__/collector.test.js @@ -20,7 +20,6 @@ along with this program. If not, see . import { databaseTest, TEST_NOTE, - delay, loginFakeUser } from "../../../../__tests__/utils"; import Collector from "../collector"; @@ -31,14 +30,9 @@ test("newly created note should get included in collector", () => await loginFakeUser(db); const collector = new Collector(db); - const lastSyncedTime = Date.now() - 10000; - const noteId = await db.notes.add(TEST_NOTE); - const items = []; - for await (const item of collector.collect(100, lastSyncedTime, false)) { - items.push(item); - } + const items = await iteratorToArray(collector.collect(100, false)); expect(items).toHaveLength(2); expect(items[0].type).toBe("content"); @@ -47,62 +41,59 @@ test("newly created note should get included in collector", () => expect(items[1].type).toBe("note"); })); -test("edited note after last synced time should get included in collector", () => +test("synced property should be true after getting collected by the collector", () => + databaseTest().then(async (db) => { + await loginFakeUser(db); + const collector = new Collector(db); + + const noteId = await db.notes.add(TEST_NOTE); + + const items = await iteratorToArray(collector.collect(100, false)); + const items2 = await iteratorToArray(collector.collect(100, false)); + + expect(items2).toHaveLength(0); + expect(items).toHaveLength(2); + expect(items[0].type).toBe("content"); + expect(items[0].items[0].id).toBe((await db.notes.note(noteId)).contentId); + expect(items[1].items[0].id).toBe(noteId); + expect(items[1].type).toBe("note"); + })); + +test("edited note should get included in collector", () => databaseTest().then(async (db) => { await loginFakeUser(db); const collector = new Collector(db); const noteId = await db.notes.add(TEST_NOTE); - const lastSyncedTime = Date.now(); - - await delay(1000); + await iteratorToArray(collector.collect(100, false)); await db.notes.add({ id: noteId, pinned: true }); - const items = []; - for await (const item of collector.collect(100, lastSyncedTime, false)) { - items.push(item); - } + const items = await iteratorToArray(collector.collect(100, false)); expect(items).toHaveLength(1); expect(items[0].items[0].id).toBe(noteId); })); -test("note edited before last synced time should not get included in collector", () => - databaseTest().then(async (db) => { - await loginFakeUser(db); - const collector = new Collector(db); - const noteId = await db.notes.add(TEST_NOTE); - - await db.notes.add({ id: noteId, pinned: true }); - - await delay(500); - - const lastSyncedTime = Date.now(); - - const items = []; - for await (const item of collector.collect(100, lastSyncedTime, false)) { - items.push(item); - } - - expect(items).toHaveLength(0); - })); - test("localOnly note should get included as a deleted item in collector", () => databaseTest().then(async (db) => { await loginFakeUser(db); const collector = new Collector(db); await db.notes.add({ ...TEST_NOTE, localOnly: true }); - const items = []; - for await (const item of collector.collect(100, 0, false)) { - items.push(item); - } + const items = await iteratorToArray(collector.collect(100, false)); expect(items).toHaveLength(2); - - expect(items[0].items[0].length).toBe(104); - expect(items[1].items[0].length).toBe(104); + expect(items[0].items[0].length).toBe(77); + expect(items[1].items[0].length).toBe(77); expect(items[0].type).toBe("content"); expect(items[1].type).toBe("note"); })); + +async function iteratorToArray(iterator) { + let items = []; + for await (const item of iterator) { + items.push(item); + } + return items; +} diff --git a/packages/core/src/api/sync/collector.ts b/packages/core/src/api/sync/collector.ts index 7ffa6921f..343e0b624 100644 --- a/packages/core/src/api/sync/collector.ts +++ b/packages/core/src/api/sync/collector.ts @@ -35,7 +35,6 @@ class Collector { async *collect( chunkSize: number, - lastSyncedTimestamp: number, isForceSync = false ): AsyncGenerator { const key = await this.db.user.getEncryptionKey(); @@ -47,13 +46,15 @@ class Collector { for (const itemType of SYNC_ITEM_TYPES) { const collectionKey = SYNC_COLLECTIONS_MAP[itemType]; const collection = this.db[collectionKey].collection; - for await (const chunk of collection.unsynced( - isForceSync ? 0 : lastSyncedTimestamp, - chunkSize - )) { + for await (const chunk of collection.unsynced(chunkSize, isForceSync)) { const items = await this.prepareChunk(chunk, key); if (!items) continue; yield { items, type: itemType }; + + await collection.update( + chunk.map((i) => i.id), + { synced: true } + ); } } } @@ -89,13 +90,20 @@ function filterSyncableItems(items: MaybeDeletedItem[]): { const ids = []; const syncableItems = []; for (const item of items) { - // const isSyncable = !item.synced || isForceSync; - - // synced is a local only property. we don't want to sync it. delete item.synced; ids.push(item.id); - syncableItems.push(JSON.stringify(item)); + syncableItems.push( + JSON.stringify( + "localOnly" in item && item.localOnly + ? { + id: item.id, + deleted: true, + dateModified: item.dateModified + } + : item + ) + ); } return { items: syncableItems, ids }; } diff --git a/packages/core/src/api/sync/devices.ts b/packages/core/src/api/sync/devices.ts new file mode 100644 index 000000000..449340815 --- /dev/null +++ b/packages/core/src/api/sync/devices.ts @@ -0,0 +1,54 @@ +/* +This file is part of the Notesnook project (https://notesnook.com/) + +Copyright (C) 2023 Streetwriters (Private) Limited + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +import { StorageAccessor } from "../../interfaces"; +import hosts from "../../utils/constants"; +import http from "../../utils/http"; +import { getId } from "../../utils/id"; +import TokenManager from "../token-manager"; + +export class SyncDevices { + constructor( + private readonly storage: StorageAccessor, + private readonly tokenManager: TokenManager + ) {} + + async register() { + const deviceId = getId(); + const url = `${hosts.API_HOST}/devices?deviceId=${deviceId}`; + const token = await this.tokenManager.getAccessToken(); + return http + .post(url, null, token) + .then(() => this.storage().write("deviceId", deviceId)); + } + + async unregister() { + const deviceId = await this.storage().read("deviceId"); + if (!deviceId) return; + const url = `${hosts.API_HOST}/devices?deviceId=${deviceId}`; + const token = await this.tokenManager.getAccessToken(); + return http + .delete(url, token) + .then(() => this.storage().remove("deviceId")); + } + + get() { + return this.storage().read("deviceId"); + } +} diff --git a/packages/core/src/api/sync/index.ts b/packages/core/src/api/sync/index.ts index 96cbc8686..04cc72273 100644 --- a/packages/core/src/api/sync/index.ts +++ b/packages/core/src/api/sync/index.ts @@ -37,18 +37,20 @@ import { Mutex } from "async-mutex"; import Database from ".."; import { migrateItem } from "../../migrations"; import { SerializedKey } from "@notesnook/crypto"; -import { Item, MaybeDeletedItem } from "../../types"; +import { Item, MaybeDeletedItem, Note, Notebook } from "../../types"; import { SYNC_COLLECTIONS_MAP, SyncTransferItem } from "./types"; import { DownloadableFile } from "../../database/fs"; +import { SyncDevices } from "./devices"; +import { COLORS } from "../../database/backup"; export type SyncOptions = { type: "full" | "fetch" | "send"; force?: boolean; - serverLastSynced?: number; }; export default class SyncManager { sync = new Sync(this.db); + devices = this.sync.devices; constructor(private readonly db: Database) {} async start(options: SyncOptions) { @@ -106,13 +108,12 @@ class Sync { logger = logger.scope("Sync"); syncConnectionMutex = new Mutex(); connection: signalr.HubConnection; + devices = new SyncDevices(this.db.storage, this.db.tokenManager); constructor(private readonly db: Database) { - let remoteSyncTimeout = 0; - const tokenManager = new TokenManager(db.storage); this.connection = new signalr.HubConnectionBuilder() - .withUrl(`${Constants.API_HOST}/hubs/sync`, { + .withUrl(`${Constants.API_HOST}/hubs/sync/v2`, { accessTokenFactory: async () => { const token = await tokenManager.getAccessToken(); if (!token) throw new Error("Failed to get access token."); @@ -144,29 +145,7 @@ class Sync { this.autoSync.stop(); }); - this.connection.on("PushItems", async (chunk) => { - if (this.connection.state !== signalr.HubConnectionState.Connected) - return; - - clearTimeout(remoteSyncTimeout); - remoteSyncTimeout = setTimeout(() => { - this.db.eventManager.publish(EVENTS.syncAborted); - }, 15000) as unknown as number; - - const key = await this.db.user.getEncryptionKey(); - if (!key || !key.key || !key.salt) { - EV.publish(EVENTS.userSessionExpired); - throw new Error("User encryption key not generated. Please relogin."); - } - - const dbLastSynced = await this.db.lastSynced(); - await this.processChunk(chunk, key, dbLastSynced, true); - }); - - this.connection.on("PushCompleted", (lastSynced) => { - clearTimeout(remoteSyncTimeout); - this.onPushCompleted(lastSynced); - }); + this.connection.on("PushCompleted", () => this.onPushCompleted()); } async start(options: SyncOptions) { @@ -185,30 +164,21 @@ class Sync { throw new Error("Connection closed."); }); - const { lastSynced, oldLastSynced } = await this.init(options.force); - this.logger.info("Initialized sync", { lastSynced, oldLastSynced }); + const { deviceId } = await this.init(options.force); + this.logger.info("Initialized sync", { deviceId }); - const newLastSynced = Date.now(); - - const serverResponse = - options.type === "fetch" || options.type === "full" - ? await this.fetch(lastSynced) - : null; - this.logger.info("Data fetched", serverResponse || {}); + if (options.type === "fetch" || options.type === "full") { + await this.fetch(deviceId); + this.logger.info("Data fetched"); + } if ( (options.type === "send" || options.type === "full") && - (await this.send(lastSynced, newLastSynced, options.force)) - ) { + (await this.send(deviceId, options.force)) + ) this.logger.info("New data sent"); - await this.stop(newLastSynced); - } else if (serverResponse) { - this.logger.info("No new data to send."); - await this.stop(serverResponse.lastSynced); - } else { - this.logger.info("Nothing to do."); - await this.stop(options.serverLastSynced || oldLastSynced); - } + + await this.stop(); if (!(await checkSyncStatus(SYNC_CHECK_IDS.autoSync))) { await this.connection.stop(); @@ -222,14 +192,23 @@ class Sync { this.conflicts.throw(); } - let lastSynced = await this.db.lastSynced(); - if (isForceSync) lastSynced = 0; + if (isForceSync) { + await this.devices.unregister(); + await this.devices.register(); + } - const oldLastSynced = lastSynced; - return { lastSynced, oldLastSynced }; + let deviceId = await this.devices.get(); + if (!deviceId) { + await this.devices.register(); + deviceId = await this.devices.get(); + } + + if (!deviceId) throw new Error("Sync device not registered."); + + return { deviceId }; } - async fetch(lastSynced: number) { + async fetch(deviceId: string) { await this.checkConnection(); const key = await this.db.user.getEncryptionKey(); @@ -241,14 +220,13 @@ class Sync { return; } - const dbLastSynced = await this.db.lastSynced(); let count = 0; this.connection.off("SendItems"); this.connection.on("SendItems", async (chunk) => { if (this.connection.state !== signalr.HubConnectionState.Connected) return; - await this.processChunk(chunk, key, dbLastSynced); + await this.processChunk(chunk, key); count += chunk.items.length; sendSyncProgressEvent(this.db.eventManager, `download`, count); @@ -257,7 +235,7 @@ class Sync { }); const serverResponse = await this.connection.invoke( "RequestFetch", - lastSynced + deviceId ); if ( @@ -275,56 +253,46 @@ class Sync { if (await this.conflicts.check()) { this.conflicts.throw(); } - - return { lastSynced: serverResponse.lastSynced }; } - async send(oldLastSynced, isForceSync, newLastSynced) { - return false; + async send(deviceId: string, isForceSync?: boolean) { + await this.uploadAttachments(); - // await this.uploadAttachments(); + let isSyncInitialized = false; + let done = 0; + for await (const item of this.collector.collect(100, isForceSync)) { + if (!isSyncInitialized) { + const vaultKey = await this.db.vault.getKey(); + await this.connection.send("InitializePush", { + vaultKey, + synced: false + }); + isSyncInitialized = true; + } - // let isSyncInitialized = false; - // let done = 0; - // for await (const item of this.collector.collect( - // 100, - // oldLastSynced, - // isForceSync - // )) { - // if (!isSyncInitialized) { - // const vaultKey = await this.db.vault._getKey(); - // newLastSynced = await this.connection.invoke("InitializePush", { - // vaultKey, - // lastSynced: newLastSynced - // }); - // isSyncInitialized = true; - // } + const result = await this.pushItem(deviceId, item); + if (result) { + done += item.items.length; + sendSyncProgressEvent(this.db.eventManager, "upload", done); - // const result = await this.pushItem(item, newLastSynced); - // if (result) { - // done += item.items.length; - // sendSyncProgressEvent(this.db.eventManager, "upload", done); - - // this.logger.info(`Batch sent (${done})`); - // } else { - // this.logger.error( - // new Error(`Failed to send batch. Server returned falsy response.`) - // ); - // } - // } - // if (!isSyncInitialized) return; - // await this.connection.invoke("SyncCompleted", newLastSynced); - // return true; + this.logger.info(`Batch sent (${done})`); + } else { + this.logger.error( + new Error(`Failed to send batch. Server returned falsy response.`) + ); + } + } + if (!isSyncInitialized) return false; + await this.connection.send("PushCompleted"); + return true; } - async stop(lastSynced: number) { + async stop() { // refresh monographs on sync completed await this.db.monographs.refresh(); - this.logger.info("Stopping sync", { lastSynced }); - const storedLastSynced = await this.db.lastSynced(); - if (lastSynced > storedLastSynced) - await this.db.storage().write("lastSynced", lastSynced); + this.logger.info("Stopping sync"); + await this.db.storage().write("lastSynced", Date.now()); this.db.eventManager.publish(EVENTS.syncCompleted); } @@ -352,19 +320,13 @@ class Sync { /** * @private */ - async onPushCompleted(lastSynced: number) { - this.db.eventManager.publish( - EVENTS.databaseSyncRequested, - false, - false, - lastSynced - ); + async onPushCompleted() { + this.db.eventManager.publish(EVENTS.databaseSyncRequested, true, false); } async processChunk( chunk: SyncTransferItem, key: SerializedKey, - dbLastSynced: number, notify = false ) { const itemType = chunk.type; @@ -372,11 +334,13 @@ class Sync { const decrypted = await this.db.storage().decryptMulti(key, chunk.items); - const deserialized = await Promise.all( - decrypted.map((item, index) => - deserializeItem(item, chunk.items[index].v, this.db) + const deserialized = ( + await Promise.all( + decrypted.map((item, index) => + deserializeItem(item, chunk.items[index].v, this.db) + ) ) - ); + ).filter(Boolean); const collectionType = SYNC_COLLECTIONS_MAP[itemType]; const collection = this.db[collectionType].collection; @@ -385,7 +349,7 @@ class Sync { if (itemType === "content") { items = await Promise.all( deserialized.map((item) => - this.merger.mergeContent(item, localItems[item.id], dbLastSynced) + this.merger.mergeContent(item, localItems[item.id]) ) ); } else { @@ -414,11 +378,9 @@ class Sync { await collection.put(items as any); } - private async pushItem(item: SyncTransferItem, newLastSynced: number) { + private async pushItem(deviceId: string, item: SyncTransferItem) { await this.checkConnection(); - return ( - (await this.connection.invoke("PushItems", item, newLastSynced)) === 1 - ); + return (await this.connection.invoke("PushItems", deviceId, item)) === 1; } private async checkConnection() { @@ -463,19 +425,46 @@ async function deserializeItem( version: number, database: Database ) { - const deserialized = JSON.parse(decryptedItem); - deserialized.remote = true; - deserialized.synced = true; + const item = JSON.parse(decryptedItem); + item.remote = true; + item.synced = true; - if (!deserialized.alg && !deserialized.cipher) { - await migrateItem( - deserialized, + if (!item.cipher) { + let migrationResult = await migrateItem( + item, version, CURRENT_DATABASE_VERSION, - deserialized.type, + item.type, database, "sync" ); + if (migrationResult === "skip") return; + + // since items in trash can have their own set of migrations, + // we have to run the migration again to account for that. + if (item.type === "trash" && item.itemType) { + migrationResult = await migrateItem( + item as unknown as Note | Notebook, + version, + CURRENT_DATABASE_VERSION, + item.itemType, + database, + "backup" + ); + if (migrationResult === "skip") return; + } + + const itemType = + // colors are naively of type "tag" instead of "color" so we have to fix that. + item.type === "tag" && COLORS.includes(item.title.toLowerCase()) + ? "color" + : item.type === "trash" && "itemType" in item && item.itemType + ? item.itemType + : item.type; + + if (!itemType || itemType === "topic" || itemType === "settings") return; + + if (migrationResult) item.synced = false; } - return deserialized; + return item; } diff --git a/packages/core/src/api/sync/merger.ts b/packages/core/src/api/sync/merger.ts index 9fb5e7849..91b625d4d 100644 --- a/packages/core/src/api/sync/merger.ts +++ b/packages/core/src/api/sync/merger.ts @@ -33,7 +33,6 @@ class Merger { isConflicted( localItem: MaybeDeletedItem, remoteItem: MaybeDeletedItem, - lastSynced: number, conflictThreshold: number ) { const isResolved = @@ -46,7 +45,7 @@ class Merger { // will be ahead of last sync. In that case, we also have to check if the // synced flag is false (it is only false if a user makes edits on the // local device). - localItem.dateModified > lastSynced && !localItem.synced; + localItem.dateModified > remoteItem.dateModified && !localItem.synced; if (isModified && !isResolved) { // If time difference between local item's edits & remote item's edits // is less than threshold, we shouldn't trigger a merge conflict; instead @@ -100,15 +99,13 @@ class Merger { async mergeContent( remoteItem: MaybeDeletedItem, - localItem: MaybeDeletedItem | undefined, - lastSynced: number + localItem: MaybeDeletedItem | undefined ) { if (localItem && "localOnly" in localItem && localItem.localOnly) return; const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000; const conflicted = - localItem && - this.isConflicted(localItem, remoteItem, lastSynced, THRESHOLD); + localItem && this.isConflicted(localItem, remoteItem, THRESHOLD); if (!localItem || conflicted === "merge") { return remoteItem; } else if (conflicted === "conflict") { diff --git a/packages/core/src/api/user-manager.ts b/packages/core/src/api/user-manager.ts index 1fc2c6881..a42709889 100644 --- a/packages/core/src/api/user-manager.ts +++ b/packages/core/src/api/user-manager.ts @@ -176,6 +176,7 @@ class UserManager { if (!sessionExpired) { await this.db.storage().write("lastSynced", 0); + await this.db.syncer.devices.register(); } await this.db.storage().deriveCryptoKey(`_uk_@${user.email}`, { @@ -228,6 +229,7 @@ class UserManager { salt: user.salt }); await this.db.storage().write("lastSynced", 0); + await this.db.syncer.devices.register(); EV.publish(EVENTS.userLoggedIn, user); } @@ -262,6 +264,7 @@ class UserManager { async logout(revoke = true, reason?: string) { try { + await this.db.syncer.devices.unregister(); if (revoke) await this.tokenManager.revokeToken(); } catch (e) { console.error(e); diff --git a/packages/core/src/database/backup.ts b/packages/core/src/database/backup.ts index 3ccf5c186..d22d1f5ba 100644 --- a/packages/core/src/database/backup.ts +++ b/packages/core/src/database/backup.ts @@ -87,7 +87,7 @@ function isLegacyBackupFile( } const MAX_CHUNK_SIZE = 10 * 1024 * 1024; -const COLORS = [ +export const COLORS = [ "red", "orange", "yellow", @@ -206,7 +206,9 @@ export default class Backup { collection: DatabaseCollection, state: BackupState ) { - for await (const item of collection.stream() as any) { + for await (const item of collection.stream( + this.db.options.batchSize + ) as any) { const data = JSON.stringify(item); state.buffer.push(data); state.bufferLength += data.length; diff --git a/packages/core/src/database/index.ts b/packages/core/src/database/index.ts index 4490b2a63..fa99d50e4 100644 --- a/packages/core/src/database/index.ts +++ b/packages/core/src/database/index.ts @@ -123,14 +123,14 @@ export interface DatabaseCollection { Record | undefined> >; unsynced( - after: number, - chunkSize: number + chunkSize: number, + forceSync?: boolean ): IsAsync extends true ? AsyncIterableIterator[]> : IterableIterator[]>; - stream(): IsAsync extends true - ? AsyncIterableIterator - : IterableIterator; + stream( + chunkSize: number + ): IsAsync extends true ? AsyncIterableIterator : IterableIterator; } export type DatabaseAccessor = () => diff --git a/packages/core/src/database/sql-cached-collection.ts b/packages/core/src/database/sql-cached-collection.ts index 5183e9951..c6e6d58a3 100644 --- a/packages/core/src/database/sql-cached-collection.ts +++ b/packages/core/src/database/sql-cached-collection.ts @@ -75,7 +75,8 @@ export class SQLCachedCollection< this.cache.set(id, { id, deleted: true, - dateModified: Date.now() + dateModified: Date.now(), + synced: false }) ); await this.collection.softDelete(ids); @@ -141,13 +142,10 @@ export class SQLCachedCollection< return items; } - *unsynced( - after: number, - chunkSize: number - ): IterableIterator[]> { + *unsynced(chunkSize: number): IterableIterator[]> { let chunk: MaybeDeletedItem[] = []; for (const [_key, value] of this.cache) { - if (value && value.dateModified && value.dateModified > after) { + if (value && !value.synced) { chunk.push(value); if (chunk.length === chunkSize) { yield chunk; diff --git a/packages/core/src/database/sql-collection.ts b/packages/core/src/database/sql-collection.ts index 5cbf09b3e..bf4264bad 100644 --- a/packages/core/src/database/sql-collection.ts +++ b/packages/core/src/database/sql-collection.ts @@ -88,7 +88,8 @@ export class SQLCollection< ids.map((id) => ({ id, deleted: true, - dateModified: Date.now() + dateModified: Date.now(), + synced: false })) ) .execute(); @@ -160,7 +161,8 @@ export class SQLCollection< .where("id", "in", ids) .set({ ...partial, - dateModified: Date.now() + dateModified: Date.now(), + synced: partial.synced || false }) .execute(); } @@ -194,47 +196,54 @@ export class SQLCollection< } async *unsynced( - after: number, - chunkSize: number + chunkSize: number, + forceSync?: boolean ): AsyncIterableIterator[]> { - let index = 0; + let lastRowId: string | null = null; while (true) { - const rows = await this.db() + const rows = (await this.db() .selectFrom(this.type) .selectAll() - .orderBy("dateModified", "asc") - .$if(after > 0, (eb) => - eb.where("dateModified", ">", after).where(isFalse("synced")) - ) + .$if(lastRowId != null, (qb) => qb.where("id", ">", lastRowId!)) + .$if(!forceSync, (eb) => eb.where(isFalse("synced"))) .$if(this.type === "attachments", (eb) => - eb.where("dateUploaded", ">", 0) + eb.where((eb) => + eb.or([eb("dateUploaded", ">", 0), eb("deleted", "==", true)]) + ) ) - .offset(index) + .orderBy("id") .limit(chunkSize) - .execute(); + .execute()) as MaybeDeletedItem[]; if (rows.length === 0) break; - index += chunkSize; - yield rows as MaybeDeletedItem[]; + yield rows; + + lastRowId = rows[rows.length - 1].id; } } - async *stream(): AsyncIterableIterator { - let index = 0; - const chunkSize = 50; + async *stream(chunkSize: number): AsyncIterableIterator { + let lastRow: T | null = null; while (true) { - const rows = await this.db() + const rows = (await this.db() .selectFrom(this.type) .where(isFalse("deleted")) - .orderBy("dateCreated desc") + .orderBy("dateCreated asc") + .orderBy("id asc") + .$if(lastRow !== null, (qb) => + qb.where( + (eb) => eb.refTuple("dateCreated", "id"), + ">", + (eb) => eb.tuple(lastRow!.dateCreated, lastRow!.id) + ) + ) .selectAll() - .offset(index) .limit(chunkSize) - .execute(); + .execute()) as T[]; if (rows.length === 0) break; - index += chunkSize; for (const row of rows) { - yield row as T; + yield row; } + lastRow = rows[rows.length - 1]; } }