import { EV, EVENTS, sendAttachmentsProgressEvent, sendSyncProgressEvent, } from "../../common"; import Constants from "../../utils/constants"; import TokenManager from "../token-manager"; import Collector from "./collector"; import { areAllEmpty } from "./utils"; import { Mutex } from "async-mutex"; import * as signalr from "@microsoft/signalr"; import Merger from "./merger"; import Conflicts from "./conflicts"; import { SyncQueue } from "./syncqueue"; import { AutoSync } from "./auto-sync"; import { toChunks } from "../../utils/array"; import id from "../../utils/id"; const ITEM_TYPE_MAP = { attachments: "attachment", content: "content", notes: "note", notebooks: "notebook", settings: "settings", }; /** * @typedef {{ * syncId: string, * item: string, * itemType: string, * lastSynced: number, * current: number, * total: number, * synced?: boolean * }} SyncTransferItem */ /** * @typedef {{ * items: string[], * types: string[], * lastSynced: number, * current: number, * total: number, * syncId: string * }} BatchedSyncTransferItem */ export default class SyncManager { /** * * @param {import("../index").default} db */ constructor(db) { this.sync = new Sync(db); this.syncMutex = new Mutex(); } start(full, force) { if (this.syncMutex.isLocked()) return false; return this.syncMutex .runExclusive(() => { this.sync.autoSync.stop(); return this.sync.start(full, force); }) .finally(() => this.sync.autoSync.start()); } async acquireLock(callback) { this.sync.autoSync.stop(); await this.syncMutex.runExclusive(callback); await this.sync.autoSync.start(); } async stop() { await this.sync.cancel(); } } class Sync { /** * * @param {import("../index").default} db */ constructor(db) { this.db = db; this.runningSyncs = {}; this.conflicts = new Conflicts(db); this.collector = new Collector(db); this.queue = new SyncQueue(db.storage); this.merger = new Merger(db); this.autoSync = new AutoSync(db, 1000); this.syncId = null; const tokenManager = new TokenManager(db.storage); this.connection = new signalr.HubConnectionBuilder() .withUrl(`${Constants.API_HOST}/hubs/sync`, { accessTokenFactory: () => tokenManager.getAccessToken(), }) .build(); EV.subscribe(EVENTS.userLoggedOut, async () => { await this.connection.stop(); this.autoSync.stop(); }); this.connection.on("SyncItem", async (syncStatus) => { this.autoSync.stop(); await this.onSyncItem.call(this, syncStatus); await this.autoSync.start(); }); this.connection.on("RemoteSyncCompleted", (syncId) => this.onRemoteSyncCompleted(syncId) ); } /** * * @param {boolean} full * @param {boolean} force * @param {Object} ignoredIds */ async start(full, force, ignoredIds) { this.connection.onclose(() => { throw new Error("Connection closed."); }); this.syncId = this.getSyncId(); const { lastSynced, oldLastSynced } = await this.init(force); const serverResponse = full ? await this.fetch(lastSynced) : null; const { newLastSynced, data } = await this.collect(lastSynced, ignoredIds); if (await this.send(data, newLastSynced)) { await this.stop(newLastSynced); } else if (serverResponse) { await this.stop(serverResponse.lastSynced); } else { await this.stop(oldLastSynced); } } async init(isForceSync) { if (this.connection.state !== signalr.HubConnectionState.Connected) await this.connection.start(); await this.conflicts.recalculate(); let lastSynced = (await this.db.lastSynced()) || 0; if (isForceSync) lastSynced = 0; const oldLastSynced = lastSynced; return { lastSynced, oldLastSynced }; } async fetch(lastSynced) { const serverResponse = await new Promise((resolve, reject) => { let serverLastSynced = 0; let _synced = false; this.connection.stream("FetchItems", lastSynced).subscribe({ next: async (/** @type {SyncTransferItem} */ syncStatus) => { const { item, synced, lastSynced } = syncStatus; serverLastSynced = lastSynced; _synced = synced; if (synced || !item) return; await this.onSyncItem(syncStatus); }, complete: () => { resolve({ synced: _synced, lastSynced: serverLastSynced }); }, error: reject, }); }); if (await this.conflicts.check()) { throw new Error( "Merge conflicts detected. Please resolve all conflicts to continue syncing." ); } return serverResponse; } async collect(lastSynced, ignoredIds) { const newLastSynced = Date.now(); let data = await this.collector.collect(lastSynced); if (ignoredIds) data = this.collector.filter(data, (item) => !ignoredIds[item.id]); let { syncedAt } = await this.queue.get(); if (syncedAt) { const newData = this.collector.filter( data, (item) => item.dateModified > syncedAt ); await this.queue.merge(newData, newLastSynced); } else { await this.queue.new(data, newLastSynced); } return { newLastSynced, data }; } /** * * @param {object} data * @param {number} lastSynced * @returns {Promise} */ async send(data, lastSynced) { await this.uploadAttachments(); if (areAllEmpty(data)) return false; const { itemIds } = await this.queue.get(); const total = itemIds.length; const arrays = itemIds.reduce( (arrays, id) => { const [arrayKey, itemId] = id.split(":"); const array = data[arrayKey] || []; const item = array.find((item) => item.id === itemId); if (!item) return arrays; const type = ITEM_TYPE_MAP[arrayKey]; arrays.types.push(type); arrays.items.push(JSON.stringify(item)); arrays.ids.push(id); return arrays; }, { items: [], types: [], ids: [] } ); if (data.vaultKey) { arrays.ids.push("vaultKey"); arrays.types.push("vaultKey"); arrays.items.push(JSON.stringify(data.vaultKey)); } arrays.ids = toChunks(arrays.ids, 30); arrays.types = toChunks(arrays.types, 30); arrays.items = toChunks(arrays.items, 30); let index = 0; for (let i = 0; i < arrays.ids.length; ++i) { const ids = arrays.ids[i]; const items = arrays.items[i]; const types = arrays.types[i]; const result = await this.sendBatchToServer({ lastSynced, current: ++index, total, items, types, syncId: this.syncId, }); if (result) { await this.queue.dequeue(...ids); sendSyncProgressEvent( this.db.eventManager, "upload", total, index + ids.length, items ); } } return await this.connection.invoke( "SyncCompleted", lastSynced, this.syncId ); } async stop(lastSynced) { await this.db.storage.write("lastSynced", lastSynced); this.db.eventManager.publish(EVENTS.syncCompleted); } async cancel() { await this.connection.stop(); } /** * @private */ async uploadAttachments() { const attachments = this.db.attachments.pending; for (var i = 0; i < attachments.length; ++i) { const attachment = attachments[i]; const { hash } = attachment.metadata; sendAttachmentsProgressEvent("upload", hash, attachments.length, i); try { const isUploaded = await this.db.fs.uploadFile(hash, hash); if (!isUploaded) throw new Error("Failed to upload file."); await this.db.attachments.markAsUploaded(attachment.id); } catch (e) { console.error(e, attachment); const error = e.message; await this.db.attachments.markAsFailed(attachment.id, error); } } sendAttachmentsProgressEvent("upload", null, attachments.length); } /** * @private */ async onRemoteSyncCompleted(syncId) { const ignoredIds = this.runningSyncs[syncId]; await this.start(false, false, ignoredIds); this.runningSyncs[syncId] = {}; } /** * @param {SyncTransferItem} syncStatus * @private */ async onSyncItem(syncStatus) { const { current, id: syncId, item: itemJSON, itemType, total } = syncStatus; const item = JSON.parse(itemJSON); if (syncId) { this.runningSyncs[syncId] = { ...this.runningSyncs[syncId], [item.id]: true, }; } await this.merger.mergeItem(itemType, item); sendSyncProgressEvent(this.db.eventManager, "download", total, current); } /** * * @param {BatchedSyncTransferItem} batch * @returns {Promise} * @private */ async sendBatchToServer(batch) { if (!batch) return false; const result = await this.connection.invoke("SyncItem", batch); return result === 1; } getSyncId() { return id(); } }