diff --git a/packages/core/__e2e__/sync.test.js b/packages/core/__e2e__/sync.test.js
index 47be56779..bce8f0d3e 100644
--- a/packages/core/__e2e__/sync.test.js
+++ b/packages/core/__e2e__/sync.test.js
@@ -17,15 +17,40 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
-import { EV, EVENTS } from "../src/common.ts";
-import { test, expect, vitest } from "vitest";
+import { EVENTS } from "../src/common.ts";
+import { test, expect, vitest, vi, beforeAll, afterAll } from "vitest";
import { login } from "./utils.js";
-import { databaseTest } from "../__tests__/utils/index.ts";
+import { databaseTest, delay } from "../__tests__/utils/index.ts";
+import http from "../src/utils/http.ts";
+import Constants from "../src/utils/constants.ts";
+import { writeFileSync } from "node:fs";
const TEST_TIMEOUT = 60 * 1000;
+const testOptions = { concurrent: true, timeout: TEST_TIMEOUT };
+
+beforeAll(async () => {
+ const device = await databaseTest("memory");
+
+ await login(device);
+
+ await device.user.resetUser(false);
+
+ await device.user.logout();
+}, TEST_TIMEOUT);
+
+afterAll(async () => {
+ const device = await databaseTest("memory");
+
+ await login(device);
+
+ await device.user.resetUser(false);
+
+ await device.user.logout();
+}, TEST_TIMEOUT);
test(
"case 1: device A & B should only download the changes from device C (no uploading)",
+ testOptions,
async (t) => {
const types = [];
function onSyncProgress({ type }) {
@@ -53,12 +78,12 @@ test(
await deviceB.sync({ type: "full" });
expect(types.every((t) => t === "download")).toBe(true);
- },
- TEST_TIMEOUT
+ }
);
test(
"case 3: Device A & B have unsynced changes but server has nothing",
+ testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -85,12 +110,12 @@ test(
expect(await deviceB.notes.note(note1Id)).toBeTruthy();
expect(await deviceA.notes.note(note1Id)).toBeTruthy();
expect(await deviceB.notes.note(note2Id)).toBeTruthy();
- },
- TEST_TIMEOUT
+ }
);
test(
"edge case 1: items updated while push is running",
+ testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -105,24 +130,32 @@ test(
const id = await deviceA.notes.add({ title: "hello" });
for (let i = 0; i < 5; ++i) {
if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` });
- await Promise.all([
- deviceA.sync({ type: "send" }),
- new Promise((resolve) => setTimeout(resolve, 40)).then(() =>
- deviceA.notes.add({ id, title: `edit ${i}` })
- )
- ]);
-
+ var spy = vi.spyOn(deviceA.syncer.sync, "start");
+ const defaultEncryptMulti = deviceA
+ .storage()
+ .encryptMulti.bind(deviceA.storage());
+ var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti");
+ encryptMulti.mockImplementationOnce(async (...args) => {
+ const result = defaultEncryptMulti(...args);
+ // simulate scenario where a note gets updated while sync is collecting
+ // items
+ await new Promise((resolve) => setTimeout(resolve, 100));
+ await deviceA.notes.add({ id, title: `edit ${i}` });
+ return result;
+ });
+ await deviceA.sync({ type: "send" });
+ expect(spy).toHaveBeenCalledTimes(2);
expect((await deviceA.notes.note(id))?.title).toBe(`edit ${i}`);
expect((await deviceA.notes.note(id))?.synced).toBe(true);
await deviceB.sync({ type: "fetch" });
expect((await deviceB.notes.note(id))?.title).toBe(`edit ${i}`);
}
- },
- TEST_TIMEOUT * 10
+ }
);
test(
"edge case 2: new items added while push is running",
+ testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -137,21 +170,33 @@ test(
const id = await deviceA.notes.add({ title: "hello" });
for (let i = 0; i < 5; ++i) {
if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` });
- await Promise.all([
- deviceA.sync({ type: "send" }),
- new Promise((resolve) => setTimeout(resolve, 40)).then(() =>
- deviceA.notes.add({ title: `note ${i}` })
- )
- ]);
+ var spy = vi.spyOn(deviceA.syncer.sync, "start");
+ const defaultEncryptMulti = deviceA
+ .storage()
+ .encryptMulti.bind(deviceA.storage());
+ var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti");
+ let newNoteId;
+ encryptMulti.mockImplementationOnce(async (key, items) => {
+ const result = defaultEncryptMulti(key, items);
+ // simulate scenario where a note gets added while sync is collecting
+ // items
+ await new Promise((resolve) => setTimeout(resolve, 100));
+ newNoteId = await deviceA.notes.add({ title: `note ${i}` });
+ return result;
+ });
+ await deviceA.sync({ type: "send" });
+ expect(await deviceA.notes.note(newNoteId)).toBeDefined();
+ expect((await deviceA.notes.note(newNoteId)).synced).toBe(true);
+ expect(spy).toHaveBeenCalledTimes(1);
await deviceB.sync({ type: "fetch" });
- expect(await deviceB.notes.all.count()).toBe(i + 2);
+ expect(await deviceB.notes.note(newNoteId)).toBeDefined();
}
- },
- TEST_TIMEOUT * 10
+ }
);
test(
"issue: syncing should not affect the items' dateModified",
+ testOptions,
async (t) => {
const [deviceA] = await Promise.all([initializeDevice("deviceA")]);
@@ -173,12 +218,12 @@ test(
.dateModified;
expect(noteDateBefore).toBe(noteDateAfter);
expect(contentDateBefore).toBe(contentDateAfter);
- },
- TEST_TIMEOUT
+ }
);
test(
"case 4: local content changed after remote content should create a conflict",
+ testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -190,7 +235,11 @@ test(
await cleanup(deviceA, deviceB);
});
- const noteId = await deviceA.notes.add({
+ t.onTestFailed(() => {
+ writeFileSync("debug_deviceA_note.json", content);
+ });
+
+ var noteId = await deviceA.notes.add({
title: "Test note from device A",
content: { data: "
Hello
", type: "tiptap" }
});
@@ -203,21 +252,21 @@ test(
});
await deviceB.sync({ type: "full" });
- await new Promise((resolve) => setTimeout(resolve, 10000));
+ await delay(2500);
await deviceA.notes.add({
id: noteId,
content: { data: "Hello (I am from device A)
", type: "tiptap" }
});
await deviceA.sync({ type: "full" });
-
+ var content = JSON.stringify(await deviceA.content.findByNoteId(noteId));
expect(await deviceA.notes.conflicted.count()).toBeGreaterThan(0);
- },
- TEST_TIMEOUT * 10
+ }
);
test(
"case 5: remote content changed after local content should create a conflict",
+ testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -228,31 +277,46 @@ test(
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});
+ // t.onTestFailed(() => {
+ // writeFileSync("debug_deviceB_note.json", content);
+ // });
- const noteId = await deviceA.notes.add({
+ deviceB.syncer.sync.merger.forceLogging = true;
+ deviceB.syncer.sync.collector.forceLogging = true;
+ var noteId = await deviceA.notes.add({
title: "Test note from device A",
- content: { data: "Hello
", type: "tiptap" }
+ content: { data: "Hello unique note
", type: "tiptap" }
});
+ console.log("Device A is syncing (A)");
await deviceA.sync({ type: "full" });
+ console.log("Device B is syncing (A)");
await deviceB.sync({ type: "full" });
await deviceA.notes.add({
id: noteId,
- content: { data: "Hello (I am from device B)
", type: "tiptap" }
+ content: {
+ data: "Hello (unique note edit) device A
",
+ type: "tiptap"
+ }
});
+ console.log("Device A is syncing (B)");
await deviceA.sync({ type: "full" });
- await new Promise((resolve) => setTimeout(resolve, 10000));
+ await delay(2500);
await deviceB.notes.add({
id: noteId,
- content: { data: "Hello (I am from device A)
", type: "tiptap" }
+ content: {
+ data: "Hello unique note edit (device B)
",
+ type: "tiptap"
+ }
});
+ expect((await deviceB.content.findByNoteId(noteId)).synced).toBe(false);
+ console.log("Syncing device B");
await deviceB.sync({ type: "full" });
-
+ // // var content = JSON.stringify(await deviceB.content.findByNoteId(noteId));
expect(await deviceB.notes.conflicted.count()).toBeGreaterThan(0);
- },
- TEST_TIMEOUT * 10
+ }
);
// test(
@@ -382,6 +446,7 @@ test(
test(
"issue: running force sync from device A makes device B always download everything",
+ testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -402,20 +467,20 @@ test(
});
}
- await syncAndWait(deviceA, deviceB, true);
-
const handler = vitest.fn();
deviceB.eventManager.subscribe(EVENTS.syncProgress, handler);
- await deviceB.sync({ type: "full" });
+ await syncAndWait(deviceA, deviceB, true);
- expect(handler).not.toHaveBeenCalled();
- },
- TEST_TIMEOUT
+ expect(handler.mock.calls.every(([data]) => data.type === "download")).toBe(
+ true
+ );
+ }
);
test(
"issue: colors are not properly created if multiple notes are synced together",
+ testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -445,7 +510,12 @@ test(
colorCode: "#ffff22"
});
for (let noteId of noteIds) {
- expect(await deviceB.notes.note(noteId)).toBeTruthy();
+ // console.log(
+ // "Adding color to note",
+ // noteId,
+ // await deviceB.syncer.devices.get()
+ // );
+ expect(await deviceB.notes.note(noteId)).toBeDefined();
expect(
await deviceB.relations
.from({ id: colorId, type: "color" }, "note")
@@ -467,36 +537,274 @@ test(
expect(
noteIds.every((id) => purpleNotes.findIndex((p) => p.id === id) > -1)
).toBe(true);
- },
- TEST_TIMEOUT
+ }
);
+test(
+ "monograph: published note appears on other device after sync",
+ testOptions,
+ async (t) => {
+ const [deviceA, deviceB] = await Promise.all([
+ initializeDevice("deviceA"),
+ initializeDevice("deviceB")
+ ]);
+
+ t.onTestFinished(async () => {
+ console.log(`${t.task.name} log out`);
+ await cleanup(deviceA, deviceB);
+ });
+
+ const noteId = await deviceA.notes.add({
+ title: "Monograph note",
+ content: { data: "monograph content
", type: "tiptap" }
+ });
+
+ // publish on device A
+ const monographId = await deviceA.monographs.publish(noteId);
+ expect(monographId).toBeTruthy();
+
+ // deviceB fetches the monograph
+ await deviceB.sync({ type: "fetch" });
+
+ expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
+
+ const mono = await deviceB.monographs.get(noteId);
+ expect(mono).toBeTruthy();
+ expect(mono.title).toBe("Monograph note");
+ }
+);
+
+test(
+ "monograph: self-destruct triggers unpublish across devices",
+ testOptions,
+ async (t) => {
+ const [deviceA, deviceB, deviceC] = await Promise.all([
+ initializeDevice("deviceA"),
+ initializeDevice("deviceB"),
+ initializeDevice("deviceC")
+ ]);
+
+ t.onTestFinished(async () => {
+ // console.log(`${t.task.name} log out`);
+ await cleanup(deviceA, deviceB, deviceC);
+ });
+
+ const noteId = await deviceA.notes.add({
+ title: "Self destruct monograph",
+ content: { data: "transient
", type: "tiptap" }
+ });
+
+ // publish with selfDestruct
+ const monographId = await deviceA.monographs.publish(noteId, {
+ selfDestruct: true
+ });
+ expect(monographId).toBeTruthy();
+
+ // ensure all devices know it's published
+ await deviceA.sync({ type: "fetch" });
+ await deviceB.sync({ type: "fetch" });
+ await deviceC.sync({ type: "fetch" });
+
+ expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
+ expect(deviceC.monographs.isPublished(noteId)).toBeTruthy();
+
+ // trigger a view on the public endpoint which should cause self-destruct
+ await http
+ .get(`${Constants.API_HOST}/monographs/${noteId}/view`)
+ .catch(() => {});
+
+ // give the server a short moment to process
+ await delay(500);
+
+ // sync all devices to pick up the unpublish
+ await deviceA.sync({ type: "fetch" });
+ await deviceB.sync({ type: "fetch" });
+ await deviceC.sync({ type: "fetch" });
+
+ expect(deviceA.monographs.isPublished(noteId)).toBeFalsy();
+ expect(deviceB.monographs.isPublished(noteId)).toBeFalsy();
+ expect(deviceC.monographs.isPublished(noteId)).toBeFalsy();
+ }
+);
+
+test(
+ "monograph: properties (password and selfDestruct) sync across devices",
+ testOptions,
+ async (t) => {
+ const [deviceA, deviceB] = await Promise.all([
+ initializeDevice("deviceA"),
+ initializeDevice("deviceB")
+ ]);
+
+ t.onTestFinished(async () => {
+ console.log(`${t.task.name} log out`);
+ await cleanup(deviceA, deviceB);
+ });
+
+ const password = "s3cr3t";
+
+ const noteId = await deviceA.notes.add({
+ title: "Protected monograph",
+ content: { data: "secret content
", type: "tiptap" }
+ });
+
+ // publish with password and selfDestruct on device A
+ const monographId = await deviceA.monographs.publish(noteId, {
+ password,
+ selfDestruct: true
+ });
+ expect(monographId).toBeTruthy();
+
+ // deviceB fetches the monograph
+ await deviceB.sync({ type: "fetch" });
+
+ expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
+
+ const mono = await deviceB.monographs.get(noteId);
+ expect(mono).toBeTruthy();
+ expect(mono.selfDestruct).toBe(true);
+ expect(mono.password).toBeTruthy();
+
+ const decrypted = await deviceB.monographs.decryptPassword(mono.password);
+ expect(decrypted).toBe(password);
+ }
+);
+
+test(
+ "edge case: deletion on one device propagates to others",
+ testOptions,
+ async (t) => {
+ const [deviceA, deviceB] = await Promise.all([
+ initializeDevice("deviceA"),
+ initializeDevice("deviceB")
+ ]);
+
+ t.onTestFinished(async () => {
+ console.log(`${t.task.name} log out`);
+ await cleanup(deviceA, deviceB);
+ });
+
+ const id = await deviceA.notes.add({ title: "note to delete" });
+
+ // ensure both devices have latest state where note exists
+ await deviceA.sync({ type: "full" });
+ await deviceB.sync({ type: "full" });
+ expect(await deviceB.notes.note(id)).toBeTruthy();
+
+ // delete on deviceA
+ await deviceA.notes.remove(id);
+ await deviceA.sync({ type: "full" });
+
+ // deviceB should observe deletion after syncing
+ await deviceB.sync({ type: "full" });
+ expect(await deviceB.notes.note(id)).toBeFalsy();
+ }
+);
+
+test(
+ "stress: sync 5000 notes from device A to device B",
+ testOptions,
+ async (t) => {
+ const [deviceA, deviceB] = await Promise.all([
+ initializeDevice("deviceA"),
+ initializeDevice("deviceB")
+ ]);
+
+ t.onTestFinished(async () => {
+ console.log(`${t.task.name} log out`);
+ await cleanup(deviceA, deviceB);
+ });
+
+ for (let i = 0; i < 5000; ++i) {
+ await deviceA.notes.add({
+ title: `note ${i}`,
+ content: {
+ type: "tiptap",
+ data: `deviceA=true
`
+ }
+ });
+ }
+
+ await deviceA.sync({ type: "full" });
+ await deviceB.sync({ type: "full" });
+
+ const countA = await deviceA.notes.all.count();
+ const countB = await deviceB.notes.all.count();
+
+ expect(countA).toBeGreaterThanOrEqual(5000);
+ expect(countB).toBeGreaterThanOrEqual(5000);
+ }
+);
+
+test("stress: super concurrent sync", testOptions, async (t) => {
+ console.time("adding devices");
+ const devices = await Promise.all(
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ .split("")
+ .map((letter) => initializeDevice(`device${letter}`))
+ );
+ console.timeEnd("adding devices");
+
+ t.onTestFinished(async () => {
+ console.log(`${t.task.name} log out`);
+ await cleanup(...devices);
+ });
+
+ await Promise.all(
+ devices.map(async (device, index) => {
+ for (let i = 0; i < 100; ++i) {
+ await device.notes.add({
+ content: {
+ type: "tiptap",
+ data: `device${i}${index}=true
`
+ }
+ });
+ }
+ })
+ );
+
+ await Promise.all(
+ devices.map(async (device) => {
+ await device.sync({ type: "send" });
+ })
+ );
+
+ await Promise.all(
+ devices.map(async (device) => {
+ await device.sync({ type: "fetch" });
+ })
+ );
+
+ for (const device of devices) {
+ // await device.sync({ type: "full" });
+
+ expect(await device.notes.all.count()).toBeGreaterThanOrEqual(
+ devices.length * 100
+ );
+ }
+});
+
/**
*
* @param {string} id
- * @returns {Promise}
+ * @returns {Promise}
*/
async function initializeDevice(id) {
// initialize(new NodeStorageInterface(), false);
- console.time(`Init ${id}`);
-
- EV.subscribe(EVENTS.syncCheckStatus, async (type) => {
- return {
- type,
- result: true
- };
- });
+ // console.time(`Init ${id}`);
const device = await databaseTest("memory");
+ console.time("login" + id);
await login(device);
+ console.timeEnd("login" + id);
- await device.user.resetUser(false);
+ // await device.user.resetUser(false);
await device.sync({ type: "full" });
- console.timeEnd(`Init ${id}`);
+ // console.timeEnd(`Init ${id}`);
return device;
}
@@ -511,22 +819,33 @@ async function cleanup(...devices) {
await device.user.logout();
device.eventManager.unsubscribeAll();
}
- EV.unsubscribeAll();
+ // EV.unsubscribeAll();
}
/**
*
- * @param {Database} deviceA
- * @param {Database} deviceB
+ * @param {import("../src/api/index.ts").default} deviceA
+ * @param {import("../src/api/index.ts").default} deviceB
* @returns
*/
function syncAndWait(deviceA, deviceB, force = false) {
return new Promise((resolve, reject) => {
const ref2 = deviceB.eventManager.subscribe(
EVENTS.databaseSyncRequested,
- (full, force) => {
+ async (full, force, deviceId) => {
if (!full) return;
- console.log("sync requested by device A", full, force);
+ if (deviceId !== (await deviceA.syncer.devices.get())) {
+ console.warn(
+ "Concurrency error. Expected:",
+ await deviceA.syncer.devices.get(),
+ "Got:",
+ deviceId
+ );
+ return;
+ }
+
+ console.log("sync requested by device A:", deviceId, full, force);
+
ref2.unsubscribe();
deviceB
.sync({
@@ -538,13 +857,13 @@ function syncAndWait(deviceA, deviceB, force = false) {
}
);
- console.log(
- "waiting for sync...",
- "Device A:",
- deviceA.syncer.sync.syncing,
- "Device B:",
- deviceB.syncer.sync.syncing
- );
+ // console.log(
+ // "waiting for sync...",
+ // "Device A:",
+ // deviceA.syncer.sync.syncing,
+ // "Device B:",
+ // deviceB.syncer.sync.syncing
+ // );
deviceA.sync({ type: "full", force }).catch(reject);
});
diff --git a/packages/core/src/api/index.ts b/packages/core/src/api/index.ts
index 57f0f0ad5..176d41d2b 100644
--- a/packages/core/src/api/index.ts
+++ b/packages/core/src/api/index.ts
@@ -288,15 +288,16 @@ class Database {
"options not specified. Did you forget to call db.setup()?"
);
- EV.subscribeMulti(
- [EVENTS.userLoggedIn, EVENTS.userFetched, EVENTS.tokenRefreshed],
+ this.eventManager.subscribeMulti(
+ [EVENTS.userLoggedIn, EVENTS.userFetched],
this.connectSSE,
this
);
+ EV.subscribe(EVENTS.tokenRefreshed, () => this.connectSSE());
EV.subscribe(EVENTS.attachmentDeleted, async (attachment: Attachment) => {
await this.fs().cancel(attachment.hash);
});
- EV.subscribe(EVENTS.userLoggedOut, async () => {
+ this.eventManager.subscribe(EVENTS.userLoggedOut, async () => {
await this.monographs.clear();
await this.fs().clear();
this.disconnectSSE();
@@ -388,11 +389,11 @@ class Database {
});
this.eventSource.onopen = async () => {
- console.log("SSE: opened channel successfully!");
+ logger.log("SSE: opened channel successfully!");
};
this.eventSource.onerror = function (error) {
- console.log("SSE: error:", error);
+ logger.error(error, "SSE: error");
};
this.eventSource.onmessage = async (event) => {
@@ -405,7 +406,7 @@ class Database {
if (!user) break;
user.subscription = data;
await this.user.setUser(user);
- EV.publish(EVENTS.userSubscriptionUpdated, data);
+ this.eventManager.publish(EVENTS.userSubscriptionUpdated, data);
await this.tokenManager._refreshToken(true);
break;
}
@@ -416,15 +417,12 @@ class Database {
case "emailConfirmed": {
await this.tokenManager._refreshToken(true);
await this.user.fetchUser();
- EV.publish(EVENTS.userEmailConfirmed);
+ this.eventManager.publish(EVENTS.userEmailConfirmed);
break;
}
- case "triggerSync": {
- await this.sync({ type: "fetch" });
- }
}
} catch (e) {
- console.log("SSE: Unsupported message. Message = ", event.data);
+ logger.error("SSE: Unsupported message. Message = ", event.data);
return;
}
};
diff --git a/packages/core/src/api/sync/collector.ts b/packages/core/src/api/sync/collector.ts
index 273f0a322..2fcfc9dd4 100644
--- a/packages/core/src/api/sync/collector.ts
+++ b/packages/core/src/api/sync/collector.ts
@@ -62,9 +62,9 @@ class Collector {
const ciphers = await this.db
.storage()
.encryptMulti(key, syncableItems);
- const items = toPushItem(ids, ciphers);
- if (!items) continue;
- yield { items, type: itemType };
+ const items = toSyncItem(ids, ciphers);
+ if (!items.length) continue;
+ yield { items, type: itemType, count: items.length };
await this.db
.sql()
@@ -88,15 +88,17 @@ class Collector {
}
export default Collector;
-function toPushItem(ids: string[], ciphers: Cipher<"base64">[]) {
+function toSyncItem(ids: string[], ciphers: Cipher<"base64">[]) {
if (ids.length !== ciphers.length)
throw new Error("ids.length must be equal to ciphers.length");
const items: SyncItem[] = [];
for (let i = 0; i < ids.length; ++i) {
const id = ids[i];
- const cipher = ciphers[i];
- items.push({ ...cipher, v: CURRENT_DATABASE_VERSION, id });
+ const cipher = ciphers[i] as SyncItem;
+ cipher.v = CURRENT_DATABASE_VERSION;
+ cipher.id = id;
+ items.push(cipher);
}
return items;
}
diff --git a/packages/core/src/api/sync/index.ts b/packages/core/src/api/sync/index.ts
index 33c146806..53e595cb9 100644
--- a/packages/core/src/api/sync/index.ts
+++ b/packages/core/src/api/sync/index.ts
@@ -166,7 +166,7 @@ class Sync {
this.autoSync = new AutoSync(db, 1000);
this.devices = new SyncDevices(db.kv, db.tokenManager);
- EV.subscribe(EVENTS.userLoggedOut, async () => {
+ db.eventManager.subscribe(EVENTS.userLoggedOut, async () => {
await this.connection?.stop();
this.autoSync.stop();
});
@@ -285,7 +285,7 @@ class Sync {
);
}
}
- if (done > 0) await this.connection?.send("PushCompleted");
+ if (done > 0) await this.connection?.send("PushCompletedV2", deviceId);
return true;
}
@@ -332,8 +332,13 @@ class Sync {
/**
* @private
*/
- async onPushCompleted() {
- this.db.eventManager.publish(EVENTS.databaseSyncRequested, true, false);
+ async onPushCompleted(deviceId: string) {
+ this.db.eventManager.publish(
+ EVENTS.databaseSyncRequested,
+ true,
+ false,
+ deviceId
+ );
}
async processChunk(
@@ -442,7 +447,9 @@ class Sync {
.withHubProtocol(new JsonHubProtocol())
.build();
this.connection.serverTimeoutInMilliseconds = 60 * 1000 * 5;
- this.connection.on("PushCompleted", () => this.onPushCompleted());
+ this.connection.on("PushCompletedV2", (deviceId: string) =>
+ this.onPushCompleted(deviceId)
+ );
this.connection.on("SendVaultKey", async (vaultKey) => {
if (this.connection?.state !== HubConnectionState.Connected) return false;
diff --git a/packages/core/src/api/sync/merger.ts b/packages/core/src/api/sync/merger.ts
index 3bf905f8a..aefbe4f79 100644
--- a/packages/core/src/api/sync/merger.ts
+++ b/packages/core/src/api/sync/merger.ts
@@ -29,7 +29,7 @@ import {
} from "../../types.js";
import { ParsedInboxItem, SyncInboxItem } from "./types.js";
-const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000;
+const THRESHOLD = process.env.NODE_ENV === "test" ? 2 * 1000 : 60 * 1000;
class Merger {
logger = logger.scope("Merger");
constructor(private readonly db: Database) {}
diff --git a/packages/core/src/api/sync/types.ts b/packages/core/src/api/sync/types.ts
index b2f518c6b..994fc8451 100644
--- a/packages/core/src/api/sync/types.ts
+++ b/packages/core/src/api/sync/types.ts
@@ -47,6 +47,7 @@ export const SYNC_ITEM_TYPES = Object.keys(
export type SyncTransferItem = {
items: SyncItem[];
type: SyncableItemType;
+ count: number;
};
export type SyncInboxItem = Omit & {
diff --git a/packages/core/src/api/token-manager.ts b/packages/core/src/api/token-manager.ts
index f3c7deeb4..b0a29ff8f 100644
--- a/packages/core/src/api/token-manager.ts
+++ b/packages/core/src/api/token-manager.ts
@@ -47,14 +47,14 @@ const ENDPOINTS = {
temporaryToken: "/account/token",
logout: "/account/logout"
};
-const REFRESH_TOKEN_MUTEX = withTimeout(
- new Mutex(),
- 10 * 1000,
- new Error("Timed out while refreshing access token.")
-);
-
class TokenManager {
logger = logger.scope("TokenManager");
+ private REFRESH_TOKEN_MUTEX = withTimeout(
+ new Mutex(),
+ 10 * 1000,
+ new Error("Timed out while refreshing access token.")
+ );
+
constructor(private readonly storage: KVStorageAccessor) {}
async getToken(renew = true, forceRenew = false): Promise {
@@ -101,7 +101,7 @@ class TokenManager {
}
async _refreshToken(forceRenew = false) {
- await REFRESH_TOKEN_MUTEX.runExclusive(async () => {
+ await this.REFRESH_TOKEN_MUTEX.runExclusive(async () => {
this.logger.info("Refreshing access token");
const token = await this.getToken(false, false);
@@ -145,7 +145,7 @@ class TokenManager {
}
saveToken(tokenResponse: Omit) {
- this.logger.info("Saving new token", tokenResponse);
+ this.logger.info("Saving new token");
if (!tokenResponse || !tokenResponse.access_token) return;
const token: Token = { ...tokenResponse, t: Date.now() };
return this.storage().write("token", token);
diff --git a/packages/core/src/api/user-manager.ts b/packages/core/src/api/user-manager.ts
index bd6875833..53040af2a 100644
--- a/packages/core/src/api/user-manager.ts
+++ b/packages/core/src/api/user-manager.ts
@@ -192,7 +192,7 @@ class UserManager {
salt: user.salt
});
}
- EV.publish(EVENTS.userLoggedIn, user);
+ this.db.eventManager.publish(EVENTS.userLoggedIn, user);
} catch (e) {
await this.tokenManager.saveToken(token);
throw e;
@@ -240,7 +240,7 @@ class UserManager {
await this.db.setLastSynced(0);
await this.db.syncer.devices.register();
- EV.publish(EVENTS.userLoggedIn, user);
+ this.db.eventManager.publish(EVENTS.userLoggedIn, user);
}
async getSessions() {
@@ -281,8 +281,8 @@ class UserManager {
this.cachedAttachmentKey = undefined;
this.cachedInboxKeys = undefined;
await this.db.reset();
- EV.publish(EVENTS.userLoggedOut, reason);
- EV.publish(EVENTS.appRefreshRequested);
+ this.db.eventManager.publish(EVENTS.userLoggedOut, reason);
+ this.db.eventManager.publish(EVENTS.appRefreshRequested);
}
}
@@ -345,6 +345,7 @@ class UserManager {
}
async fetchUser(): Promise {
+ const oldUser = await this.getUser();
try {
const token = await this.tokenManager.getAccessToken();
if (!token) return;
@@ -353,7 +354,6 @@ class UserManager {
token
);
if (user) {
- const oldUser = await this.getUser();
await this.setUser(user);
if (
oldUser &&
@@ -362,18 +362,21 @@ class UserManager {
oldUser.subscription.provider !== user.subscription.provider)
) {
await this.tokenManager._refreshToken(true);
- EV.publish(EVENTS.userSubscriptionUpdated, user.subscription);
+ this.db.eventManager.publish(
+ EVENTS.userSubscriptionUpdated,
+ user.subscription
+ );
}
if (oldUser && !oldUser.isEmailConfirmed && user.isEmailConfirmed)
- EV.publish(EVENTS.userEmailConfirmed);
- EV.publish(EVENTS.userFetched, user);
+ this.db.eventManager.publish(EVENTS.userEmailConfirmed);
+ this.db.eventManager.publish(EVENTS.userFetched, user);
return user;
} else {
- return await this.getUser();
+ return oldUser;
}
} catch (e) {
logger.error(e, "Error fetching user");
- return await this.getUser();
+ return oldUser;
}
}
@@ -559,7 +562,7 @@ class UserManager {
async hasInboxKeys() {
if (this.cachedInboxKeys) return true;
- let user = await this.getUser();
+ const user = await this.getUser();
if (!user) return false;
return !!user.inboxKeys;
diff --git a/packages/core/src/api/vault.ts b/packages/core/src/api/vault.ts
index e58e23d3a..da700e794 100644
--- a/packages/core/src/api/vault.ts
+++ b/packages/core/src/api/vault.ts
@@ -57,7 +57,7 @@ export default class Vault {
constructor(private readonly db: Database) {
this.password = undefined;
- EV.subscribe(EVENTS.userLoggedOut, () => {
+ db.eventManager.subscribe(EVENTS.userLoggedOut, () => {
this.password = undefined;
});
}