Files
notesnook/packages/core/api/sync/index.js

373 lines
10 KiB
JavaScript
Raw Normal View History

2020-04-09 16:36:57 +05:00
/**
* GENERAL PROCESS:
* make a get request to server with current lastSynced
2020-04-09 16:36:57 +05:00
* parse the response. the response should contain everything that user has on the server
* decrypt the response
* merge everything into the database and look for conflicts
* send the conflicts (if any) to the end-user for resolution
* once the conflicts have been resolved, send the updated data back to the server
*/
/**
* MERGING:
* Locally, get everything that was editted/added after the lastSynced
2020-04-09 16:36:57 +05:00
* Run forEach loop on the server response.
* Add items that do not exist in the local collections
* Remove items (without asking) that need to be removed
* Update items that were editted before the lastSynced
* Try to merge items that were edited after the lastSynced
2020-04-09 16:36:57 +05:00
* Items in which the content has changed, send them for conflict resolution
* Otherwise, keep the most recently updated copy.
*/
/**
* CONFLICTS:
* Syncing should pause until all the conflicts have been resolved
* And then it should continue.
*/
2021-10-26 23:06:52 +05:00
import {
checkIsUserPremium,
CHECK_IDS,
EV,
EVENTS,
sendAttachmentsProgressEvent,
2022-02-08 15:09:39 +05:00
sendSyncProgressEvent,
2021-10-26 23:06:52 +05:00
} from "../../common";
import Constants from "../../utils/constants";
2020-12-16 12:06:25 +05:00
import http from "../../utils/http";
import TokenManager from "../token-manager";
import Collector from "./collector";
2020-04-09 16:36:57 +05:00
import Merger from "./merger";
import { areAllEmpty } from "./utils";
2021-10-26 23:06:52 +05:00
import { Mutex, withTimeout } from "async-mutex";
2022-02-08 13:16:41 +05:00
import * as signalr from "@microsoft/signalr";
import RealtimeMerger from "./realtimeMerger";
2022-02-10 16:10:54 +05:00
import set from "../../utils/set";
const ITEM_TYPE_MAP = {
attachments: "attachment",
content: "content",
notes: "note",
2022-03-28 10:45:41 +05:00
notebooks: "notebook",
2022-02-10 16:10:54 +05:00
settings: "settings",
};
2021-10-26 23:06:52 +05:00
2020-04-09 16:36:57 +05:00
export default class Sync {
/**
*
* @param {import("../index").default} db
2020-04-09 16:36:57 +05:00
*/
constructor(db) {
this._db = db;
this._collector = new Collector(this._db);
this._merger = new Merger(this._db);
2022-02-08 13:16:41 +05:00
this._realtimeMerger = new RealtimeMerger(this._db);
2022-02-10 16:10:54 +05:00
this.syncMutex = new Mutex();
this._tokenManager = new TokenManager(this._db.storage);
2021-10-26 23:06:52 +05:00
this._autoSyncTimeout = 0;
2022-02-10 16:10:54 +05:00
this._autoSyncInterval = 100;
this._queue = new SyncQueue(this._db.storage);
2022-02-08 13:16:41 +05:00
this._connection = new signalr.HubConnectionBuilder()
.withUrl(`${Constants.API_HOST}/hubs/sync`, {
accessTokenFactory: () => this._db.user.tokenManager.getAccessToken(),
})
.build();
2022-02-10 16:10:54 +05:00
2022-03-28 10:54:13 +05:00
EV.subscribe(EVENTS.userLoggedOut, async () => {
await this._connection.stop();
});
2022-03-28 13:31:18 +05:00
let ignoredIds = {};
this._connection.on("SyncItem", async (type, itemJSON, current, total) => {
const item = JSON.parse(itemJSON);
ignoredIds[item.id] = true;
2022-02-08 13:16:41 +05:00
this.stopAutoSync();
2022-03-28 13:31:18 +05:00
await this._realtimeMerger.mergeItem(type, item);
2022-02-08 13:16:41 +05:00
EV.publish(EVENTS.appRefreshRequested);
console.log("sync item", type, item, current, total);
2022-02-08 15:09:39 +05:00
sendSyncProgressEvent("download", total, current);
2022-02-08 13:16:41 +05:00
await this.startAutoSync();
});
this._connection.on("RemoteSyncCompleted", async () => {
2022-03-28 13:31:18 +05:00
await this._realTimeSync(false, false, ignoredIds);
ignoredIds = {};
});
2020-04-09 16:36:57 +05:00
}
2020-12-11 20:19:28 +05:00
async start(full, force) {
2021-10-26 23:06:52 +05:00
if (this.syncMutex.isLocked()) return false;
return this.syncMutex
.runExclusive(() => {
this.stopAutoSync();
2022-02-08 13:16:41 +05:00
return this._realTimeSync(full, force);
2021-10-26 23:06:52 +05:00
})
.finally(() => this._afterSync());
}
2022-02-08 13:16:41 +05:00
// async remoteSync() {
// if (this.syncMutex.isLocked()) {
// this.hasNewChanges = true;
// return;
// }
// await this.syncMutex
// .runExclusive(async () => {
// this.stopAutoSync();
// this.hasNewChanges = false;
// if (await this._realTimeSync(true, false))
// EV.publish(EVENTS.appRefreshRequested);
// })
// .finally(() => this._afterSync());
// }
async startAutoSync() {
if (!(await checkIsUserPremium(CHECK_IDS.databaseSync))) return;
this.databaseUpdatedEvent = EV.subscribe(
EVENTS.databaseUpdated,
this._scheduleSync.bind(this)
);
}
stopAutoSync() {
clearTimeout(this._autoSyncTimeout);
if (this.databaseUpdatedEvent) this.databaseUpdatedEvent.unsubscribe();
}
2021-11-18 19:40:59 +05:00
async acquireLock(callback) {
this.stopAutoSync();
2021-11-18 19:40:59 +05:00
await this.syncMutex.runExclusive(callback);
await this.startAutoSync();
}
2022-03-28 13:31:18 +05:00
async _realTimeSync(full, force, ignoredIds) {
console.log("STARTING REALTIME SYNC");
2022-02-08 13:16:41 +05:00
if (this._connection.state !== signalr.HubConnectionState.Connected)
await this._connection.start();
let { lastSynced } = await this._performChecks();
2022-02-10 16:10:54 +05:00
const oldLastSynced = lastSynced;
2022-02-08 13:16:41 +05:00
if (force) lastSynced = 0;
2022-02-10 16:10:54 +05:00
let { syncedAt } = await this._queue.get();
if (full && !syncedAt) {
2022-02-08 13:16:41 +05:00
await this._connection.send("FetchItems", lastSynced);
var serverResponse = await new Promise((resolve) => {
2022-02-08 15:09:39 +05:00
this._connection.on("SyncCompleted", (synced, lastSynced) =>
resolve({ synced, lastSynced })
);
2022-02-08 13:16:41 +05:00
});
await this._db.conflicts.check();
}
await this._uploadAttachments();
2022-02-10 16:10:54 +05:00
const data = await this._collector.collect(lastSynced);
console.log(data, syncedAt, lastSynced);
if (syncedAt) {
const newData = this._collector.filter(
data,
(item) => item.dateModified > syncedAt
2022-02-08 13:16:41 +05:00
);
2022-02-10 16:10:54 +05:00
lastSynced = Date.now();
await this._queue.merge(newData, lastSynced);
} else {
lastSynced = Date.now();
await this._queue.new(data, lastSynced);
}
if (!areAllEmpty(data)) {
const { itemIds } = await this._queue.get();
const total = itemIds.length;
for (let i = 0; i < total; ++i) {
const id = itemIds[i];
2022-03-28 13:31:18 +05:00
if (ignoredIds && ignoredIds[id]) continue;
2022-02-10 16:10:54 +05:00
const [arrayKey, itemId] = id.split(":");
const array = data[arrayKey] || [];
const item = array.find((item) => item.id === itemId);
const type = ITEM_TYPE_MAP[arrayKey];
if (!item) {
continue;
}
2022-03-28 10:45:41 +05:00
console.log(item, type, array);
2022-02-10 16:10:54 +05:00
if (await this.sendItemToServer(type, item, lastSynced)) {
await this._queue.dequeue(id);
sendSyncProgressEvent("upload", total, i + 1);
}
}
if (data.vaultKey)
await this.sendItemToServer("vaultKey", data.vaultKey, lastSynced);
if (!(await this._connection.invoke("SyncCompleted", lastSynced)))
lastSynced = oldLastSynced;
} else if (serverResponse) {
lastSynced = serverResponse.lastSynced;
} else {
lastSynced = oldLastSynced;
}
2022-02-08 13:16:41 +05:00
await this._db.storage.write("lastSynced", lastSynced);
console.log("REALTIME SYNC DONE!");
2022-02-08 13:16:41 +05:00
return true;
}
2021-10-26 23:06:52 +05:00
async _afterSync() {
if (!this.hasNewChanges) {
this.startAutoSync();
} else {
return this.remoteSync();
}
2021-10-26 23:06:52 +05:00
}
2021-10-26 23:06:52 +05:00
_scheduleSync() {
this.stopAutoSync();
this._autoSyncTimeout = setTimeout(() => {
2022-03-08 13:17:24 +05:00
EV.publish(EVENTS.databaseSyncRequested, false, false);
}, this._autoSyncInterval);
}
async _send(data) {
let token = await this._tokenManager.getAccessToken();
2020-12-16 12:06:25 +05:00
let response = await http.post.json(
`${Constants.API_HOST}/sync`,
data,
token
);
return response.lastSynced;
2020-04-09 16:36:57 +05:00
}
2021-09-20 12:10:36 +05:00
async _mergeAttachments(lastSynced) {
let token = await this._tokenManager.getAccessToken();
var serverResponse = await this._fetchAttachments(lastSynced, token);
await this._merger.merge(serverResponse, lastSynced);
}
async _uploadAttachments() {
2021-09-20 12:10:36 +05:00
const attachments = this._db.attachments.pending;
for (var i = 0; i < attachments.length; ++i) {
const attachment = attachments[i];
2022-02-28 13:05:51 +05:00
const { hash } = attachment.metadata;
sendAttachmentsProgressEvent("upload", hash, attachments.length, i);
2021-09-20 12:10:36 +05:00
try {
const isUploaded = await this._db.fs.uploadFile(hash, hash);
if (!isUploaded) throw new Error("Failed to upload file.");
2021-09-20 12:10:36 +05:00
await this._db.attachments.markAsUploaded(attachment.id);
} catch (e) {
2022-02-28 13:05:51 +05:00
console.error(e, attachment);
const error = e.message;
await this._db.attachments.markAsFailed(attachment.id, error);
}
}
sendAttachmentsProgressEvent("upload", null, attachments.length);
2021-09-20 12:10:36 +05:00
}
2021-10-26 23:06:52 +05:00
async _performChecks() {
let lastSynced = (await this._db.lastSynced()) || 0;
// update the conflicts status and if find any, throw
await this._db.conflicts.recalculate();
await this._db.conflicts.check();
return { lastSynced };
2021-10-26 23:06:52 +05:00
}
async _fetch(lastSynced) {
let token = await this._tokenManager.getAccessToken();
2021-10-26 23:06:52 +05:00
return await http.get(
`${Constants.API_HOST}/sync?lst=${lastSynced}`,
token
);
}
async _fetchAttachments(lastSynced) {
let token = await this._tokenManager.getAccessToken();
2021-10-26 23:06:52 +05:00
return await http.get(
`${Constants.API_HOST}/sync/attachments?lst=${lastSynced}`,
token
);
}
2022-02-08 13:16:41 +05:00
2022-02-10 16:10:54 +05:00
async sendItemToServer(type, item, dateSynced) {
if (!item) return;
const result = await this._connection.invoke(
"SyncItem",
type,
JSON.stringify(item),
dateSynced
);
return result === 1;
}
}
function mapToIds(data) {
const ids = [];
const keys = ["attachments", "content", "notes", "notebooks", "settings"];
for (let key of keys) {
const array = data[key];
if (!array || !Array.isArray(array)) continue;
for (let item of array) {
ids.push(`${key}:${item.id}`);
2022-02-08 13:16:41 +05:00
}
}
2022-02-10 16:10:54 +05:00
return ids;
}
class SyncQueue {
/**
*
* @param {import("../../database/storage").default} storage
*/
constructor(storage) {
this.storage = storage;
}
async new(data, syncedAt) {
const itemIds = mapToIds(data);
const syncData = { itemIds, syncedAt };
await this.save(syncData);
return syncData;
}
async merge(data, syncedAt) {
const syncQueue = await this.get();
if (!syncQueue.itemIds) return;
const itemIds = set.union(syncQueue.itemIds, mapToIds(data));
const syncData = { itemIds, syncedAt };
await this.save(syncData);
return syncData;
}
async dequeue(id) {
const syncQueue = await this.get();
if (!syncQueue || !syncQueue.itemIds) return;
const { itemIds } = syncQueue;
const index = itemIds.findIndex((i) => i === id);
if (index <= -1) return;
syncQueue.itemIds.splice(index, 1);
await this.save(syncQueue);
}
/**
*
* @returns {Promise<{ itemIds: string[]; syncedAt: number; }>}
*/
async get() {
const syncQueue = await this.storage.read("syncQueue");
if (!syncQueue || syncQueue.itemIds.length <= 0) return {};
return syncQueue;
}
async save(syncQueue) {
await this.storage.write("syncQueue", syncQueue);
}
2020-04-09 16:36:57 +05:00
}