mirror of
https://github.com/streetwriters/notesnook.git
synced 2026-02-24 04:00:59 +01:00
core: improve sync reliability
This commit is contained in:
@@ -17,15 +17,40 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
import { EV, EVENTS } from "../src/common.ts";
|
||||
import { test, expect, vitest } from "vitest";
|
||||
import { EVENTS } from "../src/common.ts";
|
||||
import { test, expect, vitest, vi, beforeAll, afterAll } from "vitest";
|
||||
import { login } from "./utils.js";
|
||||
import { databaseTest } from "../__tests__/utils/index.ts";
|
||||
import { databaseTest, delay } from "../__tests__/utils/index.ts";
|
||||
import http from "../src/utils/http.ts";
|
||||
import Constants from "../src/utils/constants.ts";
|
||||
import { writeFileSync } from "node:fs";
|
||||
|
||||
const TEST_TIMEOUT = 60 * 1000;
|
||||
const testOptions = { concurrent: true, timeout: TEST_TIMEOUT };
|
||||
|
||||
beforeAll(async () => {
|
||||
const device = await databaseTest("memory");
|
||||
|
||||
await login(device);
|
||||
|
||||
await device.user.resetUser(false);
|
||||
|
||||
await device.user.logout();
|
||||
}, TEST_TIMEOUT);
|
||||
|
||||
afterAll(async () => {
|
||||
const device = await databaseTest("memory");
|
||||
|
||||
await login(device);
|
||||
|
||||
await device.user.resetUser(false);
|
||||
|
||||
await device.user.logout();
|
||||
}, TEST_TIMEOUT);
|
||||
|
||||
test(
|
||||
"case 1: device A & B should only download the changes from device C (no uploading)",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const types = [];
|
||||
function onSyncProgress({ type }) {
|
||||
@@ -53,12 +78,12 @@ test(
|
||||
await deviceB.sync({ type: "full" });
|
||||
|
||||
expect(types.every((t) => t === "download")).toBe(true);
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"case 3: Device A & B have unsynced changes but server has nothing",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
@@ -85,12 +110,12 @@ test(
|
||||
expect(await deviceB.notes.note(note1Id)).toBeTruthy();
|
||||
expect(await deviceA.notes.note(note1Id)).toBeTruthy();
|
||||
expect(await deviceB.notes.note(note2Id)).toBeTruthy();
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"edge case 1: items updated while push is running",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
@@ -105,24 +130,32 @@ test(
|
||||
const id = await deviceA.notes.add({ title: "hello" });
|
||||
for (let i = 0; i < 5; ++i) {
|
||||
if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` });
|
||||
await Promise.all([
|
||||
deviceA.sync({ type: "send" }),
|
||||
new Promise((resolve) => setTimeout(resolve, 40)).then(() =>
|
||||
deviceA.notes.add({ id, title: `edit ${i}` })
|
||||
)
|
||||
]);
|
||||
|
||||
var spy = vi.spyOn(deviceA.syncer.sync, "start");
|
||||
const defaultEncryptMulti = deviceA
|
||||
.storage()
|
||||
.encryptMulti.bind(deviceA.storage());
|
||||
var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti");
|
||||
encryptMulti.mockImplementationOnce(async (...args) => {
|
||||
const result = defaultEncryptMulti(...args);
|
||||
// simulate scenario where a note gets updated while sync is collecting
|
||||
// items
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
await deviceA.notes.add({ id, title: `edit ${i}` });
|
||||
return result;
|
||||
});
|
||||
await deviceA.sync({ type: "send" });
|
||||
expect(spy).toHaveBeenCalledTimes(2);
|
||||
expect((await deviceA.notes.note(id))?.title).toBe(`edit ${i}`);
|
||||
expect((await deviceA.notes.note(id))?.synced).toBe(true);
|
||||
await deviceB.sync({ type: "fetch" });
|
||||
expect((await deviceB.notes.note(id))?.title).toBe(`edit ${i}`);
|
||||
}
|
||||
},
|
||||
TEST_TIMEOUT * 10
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"edge case 2: new items added while push is running",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
@@ -137,21 +170,33 @@ test(
|
||||
const id = await deviceA.notes.add({ title: "hello" });
|
||||
for (let i = 0; i < 5; ++i) {
|
||||
if (i > 0) await deviceA.notes.add({ id, title: `edit ${i - 1}` });
|
||||
await Promise.all([
|
||||
deviceA.sync({ type: "send" }),
|
||||
new Promise((resolve) => setTimeout(resolve, 40)).then(() =>
|
||||
deviceA.notes.add({ title: `note ${i}` })
|
||||
)
|
||||
]);
|
||||
var spy = vi.spyOn(deviceA.syncer.sync, "start");
|
||||
const defaultEncryptMulti = deviceA
|
||||
.storage()
|
||||
.encryptMulti.bind(deviceA.storage());
|
||||
var encryptMulti = vi.spyOn(deviceA.storage(), "encryptMulti");
|
||||
let newNoteId;
|
||||
encryptMulti.mockImplementationOnce(async (key, items) => {
|
||||
const result = defaultEncryptMulti(key, items);
|
||||
// simulate scenario where a note gets added while sync is collecting
|
||||
// items
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
newNoteId = await deviceA.notes.add({ title: `note ${i}` });
|
||||
return result;
|
||||
});
|
||||
await deviceA.sync({ type: "send" });
|
||||
expect(await deviceA.notes.note(newNoteId)).toBeDefined();
|
||||
expect((await deviceA.notes.note(newNoteId)).synced).toBe(true);
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
await deviceB.sync({ type: "fetch" });
|
||||
expect(await deviceB.notes.all.count()).toBe(i + 2);
|
||||
expect(await deviceB.notes.note(newNoteId)).toBeDefined();
|
||||
}
|
||||
},
|
||||
TEST_TIMEOUT * 10
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"issue: syncing should not affect the items' dateModified",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA] = await Promise.all([initializeDevice("deviceA")]);
|
||||
|
||||
@@ -173,12 +218,12 @@ test(
|
||||
.dateModified;
|
||||
expect(noteDateBefore).toBe(noteDateAfter);
|
||||
expect(contentDateBefore).toBe(contentDateAfter);
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"case 4: local content changed after remote content should create a conflict",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
@@ -190,7 +235,11 @@ test(
|
||||
await cleanup(deviceA, deviceB);
|
||||
});
|
||||
|
||||
const noteId = await deviceA.notes.add({
|
||||
t.onTestFailed(() => {
|
||||
writeFileSync("debug_deviceA_note.json", content);
|
||||
});
|
||||
|
||||
var noteId = await deviceA.notes.add({
|
||||
title: "Test note from device A",
|
||||
content: { data: "<p>Hello</p>", type: "tiptap" }
|
||||
});
|
||||
@@ -203,21 +252,21 @@ test(
|
||||
});
|
||||
await deviceB.sync({ type: "full" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10000));
|
||||
await delay(2500);
|
||||
|
||||
await deviceA.notes.add({
|
||||
id: noteId,
|
||||
content: { data: "<p>Hello (I am from device A)</p>", type: "tiptap" }
|
||||
});
|
||||
await deviceA.sync({ type: "full" });
|
||||
|
||||
var content = JSON.stringify(await deviceA.content.findByNoteId(noteId));
|
||||
expect(await deviceA.notes.conflicted.count()).toBeGreaterThan(0);
|
||||
},
|
||||
TEST_TIMEOUT * 10
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"case 5: remote content changed after local content should create a conflict",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
@@ -228,31 +277,46 @@ test(
|
||||
console.log(`${t.task.name} log out`);
|
||||
await cleanup(deviceA, deviceB);
|
||||
});
|
||||
// t.onTestFailed(() => {
|
||||
// writeFileSync("debug_deviceB_note.json", content);
|
||||
// });
|
||||
|
||||
const noteId = await deviceA.notes.add({
|
||||
deviceB.syncer.sync.merger.forceLogging = true;
|
||||
deviceB.syncer.sync.collector.forceLogging = true;
|
||||
var noteId = await deviceA.notes.add({
|
||||
title: "Test note from device A",
|
||||
content: { data: "<p>Hello</p>", type: "tiptap" }
|
||||
content: { data: "<p>Hello unique note</p>", type: "tiptap" }
|
||||
});
|
||||
console.log("Device A is syncing (A)");
|
||||
await deviceA.sync({ type: "full" });
|
||||
console.log("Device B is syncing (A)");
|
||||
await deviceB.sync({ type: "full" });
|
||||
|
||||
await deviceA.notes.add({
|
||||
id: noteId,
|
||||
content: { data: "<p>Hello (I am from device B)</p>", type: "tiptap" }
|
||||
content: {
|
||||
data: "<p>Hello (unique note edit) device A</p>",
|
||||
type: "tiptap"
|
||||
}
|
||||
});
|
||||
console.log("Device A is syncing (B)");
|
||||
await deviceA.sync({ type: "full" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10000));
|
||||
await delay(2500);
|
||||
|
||||
await deviceB.notes.add({
|
||||
id: noteId,
|
||||
content: { data: "<p>Hello (I am from device A)</p>", type: "tiptap" }
|
||||
content: {
|
||||
data: "<p>Hello unique note edit (device B)</p>",
|
||||
type: "tiptap"
|
||||
}
|
||||
});
|
||||
expect((await deviceB.content.findByNoteId(noteId)).synced).toBe(false);
|
||||
console.log("Syncing device B");
|
||||
await deviceB.sync({ type: "full" });
|
||||
|
||||
// // var content = JSON.stringify(await deviceB.content.findByNoteId(noteId));
|
||||
expect(await deviceB.notes.conflicted.count()).toBeGreaterThan(0);
|
||||
},
|
||||
TEST_TIMEOUT * 10
|
||||
}
|
||||
);
|
||||
|
||||
// test(
|
||||
@@ -382,6 +446,7 @@ test(
|
||||
|
||||
test(
|
||||
"issue: running force sync from device A makes device B always download everything",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
@@ -402,20 +467,20 @@ test(
|
||||
});
|
||||
}
|
||||
|
||||
await syncAndWait(deviceA, deviceB, true);
|
||||
|
||||
const handler = vitest.fn();
|
||||
deviceB.eventManager.subscribe(EVENTS.syncProgress, handler);
|
||||
|
||||
await deviceB.sync({ type: "full" });
|
||||
await syncAndWait(deviceA, deviceB, true);
|
||||
|
||||
expect(handler).not.toHaveBeenCalled();
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
expect(handler.mock.calls.every(([data]) => data.type === "download")).toBe(
|
||||
true
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"issue: colors are not properly created if multiple notes are synced together",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
@@ -445,7 +510,12 @@ test(
|
||||
colorCode: "#ffff22"
|
||||
});
|
||||
for (let noteId of noteIds) {
|
||||
expect(await deviceB.notes.note(noteId)).toBeTruthy();
|
||||
// console.log(
|
||||
// "Adding color to note",
|
||||
// noteId,
|
||||
// await deviceB.syncer.devices.get()
|
||||
// );
|
||||
expect(await deviceB.notes.note(noteId)).toBeDefined();
|
||||
expect(
|
||||
await deviceB.relations
|
||||
.from({ id: colorId, type: "color" }, "note")
|
||||
@@ -467,36 +537,274 @@ test(
|
||||
expect(
|
||||
noteIds.every((id) => purpleNotes.findIndex((p) => p.id === id) > -1)
|
||||
).toBe(true);
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"monograph: published note appears on other device after sync",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
initializeDevice("deviceB")
|
||||
]);
|
||||
|
||||
t.onTestFinished(async () => {
|
||||
console.log(`${t.task.name} log out`);
|
||||
await cleanup(deviceA, deviceB);
|
||||
});
|
||||
|
||||
const noteId = await deviceA.notes.add({
|
||||
title: "Monograph note",
|
||||
content: { data: "<p>monograph content</p>", type: "tiptap" }
|
||||
});
|
||||
|
||||
// publish on device A
|
||||
const monographId = await deviceA.monographs.publish(noteId);
|
||||
expect(monographId).toBeTruthy();
|
||||
|
||||
// deviceB fetches the monograph
|
||||
await deviceB.sync({ type: "fetch" });
|
||||
|
||||
expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
|
||||
|
||||
const mono = await deviceB.monographs.get(noteId);
|
||||
expect(mono).toBeTruthy();
|
||||
expect(mono.title).toBe("Monograph note");
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"monograph: self-destruct triggers unpublish across devices",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB, deviceC] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
initializeDevice("deviceB"),
|
||||
initializeDevice("deviceC")
|
||||
]);
|
||||
|
||||
t.onTestFinished(async () => {
|
||||
// console.log(`${t.task.name} log out`);
|
||||
await cleanup(deviceA, deviceB, deviceC);
|
||||
});
|
||||
|
||||
const noteId = await deviceA.notes.add({
|
||||
title: "Self destruct monograph",
|
||||
content: { data: "<p>transient</p>", type: "tiptap" }
|
||||
});
|
||||
|
||||
// publish with selfDestruct
|
||||
const monographId = await deviceA.monographs.publish(noteId, {
|
||||
selfDestruct: true
|
||||
});
|
||||
expect(monographId).toBeTruthy();
|
||||
|
||||
// ensure all devices know it's published
|
||||
await deviceA.sync({ type: "fetch" });
|
||||
await deviceB.sync({ type: "fetch" });
|
||||
await deviceC.sync({ type: "fetch" });
|
||||
|
||||
expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
|
||||
expect(deviceC.monographs.isPublished(noteId)).toBeTruthy();
|
||||
|
||||
// trigger a view on the public endpoint which should cause self-destruct
|
||||
await http
|
||||
.get(`${Constants.API_HOST}/monographs/${noteId}/view`)
|
||||
.catch(() => {});
|
||||
|
||||
// give the server a short moment to process
|
||||
await delay(500);
|
||||
|
||||
// sync all devices to pick up the unpublish
|
||||
await deviceA.sync({ type: "fetch" });
|
||||
await deviceB.sync({ type: "fetch" });
|
||||
await deviceC.sync({ type: "fetch" });
|
||||
|
||||
expect(deviceA.monographs.isPublished(noteId)).toBeFalsy();
|
||||
expect(deviceB.monographs.isPublished(noteId)).toBeFalsy();
|
||||
expect(deviceC.monographs.isPublished(noteId)).toBeFalsy();
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"monograph: properties (password and selfDestruct) sync across devices",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
initializeDevice("deviceB")
|
||||
]);
|
||||
|
||||
t.onTestFinished(async () => {
|
||||
console.log(`${t.task.name} log out`);
|
||||
await cleanup(deviceA, deviceB);
|
||||
});
|
||||
|
||||
const password = "s3cr3t";
|
||||
|
||||
const noteId = await deviceA.notes.add({
|
||||
title: "Protected monograph",
|
||||
content: { data: "<p>secret content</p>", type: "tiptap" }
|
||||
});
|
||||
|
||||
// publish with password and selfDestruct on device A
|
||||
const monographId = await deviceA.monographs.publish(noteId, {
|
||||
password,
|
||||
selfDestruct: true
|
||||
});
|
||||
expect(monographId).toBeTruthy();
|
||||
|
||||
// deviceB fetches the monograph
|
||||
await deviceB.sync({ type: "fetch" });
|
||||
|
||||
expect(deviceB.monographs.isPublished(noteId)).toBeTruthy();
|
||||
|
||||
const mono = await deviceB.monographs.get(noteId);
|
||||
expect(mono).toBeTruthy();
|
||||
expect(mono.selfDestruct).toBe(true);
|
||||
expect(mono.password).toBeTruthy();
|
||||
|
||||
const decrypted = await deviceB.monographs.decryptPassword(mono.password);
|
||||
expect(decrypted).toBe(password);
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"edge case: deletion on one device propagates to others",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
initializeDevice("deviceB")
|
||||
]);
|
||||
|
||||
t.onTestFinished(async () => {
|
||||
console.log(`${t.task.name} log out`);
|
||||
await cleanup(deviceA, deviceB);
|
||||
});
|
||||
|
||||
const id = await deviceA.notes.add({ title: "note to delete" });
|
||||
|
||||
// ensure both devices have latest state where note exists
|
||||
await deviceA.sync({ type: "full" });
|
||||
await deviceB.sync({ type: "full" });
|
||||
expect(await deviceB.notes.note(id)).toBeTruthy();
|
||||
|
||||
// delete on deviceA
|
||||
await deviceA.notes.remove(id);
|
||||
await deviceA.sync({ type: "full" });
|
||||
|
||||
// deviceB should observe deletion after syncing
|
||||
await deviceB.sync({ type: "full" });
|
||||
expect(await deviceB.notes.note(id)).toBeFalsy();
|
||||
}
|
||||
);
|
||||
|
||||
test(
|
||||
"stress: sync 5000 notes from device A to device B",
|
||||
testOptions,
|
||||
async (t) => {
|
||||
const [deviceA, deviceB] = await Promise.all([
|
||||
initializeDevice("deviceA"),
|
||||
initializeDevice("deviceB")
|
||||
]);
|
||||
|
||||
t.onTestFinished(async () => {
|
||||
console.log(`${t.task.name} log out`);
|
||||
await cleanup(deviceA, deviceB);
|
||||
});
|
||||
|
||||
for (let i = 0; i < 5000; ++i) {
|
||||
await deviceA.notes.add({
|
||||
title: `note ${i}`,
|
||||
content: {
|
||||
type: "tiptap",
|
||||
data: `<p>deviceA=true</p>`
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
await deviceA.sync({ type: "full" });
|
||||
await deviceB.sync({ type: "full" });
|
||||
|
||||
const countA = await deviceA.notes.all.count();
|
||||
const countB = await deviceB.notes.all.count();
|
||||
|
||||
expect(countA).toBeGreaterThanOrEqual(5000);
|
||||
expect(countB).toBeGreaterThanOrEqual(5000);
|
||||
}
|
||||
);
|
||||
|
||||
test("stress: super concurrent sync", testOptions, async (t) => {
|
||||
console.time("adding devices");
|
||||
const devices = await Promise.all(
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
.split("")
|
||||
.map((letter) => initializeDevice(`device${letter}`))
|
||||
);
|
||||
console.timeEnd("adding devices");
|
||||
|
||||
t.onTestFinished(async () => {
|
||||
console.log(`${t.task.name} log out`);
|
||||
await cleanup(...devices);
|
||||
});
|
||||
|
||||
await Promise.all(
|
||||
devices.map(async (device, index) => {
|
||||
for (let i = 0; i < 100; ++i) {
|
||||
await device.notes.add({
|
||||
content: {
|
||||
type: "tiptap",
|
||||
data: `<p>device${i}${index}=true</p>`
|
||||
}
|
||||
});
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
devices.map(async (device) => {
|
||||
await device.sync({ type: "send" });
|
||||
})
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
devices.map(async (device) => {
|
||||
await device.sync({ type: "fetch" });
|
||||
})
|
||||
);
|
||||
|
||||
for (const device of devices) {
|
||||
// await device.sync({ type: "full" });
|
||||
|
||||
expect(await device.notes.all.count()).toBeGreaterThanOrEqual(
|
||||
devices.length * 100
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} id
|
||||
* @returns {Promise<Database>}
|
||||
* @returns {Promise<import("../src/api/index.ts").default>}
|
||||
*/
|
||||
async function initializeDevice(id) {
|
||||
// initialize(new NodeStorageInterface(), false);
|
||||
|
||||
console.time(`Init ${id}`);
|
||||
|
||||
EV.subscribe(EVENTS.syncCheckStatus, async (type) => {
|
||||
return {
|
||||
type,
|
||||
result: true
|
||||
};
|
||||
});
|
||||
// console.time(`Init ${id}`);
|
||||
|
||||
const device = await databaseTest("memory");
|
||||
|
||||
console.time("login" + id);
|
||||
await login(device);
|
||||
console.timeEnd("login" + id);
|
||||
|
||||
await device.user.resetUser(false);
|
||||
// await device.user.resetUser(false);
|
||||
|
||||
await device.sync({ type: "full" });
|
||||
|
||||
console.timeEnd(`Init ${id}`);
|
||||
// console.timeEnd(`Init ${id}`);
|
||||
return device;
|
||||
}
|
||||
|
||||
@@ -511,22 +819,33 @@ async function cleanup(...devices) {
|
||||
await device.user.logout();
|
||||
device.eventManager.unsubscribeAll();
|
||||
}
|
||||
EV.unsubscribeAll();
|
||||
// EV.unsubscribeAll();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {Database} deviceA
|
||||
* @param {Database} deviceB
|
||||
* @param {import("../src/api/index.ts").default} deviceA
|
||||
* @param {import("../src/api/index.ts").default} deviceB
|
||||
* @returns
|
||||
*/
|
||||
function syncAndWait(deviceA, deviceB, force = false) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const ref2 = deviceB.eventManager.subscribe(
|
||||
EVENTS.databaseSyncRequested,
|
||||
(full, force) => {
|
||||
async (full, force, deviceId) => {
|
||||
if (!full) return;
|
||||
console.log("sync requested by device A", full, force);
|
||||
if (deviceId !== (await deviceA.syncer.devices.get())) {
|
||||
console.warn(
|
||||
"Concurrency error. Expected:",
|
||||
await deviceA.syncer.devices.get(),
|
||||
"Got:",
|
||||
deviceId
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log("sync requested by device A:", deviceId, full, force);
|
||||
|
||||
ref2.unsubscribe();
|
||||
deviceB
|
||||
.sync({
|
||||
@@ -538,13 +857,13 @@ function syncAndWait(deviceA, deviceB, force = false) {
|
||||
}
|
||||
);
|
||||
|
||||
console.log(
|
||||
"waiting for sync...",
|
||||
"Device A:",
|
||||
deviceA.syncer.sync.syncing,
|
||||
"Device B:",
|
||||
deviceB.syncer.sync.syncing
|
||||
);
|
||||
// console.log(
|
||||
// "waiting for sync...",
|
||||
// "Device A:",
|
||||
// deviceA.syncer.sync.syncing,
|
||||
// "Device B:",
|
||||
// deviceB.syncer.sync.syncing
|
||||
// );
|
||||
|
||||
deviceA.sync({ type: "full", force }).catch(reject);
|
||||
});
|
||||
|
||||
@@ -288,15 +288,16 @@ class Database {
|
||||
"options not specified. Did you forget to call db.setup()?"
|
||||
);
|
||||
|
||||
EV.subscribeMulti(
|
||||
[EVENTS.userLoggedIn, EVENTS.userFetched, EVENTS.tokenRefreshed],
|
||||
this.eventManager.subscribeMulti(
|
||||
[EVENTS.userLoggedIn, EVENTS.userFetched],
|
||||
this.connectSSE,
|
||||
this
|
||||
);
|
||||
EV.subscribe(EVENTS.tokenRefreshed, () => this.connectSSE());
|
||||
EV.subscribe(EVENTS.attachmentDeleted, async (attachment: Attachment) => {
|
||||
await this.fs().cancel(attachment.hash);
|
||||
});
|
||||
EV.subscribe(EVENTS.userLoggedOut, async () => {
|
||||
this.eventManager.subscribe(EVENTS.userLoggedOut, async () => {
|
||||
await this.monographs.clear();
|
||||
await this.fs().clear();
|
||||
this.disconnectSSE();
|
||||
@@ -388,11 +389,11 @@ class Database {
|
||||
});
|
||||
|
||||
this.eventSource.onopen = async () => {
|
||||
console.log("SSE: opened channel successfully!");
|
||||
logger.log("SSE: opened channel successfully!");
|
||||
};
|
||||
|
||||
this.eventSource.onerror = function (error) {
|
||||
console.log("SSE: error:", error);
|
||||
logger.error(error, "SSE: error");
|
||||
};
|
||||
|
||||
this.eventSource.onmessage = async (event) => {
|
||||
@@ -405,7 +406,7 @@ class Database {
|
||||
if (!user) break;
|
||||
user.subscription = data;
|
||||
await this.user.setUser(user);
|
||||
EV.publish(EVENTS.userSubscriptionUpdated, data);
|
||||
this.eventManager.publish(EVENTS.userSubscriptionUpdated, data);
|
||||
await this.tokenManager._refreshToken(true);
|
||||
break;
|
||||
}
|
||||
@@ -416,15 +417,12 @@ class Database {
|
||||
case "emailConfirmed": {
|
||||
await this.tokenManager._refreshToken(true);
|
||||
await this.user.fetchUser();
|
||||
EV.publish(EVENTS.userEmailConfirmed);
|
||||
this.eventManager.publish(EVENTS.userEmailConfirmed);
|
||||
break;
|
||||
}
|
||||
case "triggerSync": {
|
||||
await this.sync({ type: "fetch" });
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.log("SSE: Unsupported message. Message = ", event.data);
|
||||
logger.error("SSE: Unsupported message. Message = ", event.data);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -62,9 +62,9 @@ class Collector {
|
||||
const ciphers = await this.db
|
||||
.storage()
|
||||
.encryptMulti(key, syncableItems);
|
||||
const items = toPushItem(ids, ciphers);
|
||||
if (!items) continue;
|
||||
yield { items, type: itemType };
|
||||
const items = toSyncItem(ids, ciphers);
|
||||
if (!items.length) continue;
|
||||
yield { items, type: itemType, count: items.length };
|
||||
|
||||
await this.db
|
||||
.sql()
|
||||
@@ -88,15 +88,17 @@ class Collector {
|
||||
}
|
||||
export default Collector;
|
||||
|
||||
function toPushItem(ids: string[], ciphers: Cipher<"base64">[]) {
|
||||
function toSyncItem(ids: string[], ciphers: Cipher<"base64">[]) {
|
||||
if (ids.length !== ciphers.length)
|
||||
throw new Error("ids.length must be equal to ciphers.length");
|
||||
|
||||
const items: SyncItem[] = [];
|
||||
for (let i = 0; i < ids.length; ++i) {
|
||||
const id = ids[i];
|
||||
const cipher = ciphers[i];
|
||||
items.push({ ...cipher, v: CURRENT_DATABASE_VERSION, id });
|
||||
const cipher = ciphers[i] as SyncItem;
|
||||
cipher.v = CURRENT_DATABASE_VERSION;
|
||||
cipher.id = id;
|
||||
items.push(cipher);
|
||||
}
|
||||
return items;
|
||||
}
|
||||
|
||||
@@ -166,7 +166,7 @@ class Sync {
|
||||
this.autoSync = new AutoSync(db, 1000);
|
||||
this.devices = new SyncDevices(db.kv, db.tokenManager);
|
||||
|
||||
EV.subscribe(EVENTS.userLoggedOut, async () => {
|
||||
db.eventManager.subscribe(EVENTS.userLoggedOut, async () => {
|
||||
await this.connection?.stop();
|
||||
this.autoSync.stop();
|
||||
});
|
||||
@@ -285,7 +285,7 @@ class Sync {
|
||||
);
|
||||
}
|
||||
}
|
||||
if (done > 0) await this.connection?.send("PushCompleted");
|
||||
if (done > 0) await this.connection?.send("PushCompletedV2", deviceId);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -332,8 +332,13 @@ class Sync {
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
async onPushCompleted() {
|
||||
this.db.eventManager.publish(EVENTS.databaseSyncRequested, true, false);
|
||||
async onPushCompleted(deviceId: string) {
|
||||
this.db.eventManager.publish(
|
||||
EVENTS.databaseSyncRequested,
|
||||
true,
|
||||
false,
|
||||
deviceId
|
||||
);
|
||||
}
|
||||
|
||||
async processChunk(
|
||||
@@ -442,7 +447,9 @@ class Sync {
|
||||
.withHubProtocol(new JsonHubProtocol())
|
||||
.build();
|
||||
this.connection.serverTimeoutInMilliseconds = 60 * 1000 * 5;
|
||||
this.connection.on("PushCompleted", () => this.onPushCompleted());
|
||||
this.connection.on("PushCompletedV2", (deviceId: string) =>
|
||||
this.onPushCompleted(deviceId)
|
||||
);
|
||||
this.connection.on("SendVaultKey", async (vaultKey) => {
|
||||
if (this.connection?.state !== HubConnectionState.Connected) return false;
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ import {
|
||||
} from "../../types.js";
|
||||
import { ParsedInboxItem, SyncInboxItem } from "./types.js";
|
||||
|
||||
const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000;
|
||||
const THRESHOLD = process.env.NODE_ENV === "test" ? 2 * 1000 : 60 * 1000;
|
||||
class Merger {
|
||||
logger = logger.scope("Merger");
|
||||
constructor(private readonly db: Database) {}
|
||||
|
||||
@@ -47,6 +47,7 @@ export const SYNC_ITEM_TYPES = Object.keys(
|
||||
export type SyncTransferItem = {
|
||||
items: SyncItem[];
|
||||
type: SyncableItemType;
|
||||
count: number;
|
||||
};
|
||||
|
||||
export type SyncInboxItem = Omit<SyncItem, "format"> & {
|
||||
|
||||
@@ -47,14 +47,14 @@ const ENDPOINTS = {
|
||||
temporaryToken: "/account/token",
|
||||
logout: "/account/logout"
|
||||
};
|
||||
const REFRESH_TOKEN_MUTEX = withTimeout(
|
||||
new Mutex(),
|
||||
10 * 1000,
|
||||
new Error("Timed out while refreshing access token.")
|
||||
);
|
||||
|
||||
class TokenManager {
|
||||
logger = logger.scope("TokenManager");
|
||||
private REFRESH_TOKEN_MUTEX = withTimeout(
|
||||
new Mutex(),
|
||||
10 * 1000,
|
||||
new Error("Timed out while refreshing access token.")
|
||||
);
|
||||
|
||||
constructor(private readonly storage: KVStorageAccessor) {}
|
||||
|
||||
async getToken(renew = true, forceRenew = false): Promise<Token | undefined> {
|
||||
@@ -101,7 +101,7 @@ class TokenManager {
|
||||
}
|
||||
|
||||
async _refreshToken(forceRenew = false) {
|
||||
await REFRESH_TOKEN_MUTEX.runExclusive(async () => {
|
||||
await this.REFRESH_TOKEN_MUTEX.runExclusive(async () => {
|
||||
this.logger.info("Refreshing access token");
|
||||
|
||||
const token = await this.getToken(false, false);
|
||||
@@ -145,7 +145,7 @@ class TokenManager {
|
||||
}
|
||||
|
||||
saveToken(tokenResponse: Omit<Token, "t">) {
|
||||
this.logger.info("Saving new token", tokenResponse);
|
||||
this.logger.info("Saving new token");
|
||||
if (!tokenResponse || !tokenResponse.access_token) return;
|
||||
const token: Token = { ...tokenResponse, t: Date.now() };
|
||||
return this.storage().write("token", token);
|
||||
|
||||
@@ -192,7 +192,7 @@ class UserManager {
|
||||
salt: user.salt
|
||||
});
|
||||
}
|
||||
EV.publish(EVENTS.userLoggedIn, user);
|
||||
this.db.eventManager.publish(EVENTS.userLoggedIn, user);
|
||||
} catch (e) {
|
||||
await this.tokenManager.saveToken(token);
|
||||
throw e;
|
||||
@@ -240,7 +240,7 @@ class UserManager {
|
||||
await this.db.setLastSynced(0);
|
||||
await this.db.syncer.devices.register();
|
||||
|
||||
EV.publish(EVENTS.userLoggedIn, user);
|
||||
this.db.eventManager.publish(EVENTS.userLoggedIn, user);
|
||||
}
|
||||
|
||||
async getSessions() {
|
||||
@@ -281,8 +281,8 @@ class UserManager {
|
||||
this.cachedAttachmentKey = undefined;
|
||||
this.cachedInboxKeys = undefined;
|
||||
await this.db.reset();
|
||||
EV.publish(EVENTS.userLoggedOut, reason);
|
||||
EV.publish(EVENTS.appRefreshRequested);
|
||||
this.db.eventManager.publish(EVENTS.userLoggedOut, reason);
|
||||
this.db.eventManager.publish(EVENTS.appRefreshRequested);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +345,7 @@ class UserManager {
|
||||
}
|
||||
|
||||
async fetchUser(): Promise<User | undefined> {
|
||||
const oldUser = await this.getUser();
|
||||
try {
|
||||
const token = await this.tokenManager.getAccessToken();
|
||||
if (!token) return;
|
||||
@@ -353,7 +354,6 @@ class UserManager {
|
||||
token
|
||||
);
|
||||
if (user) {
|
||||
const oldUser = await this.getUser();
|
||||
await this.setUser(user);
|
||||
if (
|
||||
oldUser &&
|
||||
@@ -362,18 +362,21 @@ class UserManager {
|
||||
oldUser.subscription.provider !== user.subscription.provider)
|
||||
) {
|
||||
await this.tokenManager._refreshToken(true);
|
||||
EV.publish(EVENTS.userSubscriptionUpdated, user.subscription);
|
||||
this.db.eventManager.publish(
|
||||
EVENTS.userSubscriptionUpdated,
|
||||
user.subscription
|
||||
);
|
||||
}
|
||||
if (oldUser && !oldUser.isEmailConfirmed && user.isEmailConfirmed)
|
||||
EV.publish(EVENTS.userEmailConfirmed);
|
||||
EV.publish(EVENTS.userFetched, user);
|
||||
this.db.eventManager.publish(EVENTS.userEmailConfirmed);
|
||||
this.db.eventManager.publish(EVENTS.userFetched, user);
|
||||
return user;
|
||||
} else {
|
||||
return await this.getUser();
|
||||
return oldUser;
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e, "Error fetching user");
|
||||
return await this.getUser();
|
||||
return oldUser;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -559,7 +562,7 @@ class UserManager {
|
||||
async hasInboxKeys() {
|
||||
if (this.cachedInboxKeys) return true;
|
||||
|
||||
let user = await this.getUser();
|
||||
const user = await this.getUser();
|
||||
if (!user) return false;
|
||||
|
||||
return !!user.inboxKeys;
|
||||
|
||||
@@ -57,7 +57,7 @@ export default class Vault {
|
||||
|
||||
constructor(private readonly db: Database) {
|
||||
this.password = undefined;
|
||||
EV.subscribe(EVENTS.userLoggedOut, () => {
|
||||
db.eventManager.subscribe(EVENTS.userLoggedOut, () => {
|
||||
this.password = undefined;
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user