From 896420064f1c0e8ef57ad153d7434b353b6c0a9d Mon Sep 17 00:00:00 2001 From: Abdullah Atta Date: Tue, 23 Dec 2025 11:03:36 +0500 Subject: [PATCH 1/4] core: improve sync reliability --- packages/core/__e2e__/sync.test.js | 463 ++++++++++++++++++++---- packages/core/src/api/index.ts | 20 +- packages/core/src/api/sync/collector.ts | 14 +- packages/core/src/api/sync/index.ts | 17 +- packages/core/src/api/sync/merger.ts | 2 +- packages/core/src/api/sync/types.ts | 1 + packages/core/src/api/token-manager.ts | 16 +- packages/core/src/api/user-manager.ts | 25 +- packages/core/src/api/vault.ts | 2 +- 9 files changed, 445 insertions(+), 115 deletions(-) diff --git a/packages/core/__e2e__/sync.test.js b/packages/core/__e2e__/sync.test.js index 47be56779..bce8f0d3e 100644 --- a/packages/core/__e2e__/sync.test.js +++ b/packages/core/__e2e__/sync.test.js @@ -17,15 +17,40 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . */ -import { EV, EVENTS } from "../src/common.ts"; -import { test, expect, vitest } from "vitest"; +import { EVENTS } from "../src/common.ts"; +import { test, expect, vitest, vi, beforeAll, afterAll } from "vitest"; import { login } from "./utils.js"; -import { databaseTest } from "../__tests__/utils/index.ts"; +import { databaseTest, delay } from "../__tests__/utils/index.ts"; +import http from "../src/utils/http.ts"; +import Constants from "../src/utils/constants.ts"; +import { writeFileSync } from "node:fs"; const TEST_TIMEOUT = 60 * 1000; +const testOptions = { concurrent: true, timeout: TEST_TIMEOUT }; + +beforeAll(async () => { + const device = await databaseTest("memory"); + + await login(device); + + await device.user.resetUser(false); + + await device.user.logout(); +}, TEST_TIMEOUT); + +afterAll(async () => { + const device = await databaseTest("memory"); + + await login(device); + + await device.user.resetUser(false); + + await device.user.logout(); +}, TEST_TIMEOUT); test( "case 1: device A & B should only download the changes from device C (no uploading)", + testOptions, async (t) => { const types = []; function onSyncProgress({ type }) { @@ -53,12 +78,12 @@ test( await deviceB.sync({ type: "full" }); expect(types.every((t) => t === "download")).toBe(true); - }, - TEST_TIMEOUT + } ); test( "case 3: Device A & B have unsynced changes but server has nothing", + testOptions, async (t) => { const [deviceA, deviceB] = await Promise.all([ initializeDevice("deviceA"), @@ -85,12 +110,12 @@ test( expect(await deviceB.notes.note(note1Id)).toBeTruthy(); expect(await deviceA.notes.note(note1Id)).toBeTruthy(); expect(await deviceB.notes.note(note2Id)).toBeTruthy(); - }, - TEST_TIMEOUT + } ); test( "edge case 1: items updated while push is running", + testOptions, async (t) => { const [deviceA, deviceB] = await Promise.all([ initializeDevice("deviceA"), @@ -105,24 +130,32 @@ test( const id = await deviceA.notes.add({ title: "hello" }); for (let i = 0; i < 5; ++i) { if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` }); - await Promise.all([ - deviceA.sync({ type: "send" }), - new Promise((resolve) => setTimeout(resolve, 40)).then(() => - deviceA.notes.add({ id, title: `edit ${i}` }) - ) - ]); - + var spy = vi.spyOn(deviceA.syncer.sync, "start"); + const defaultEncryptMulti = deviceA + .storage() + .encryptMulti.bind(deviceA.storage()); + var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti"); + encryptMulti.mockImplementationOnce(async (...args) => { + const result = defaultEncryptMulti(...args); + // simulate scenario where a note gets updated while sync is collecting + // items + await new Promise((resolve) => setTimeout(resolve, 100)); + await deviceA.notes.add({ id, title: `edit ${i}` }); + return result; + }); + await deviceA.sync({ type: "send" }); + expect(spy).toHaveBeenCalledTimes(2); expect((await deviceA.notes.note(id))?.title).toBe(`edit ${i}`); expect((await deviceA.notes.note(id))?.synced).toBe(true); await deviceB.sync({ type: "fetch" }); expect((await deviceB.notes.note(id))?.title).toBe(`edit ${i}`); } - }, - TEST_TIMEOUT * 10 + } ); test( "edge case 2: new items added while push is running", + testOptions, async (t) => { const [deviceA, deviceB] = await Promise.all([ initializeDevice("deviceA"), @@ -137,21 +170,33 @@ test( const id = await deviceA.notes.add({ title: "hello" }); for (let i = 0; i < 5; ++i) { if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` }); - await Promise.all([ - deviceA.sync({ type: "send" }), - new Promise((resolve) => setTimeout(resolve, 40)).then(() => - deviceA.notes.add({ title: `note ${i}` }) - ) - ]); + var spy = vi.spyOn(deviceA.syncer.sync, "start"); + const defaultEncryptMulti = deviceA + .storage() + .encryptMulti.bind(deviceA.storage()); + var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti"); + let newNoteId; + encryptMulti.mockImplementationOnce(async (key, items) => { + const result = defaultEncryptMulti(key, items); + // simulate scenario where a note gets added while sync is collecting + // items + await new Promise((resolve) => setTimeout(resolve, 100)); + newNoteId = await deviceA.notes.add({ title: `note ${i}` }); + return result; + }); + await deviceA.sync({ type: "send" }); + expect(await deviceA.notes.note(newNoteId)).toBeDefined(); + expect((await deviceA.notes.note(newNoteId)).synced).toBe(true); + expect(spy).toHaveBeenCalledTimes(1); await deviceB.sync({ type: "fetch" }); - expect(await deviceB.notes.all.count()).toBe(i + 2); + expect(await deviceB.notes.note(newNoteId)).toBeDefined(); } - }, - TEST_TIMEOUT * 10 + } ); test( "issue: syncing should not affect the items' dateModified", + testOptions, async (t) => { const [deviceA] = await Promise.all([initializeDevice("deviceA")]); @@ -173,12 +218,12 @@ test( .dateModified; expect(noteDateBefore).toBe(noteDateAfter); expect(contentDateBefore).toBe(contentDateAfter); - }, - TEST_TIMEOUT + } ); test( "case 4: local content changed after remote content should create a conflict", + testOptions, async (t) => { const [deviceA, deviceB] = await Promise.all([ initializeDevice("deviceA"), @@ -190,7 +235,11 @@ test( await cleanup(deviceA, deviceB); }); - const noteId = await deviceA.notes.add({ + t.onTestFailed(() => { + writeFileSync("debug_deviceA_note.json", content); + }); + + var noteId = await deviceA.notes.add({ title: "Test note from device A", content: { data: "

Hello

", type: "tiptap" } }); @@ -203,21 +252,21 @@ test( }); await deviceB.sync({ type: "full" }); - await new Promise((resolve) => setTimeout(resolve, 10000)); + await delay(2500); await deviceA.notes.add({ id: noteId, content: { data: "

Hello (I am from device A)

", type: "tiptap" } }); await deviceA.sync({ type: "full" }); - + var content = JSON.stringify(await deviceA.content.findByNoteId(noteId)); expect(await deviceA.notes.conflicted.count()).toBeGreaterThan(0); - }, - TEST_TIMEOUT * 10 + } ); test( "case 5: remote content changed after local content should create a conflict", + testOptions, async (t) => { const [deviceA, deviceB] = await Promise.all([ initializeDevice("deviceA"), @@ -228,31 +277,46 @@ test( console.log(`${t.task.name} log out`); await cleanup(deviceA, deviceB); }); + // t.onTestFailed(() => { + // writeFileSync("debug_deviceB_note.json", content); + // }); - const noteId = await deviceA.notes.add({ + deviceB.syncer.sync.merger.forceLogging = true; + deviceB.syncer.sync.collector.forceLogging = true; + var noteId = await deviceA.notes.add({ title: "Test note from device A", - content: { data: "

Hello

", type: "tiptap" } + content: { data: "

Hello unique note

", type: "tiptap" } }); + console.log("Device A is syncing (A)"); await deviceA.sync({ type: "full" }); + console.log("Device B is syncing (A)"); await deviceB.sync({ type: "full" }); await deviceA.notes.add({ id: noteId, - content: { data: "

Hello (I am from device B)

", type: "tiptap" } + content: { + data: "

Hello (unique note edit) device A

", + type: "tiptap" + } }); + console.log("Device A is syncing (B)"); await deviceA.sync({ type: "full" }); - await new Promise((resolve) => setTimeout(resolve, 10000)); + await delay(2500); await deviceB.notes.add({ id: noteId, - content: { data: "

Hello (I am from device A)

", type: "tiptap" } + content: { + data: "

Hello unique note edit (device B)

", + type: "tiptap" + } }); + expect((await deviceB.content.findByNoteId(noteId)).synced).toBe(false); + console.log("Syncing device B"); await deviceB.sync({ type: "full" }); - + // // var content = JSON.stringify(await deviceB.content.findByNoteId(noteId)); expect(await deviceB.notes.conflicted.count()).toBeGreaterThan(0); - }, - TEST_TIMEOUT * 10 + } ); // test( @@ -382,6 +446,7 @@ test( test( "issue: running force sync from device A makes device B always download everything", + testOptions, async (t) => { const [deviceA, deviceB] = await Promise.all([ initializeDevice("deviceA"), @@ -402,20 +467,20 @@ test( }); } - await syncAndWait(deviceA, deviceB, true); - const handler = vitest.fn(); deviceB.eventManager.subscribe(EVENTS.syncProgress, handler); - await deviceB.sync({ type: "full" }); + await syncAndWait(deviceA, deviceB, true); - expect(handler).not.toHaveBeenCalled(); - }, - TEST_TIMEOUT + expect(handler.mock.calls.every(([data]) => data.type === "download")).toBe( + true + ); + } ); test( "issue: colors are not properly created if multiple notes are synced together", + testOptions, async (t) => { const [deviceA, deviceB] = await Promise.all([ initializeDevice("deviceA"), @@ -445,7 +510,12 @@ test( colorCode: "#ffff22" }); for (let noteId of noteIds) { - expect(await deviceB.notes.note(noteId)).toBeTruthy(); + // console.log( + // "Adding color to note", + // noteId, + // await deviceB.syncer.devices.get() + // ); + expect(await deviceB.notes.note(noteId)).toBeDefined(); expect( await deviceB.relations .from({ id: colorId, type: "color" }, "note") @@ -467,36 +537,274 @@ test( expect( noteIds.every((id) => purpleNotes.findIndex((p) => p.id === id) > -1) ).toBe(true); - }, - TEST_TIMEOUT + } ); +test( + "monograph: published note appears on other device after sync", + testOptions, + async (t) => { + const [deviceA, deviceB] = await Promise.all([ + initializeDevice("deviceA"), + initializeDevice("deviceB") + ]); + + t.onTestFinished(async () => { + console.log(`${t.task.name} log out`); + await cleanup(deviceA, deviceB); + }); + + const noteId = await deviceA.notes.add({ + title: "Monograph note", + content: { data: "

monograph content

", type: "tiptap" } + }); + + // publish on device A + const monographId = await deviceA.monographs.publish(noteId); + expect(monographId).toBeTruthy(); + + // deviceB fetches the monograph + await deviceB.sync({ type: "fetch" }); + + expect(deviceB.monographs.isPublished(noteId)).toBeTruthy(); + + const mono = await deviceB.monographs.get(noteId); + expect(mono).toBeTruthy(); + expect(mono.title).toBe("Monograph note"); + } +); + +test( + "monograph: self-destruct triggers unpublish across devices", + testOptions, + async (t) => { + const [deviceA, deviceB, deviceC] = await Promise.all([ + initializeDevice("deviceA"), + initializeDevice("deviceB"), + initializeDevice("deviceC") + ]); + + t.onTestFinished(async () => { + // console.log(`${t.task.name} log out`); + await cleanup(deviceA, deviceB, deviceC); + }); + + const noteId = await deviceA.notes.add({ + title: "Self destruct monograph", + content: { data: "

transient

", type: "tiptap" } + }); + + // publish with selfDestruct + const monographId = await deviceA.monographs.publish(noteId, { + selfDestruct: true + }); + expect(monographId).toBeTruthy(); + + // ensure all devices know it's published + await deviceA.sync({ type: "fetch" }); + await deviceB.sync({ type: "fetch" }); + await deviceC.sync({ type: "fetch" }); + + expect(deviceB.monographs.isPublished(noteId)).toBeTruthy(); + expect(deviceC.monographs.isPublished(noteId)).toBeTruthy(); + + // trigger a view on the public endpoint which should cause self-destruct + await http + .get(`${Constants.API_HOST}/monographs/${noteId}/view`) + .catch(() => {}); + + // give the server a short moment to process + await delay(500); + + // sync all devices to pick up the unpublish + await deviceA.sync({ type: "fetch" }); + await deviceB.sync({ type: "fetch" }); + await deviceC.sync({ type: "fetch" }); + + expect(deviceA.monographs.isPublished(noteId)).toBeFalsy(); + expect(deviceB.monographs.isPublished(noteId)).toBeFalsy(); + expect(deviceC.monographs.isPublished(noteId)).toBeFalsy(); + } +); + +test( + "monograph: properties (password and selfDestruct) sync across devices", + testOptions, + async (t) => { + const [deviceA, deviceB] = await Promise.all([ + initializeDevice("deviceA"), + initializeDevice("deviceB") + ]); + + t.onTestFinished(async () => { + console.log(`${t.task.name} log out`); + await cleanup(deviceA, deviceB); + }); + + const password = "s3cr3t"; + + const noteId = await deviceA.notes.add({ + title: "Protected monograph", + content: { data: "

secret content

", type: "tiptap" } + }); + + // publish with password and selfDestruct on device A + const monographId = await deviceA.monographs.publish(noteId, { + password, + selfDestruct: true + }); + expect(monographId).toBeTruthy(); + + // deviceB fetches the monograph + await deviceB.sync({ type: "fetch" }); + + expect(deviceB.monographs.isPublished(noteId)).toBeTruthy(); + + const mono = await deviceB.monographs.get(noteId); + expect(mono).toBeTruthy(); + expect(mono.selfDestruct).toBe(true); + expect(mono.password).toBeTruthy(); + + const decrypted = await deviceB.monographs.decryptPassword(mono.password); + expect(decrypted).toBe(password); + } +); + +test( + "edge case: deletion on one device propagates to others", + testOptions, + async (t) => { + const [deviceA, deviceB] = await Promise.all([ + initializeDevice("deviceA"), + initializeDevice("deviceB") + ]); + + t.onTestFinished(async () => { + console.log(`${t.task.name} log out`); + await cleanup(deviceA, deviceB); + }); + + const id = await deviceA.notes.add({ title: "note to delete" }); + + // ensure both devices have latest state where note exists + await deviceA.sync({ type: "full" }); + await deviceB.sync({ type: "full" }); + expect(await deviceB.notes.note(id)).toBeTruthy(); + + // delete on deviceA + await deviceA.notes.remove(id); + await deviceA.sync({ type: "full" }); + + // deviceB should observe deletion after syncing + await deviceB.sync({ type: "full" }); + expect(await deviceB.notes.note(id)).toBeFalsy(); + } +); + +test( + "stress: sync 5000 notes from device A to device B", + testOptions, + async (t) => { + const [deviceA, deviceB] = await Promise.all([ + initializeDevice("deviceA"), + initializeDevice("deviceB") + ]); + + t.onTestFinished(async () => { + console.log(`${t.task.name} log out`); + await cleanup(deviceA, deviceB); + }); + + for (let i = 0; i < 5000; ++i) { + await deviceA.notes.add({ + title: `note ${i}`, + content: { + type: "tiptap", + data: `

deviceA=true

` + } + }); + } + + await deviceA.sync({ type: "full" }); + await deviceB.sync({ type: "full" }); + + const countA = await deviceA.notes.all.count(); + const countB = await deviceB.notes.all.count(); + + expect(countA).toBeGreaterThanOrEqual(5000); + expect(countB).toBeGreaterThanOrEqual(5000); + } +); + +test("stress: super concurrent sync", testOptions, async (t) => { + console.time("adding devices"); + const devices = await Promise.all( + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + .split("") + .map((letter) => initializeDevice(`device${letter}`)) + ); + console.timeEnd("adding devices"); + + t.onTestFinished(async () => { + console.log(`${t.task.name} log out`); + await cleanup(...devices); + }); + + await Promise.all( + devices.map(async (device, index) => { + for (let i = 0; i < 100; ++i) { + await device.notes.add({ + content: { + type: "tiptap", + data: `

device${i}${index}=true

` + } + }); + } + }) + ); + + await Promise.all( + devices.map(async (device) => { + await device.sync({ type: "send" }); + }) + ); + + await Promise.all( + devices.map(async (device) => { + await device.sync({ type: "fetch" }); + }) + ); + + for (const device of devices) { + // await device.sync({ type: "full" }); + + expect(await device.notes.all.count()).toBeGreaterThanOrEqual( + devices.length * 100 + ); + } +}); + /** * * @param {string} id - * @returns {Promise} + * @returns {Promise} */ async function initializeDevice(id) { // initialize(new NodeStorageInterface(), false); - console.time(`Init ${id}`); - - EV.subscribe(EVENTS.syncCheckStatus, async (type) => { - return { - type, - result: true - }; - }); + // console.time(`Init ${id}`); const device = await databaseTest("memory"); + console.time("login" + id); await login(device); + console.timeEnd("login" + id); - await device.user.resetUser(false); + // await device.user.resetUser(false); await device.sync({ type: "full" }); - console.timeEnd(`Init ${id}`); + // console.timeEnd(`Init ${id}`); return device; } @@ -511,22 +819,33 @@ async function cleanup(...devices) { await device.user.logout(); device.eventManager.unsubscribeAll(); } - EV.unsubscribeAll(); + // EV.unsubscribeAll(); } /** * - * @param {Database} deviceA - * @param {Database} deviceB + * @param {import("../src/api/index.ts").default} deviceA + * @param {import("../src/api/index.ts").default} deviceB * @returns */ function syncAndWait(deviceA, deviceB, force = false) { return new Promise((resolve, reject) => { const ref2 = deviceB.eventManager.subscribe( EVENTS.databaseSyncRequested, - (full, force) => { + async (full, force, deviceId) => { if (!full) return; - console.log("sync requested by device A", full, force); + if (deviceId !== (await deviceA.syncer.devices.get())) { + console.warn( + "Concurrency error. Expected:", + await deviceA.syncer.devices.get(), + "Got:", + deviceId + ); + return; + } + + console.log("sync requested by device A:", deviceId, full, force); + ref2.unsubscribe(); deviceB .sync({ @@ -538,13 +857,13 @@ function syncAndWait(deviceA, deviceB, force = false) { } ); - console.log( - "waiting for sync...", - "Device A:", - deviceA.syncer.sync.syncing, - "Device B:", - deviceB.syncer.sync.syncing - ); + // console.log( + // "waiting for sync...", + // "Device A:", + // deviceA.syncer.sync.syncing, + // "Device B:", + // deviceB.syncer.sync.syncing + // ); deviceA.sync({ type: "full", force }).catch(reject); }); diff --git a/packages/core/src/api/index.ts b/packages/core/src/api/index.ts index 57f0f0ad5..176d41d2b 100644 --- a/packages/core/src/api/index.ts +++ b/packages/core/src/api/index.ts @@ -288,15 +288,16 @@ class Database { "options not specified. Did you forget to call db.setup()?" ); - EV.subscribeMulti( - [EVENTS.userLoggedIn, EVENTS.userFetched, EVENTS.tokenRefreshed], + this.eventManager.subscribeMulti( + [EVENTS.userLoggedIn, EVENTS.userFetched], this.connectSSE, this ); + EV.subscribe(EVENTS.tokenRefreshed, () => this.connectSSE()); EV.subscribe(EVENTS.attachmentDeleted, async (attachment: Attachment) => { await this.fs().cancel(attachment.hash); }); - EV.subscribe(EVENTS.userLoggedOut, async () => { + this.eventManager.subscribe(EVENTS.userLoggedOut, async () => { await this.monographs.clear(); await this.fs().clear(); this.disconnectSSE(); @@ -388,11 +389,11 @@ class Database { }); this.eventSource.onopen = async () => { - console.log("SSE: opened channel successfully!"); + logger.log("SSE: opened channel successfully!"); }; this.eventSource.onerror = function (error) { - console.log("SSE: error:", error); + logger.error(error, "SSE: error"); }; this.eventSource.onmessage = async (event) => { @@ -405,7 +406,7 @@ class Database { if (!user) break; user.subscription = data; await this.user.setUser(user); - EV.publish(EVENTS.userSubscriptionUpdated, data); + this.eventManager.publish(EVENTS.userSubscriptionUpdated, data); await this.tokenManager._refreshToken(true); break; } @@ -416,15 +417,12 @@ class Database { case "emailConfirmed": { await this.tokenManager._refreshToken(true); await this.user.fetchUser(); - EV.publish(EVENTS.userEmailConfirmed); + this.eventManager.publish(EVENTS.userEmailConfirmed); break; } - case "triggerSync": { - await this.sync({ type: "fetch" }); - } } } catch (e) { - console.log("SSE: Unsupported message. Message = ", event.data); + logger.error("SSE: Unsupported message. Message = ", event.data); return; } }; diff --git a/packages/core/src/api/sync/collector.ts b/packages/core/src/api/sync/collector.ts index 273f0a322..2fcfc9dd4 100644 --- a/packages/core/src/api/sync/collector.ts +++ b/packages/core/src/api/sync/collector.ts @@ -62,9 +62,9 @@ class Collector { const ciphers = await this.db .storage() .encryptMulti(key, syncableItems); - const items = toPushItem(ids, ciphers); - if (!items) continue; - yield { items, type: itemType }; + const items = toSyncItem(ids, ciphers); + if (!items.length) continue; + yield { items, type: itemType, count: items.length }; await this.db .sql() @@ -88,15 +88,17 @@ class Collector { } export default Collector; -function toPushItem(ids: string[], ciphers: Cipher<"base64">[]) { +function toSyncItem(ids: string[], ciphers: Cipher<"base64">[]) { if (ids.length !== ciphers.length) throw new Error("ids.length must be equal to ciphers.length"); const items: SyncItem[] = []; for (let i = 0; i < ids.length; ++i) { const id = ids[i]; - const cipher = ciphers[i]; - items.push({ ...cipher, v: CURRENT_DATABASE_VERSION, id }); + const cipher = ciphers[i] as SyncItem; + cipher.v = CURRENT_DATABASE_VERSION; + cipher.id = id; + items.push(cipher); } return items; } diff --git a/packages/core/src/api/sync/index.ts b/packages/core/src/api/sync/index.ts index 33c146806..53e595cb9 100644 --- a/packages/core/src/api/sync/index.ts +++ b/packages/core/src/api/sync/index.ts @@ -166,7 +166,7 @@ class Sync { this.autoSync = new AutoSync(db, 1000); this.devices = new SyncDevices(db.kv, db.tokenManager); - EV.subscribe(EVENTS.userLoggedOut, async () => { + db.eventManager.subscribe(EVENTS.userLoggedOut, async () => { await this.connection?.stop(); this.autoSync.stop(); }); @@ -285,7 +285,7 @@ class Sync { ); } } - if (done > 0) await this.connection?.send("PushCompleted"); + if (done > 0) await this.connection?.send("PushCompletedV2", deviceId); return true; } @@ -332,8 +332,13 @@ class Sync { /** * @private */ - async onPushCompleted() { - this.db.eventManager.publish(EVENTS.databaseSyncRequested, true, false); + async onPushCompleted(deviceId: string) { + this.db.eventManager.publish( + EVENTS.databaseSyncRequested, + true, + false, + deviceId + ); } async processChunk( @@ -442,7 +447,9 @@ class Sync { .withHubProtocol(new JsonHubProtocol()) .build(); this.connection.serverTimeoutInMilliseconds = 60 * 1000 * 5; - this.connection.on("PushCompleted", () => this.onPushCompleted()); + this.connection.on("PushCompletedV2", (deviceId: string) => + this.onPushCompleted(deviceId) + ); this.connection.on("SendVaultKey", async (vaultKey) => { if (this.connection?.state !== HubConnectionState.Connected) return false; diff --git a/packages/core/src/api/sync/merger.ts b/packages/core/src/api/sync/merger.ts index 3bf905f8a..aefbe4f79 100644 --- a/packages/core/src/api/sync/merger.ts +++ b/packages/core/src/api/sync/merger.ts @@ -29,7 +29,7 @@ import { } from "../../types.js"; import { ParsedInboxItem, SyncInboxItem } from "./types.js"; -const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000; +const THRESHOLD = process.env.NODE_ENV === "test" ? 2 * 1000 : 60 * 1000; class Merger { logger = logger.scope("Merger"); constructor(private readonly db: Database) {} diff --git a/packages/core/src/api/sync/types.ts b/packages/core/src/api/sync/types.ts index b2f518c6b..994fc8451 100644 --- a/packages/core/src/api/sync/types.ts +++ b/packages/core/src/api/sync/types.ts @@ -47,6 +47,7 @@ export const SYNC_ITEM_TYPES = Object.keys( export type SyncTransferItem = { items: SyncItem[]; type: SyncableItemType; + count: number; }; export type SyncInboxItem = Omit & { diff --git a/packages/core/src/api/token-manager.ts b/packages/core/src/api/token-manager.ts index f3c7deeb4..b0a29ff8f 100644 --- a/packages/core/src/api/token-manager.ts +++ b/packages/core/src/api/token-manager.ts @@ -47,14 +47,14 @@ const ENDPOINTS = { temporaryToken: "/account/token", logout: "/account/logout" }; -const REFRESH_TOKEN_MUTEX = withTimeout( - new Mutex(), - 10 * 1000, - new Error("Timed out while refreshing access token.") -); - class TokenManager { logger = logger.scope("TokenManager"); + private REFRESH_TOKEN_MUTEX = withTimeout( + new Mutex(), + 10 * 1000, + new Error("Timed out while refreshing access token.") + ); + constructor(private readonly storage: KVStorageAccessor) {} async getToken(renew = true, forceRenew = false): Promise { @@ -101,7 +101,7 @@ class TokenManager { } async _refreshToken(forceRenew = false) { - await REFRESH_TOKEN_MUTEX.runExclusive(async () => { + await this.REFRESH_TOKEN_MUTEX.runExclusive(async () => { this.logger.info("Refreshing access token"); const token = await this.getToken(false, false); @@ -145,7 +145,7 @@ class TokenManager { } saveToken(tokenResponse: Omit) { - this.logger.info("Saving new token", tokenResponse); + this.logger.info("Saving new token"); if (!tokenResponse || !tokenResponse.access_token) return; const token: Token = { ...tokenResponse, t: Date.now() }; return this.storage().write("token", token); diff --git a/packages/core/src/api/user-manager.ts b/packages/core/src/api/user-manager.ts index bd6875833..53040af2a 100644 --- a/packages/core/src/api/user-manager.ts +++ b/packages/core/src/api/user-manager.ts @@ -192,7 +192,7 @@ class UserManager { salt: user.salt }); } - EV.publish(EVENTS.userLoggedIn, user); + this.db.eventManager.publish(EVENTS.userLoggedIn, user); } catch (e) { await this.tokenManager.saveToken(token); throw e; @@ -240,7 +240,7 @@ class UserManager { await this.db.setLastSynced(0); await this.db.syncer.devices.register(); - EV.publish(EVENTS.userLoggedIn, user); + this.db.eventManager.publish(EVENTS.userLoggedIn, user); } async getSessions() { @@ -281,8 +281,8 @@ class UserManager { this.cachedAttachmentKey = undefined; this.cachedInboxKeys = undefined; await this.db.reset(); - EV.publish(EVENTS.userLoggedOut, reason); - EV.publish(EVENTS.appRefreshRequested); + this.db.eventManager.publish(EVENTS.userLoggedOut, reason); + this.db.eventManager.publish(EVENTS.appRefreshRequested); } } @@ -345,6 +345,7 @@ class UserManager { } async fetchUser(): Promise { + const oldUser = await this.getUser(); try { const token = await this.tokenManager.getAccessToken(); if (!token) return; @@ -353,7 +354,6 @@ class UserManager { token ); if (user) { - const oldUser = await this.getUser(); await this.setUser(user); if ( oldUser && @@ -362,18 +362,21 @@ class UserManager { oldUser.subscription.provider !== user.subscription.provider) ) { await this.tokenManager._refreshToken(true); - EV.publish(EVENTS.userSubscriptionUpdated, user.subscription); + this.db.eventManager.publish( + EVENTS.userSubscriptionUpdated, + user.subscription + ); } if (oldUser && !oldUser.isEmailConfirmed && user.isEmailConfirmed) - EV.publish(EVENTS.userEmailConfirmed); - EV.publish(EVENTS.userFetched, user); + this.db.eventManager.publish(EVENTS.userEmailConfirmed); + this.db.eventManager.publish(EVENTS.userFetched, user); return user; } else { - return await this.getUser(); + return oldUser; } } catch (e) { logger.error(e, "Error fetching user"); - return await this.getUser(); + return oldUser; } } @@ -559,7 +562,7 @@ class UserManager { async hasInboxKeys() { if (this.cachedInboxKeys) return true; - let user = await this.getUser(); + const user = await this.getUser(); if (!user) return false; return !!user.inboxKeys; diff --git a/packages/core/src/api/vault.ts b/packages/core/src/api/vault.ts index e58e23d3a..da700e794 100644 --- a/packages/core/src/api/vault.ts +++ b/packages/core/src/api/vault.ts @@ -57,7 +57,7 @@ export default class Vault { constructor(private readonly db: Database) { this.password = undefined; - EV.subscribe(EVENTS.userLoggedOut, () => { + db.eventManager.subscribe(EVENTS.userLoggedOut, () => { this.password = undefined; }); } From 637a19ee217cf791da44b9592b152db36afe5303 Mon Sep 17 00:00:00 2001 From: Abdullah Atta Date: Tue, 23 Dec 2025 11:03:59 +0500 Subject: [PATCH 2/4] web: use db's event manager instead of global --- apps/web/src/stores/editor-store.ts | 2 +- apps/web/src/stores/user-store.ts | 27 +++++++++++++++------------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/apps/web/src/stores/editor-store.ts b/apps/web/src/stores/editor-store.ts index 92281027c..5173e0c01 100644 --- a/apps/web/src/stores/editor-store.ts +++ b/apps/web/src/stores/editor-store.ts @@ -249,7 +249,7 @@ class EditorStore extends BaseStore { } ); - EV.subscribe(EVENTS.userLoggedOut, () => { + db.eventManager.subscribe(EVENTS.userLoggedOut, () => { const { closeTabs, tabs } = this.get(); closeTabs(...tabs.map((s) => s.id)); }); diff --git a/apps/web/src/stores/user-store.ts b/apps/web/src/stores/user-store.ts index a3b99091c..9b85b8557 100644 --- a/apps/web/src/stores/user-store.ts +++ b/apps/web/src/stores/user-store.ts @@ -58,22 +58,25 @@ class UserStore extends BaseStore { if (Config.get("sessionExpired")) return; - EV.subscribe(EVENTS.userSubscriptionUpdated, (subscription) => { - const wasSubscribed = isUserSubscribed(); - this.refreshUser(); - this.set((state) => { - if (!state.user) return; - state.user.subscription = subscription; - }); - if (!wasSubscribed && isUserSubscribed()) OnboardingDialog.show({}); - resetFeatures(); - }); + db.eventManager.subscribe( + EVENTS.userSubscriptionUpdated, + (subscription) => { + const wasSubscribed = isUserSubscribed(); + this.refreshUser(); + this.set((state) => { + if (!state.user) return; + state.user.subscription = subscription; + }); + if (!wasSubscribed && isUserSubscribed()) OnboardingDialog.show({}); + resetFeatures(); + } + ); - EV.subscribe(EVENTS.userEmailConfirmed, () => { + db.eventManager.subscribe(EVENTS.userEmailConfirmed, () => { hashNavigate("/confirmed"); }); - EV.subscribe(EVENTS.userLoggedOut, async (reason) => { + db.eventManager.subscribe(EVENTS.userLoggedOut, async (reason) => { this.set((state) => { state.user = undefined; state.isLoggedIn = false; From 9c1a02dbd5233ba676d2121c7eab3179d98a4102 Mon Sep 17 00:00:00 2001 From: Abdullah Atta Date: Tue, 23 Dec 2025 11:04:06 +0500 Subject: [PATCH 3/4] mobile: use db's event manager instead of global --- apps/mobile/app/hooks/use-app-events.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/mobile/app/hooks/use-app-events.tsx b/apps/mobile/app/hooks/use-app-events.tsx index 83f20095b..95ab33f62 100644 --- a/apps/mobile/app/hooks/use-app-events.tsx +++ b/apps/mobile/app/hooks/use-app-events.tsx @@ -637,10 +637,10 @@ export const useAppEvents = () => { EV.subscribe(EVENTS.syncCheckStatus, onCheckSyncStatus), EV.subscribe(EVENTS.syncAborted, onSyncAborted), EV.subscribe(EVENTS.appRefreshRequested, onSyncComplete), - EV.subscribe(EVENTS.userLoggedOut, onLogout), - EV.subscribe(EVENTS.userEmailConfirmed, onUserEmailVerified), + db.eventManager.subscribe(EVENTS.userLoggedOut, onLogout), + db.eventManager.subscribe(EVENTS.userEmailConfirmed, onUserEmailVerified), EV.subscribe(EVENTS.userSessionExpired, onUserSessionExpired), - EV.subscribe( + db.eventManager.subscribe( EVENTS.userSubscriptionUpdated, onUserSubscriptionStatusChanged ), From 583a68cabec36061c2e63fb1dd672a80d37567b6 Mon Sep 17 00:00:00 2001 From: Abdullah Atta Date: Tue, 23 Dec 2025 11:53:43 +0500 Subject: [PATCH 4/4] core: fix merger tests --- packages/core/src/api/sync/__tests__/merger.test.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/core/src/api/sync/__tests__/merger.test.js b/packages/core/src/api/sync/__tests__/merger.test.js index 4578fa839..11c5714f8 100644 --- a/packages/core/src/api/sync/__tests__/merger.test.js +++ b/packages/core/src/api/sync/__tests__/merger.test.js @@ -209,7 +209,7 @@ describe.concurrent("merge content", (test) => { type: "tiptap", data: "Remote", noteId, - dateEdited: Date.now() - 3000, + dateEdited: Date.now() - 300, dateModified: Date.now() }, { @@ -217,7 +217,7 @@ describe.concurrent("merge content", (test) => { data: "Local", noteId, dateEdited: Date.now(), - dateModified: Date.now() - 6000 + dateModified: Date.now() - 600 } ); @@ -236,8 +236,8 @@ describe.concurrent("merge content", (test) => { type: "tiptap", data: "Remote", noteId, - dateEdited: Date.now() - 3000, - dateModified: Date.now() - 6000 + dateEdited: Date.now() - 300, + dateModified: Date.now() - 600 }, { type: "tiptap",