From ac96354b4c1175518328e8fd0da29e71427be752 Mon Sep 17 00:00:00 2001 From: Abdullah Atta Date: Mon, 6 May 2024 22:04:04 +0500 Subject: [PATCH] core: handle case when updating item while a push is ongoing --- packages/core/__e2e__/sync.test.js | 60 ++++++++++++++++++++ packages/core/__mocks__/fs.mock.ts | 2 +- packages/core/src/api/sync/collector.ts | 15 ++++- packages/core/src/database/sql-collection.ts | 10 +++- 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/packages/core/__e2e__/sync.test.js b/packages/core/__e2e__/sync.test.js index 56c3f2593..c5421135c 100644 --- a/packages/core/__e2e__/sync.test.js +++ b/packages/core/__e2e__/sync.test.js @@ -95,6 +95,66 @@ test( TEST_TIMEOUT ); +test( + "edge case 1: items updated while push is running", + async (t) => { + const [deviceA, deviceB] = await Promise.all([ + initializeDevice("deviceA"), + initializeDevice("deviceB") + ]); + + t.onTestFinished(async () => { + console.log(`${t.task.name} log out`); + await cleanup(deviceA, deviceB); + }); + + const id = await deviceA.notes.add({ title: "hello" }); + for (let i = 0; i < 10; ++i) { + await Promise.all([ + deviceA.sync({ type: "send" }), + new Promise((resolve) => setTimeout(resolve), 100).then(() => + deviceA.notes.add({ id, title: `edit ${i}` }) + ) + ]); + + expect((await deviceA.notes.note(id))?.synced).toBe(false); + await deviceA.sync({ type: "send" }); + await deviceB.sync({ type: "fetch" }); + expect((await deviceB.notes.note(id))?.title).toBe(`edit ${i}`); + } + }, + TEST_TIMEOUT * 10 +); + +test( + "edge case 2: new items added while push is running", + async (t) => { + const [deviceA, deviceB] = await Promise.all([ + initializeDevice("deviceA"), + initializeDevice("deviceB") + ]); + + t.onTestFinished(async () => { + console.log(`${t.task.name} log out`); + await cleanup(deviceA, deviceB); + }); + + for (let i = 0; i < 10; ++i) { + await Promise.all([ + deviceA.sync({ type: "send" }), + new Promise((resolve) => setTimeout(resolve), 100).then(() => + deviceA.notes.add({ title: `note ${i}` }) + ) + ]); + expect(await deviceB.notes.all.count()).toBe(i); + await deviceA.sync({ type: "send" }); + await deviceB.sync({ type: "fetch" }); + expect(await deviceB.notes.all.count()).toBe(i + 1); + } + }, + TEST_TIMEOUT * 10 +); + // test( // "case 4: Device A's sync is interrupted halfway and Device B makes some changes afterwards and syncs.", // async () => { diff --git a/packages/core/__mocks__/fs.mock.ts b/packages/core/__mocks__/fs.mock.ts index 84d9e10ec..aa4f62dbe 100644 --- a/packages/core/__mocks__/fs.mock.ts +++ b/packages/core/__mocks__/fs.mock.ts @@ -51,7 +51,7 @@ async function writeEncryptedBase64( hashType, iv: "some iv", salt: key.salt!, - length: data.length + size: data.length }; } diff --git a/packages/core/src/api/sync/collector.ts b/packages/core/src/api/sync/collector.ts index 28af685a5..e64e0d5aa 100644 --- a/packages/core/src/api/sync/collector.ts +++ b/packages/core/src/api/sync/collector.ts @@ -46,6 +46,7 @@ class Collector { for (const itemType of SYNC_ITEM_TYPES) { const collectionKey = SYNC_COLLECTIONS_MAP[itemType]; const collection = this.db[collectionKey].collection; + let pushTimestamp = Date.now(); for await (const chunk of collection.unsynced(chunkSize, isForceSync)) { const items = await this.prepareChunk(chunk, key); if (!items) continue; @@ -54,8 +55,20 @@ class Collector { await collection.update( chunk.map((i) => i.id), { synced: true }, - { sendEvent: false } + { + sendEvent: false, + // EDGE CASE: + // Sometimes an item can get updated while it's being pushed. + // The result is that its `synced` property becomes true even + // though it's modification wasn't yet synced. + // In order to prevent that, we only set the `synced` property + // to true for items that haven't been modified since we last ran + // the push. Everything else will be collected again in the next + // push. + condition: (eb) => eb("dateModified", "<=", pushTimestamp) + } ); + pushTimestamp = Date.now(); } } } diff --git a/packages/core/src/database/sql-collection.ts b/packages/core/src/database/sql-collection.ts index 9b6108f1a..03649b570 100644 --- a/packages/core/src/database/sql-collection.ts +++ b/packages/core/src/database/sql-collection.ts @@ -226,7 +226,14 @@ export class SQLCollection< async update( ids: string[], partial: Partial>, - options: { sendEvent: boolean } = { sendEvent: true } + options: { + sendEvent: boolean; + condition?: ExpressionOrFactory< + DatabaseSchema, + keyof DatabaseSchema, + SqlBool + >; + } = { sendEvent: true } ) { if (!this.sanitizer.sanitize(this.type, partial)) return; @@ -237,6 +244,7 @@ export class SQLCollection< await tx .updateTable(this.type) .where("id", "in", chunk) + .$if(!!options.condition, (eb) => eb.where(options.condition!)) .set({ ...partial, dateModified: Date.now(),