core: implement sync v2

This commit is contained in:
Abdullah Atta
2023-12-08 11:51:51 +05:00
parent f81814a426
commit 94150a92aa
10 changed files with 265 additions and 214 deletions

View File

@@ -20,7 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
import {
databaseTest,
TEST_NOTE,
delay,
loginFakeUser
} from "../../../../__tests__/utils";
import Collector from "../collector";
@@ -31,14 +30,9 @@ test("newly created note should get included in collector", () =>
await loginFakeUser(db);
const collector = new Collector(db);
const lastSyncedTime = Date.now() - 10000;
const noteId = await db.notes.add(TEST_NOTE);
const items = [];
for await (const item of collector.collect(100, lastSyncedTime, false)) {
items.push(item);
}
const items = await iteratorToArray(collector.collect(100, false));
expect(items).toHaveLength(2);
expect(items[0].type).toBe("content");
@@ -47,62 +41,59 @@ test("newly created note should get included in collector", () =>
expect(items[1].type).toBe("note");
}));
test("edited note after last synced time should get included in collector", () =>
test("synced property should be true after getting collected by the collector", () =>
databaseTest().then(async (db) => {
await loginFakeUser(db);
const collector = new Collector(db);
const noteId = await db.notes.add(TEST_NOTE);
const items = await iteratorToArray(collector.collect(100, false));
const items2 = await iteratorToArray(collector.collect(100, false));
expect(items2).toHaveLength(0);
expect(items).toHaveLength(2);
expect(items[0].type).toBe("content");
expect(items[0].items[0].id).toBe((await db.notes.note(noteId)).contentId);
expect(items[1].items[0].id).toBe(noteId);
expect(items[1].type).toBe("note");
}));
test("edited note should get included in collector", () =>
databaseTest().then(async (db) => {
await loginFakeUser(db);
const collector = new Collector(db);
const noteId = await db.notes.add(TEST_NOTE);
const lastSyncedTime = Date.now();
await delay(1000);
await iteratorToArray(collector.collect(100, false));
await db.notes.add({ id: noteId, pinned: true });
const items = [];
for await (const item of collector.collect(100, lastSyncedTime, false)) {
items.push(item);
}
const items = await iteratorToArray(collector.collect(100, false));
expect(items).toHaveLength(1);
expect(items[0].items[0].id).toBe(noteId);
}));
test("note edited before last synced time should not get included in collector", () =>
databaseTest().then(async (db) => {
await loginFakeUser(db);
const collector = new Collector(db);
const noteId = await db.notes.add(TEST_NOTE);
await db.notes.add({ id: noteId, pinned: true });
await delay(500);
const lastSyncedTime = Date.now();
const items = [];
for await (const item of collector.collect(100, lastSyncedTime, false)) {
items.push(item);
}
expect(items).toHaveLength(0);
}));
test("localOnly note should get included as a deleted item in collector", () =>
databaseTest().then(async (db) => {
await loginFakeUser(db);
const collector = new Collector(db);
await db.notes.add({ ...TEST_NOTE, localOnly: true });
const items = [];
for await (const item of collector.collect(100, 0, false)) {
items.push(item);
}
const items = await iteratorToArray(collector.collect(100, false));
expect(items).toHaveLength(2);
expect(items[0].items[0].length).toBe(104);
expect(items[1].items[0].length).toBe(104);
expect(items[0].items[0].length).toBe(77);
expect(items[1].items[0].length).toBe(77);
expect(items[0].type).toBe("content");
expect(items[1].type).toBe("note");
}));
async function iteratorToArray(iterator) {
let items = [];
for await (const item of iterator) {
items.push(item);
}
return items;
}

View File

@@ -35,7 +35,6 @@ class Collector {
async *collect(
chunkSize: number,
lastSyncedTimestamp: number,
isForceSync = false
): AsyncGenerator<SyncTransferItem, void, unknown> {
const key = await this.db.user.getEncryptionKey();
@@ -47,13 +46,15 @@ class Collector {
for (const itemType of SYNC_ITEM_TYPES) {
const collectionKey = SYNC_COLLECTIONS_MAP[itemType];
const collection = this.db[collectionKey].collection;
for await (const chunk of collection.unsynced(
isForceSync ? 0 : lastSyncedTimestamp,
chunkSize
)) {
for await (const chunk of collection.unsynced(chunkSize, isForceSync)) {
const items = await this.prepareChunk(chunk, key);
if (!items) continue;
yield { items, type: itemType };
await collection.update(
chunk.map((i) => i.id),
{ synced: true }
);
}
}
}
@@ -89,13 +90,20 @@ function filterSyncableItems(items: MaybeDeletedItem<Item>[]): {
const ids = [];
const syncableItems = [];
for (const item of items) {
// const isSyncable = !item.synced || isForceSync;
// synced is a local only property. we don't want to sync it.
delete item.synced;
ids.push(item.id);
syncableItems.push(JSON.stringify(item));
syncableItems.push(
JSON.stringify(
"localOnly" in item && item.localOnly
? {
id: item.id,
deleted: true,
dateModified: item.dateModified
}
: item
)
);
}
return { items: syncableItems, ids };
}

View File

@@ -0,0 +1,54 @@
/*
This file is part of the Notesnook project (https://notesnook.com/)
Copyright (C) 2023 Streetwriters (Private) Limited
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { StorageAccessor } from "../../interfaces";
import hosts from "../../utils/constants";
import http from "../../utils/http";
import { getId } from "../../utils/id";
import TokenManager from "../token-manager";
export class SyncDevices {
constructor(
private readonly storage: StorageAccessor,
private readonly tokenManager: TokenManager
) {}
async register() {
const deviceId = getId();
const url = `${hosts.API_HOST}/devices?deviceId=${deviceId}`;
const token = await this.tokenManager.getAccessToken();
return http
.post(url, null, token)
.then(() => this.storage().write("deviceId", deviceId));
}
async unregister() {
const deviceId = await this.storage().read("deviceId");
if (!deviceId) return;
const url = `${hosts.API_HOST}/devices?deviceId=${deviceId}`;
const token = await this.tokenManager.getAccessToken();
return http
.delete(url, token)
.then(() => this.storage().remove("deviceId"));
}
get() {
return this.storage().read<string>("deviceId");
}
}

View File

@@ -37,18 +37,20 @@ import { Mutex } from "async-mutex";
import Database from "..";
import { migrateItem } from "../../migrations";
import { SerializedKey } from "@notesnook/crypto";
import { Item, MaybeDeletedItem } from "../../types";
import { Item, MaybeDeletedItem, Note, Notebook } from "../../types";
import { SYNC_COLLECTIONS_MAP, SyncTransferItem } from "./types";
import { DownloadableFile } from "../../database/fs";
import { SyncDevices } from "./devices";
import { COLORS } from "../../database/backup";
export type SyncOptions = {
type: "full" | "fetch" | "send";
force?: boolean;
serverLastSynced?: number;
};
export default class SyncManager {
sync = new Sync(this.db);
devices = this.sync.devices;
constructor(private readonly db: Database) {}
async start(options: SyncOptions) {
@@ -106,13 +108,12 @@ class Sync {
logger = logger.scope("Sync");
syncConnectionMutex = new Mutex();
connection: signalr.HubConnection;
devices = new SyncDevices(this.db.storage, this.db.tokenManager);
constructor(private readonly db: Database) {
let remoteSyncTimeout = 0;
const tokenManager = new TokenManager(db.storage);
this.connection = new signalr.HubConnectionBuilder()
.withUrl(`${Constants.API_HOST}/hubs/sync`, {
.withUrl(`${Constants.API_HOST}/hubs/sync/v2`, {
accessTokenFactory: async () => {
const token = await tokenManager.getAccessToken();
if (!token) throw new Error("Failed to get access token.");
@@ -144,29 +145,7 @@ class Sync {
this.autoSync.stop();
});
this.connection.on("PushItems", async (chunk) => {
if (this.connection.state !== signalr.HubConnectionState.Connected)
return;
clearTimeout(remoteSyncTimeout);
remoteSyncTimeout = setTimeout(() => {
this.db.eventManager.publish(EVENTS.syncAborted);
}, 15000) as unknown as number;
const key = await this.db.user.getEncryptionKey();
if (!key || !key.key || !key.salt) {
EV.publish(EVENTS.userSessionExpired);
throw new Error("User encryption key not generated. Please relogin.");
}
const dbLastSynced = await this.db.lastSynced();
await this.processChunk(chunk, key, dbLastSynced, true);
});
this.connection.on("PushCompleted", (lastSynced) => {
clearTimeout(remoteSyncTimeout);
this.onPushCompleted(lastSynced);
});
this.connection.on("PushCompleted", () => this.onPushCompleted());
}
async start(options: SyncOptions) {
@@ -185,30 +164,21 @@ class Sync {
throw new Error("Connection closed.");
});
const { lastSynced, oldLastSynced } = await this.init(options.force);
this.logger.info("Initialized sync", { lastSynced, oldLastSynced });
const { deviceId } = await this.init(options.force);
this.logger.info("Initialized sync", { deviceId });
const newLastSynced = Date.now();
const serverResponse =
options.type === "fetch" || options.type === "full"
? await this.fetch(lastSynced)
: null;
this.logger.info("Data fetched", serverResponse || {});
if (options.type === "fetch" || options.type === "full") {
await this.fetch(deviceId);
this.logger.info("Data fetched");
}
if (
(options.type === "send" || options.type === "full") &&
(await this.send(lastSynced, newLastSynced, options.force))
) {
(await this.send(deviceId, options.force))
)
this.logger.info("New data sent");
await this.stop(newLastSynced);
} else if (serverResponse) {
this.logger.info("No new data to send.");
await this.stop(serverResponse.lastSynced);
} else {
this.logger.info("Nothing to do.");
await this.stop(options.serverLastSynced || oldLastSynced);
}
await this.stop();
if (!(await checkSyncStatus(SYNC_CHECK_IDS.autoSync))) {
await this.connection.stop();
@@ -222,14 +192,23 @@ class Sync {
this.conflicts.throw();
}
let lastSynced = await this.db.lastSynced();
if (isForceSync) lastSynced = 0;
if (isForceSync) {
await this.devices.unregister();
await this.devices.register();
}
const oldLastSynced = lastSynced;
return { lastSynced, oldLastSynced };
let deviceId = await this.devices.get();
if (!deviceId) {
await this.devices.register();
deviceId = await this.devices.get();
}
if (!deviceId) throw new Error("Sync device not registered.");
return { deviceId };
}
async fetch(lastSynced: number) {
async fetch(deviceId: string) {
await this.checkConnection();
const key = await this.db.user.getEncryptionKey();
@@ -241,14 +220,13 @@ class Sync {
return;
}
const dbLastSynced = await this.db.lastSynced();
let count = 0;
this.connection.off("SendItems");
this.connection.on("SendItems", async (chunk) => {
if (this.connection.state !== signalr.HubConnectionState.Connected)
return;
await this.processChunk(chunk, key, dbLastSynced);
await this.processChunk(chunk, key);
count += chunk.items.length;
sendSyncProgressEvent(this.db.eventManager, `download`, count);
@@ -257,7 +235,7 @@ class Sync {
});
const serverResponse = await this.connection.invoke(
"RequestFetch",
lastSynced
deviceId
);
if (
@@ -275,56 +253,46 @@ class Sync {
if (await this.conflicts.check()) {
this.conflicts.throw();
}
return { lastSynced: serverResponse.lastSynced };
}
async send(oldLastSynced, isForceSync, newLastSynced) {
return false;
async send(deviceId: string, isForceSync?: boolean) {
await this.uploadAttachments();
// await this.uploadAttachments();
let isSyncInitialized = false;
let done = 0;
for await (const item of this.collector.collect(100, isForceSync)) {
if (!isSyncInitialized) {
const vaultKey = await this.db.vault.getKey();
await this.connection.send("InitializePush", {
vaultKey,
synced: false
});
isSyncInitialized = true;
}
// let isSyncInitialized = false;
// let done = 0;
// for await (const item of this.collector.collect(
// 100,
// oldLastSynced,
// isForceSync
// )) {
// if (!isSyncInitialized) {
// const vaultKey = await this.db.vault._getKey();
// newLastSynced = await this.connection.invoke("InitializePush", {
// vaultKey,
// lastSynced: newLastSynced
// });
// isSyncInitialized = true;
// }
const result = await this.pushItem(deviceId, item);
if (result) {
done += item.items.length;
sendSyncProgressEvent(this.db.eventManager, "upload", done);
// const result = await this.pushItem(item, newLastSynced);
// if (result) {
// done += item.items.length;
// sendSyncProgressEvent(this.db.eventManager, "upload", done);
// this.logger.info(`Batch sent (${done})`);
// } else {
// this.logger.error(
// new Error(`Failed to send batch. Server returned falsy response.`)
// );
// }
// }
// if (!isSyncInitialized) return;
// await this.connection.invoke("SyncCompleted", newLastSynced);
// return true;
this.logger.info(`Batch sent (${done})`);
} else {
this.logger.error(
new Error(`Failed to send batch. Server returned falsy response.`)
);
}
}
if (!isSyncInitialized) return false;
await this.connection.send("PushCompleted");
return true;
}
async stop(lastSynced: number) {
async stop() {
// refresh monographs on sync completed
await this.db.monographs.refresh();
this.logger.info("Stopping sync", { lastSynced });
const storedLastSynced = await this.db.lastSynced();
if (lastSynced > storedLastSynced)
await this.db.storage().write("lastSynced", lastSynced);
this.logger.info("Stopping sync");
await this.db.storage().write("lastSynced", Date.now());
this.db.eventManager.publish(EVENTS.syncCompleted);
}
@@ -352,19 +320,13 @@ class Sync {
/**
* @private
*/
async onPushCompleted(lastSynced: number) {
this.db.eventManager.publish(
EVENTS.databaseSyncRequested,
false,
false,
lastSynced
);
async onPushCompleted() {
this.db.eventManager.publish(EVENTS.databaseSyncRequested, true, false);
}
async processChunk(
chunk: SyncTransferItem,
key: SerializedKey,
dbLastSynced: number,
notify = false
) {
const itemType = chunk.type;
@@ -372,11 +334,13 @@ class Sync {
const decrypted = await this.db.storage().decryptMulti(key, chunk.items);
const deserialized = await Promise.all(
decrypted.map((item, index) =>
deserializeItem(item, chunk.items[index].v, this.db)
const deserialized = (
await Promise.all(
decrypted.map((item, index) =>
deserializeItem(item, chunk.items[index].v, this.db)
)
)
);
).filter(Boolean);
const collectionType = SYNC_COLLECTIONS_MAP[itemType];
const collection = this.db[collectionType].collection;
@@ -385,7 +349,7 @@ class Sync {
if (itemType === "content") {
items = await Promise.all(
deserialized.map((item) =>
this.merger.mergeContent(item, localItems[item.id], dbLastSynced)
this.merger.mergeContent(item, localItems[item.id])
)
);
} else {
@@ -414,11 +378,9 @@ class Sync {
await collection.put(items as any);
}
private async pushItem(item: SyncTransferItem, newLastSynced: number) {
private async pushItem(deviceId: string, item: SyncTransferItem) {
await this.checkConnection();
return (
(await this.connection.invoke("PushItems", item, newLastSynced)) === 1
);
return (await this.connection.invoke("PushItems", deviceId, item)) === 1;
}
private async checkConnection() {
@@ -463,19 +425,46 @@ async function deserializeItem(
version: number,
database: Database
) {
const deserialized = JSON.parse(decryptedItem);
deserialized.remote = true;
deserialized.synced = true;
const item = JSON.parse(decryptedItem);
item.remote = true;
item.synced = true;
if (!deserialized.alg && !deserialized.cipher) {
await migrateItem(
deserialized,
if (!item.cipher) {
let migrationResult = await migrateItem(
item,
version,
CURRENT_DATABASE_VERSION,
deserialized.type,
item.type,
database,
"sync"
);
if (migrationResult === "skip") return;
// since items in trash can have their own set of migrations,
// we have to run the migration again to account for that.
if (item.type === "trash" && item.itemType) {
migrationResult = await migrateItem(
item as unknown as Note | Notebook,
version,
CURRENT_DATABASE_VERSION,
item.itemType,
database,
"backup"
);
if (migrationResult === "skip") return;
}
const itemType =
// colors are naively of type "tag" instead of "color" so we have to fix that.
item.type === "tag" && COLORS.includes(item.title.toLowerCase())
? "color"
: item.type === "trash" && "itemType" in item && item.itemType
? item.itemType
: item.type;
if (!itemType || itemType === "topic" || itemType === "settings") return;
if (migrationResult) item.synced = false;
}
return deserialized;
return item;
}

View File

@@ -33,7 +33,6 @@ class Merger {
isConflicted(
localItem: MaybeDeletedItem<Item>,
remoteItem: MaybeDeletedItem<Item>,
lastSynced: number,
conflictThreshold: number
) {
const isResolved =
@@ -46,7 +45,7 @@ class Merger {
// will be ahead of last sync. In that case, we also have to check if the
// synced flag is false (it is only false if a user makes edits on the
// local device).
localItem.dateModified > lastSynced && !localItem.synced;
localItem.dateModified > remoteItem.dateModified && !localItem.synced;
if (isModified && !isResolved) {
// If time difference between local item's edits & remote item's edits
// is less than threshold, we shouldn't trigger a merge conflict; instead
@@ -100,15 +99,13 @@ class Merger {
async mergeContent(
remoteItem: MaybeDeletedItem<Item>,
localItem: MaybeDeletedItem<Item> | undefined,
lastSynced: number
localItem: MaybeDeletedItem<Item> | undefined
) {
if (localItem && "localOnly" in localItem && localItem.localOnly) return;
const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000;
const conflicted =
localItem &&
this.isConflicted(localItem, remoteItem, lastSynced, THRESHOLD);
localItem && this.isConflicted(localItem, remoteItem, THRESHOLD);
if (!localItem || conflicted === "merge") {
return remoteItem;
} else if (conflicted === "conflict") {

View File

@@ -176,6 +176,7 @@ class UserManager {
if (!sessionExpired) {
await this.db.storage().write("lastSynced", 0);
await this.db.syncer.devices.register();
}
await this.db.storage().deriveCryptoKey(`_uk_@${user.email}`, {
@@ -228,6 +229,7 @@ class UserManager {
salt: user.salt
});
await this.db.storage().write("lastSynced", 0);
await this.db.syncer.devices.register();
EV.publish(EVENTS.userLoggedIn, user);
}
@@ -262,6 +264,7 @@ class UserManager {
async logout(revoke = true, reason?: string) {
try {
await this.db.syncer.devices.unregister();
if (revoke) await this.tokenManager.revokeToken();
} catch (e) {
console.error(e);

View File

@@ -87,7 +87,7 @@ function isLegacyBackupFile(
}
const MAX_CHUNK_SIZE = 10 * 1024 * 1024;
const COLORS = [
export const COLORS = [
"red",
"orange",
"yellow",
@@ -206,7 +206,9 @@ export default class Backup {
collection: DatabaseCollection<T, B>,
state: BackupState
) {
for await (const item of collection.stream() as any) {
for await (const item of collection.stream(
this.db.options.batchSize
) as any) {
const data = JSON.stringify(item);
state.buffer.push(data);
state.bufferLength += data.length;

View File

@@ -123,14 +123,14 @@ export interface DatabaseCollection<T, IsAsync extends boolean> {
Record<string, MaybeDeletedItem<T> | undefined>
>;
unsynced(
after: number,
chunkSize: number
chunkSize: number,
forceSync?: boolean
): IsAsync extends true
? AsyncIterableIterator<MaybeDeletedItem<T>[]>
: IterableIterator<MaybeDeletedItem<T>[]>;
stream(): IsAsync extends true
? AsyncIterableIterator<T>
: IterableIterator<T>;
stream(
chunkSize: number
): IsAsync extends true ? AsyncIterableIterator<T> : IterableIterator<T>;
}
export type DatabaseAccessor = () =>

View File

@@ -75,7 +75,8 @@ export class SQLCachedCollection<
this.cache.set(id, {
id,
deleted: true,
dateModified: Date.now()
dateModified: Date.now(),
synced: false
})
);
await this.collection.softDelete(ids);
@@ -141,13 +142,10 @@ export class SQLCachedCollection<
return items;
}
*unsynced(
after: number,
chunkSize: number
): IterableIterator<MaybeDeletedItem<T>[]> {
*unsynced(chunkSize: number): IterableIterator<MaybeDeletedItem<T>[]> {
let chunk: MaybeDeletedItem<T>[] = [];
for (const [_key, value] of this.cache) {
if (value && value.dateModified && value.dateModified > after) {
if (value && !value.synced) {
chunk.push(value);
if (chunk.length === chunkSize) {
yield chunk;

View File

@@ -88,7 +88,8 @@ export class SQLCollection<
ids.map((id) => ({
id,
deleted: true,
dateModified: Date.now()
dateModified: Date.now(),
synced: false
}))
)
.execute();
@@ -160,7 +161,8 @@ export class SQLCollection<
.where("id", "in", ids)
.set({
...partial,
dateModified: Date.now()
dateModified: Date.now(),
synced: partial.synced || false
})
.execute();
}
@@ -194,47 +196,54 @@ export class SQLCollection<
}
async *unsynced(
after: number,
chunkSize: number
chunkSize: number,
forceSync?: boolean
): AsyncIterableIterator<MaybeDeletedItem<T>[]> {
let index = 0;
let lastRowId: string | null = null;
while (true) {
const rows = await this.db()
const rows = (await this.db()
.selectFrom<keyof DatabaseSchema>(this.type)
.selectAll()
.orderBy("dateModified", "asc")
.$if(after > 0, (eb) =>
eb.where("dateModified", ">", after).where(isFalse("synced"))
)
.$if(lastRowId != null, (qb) => qb.where("id", ">", lastRowId!))
.$if(!forceSync, (eb) => eb.where(isFalse("synced")))
.$if(this.type === "attachments", (eb) =>
eb.where("dateUploaded", ">", 0)
eb.where((eb) =>
eb.or([eb("dateUploaded", ">", 0), eb("deleted", "==", true)])
)
)
.offset(index)
.orderBy("id")
.limit(chunkSize)
.execute();
.execute()) as MaybeDeletedItem<T>[];
if (rows.length === 0) break;
index += chunkSize;
yield rows as MaybeDeletedItem<T>[];
yield rows;
lastRowId = rows[rows.length - 1].id;
}
}
async *stream(): AsyncIterableIterator<T> {
let index = 0;
const chunkSize = 50;
async *stream(chunkSize: number): AsyncIterableIterator<T> {
let lastRow: T | null = null;
while (true) {
const rows = await this.db()
const rows = (await this.db()
.selectFrom<keyof DatabaseSchema>(this.type)
.where(isFalse("deleted"))
.orderBy("dateCreated desc")
.orderBy("dateCreated asc")
.orderBy("id asc")
.$if(lastRow !== null, (qb) =>
qb.where(
(eb) => eb.refTuple("dateCreated", "id"),
">",
(eb) => eb.tuple(lastRow!.dateCreated, lastRow!.id)
)
)
.selectAll()
.offset(index)
.limit(chunkSize)
.execute();
.execute()) as T[];
if (rows.length === 0) break;
index += chunkSize;
for (const row of rows) {
yield row as T;
yield row;
}
lastRow = rows[rows.length - 1];
}
}