core: fix realtime sync

This commit is contained in:
Abdullah Atta
2023-08-31 13:22:49 +05:00
committed by Abdullah Atta
parent 339c5a357b
commit 81d3ec83d9
2 changed files with 56 additions and 33 deletions

View File

@@ -122,6 +122,15 @@ class Sync {
this.autoSync = new AutoSync(db, 1000); this.autoSync = new AutoSync(db, 1000);
this.logger = logger.scope("Sync"); this.logger = logger.scope("Sync");
this.syncConnectionMutex = new Mutex(); this.syncConnectionMutex = new Mutex();
this.itemTypeToCollection = {
note: "notes",
notebook: "notebooks",
content: "content",
attachment: "attachments",
relation: "relations",
reminder: "reminders",
shortcut: "shortcuts"
};
let remoteSyncTimeout = 0; let remoteSyncTimeout = 0;
const tokenManager = new TokenManager(db.storage); const tokenManager = new TokenManager(db.storage);
@@ -160,7 +169,25 @@ class Sync {
db.eventManager.publish(EVENTS.syncAborted); db.eventManager.publish(EVENTS.syncAborted);
}, 15000); }, 15000);
await this.onSyncItem(payload); const key = await this.db.user.getEncryptionKey();
const item = JSON.parse(payload.item);
const decryptedItem = await this.db.storage.decrypt(key, item);
const deserialized = await deserializeItem(
decryptedItem,
item.v,
this.db
);
const mergedItem = await this.onSyncItem(deserialized, payload.itemType);
const collectionType = this.itemTypeToCollection[payload.itemType];
if (collectionType && this.db[collectionType])
await this.db[collectionType]._collection.addItem(mergedItem);
if (payload.itemType === "content" || payload.itemType === "note") {
this.db.eventManager.publish(EVENTS.syncItemMerged, mergedItem);
}
sendSyncProgressEvent( sendSyncProgressEvent(
this.db.eventManager, this.db.eventManager,
"download", "download",
@@ -243,16 +270,6 @@ class Sync {
async fetch(lastSynced) { async fetch(lastSynced) {
await this.checkConnection(); await this.checkConnection();
const typeToCollection = {
note: this.db.notes,
notebook: this.db.notebooks,
content: this.db.content,
attachment: this.db.attachments,
relation: this.db.relations,
reminder: this.db.reminders,
shortcut: this.db.shortcuts
};
const key = await this.db.user.getEncryptionKey(); const key = await this.db.user.getEncryptionKey();
if (!key || !key.key || !key.salt) { if (!key || !key.key || !key.salt) {
EV.publish(EVENTS.userSessionExpired); EV.publish(EVENTS.userSessionExpired);
@@ -266,19 +283,9 @@ class Sync {
const decrypted = await this.db.storage.decryptMulti(key, chunk.items); const decrypted = await this.db.storage.decryptMulti(key, chunk.items);
const deserialized = await Promise.all( const deserialized = await Promise.all(
decrypted.map(async (item, index) => { decrypted.map((item, index) =>
const deserialized = JSON.parse(item); deserializeItem(item, chunk.items[index].v, this.db)
deserialized.remote = true; )
deserialized.synced = true;
// if (!migrate) return deserialized;
// it is a locked note, bail out.
if (deserialized.alg && deserialized.cipher) return deserialized;
const version = chunk.items[index].v;
await migrateItem(deserialized, version, deserialized.type, this._db);
return deserialized;
})
); );
let items = []; let items = [];
@@ -292,7 +299,7 @@ class Sync {
); );
items = await Promise.all( items = await Promise.all(
deserialized.map((item) => deserialized.map((item) =>
this.merger.mergeContent(item, localItems, dbLastSynced) this.merger.mergeContent(item, localItems[item.id], dbLastSynced)
) )
); );
} else { } else {
@@ -303,8 +310,9 @@ class Sync {
); );
} }
const collection = typeToCollection[chunk.type]; const collectionType = this.itemTypeToCollection[chunk.type];
if (collection) await collection._collection.setItems(items); if (collectionType && this.db[collectionType])
await this.db[collectionType]._collection.setItems(items);
count += chunk.items.length; count += chunk.items.length;
sendSyncProgressEvent( sendSyncProgressEvent(
@@ -444,10 +452,15 @@ class Sync {
/** /**
* @private * @private
*/ */
async onSyncItem(item, type) { async onSyncItem(item, type, lastSynced) {
const remoteItem = await this.merger.mergeItem(type, item); if (this.merger.isSyncCollection(type)) {
if (remoteItem) return this.merger.mergeItemSync(item, type, lastSynced);
this.db.eventManager.publish(EVENTS.syncItemMerged, remoteItem); } else if (type === "content") {
const localItem = await this.db.content.raw(item.id);
return await this.merger.mergeContent(item, localItem, lastSynced);
} else {
return await this.merger.mergeItem(item, type, lastSynced);
}
} }
/** /**
@@ -497,3 +510,14 @@ function promiseTimeout(ms, promise) {
// Returns a race between our timeout and the passed in promise // Returns a race between our timeout and the passed in promise
return Promise.race([promise, timeout]); return Promise.race([promise, timeout]);
} }
async function deserializeItem(decryptedItem, version, database) {
const deserialized = JSON.parse(decryptedItem);
deserialized.remote = true;
deserialized.synced = true;
if (!deserialized.alg && !deserialized.cipher) {
await migrateItem(deserialized, version, deserialized.type, database);
}
return deserialized;
}

View File

@@ -102,9 +102,8 @@ class Merger {
} }
} }
async mergeContent(remoteItem, localItems, lastSynced) { async mergeContent(remoteItem, localItem, lastSynced) {
const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000; const THRESHOLD = process.env.NODE_ENV === "test" ? 6 * 1000 : 60 * 1000;
const localItem = localItems[remoteItem.id];
const conflicted = const conflicted =
localItem && localItem &&
this.isConflicted(localItem, remoteItem, lastSynced, THRESHOLD); this.isConflicted(localItem, remoteItem, lastSynced, THRESHOLD);