Merge pull request #9081 from streetwriters/fix/improve-sync-reliability

Improve sync reliability
This commit is contained in:
Abdullah Atta
2026-01-14 11:48:34 +05:00
committed by GitHub
13 changed files with 468 additions and 135 deletions

View File

@@ -700,10 +700,10 @@ export const useAppEvents = () => {
EV.subscribe(EVENTS.syncCheckStatus, onCheckSyncStatus),
EV.subscribe(EVENTS.syncAborted, onSyncAborted),
EV.subscribe(EVENTS.appRefreshRequested, onSyncComplete),
EV.subscribe(EVENTS.userLoggedOut, onLogout),
EV.subscribe(EVENTS.userEmailConfirmed, onUserEmailVerified),
db.eventManager.subscribe(EVENTS.userLoggedOut, onLogout),
db.eventManager.subscribe(EVENTS.userEmailConfirmed, onUserEmailVerified),
EV.subscribe(EVENTS.userSessionExpired, onUserSessionExpired),
EV.subscribe(
db.eventManager.subscribe(
EVENTS.userSubscriptionUpdated,
onUserSubscriptionStatusChanged
),

View File

@@ -249,7 +249,7 @@ class EditorStore extends BaseStore<EditorStore> {
}
);
EV.subscribe(EVENTS.userLoggedOut, () => {
db.eventManager.subscribe(EVENTS.userLoggedOut, () => {
const { closeTabs, tabs } = this.get();
closeTabs(...tabs.map((s) => s.id));
});

View File

@@ -58,22 +58,25 @@ class UserStore extends BaseStore<UserStore> {
if (Config.get("sessionExpired")) return;
EV.subscribe(EVENTS.userSubscriptionUpdated, (subscription) => {
const wasSubscribed = isUserSubscribed();
this.refreshUser();
this.set((state) => {
if (!state.user) return;
state.user.subscription = subscription;
});
if (!wasSubscribed && isUserSubscribed()) OnboardingDialog.show({});
resetFeatures();
});
db.eventManager.subscribe(
EVENTS.userSubscriptionUpdated,
(subscription) => {
const wasSubscribed = isUserSubscribed();
this.refreshUser();
this.set((state) => {
if (!state.user) return;
state.user.subscription = subscription;
});
if (!wasSubscribed && isUserSubscribed()) OnboardingDialog.show({});
resetFeatures();
}
);
EV.subscribe(EVENTS.userEmailConfirmed, () => {
db.eventManager.subscribe(EVENTS.userEmailConfirmed, () => {
hashNavigate("/confirmed");
});
EV.subscribe(EVENTS.userLoggedOut, async (reason) => {
db.eventManager.subscribe(EVENTS.userLoggedOut, async (reason) => {
this.set((state) => {
state.user = undefined;
state.isLoggedIn = false;

View File

@@ -17,15 +17,40 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { EV, EVENTS } from "../src/common.ts";
import { test, expect, vitest } from "vitest";
import { EVENTS } from "../src/common.ts";
import { test, expect, vitest, vi, beforeAll, afterAll } from "vitest";
import { login } from "./utils.js";
import { databaseTest } from "../__tests__/utils/index.ts";
import { databaseTest, delay } from "../__tests__/utils/index.ts";
import http from "../src/utils/http.ts";
import Constants from "../src/utils/constants.ts";
import { writeFileSync } from "node:fs";
const TEST_TIMEOUT = 60 * 1000;
const testOptions = { concurrent: true, timeout: TEST_TIMEOUT };
beforeAll(async () => {
const device = await databaseTest("memory");
await login(device);
await device.user.resetUser(false);
await device.user.logout();
}, TEST_TIMEOUT);
afterAll(async () => {
const device = await databaseTest("memory");
await login(device);
await device.user.resetUser(false);
await device.user.logout();
}, TEST_TIMEOUT);
test(
"case 1: device A & B should only download the changes from device C (no uploading)",
testOptions,
async (t) => {
const types = [];
function onSyncProgress({ type }) {
@@ -53,12 +78,12 @@ test(
await deviceB.sync({ type: "full" });
expect(types.every((t) => t === "download")).toBe(true);
},
TEST_TIMEOUT
}
);
test(
"case 3: Device A & B have unsynced changes but server has nothing",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -85,12 +110,12 @@ test(
expect(await deviceB.notes.note(note1Id)).toBeTruthy();
expect(await deviceA.notes.note(note1Id)).toBeTruthy();
expect(await deviceB.notes.note(note2Id)).toBeTruthy();
},
TEST_TIMEOUT
}
);
test(
"edge case 1: items updated while push is running",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -105,24 +130,32 @@ test(
const id = await deviceA.notes.add({ title: "hello" });
for (let i = 0; i < 5; ++i) {
if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` });
await Promise.all([
deviceA.sync({ type: "send" }),
new Promise((resolve) => setTimeout(resolve, 40)).then(() =>
deviceA.notes.add({ id, title: `edit ${i}` })
)
]);
var spy = vi.spyOn(deviceA.syncer.sync, "start");
const defaultEncryptMulti = deviceA
.storage()
.encryptMulti.bind(deviceA.storage());
var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti");
encryptMulti.mockImplementationOnce(async (...args) => {
const result = defaultEncryptMulti(...args);
// simulate scenario where a note gets updated while sync is collecting
// items
await new Promise((resolve) => setTimeout(resolve, 100));
await deviceA.notes.add({ id, title: `edit ${i}` });
return result;
});
await deviceA.sync({ type: "send" });
expect(spy).toHaveBeenCalledTimes(2);
expect((await deviceA.notes.note(id))?.title).toBe(`edit ${i}`);
expect((await deviceA.notes.note(id))?.synced).toBe(true);
await deviceB.sync({ type: "fetch" });
expect((await deviceB.notes.note(id))?.title).toBe(`edit ${i}`);
}
},
TEST_TIMEOUT * 10
}
);
test(
"edge case 2: new items added while push is running",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -137,21 +170,33 @@ test(
const id = await deviceA.notes.add({ title: "hello" });
for (let i = 0; i < 5; ++i) {
if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` });
await Promise.all([
deviceA.sync({ type: "send" }),
new Promise((resolve) => setTimeout(resolve, 40)).then(() =>
deviceA.notes.add({ title: `note ${i}` })
)
]);
var spy = vi.spyOn(deviceA.syncer.sync, "start");
const defaultEncryptMulti = deviceA
.storage()
.encryptMulti.bind(deviceA.storage());
var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti");
let newNoteId;
encryptMulti.mockImplementationOnce(async (key, items) => {
const result = defaultEncryptMulti(key, items);
// simulate scenario where a note gets added while sync is collecting
// items
await new Promise((resolve) => setTimeout(resolve, 100));
newNoteId = await deviceA.notes.add({ title: `note ${i}` });
return result;
});
await deviceA.sync({ type: "send" });
expect(await deviceA.notes.note(newNoteId)).toBeDefined();
expect((await deviceA.notes.note(newNoteId)).synced).toBe(true);
expect(spy).toHaveBeenCalledTimes(1);
await deviceB.sync({ type: "fetch" });
expect(await deviceB.notes.all.count()).toBe(i + 2);
expect(await deviceB.notes.note(newNoteId)).toBeDefined();
}
},
TEST_TIMEOUT * 10
}
);
test(
"issue: syncing should not affect the items' dateModified",
testOptions,
async (t) => {
const [deviceA] = await Promise.all([initializeDevice("deviceA")]);
@@ -173,12 +218,12 @@ test(
.dateModified;
expect(noteDateBefore).toBe(noteDateAfter);
expect(contentDateBefore).toBe(contentDateAfter);
},
TEST_TIMEOUT
}
);
test(
"case 4: local content changed after remote content should create a conflict",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -190,7 +235,11 @@ test(
await cleanup(deviceA, deviceB);
});
const noteId = await deviceA.notes.add({
t.onTestFailed(() => {
writeFileSync("debug_deviceA_note.json", content);
});
var noteId = await deviceA.notes.add({
title: "Test note from device A",
content: { data: "<p>Hello</p>", type: "tiptap" }
});
@@ -203,21 +252,21 @@ test(
});
await deviceB.sync({ type: "full" });
await new Promise((resolve) => setTimeout(resolve, 10000));
await delay(2500);
await deviceA.notes.add({
id: noteId,
content: { data: "<p>Hello (I am from device A)</p>", type: "tiptap" }
});
await deviceA.sync({ type: "full" });
var content = JSON.stringify(await deviceA.content.findByNoteId(noteId));
expect(await deviceA.notes.conflicted.count()).toBeGreaterThan(0);
},
TEST_TIMEOUT * 10
}
);
test(
"case 5: remote content changed after local content should create a conflict",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -228,31 +277,46 @@ test(
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});
// t.onTestFailed(() => {
// writeFileSync("debug_deviceB_note.json", content);
// });
const noteId = await deviceA.notes.add({
deviceB.syncer.sync.merger.forceLogging = true;
deviceB.syncer.sync.collector.forceLogging = true;
var noteId = await deviceA.notes.add({
title: "Test note from device A",
content: { data: "<p>Hello</p>", type: "tiptap" }
content: { data: "<p>Hello unique note</p>", type: "tiptap" }
});
console.log("Device A is syncing (A)");
await deviceA.sync({ type: "full" });
console.log("Device B is syncing (A)");
await deviceB.sync({ type: "full" });
await deviceA.notes.add({
id: noteId,
content: { data: "<p>Hello (I am from device B)</p>", type: "tiptap" }
content: {
data: "<p>Hello (unique note edit) device A</p>",
type: "tiptap"
}
});
console.log("Device A is syncing (B)");
await deviceA.sync({ type: "full" });
await new Promise((resolve) => setTimeout(resolve, 10000));
await delay(2500);
await deviceB.notes.add({
id: noteId,
content: { data: "<p>Hello (I am from device A)</p>", type: "tiptap" }
content: {
data: "<p>Hello unique note edit (device B)</p>",
type: "tiptap"
}
});
expect((await deviceB.content.findByNoteId(noteId)).synced).toBe(false);
console.log("Syncing device B");
await deviceB.sync({ type: "full" });
// // var content = JSON.stringify(await deviceB.content.findByNoteId(noteId));
expect(await deviceB.notes.conflicted.count()).toBeGreaterThan(0);
},
TEST_TIMEOUT * 10
}
);
// test(
@@ -382,6 +446,7 @@ test(
test(
"issue: running force sync from device A makes device B always download everything",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -402,20 +467,20 @@ test(
});
}
await syncAndWait(deviceA, deviceB, true);
const handler = vitest.fn();
deviceB.eventManager.subscribe(EVENTS.syncProgress, handler);
await deviceB.sync({ type: "full" });
await syncAndWait(deviceA, deviceB, true);
expect(handler).not.toHaveBeenCalled();
},
TEST_TIMEOUT
expect(handler.mock.calls.every(([data]) => data.type === "download")).toBe(
true
);
}
);
test(
"issue: colors are not properly created if multiple notes are synced together",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
@@ -445,7 +510,12 @@ test(
colorCode: "#ffff22"
});
for (let noteId of noteIds) {
expect(await deviceB.notes.note(noteId)).toBeTruthy();
// console.log(
// "Adding color to note",
// noteId,
// await deviceB.syncer.devices.get()
// );
expect(await deviceB.notes.note(noteId)).toBeDefined();
expect(
await deviceB.relations
.from({ id: colorId, type: "color" }, "note")
@@ -467,36 +537,274 @@ test(
expect(
noteIds.every((id) => purpleNotes.findIndex((p) => p.id === id) > -1)
).toBe(true);
},
TEST_TIMEOUT
}
);
test(
"monograph: published note appears on other device after sync",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
initializeDevice("deviceB")
]);
t.onTestFinished(async () => {
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});
const noteId = await deviceA.notes.add({
title: "Monograph note",
content: { data: "<p>monograph content</p>", type: "tiptap" }
});
// publish on device A
const monographId = await deviceA.monographs.publish(noteId);
expect(monographId).toBeTruthy();
// deviceB fetches the monograph
await deviceB.sync({ type: "fetch" });
expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
const mono = await deviceB.monographs.get(noteId);
expect(mono).toBeTruthy();
expect(mono.title).toBe("Monograph note");
}
);
test(
"monograph: self-destruct triggers unpublish across devices",
testOptions,
async (t) => {
const [deviceA, deviceB, deviceC] = await Promise.all([
initializeDevice("deviceA"),
initializeDevice("deviceB"),
initializeDevice("deviceC")
]);
t.onTestFinished(async () => {
// console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB, deviceC);
});
const noteId = await deviceA.notes.add({
title: "Self destruct monograph",
content: { data: "<p>transient</p>", type: "tiptap" }
});
// publish with selfDestruct
const monographId = await deviceA.monographs.publish(noteId, {
selfDestruct: true
});
expect(monographId).toBeTruthy();
// ensure all devices know it's published
await deviceA.sync({ type: "fetch" });
await deviceB.sync({ type: "fetch" });
await deviceC.sync({ type: "fetch" });
expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
expect(deviceC.monographs.isPublished(noteId)).toBeTruthy();
// trigger a view on the public endpoint which should cause self-destruct
await http
.get(`${Constants.API_HOST}/monographs/${noteId}/view`)
.catch(() => {});
// give the server a short moment to process
await delay(500);
// sync all devices to pick up the unpublish
await deviceA.sync({ type: "fetch" });
await deviceB.sync({ type: "fetch" });
await deviceC.sync({ type: "fetch" });
expect(deviceA.monographs.isPublished(noteId)).toBeFalsy();
expect(deviceB.monographs.isPublished(noteId)).toBeFalsy();
expect(deviceC.monographs.isPublished(noteId)).toBeFalsy();
}
);
test(
"monograph: properties (password and selfDestruct) sync across devices",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
initializeDevice("deviceB")
]);
t.onTestFinished(async () => {
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});
const password = "s3cr3t";
const noteId = await deviceA.notes.add({
title: "Protected monograph",
content: { data: "<p>secret content</p>", type: "tiptap" }
});
// publish with password and selfDestruct on device A
const monographId = await deviceA.monographs.publish(noteId, {
password,
selfDestruct: true
});
expect(monographId).toBeTruthy();
// deviceB fetches the monograph
await deviceB.sync({ type: "fetch" });
expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
const mono = await deviceB.monographs.get(noteId);
expect(mono).toBeTruthy();
expect(mono.selfDestruct).toBe(true);
expect(mono.password).toBeTruthy();
const decrypted = await deviceB.monographs.decryptPassword(mono.password);
expect(decrypted).toBe(password);
}
);
test(
"edge case: deletion on one device propagates to others",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
initializeDevice("deviceB")
]);
t.onTestFinished(async () => {
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});
const id = await deviceA.notes.add({ title: "note to delete" });
// ensure both devices have latest state where note exists
await deviceA.sync({ type: "full" });
await deviceB.sync({ type: "full" });
expect(await deviceB.notes.note(id)).toBeTruthy();
// delete on deviceA
await deviceA.notes.remove(id);
await deviceA.sync({ type: "full" });
// deviceB should observe deletion after syncing
await deviceB.sync({ type: "full" });
expect(await deviceB.notes.note(id)).toBeFalsy();
}
);
test(
"stress: sync 5000 notes from device A to device B",
testOptions,
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
initializeDevice("deviceB")
]);
t.onTestFinished(async () => {
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});
for (let i = 0; i < 5000; ++i) {
await deviceA.notes.add({
title: `note ${i}`,
content: {
type: "tiptap",
data: `<p>deviceA=true</p>`
}
});
}
await deviceA.sync({ type: "full" });
await deviceB.sync({ type: "full" });
const countA = await deviceA.notes.all.count();
const countB = await deviceB.notes.all.count();
expect(countA).toBeGreaterThanOrEqual(5000);
expect(countB).toBeGreaterThanOrEqual(5000);
}
);
test("stress: super concurrent sync", testOptions, async (t) => {
console.time("adding devices");
const devices = await Promise.all(
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
.split("")
.map((letter) => initializeDevice(`device${letter}`))
);
console.timeEnd("adding devices");
t.onTestFinished(async () => {
console.log(`${t.task.name} log out`);
await cleanup(...devices);
});
await Promise.all(
devices.map(async (device, index) => {
for (let i = 0; i < 100; ++i) {
await device.notes.add({
content: {
type: "tiptap",
data: `<p>device${i}${index}=true</p>`
}
});
}
})
);
await Promise.all(
devices.map(async (device) => {
await device.sync({ type: "send" });
})
);
await Promise.all(
devices.map(async (device) => {
await device.sync({ type: "fetch" });
})
);
for (const device of devices) {
// await device.sync({ type: "full" });
expect(await device.notes.all.count()).toBeGreaterThanOrEqual(
devices.length * 100
);
}
});
/**
*
* @param {string} id
* @returns {Promise<Database>}
* @returns {Promise<import("../src/api/index.ts").default>}
*/
async function initializeDevice(id) {
// initialize(new NodeStorageInterface(), false);
console.time(`Init ${id}`);
EV.subscribe(EVENTS.syncCheckStatus, async (type) => {
return {
type,
result: true
};
});
// console.time(`Init ${id}`);
const device = await databaseTest("memory");
console.time("login" + id);
await login(device);
console.timeEnd("login" + id);
await device.user.resetUser(false);
// await device.user.resetUser(false);
await device.sync({ type: "full" });
console.timeEnd(`Init ${id}`);
// console.timeEnd(`Init ${id}`);
return device;
}
@@ -511,22 +819,33 @@ async function cleanup(...devices) {
await device.user.logout();
device.eventManager.unsubscribeAll();
}
EV.unsubscribeAll();
// EV.unsubscribeAll();
}
/**
*
* @param {Database} deviceA
* @param {Database} deviceB
* @param {import("../src/api/index.ts").default} deviceA
* @param {import("../src/api/index.ts").default} deviceB
* @returns
*/
function syncAndWait(deviceA, deviceB, force = false) {
return new Promise((resolve, reject) => {
const ref2 = deviceB.eventManager.subscribe(
EVENTS.databaseSyncRequested,
(full, force) => {
async (full, force, deviceId) => {
if (!full) return;
console.log("sync requested by device A", full, force);
if (deviceId !== (await deviceA.syncer.devices.get())) {
console.warn(
"Concurrency error. Expected:",
await deviceA.syncer.devices.get(),
"Got:",
deviceId
);
return;
}
console.log("sync requested by device A:", deviceId, full, force);
ref2.unsubscribe();
deviceB
.sync({
@@ -538,13 +857,13 @@ function syncAndWait(deviceA, deviceB, force = false) {
}
);
console.log(
"waiting for sync...",
"Device A:",
deviceA.syncer.sync.syncing,
"Device B:",
deviceB.syncer.sync.syncing
);
// console.log(
// "waiting for sync...",
// "Device A:",
// deviceA.syncer.sync.syncing,
// "Device B:",
// deviceB.syncer.sync.syncing
// );
deviceA.sync({ type: "full", force }).catch(reject);
});

View File

@@ -288,15 +288,16 @@ class Database {
"options not specified. Did you forget to call db.setup()?"
);
EV.subscribeMulti(
[EVENTS.userLoggedIn, EVENTS.userFetched, EVENTS.tokenRefreshed],
this.eventManager.subscribeMulti(
[EVENTS.userLoggedIn, EVENTS.userFetched],
this.connectSSE,
this
);
EV.subscribe(EVENTS.tokenRefreshed, () => this.connectSSE());
EV.subscribe(EVENTS.attachmentDeleted, async (attachment: Attachment) => {
await this.fs().cancel(attachment.hash);
});
EV.subscribe(EVENTS.userLoggedOut, async () => {
this.eventManager.subscribe(EVENTS.userLoggedOut, async () => {
await this.monographs.clear();
await this.fs().clear();
this.disconnectSSE();
@@ -388,11 +389,11 @@ class Database {
});
this.eventSource.onopen = async () => {
console.log("SSE: opened channel successfully!");
logger.log("SSE: opened channel successfully!");
};
this.eventSource.onerror = function (error) {
console.log("SSE: error:", error);
logger.error(error, "SSE: error");
};
this.eventSource.onmessage = async (event) => {
@@ -405,7 +406,7 @@ class Database {
if (!user) break;
user.subscription = data;
await this.user.setUser(user);
EV.publish(EVENTS.userSubscriptionUpdated, data);
this.eventManager.publish(EVENTS.userSubscriptionUpdated, data);
await this.tokenManager._refreshToken(true);
break;
}
@@ -416,15 +417,12 @@ class Database {
case "emailConfirmed": {
await this.tokenManager._refreshToken(true);
await this.user.fetchUser();
EV.publish(EVENTS.userEmailConfirmed);
this.eventManager.publish(EVENTS.userEmailConfirmed);
break;
}
case "triggerSync": {
await this.sync({ type: "fetch" });
}
}
} catch (e) {
console.log("SSE: Unsupported message. Message = ", event.data);
logger.error("SSE: Unsupported message. Message = ", event.data);
return;
}
};

View File

@@ -209,7 +209,7 @@ describe.concurrent("merge content", (test) => {
type: "tiptap",
data: "Remote",
noteId,
dateEdited: Date.now() - 3000,
dateEdited: Date.now() - 300,
dateModified: Date.now()
},
{
@@ -217,7 +217,7 @@ describe.concurrent("merge content", (test) => {
data: "Local",
noteId,
dateEdited: Date.now(),
dateModified: Date.now() - 6000
dateModified: Date.now() - 600
}
);
@@ -236,8 +236,8 @@ describe.concurrent("merge content", (test) => {
type: "tiptap",
data: "Remote",
noteId,
dateEdited: Date.now() - 3000,
dateModified: Date.now() - 6000
dateEdited: Date.now() - 300,
dateModified: Date.now() - 600
},
{
type: "tiptap",

View File

@@ -62,9 +62,9 @@ class Collector {
const ciphers = await this.db
.storage()
.encryptMulti(key, syncableItems);
const items = toPushItem(ids, ciphers);
if (!items) continue;
yield { items, type: itemType };
const items = toSyncItem(ids, ciphers);
if (!items.length) continue;
yield { items, type: itemType, count: items.length };
await this.db
.sql()
@@ -88,15 +88,17 @@ class Collector {
}
export default Collector;
function toPushItem(ids: string[], ciphers: Cipher<"base64">[]) {
function toSyncItem(ids: string[], ciphers: Cipher<"base64">[]) {
if (ids.length !== ciphers.length)
throw new Error("ids.length must be equal to ciphers.length");
const items: SyncItem[] = [];
for (let i = 0; i < ids.length; ++i) {
const id = ids[i];
const cipher = ciphers[i];
items.push({ ...cipher, v: CURRENT_DATABASE_VERSION, id });
const cipher = ciphers[i] as SyncItem;
cipher.v = CURRENT_DATABASE_VERSION;
cipher.id = id;
items.push(cipher);
}
return items;
}

View File

@@ -166,7 +166,7 @@ class Sync {
this.autoSync = new AutoSync(db, 1000);
this.devices = new SyncDevices(db.kv, db.tokenManager);
EV.subscribe(EVENTS.userLoggedOut, async () => {
db.eventManager.subscribe(EVENTS.userLoggedOut, async () => {
await this.connection?.stop();
this.autoSync.stop();
});
@@ -285,7 +285,7 @@ class Sync {
);
}
}
if (done > 0) await this.connection?.send("PushCompleted");
if (done > 0) await this.connection?.send("PushCompletedV2", deviceId);
return true;
}
@@ -332,8 +332,13 @@ class Sync {
/**
* @private
*/
async onPushCompleted() {
this.db.eventManager.publish(EVENTS.databaseSyncRequested, true, false);
async onPushCompleted(deviceId: string) {
this.db.eventManager.publish(
EVENTS.databaseSyncRequested,
true,
false,
deviceId
);
}
async processChunk(
@@ -442,7 +447,9 @@ class Sync {
.withHubProtocol(new JsonHubProtocol())
.build();
this.connection.serverTimeoutInMilliseconds = 60 * 1000 * 5;
this.connection.on("PushCompleted", () => this.onPushCompleted());
this.connection.on("PushCompletedV2", (deviceId: string) =>
this.onPushCompleted(deviceId)
);
this.connection.on("SendVaultKey", async (vaultKey) => {
if (this.connection?.state !== HubConnectionState.Connected) return false;

View File

@@ -29,7 +29,7 @@ import {
} from "../../types.js";
import { ParsedInboxItem, SyncInboxItem } from "./types.js";
const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000;
const THRESHOLD = process.env.NODE_ENV === "test" ? 2 * 1000 : 60 * 1000;
class Merger {
logger = logger.scope("Merger");
constructor(private readonly db: Database) {}

View File

@@ -47,6 +47,7 @@ export const SYNC_ITEM_TYPES = Object.keys(
export type SyncTransferItem = {
items: SyncItem[];
type: SyncableItemType;
count: number;
};
export type SyncInboxItem = Omit<SyncItem, "format"> & {

View File

@@ -47,14 +47,14 @@ const ENDPOINTS = {
temporaryToken: "/account/token",
logout: "/account/logout"
};
const REFRESH_TOKEN_MUTEX = withTimeout(
new Mutex(),
10 * 1000,
new Error("Timed out while refreshing access token.")
);
class TokenManager {
logger = logger.scope("TokenManager");
private REFRESH_TOKEN_MUTEX = withTimeout(
new Mutex(),
10 * 1000,
new Error("Timed out while refreshing access token.")
);
constructor(private readonly storage: KVStorageAccessor) {}
async getToken(renew = true, forceRenew = false): Promise<Token | undefined> {
@@ -101,7 +101,7 @@ class TokenManager {
}
async _refreshToken(forceRenew = false) {
await REFRESH_TOKEN_MUTEX.runExclusive(async () => {
await this.REFRESH_TOKEN_MUTEX.runExclusive(async () => {
this.logger.info("Refreshing access token");
const token = await this.getToken(false, false);
@@ -145,7 +145,7 @@ class TokenManager {
}
saveToken(tokenResponse: Omit<Token, "t">) {
this.logger.info("Saving new token", tokenResponse);
this.logger.info("Saving new token");
if (!tokenResponse || !tokenResponse.access_token) return;
const token: Token = { ...tokenResponse, t: Date.now() };
return this.storage().write("token", token);

View File

@@ -192,7 +192,7 @@ class UserManager {
salt: user.salt
});
}
EV.publish(EVENTS.userLoggedIn, user);
this.db.eventManager.publish(EVENTS.userLoggedIn, user);
} catch (e) {
await this.tokenManager.saveToken(token);
throw e;
@@ -240,7 +240,7 @@ class UserManager {
await this.db.setLastSynced(0);
await this.db.syncer.devices.register();
EV.publish(EVENTS.userLoggedIn, user);
this.db.eventManager.publish(EVENTS.userLoggedIn, user);
}
async getSessions() {
@@ -281,8 +281,8 @@ class UserManager {
this.cachedAttachmentKey = undefined;
this.cachedInboxKeys = undefined;
await this.db.reset();
EV.publish(EVENTS.userLoggedOut, reason);
EV.publish(EVENTS.appRefreshRequested);
this.db.eventManager.publish(EVENTS.userLoggedOut, reason);
this.db.eventManager.publish(EVENTS.appRefreshRequested);
}
}
@@ -345,6 +345,7 @@ class UserManager {
}
async fetchUser(): Promise<User | undefined> {
const oldUser = await this.getUser();
try {
const token = await this.tokenManager.getAccessToken();
if (!token) return;
@@ -353,7 +354,6 @@ class UserManager {
token
);
if (user) {
const oldUser = await this.getUser();
await this.setUser(user);
if (
oldUser &&
@@ -362,18 +362,21 @@ class UserManager {
oldUser.subscription.provider !== user.subscription.provider)
) {
await this.tokenManager._refreshToken(true);
EV.publish(EVENTS.userSubscriptionUpdated, user.subscription);
this.db.eventManager.publish(
EVENTS.userSubscriptionUpdated,
user.subscription
);
}
if (oldUser && !oldUser.isEmailConfirmed && user.isEmailConfirmed)
EV.publish(EVENTS.userEmailConfirmed);
EV.publish(EVENTS.userFetched, user);
this.db.eventManager.publish(EVENTS.userEmailConfirmed);
this.db.eventManager.publish(EVENTS.userFetched, user);
return user;
} else {
return await this.getUser();
return oldUser;
}
} catch (e) {
logger.error(e, "Error fetching user");
return await this.getUser();
return oldUser;
}
}
@@ -559,7 +562,7 @@ class UserManager {
async hasInboxKeys() {
if (this.cachedInboxKeys) return true;
let user = await this.getUser();
const user = await this.getUser();
if (!user) return false;
return !!user.inboxKeys;

View File

@@ -57,7 +57,7 @@ export default class Vault {
constructor(private readonly db: Database) {
this.password = undefined;
EV.subscribe(EVENTS.userLoggedOut, () => {
db.eventManager.subscribe(EVENTS.userLoggedOut, () => {
this.password = undefined;
});
}