Files
notesnook/packages/core/api/sync/index.js

353 lines
8.8 KiB
JavaScript
Raw Normal View History

2021-10-26 23:06:52 +05:00
import {
EV,
EVENTS,
sendAttachmentsProgressEvent,
2022-02-08 15:09:39 +05:00
sendSyncProgressEvent,
2021-10-26 23:06:52 +05:00
} from "../../common";
import Constants from "../../utils/constants";
2020-12-16 12:06:25 +05:00
import TokenManager from "../token-manager";
import Collector from "./collector";
import { areAllEmpty } from "./utils";
2022-03-30 15:52:48 +05:00
import { Mutex } from "async-mutex";
2022-02-08 13:16:41 +05:00
import * as signalr from "@microsoft/signalr";
2022-03-30 15:52:48 +05:00
import Merger from "./merger";
import Conflicts from "./conflicts";
import { SyncQueue } from "./syncqueue";
import { AutoSync } from "./auto-sync";
import { toChunks } from "../../utils/array";
2022-03-31 16:18:34 +05:00
import { MessagePackHubProtocol } from "@microsoft/signalr-protocol-msgpack";
2022-02-10 16:10:54 +05:00
const ITEM_TYPE_MAP = {
attachments: "attachment",
content: "content",
notes: "note",
2022-03-28 10:45:41 +05:00
notebooks: "notebook",
2022-02-10 16:10:54 +05:00
settings: "settings",
};
2021-10-26 23:06:52 +05:00
2022-03-30 15:52:48 +05:00
/**
* @typedef {{
* item: string,
* itemType: string,
* lastSynced: number,
* current: number,
* total: number,
* synced?: boolean
* }} SyncTransferItem
*/
/**
* @typedef {{
* items: string[],
* types: string[],
* lastSynced: number,
* current: number,
* total: number,
* }} BatchedSyncTransferItem
*/
export default class SyncManager {
2020-04-09 16:36:57 +05:00
/**
*
* @param {import("../index").default} db
2020-04-09 16:36:57 +05:00
*/
constructor(db) {
2022-03-30 15:52:48 +05:00
this.sync = new Sync(db);
2022-02-10 16:10:54 +05:00
this.syncMutex = new Mutex();
2022-03-30 15:52:48 +05:00
}
2022-03-30 20:45:16 +05:00
async start(full, force) {
2022-03-30 15:52:48 +05:00
if (this.syncMutex.isLocked()) return false;
2022-03-30 20:45:16 +05:00
return this.syncMutex.runExclusive(async () => {
await this.sync.autoSync.start();
2022-03-31 16:18:34 +05:00
await this.sync.start(full, force);
2022-03-30 20:45:16 +05:00
});
2022-03-30 15:52:48 +05:00
}
async acquireLock(callback) {
this.sync.autoSync.stop();
await this.syncMutex.runExclusive(callback);
await this.sync.autoSync.start();
}
async stop() {
await this.sync.cancel();
}
}
class Sync {
/**
*
* @param {import("../index").default} db
*/
constructor(db) {
this.db = db;
this.conflicts = new Conflicts(db);
this.collector = new Collector(db);
this.queue = new SyncQueue(db.storage);
this.merger = new Merger(db);
this.autoSync = new AutoSync(db, 1000);
const tokenManager = new TokenManager(db.storage);
this.connection = new signalr.HubConnectionBuilder()
2022-02-08 13:16:41 +05:00
.withUrl(`${Constants.API_HOST}/hubs/sync`, {
2022-03-30 15:52:48 +05:00
accessTokenFactory: () => tokenManager.getAccessToken(),
2022-02-08 13:16:41 +05:00
})
2022-03-31 16:18:34 +05:00
.withHubProtocol(new MessagePackHubProtocol({ ignoreUndefined: true }))
2022-02-08 13:16:41 +05:00
.build();
2022-02-10 16:10:54 +05:00
2022-03-28 10:54:13 +05:00
EV.subscribe(EVENTS.userLoggedOut, async () => {
2022-03-30 15:52:48 +05:00
await this.connection.stop();
this.autoSync.stop();
2022-03-28 10:54:13 +05:00
});
2022-03-30 15:52:48 +05:00
this.connection.on("SyncItem", async (syncStatus) => {
2022-03-31 00:08:21 +05:00
await this.onSyncItem(syncStatus);
sendSyncProgressEvent(
this.db.eventManager,
"download",
syncStatus.total,
syncStatus.current
);
2022-03-30 15:52:48 +05:00
});
2022-03-28 15:00:27 +05:00
this.connection.on("RemoteSyncCompleted", (lastSynced) => {
this.onRemoteSyncCompleted(lastSynced);
2022-03-31 00:08:21 +05:00
});
2022-03-30 15:52:48 +05:00
}
2022-03-28 15:00:27 +05:00
2022-03-30 15:52:48 +05:00
/**
*
* @param {boolean} full
* @param {boolean} force
* @param {Object} ignoredIds
*/
async start(full, force, serverLastSynced) {
2022-03-30 15:52:48 +05:00
this.connection.onclose(() => {
throw new Error("Connection closed.");
});
2020-04-09 16:36:57 +05:00
2022-03-30 15:52:48 +05:00
const { lastSynced, oldLastSynced } = await this.init(force);
2021-10-26 23:06:52 +05:00
2022-03-31 09:40:51 +05:00
const { newLastSynced, data } = await this.collect(lastSynced, force);
2022-03-30 20:45:16 +05:00
const serverResponse = full ? await this.fetch(lastSynced) : null;
2022-03-30 15:52:48 +05:00
if (await this.send(data, newLastSynced)) {
await this.stop(newLastSynced);
} else if (serverResponse) {
await this.stop(serverResponse.lastSynced);
} else {
await this.stop(serverLastSynced || oldLastSynced);
2022-03-30 15:52:48 +05:00
}
}
2022-03-30 15:52:48 +05:00
async init(isForceSync) {
if (this.connection.state !== signalr.HubConnectionState.Connected)
await this.connection.start();
2022-03-30 15:52:48 +05:00
await this.conflicts.recalculate();
2022-02-08 13:16:41 +05:00
let lastSynced = await this.db.lastSynced();
2022-03-30 15:52:48 +05:00
if (isForceSync) lastSynced = 0;
2022-02-08 13:16:41 +05:00
2022-03-30 15:52:48 +05:00
const oldLastSynced = lastSynced;
return { lastSynced, oldLastSynced };
}
2022-02-10 16:10:54 +05:00
2022-03-30 15:52:48 +05:00
async fetch(lastSynced) {
const serverResponse = await new Promise((resolve, reject) => {
2022-03-31 16:18:34 +05:00
let counter = { count: 0, queue: 0 };
2022-03-30 15:52:48 +05:00
this.connection.stream("FetchItems", lastSynced).subscribe({
2022-03-31 16:18:34 +05:00
next: (/** @type {SyncTransferItem} */ syncStatus) => {
2022-03-31 00:08:21 +05:00
const { total, item, synced, lastSynced } = syncStatus;
2022-03-31 17:14:09 +05:00
if (synced || !item) return;
2022-03-31 00:08:21 +05:00
2022-03-31 17:14:09 +05:00
++counter.count;
++counter.queue;
const progress = counter.count;
this.onSyncItem(syncStatus)
.then(() => {
2022-03-31 16:18:34 +05:00
sendSyncProgressEvent(
this.db.eventManager,
`download`,
total,
progress
);
2022-03-31 17:14:09 +05:00
})
.catch(reject)
.finally(() => {
if (--counter.queue <= 0) resolve({ synced, lastSynced });
2022-03-31 16:18:34 +05:00
});
2022-03-30 15:52:48 +05:00
},
2022-03-31 00:08:21 +05:00
complete: () => {},
2022-03-30 15:52:48 +05:00
error: reject,
2022-02-08 13:16:41 +05:00
});
2022-03-30 15:52:48 +05:00
});
2022-02-08 13:16:41 +05:00
2022-03-30 15:52:48 +05:00
if (await this.conflicts.check()) {
throw new Error(
"Merge conflicts detected. Please resolve all conflicts to continue syncing."
);
2022-02-08 13:16:41 +05:00
}
2022-03-31 00:08:21 +05:00
return serverResponse;
2022-03-30 15:52:48 +05:00
}
2022-03-31 09:40:51 +05:00
async collect(lastSynced, force) {
2022-03-30 15:52:48 +05:00
const newLastSynced = Date.now();
2022-02-08 13:16:41 +05:00
2022-03-31 09:40:51 +05:00
let data = await this.collector.collect(lastSynced, force);
2022-03-30 15:52:48 +05:00
let { syncedAt } = await this.queue.get();
2022-02-10 16:10:54 +05:00
if (syncedAt) {
2022-03-30 15:52:48 +05:00
const newData = this.collector.filter(
2022-02-10 16:10:54 +05:00
data,
(item) => item.dateModified > syncedAt
2022-02-08 13:16:41 +05:00
);
2022-03-30 15:52:48 +05:00
await this.queue.merge(newData, newLastSynced);
2022-02-10 16:10:54 +05:00
} else {
2022-03-30 15:52:48 +05:00
await this.queue.new(data, newLastSynced);
2022-02-10 16:10:54 +05:00
}
2022-03-30 15:52:48 +05:00
return { newLastSynced, data };
}
/**
*
* @param {object} data
* @param {number} lastSynced
* @returns {Promise<boolean>}
*/
async send(data, lastSynced) {
await this.uploadAttachments();
if (areAllEmpty(data)) return false;
2022-02-10 16:10:54 +05:00
2022-03-30 15:52:48 +05:00
const { itemIds } = await this.queue.get();
2022-03-30 20:45:16 +05:00
if (!itemIds) return false;
2022-03-30 15:52:48 +05:00
const arrays = itemIds.reduce(
(arrays, id) => {
const [arrayKey, itemId] = id.split(":");
2022-02-10 16:10:54 +05:00
const array = data[arrayKey] || [];
2022-03-30 15:52:48 +05:00
2022-02-10 16:10:54 +05:00
const item = array.find((item) => item.id === itemId);
2022-03-30 15:52:48 +05:00
if (!item) return arrays;
2022-02-10 16:10:54 +05:00
const type = ITEM_TYPE_MAP[arrayKey];
2022-03-30 15:52:48 +05:00
arrays.types.push(type);
arrays.items.push(JSON.stringify(item));
2022-03-30 16:36:21 +05:00
arrays.ids.push(id);
2022-02-08 13:16:41 +05:00
2022-03-30 15:52:48 +05:00
return arrays;
},
{ items: [], types: [], ids: [] }
);
2022-02-08 13:16:41 +05:00
2022-03-30 15:52:48 +05:00
if (data.vaultKey) {
arrays.ids.push("vaultKey");
arrays.types.push("vaultKey");
arrays.items.push(JSON.stringify(data.vaultKey));
}
2022-03-31 16:18:34 +05:00
let total = arrays.ids.length;
2022-03-30 15:52:48 +05:00
arrays.ids = toChunks(arrays.ids, 30);
arrays.types = toChunks(arrays.types, 30);
arrays.items = toChunks(arrays.items, 30);
let index = 0;
for (let i = 0; i < arrays.ids.length; ++i) {
const ids = arrays.ids[i];
const items = arrays.items[i];
const types = arrays.types[i];
const result = await this.sendBatchToServer({
lastSynced,
current: ++index,
total,
items,
types,
});
if (result) {
await this.queue.dequeue(...ids);
sendSyncProgressEvent(
this.db.eventManager,
"upload",
total,
2022-03-31 10:33:38 +05:00
index * ids.length
2022-03-30 15:52:48 +05:00
);
}
}
2022-03-30 20:45:16 +05:00
return await this.connection.invoke("SyncCompleted", lastSynced);
}
2022-03-30 15:52:48 +05:00
async stop(lastSynced) {
2022-03-30 20:45:16 +05:00
const storedLastSynced = await this.db.lastSynced();
if (lastSynced > storedLastSynced)
await this.db.storage.write("lastSynced", lastSynced);
2022-03-30 15:52:48 +05:00
this.db.eventManager.publish(EVENTS.syncCompleted);
2020-04-09 16:36:57 +05:00
}
2021-09-20 12:10:36 +05:00
2022-03-30 15:52:48 +05:00
async cancel() {
await this.connection.stop();
}
2022-03-30 15:52:48 +05:00
/**
* @private
*/
async uploadAttachments() {
const attachments = this.db.attachments.pending;
for (var i = 0; i < attachments.length; ++i) {
const attachment = attachments[i];
2022-02-28 13:05:51 +05:00
const { hash } = attachment.metadata;
sendAttachmentsProgressEvent("upload", hash, attachments.length, i);
2021-09-20 12:10:36 +05:00
try {
2022-03-30 15:52:48 +05:00
const isUploaded = await this.db.fs.uploadFile(hash, hash);
if (!isUploaded) throw new Error("Failed to upload file.");
2021-09-20 12:10:36 +05:00
2022-03-30 15:52:48 +05:00
await this.db.attachments.markAsUploaded(attachment.id);
} catch (e) {
2022-02-28 13:05:51 +05:00
console.error(e, attachment);
const error = e.message;
2022-03-30 15:52:48 +05:00
await this.db.attachments.markAsFailed(attachment.id, error);
}
}
sendAttachmentsProgressEvent("upload", null, attachments.length);
2021-09-20 12:10:36 +05:00
}
2021-10-26 23:06:52 +05:00
2022-02-10 16:10:54 +05:00
/**
2022-03-30 15:52:48 +05:00
* @private
2022-02-10 16:10:54 +05:00
*/
async onRemoteSyncCompleted(lastSynced) {
await this.start(false, false, lastSynced);
2022-02-10 16:10:54 +05:00
}
2022-03-30 15:52:48 +05:00
/**
* @param {SyncTransferItem} syncStatus
* @private
*/
2022-03-31 00:08:21 +05:00
onSyncItem(syncStatus) {
const { item: itemJSON, itemType } = syncStatus;
2022-03-30 15:52:48 +05:00
const item = JSON.parse(itemJSON);
2022-02-10 16:10:54 +05:00
2022-03-31 00:08:21 +05:00
return this.merger.mergeItem(itemType, item);
2022-02-10 16:10:54 +05:00
}
/**
*
2022-03-30 15:52:48 +05:00
* @param {BatchedSyncTransferItem} batch
* @returns {Promise<boolean>}
* @private
2022-02-10 16:10:54 +05:00
*/
2022-03-30 15:52:48 +05:00
async sendBatchToServer(batch) {
if (!batch) return false;
const result = await this.connection.invoke("SyncItem", batch);
return result === 1;
2022-02-10 16:10:54 +05:00
}
2020-04-09 16:36:57 +05:00
}