core: migrate to sync v2 for data uploads

This commit is contained in:
Abdullah Atta
2023-09-02 11:29:00 +05:00
committed by Abdullah Atta
parent fac788d2a9
commit 2fa8fd14e7
9 changed files with 221 additions and 254 deletions

View File

@@ -20,6 +20,18 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
import { CURRENT_DATABASE_VERSION } from "../../common";
import { logger } from "../../logger";
const SYNC_COLLECTIONS_MAP = {
attachment: "attachments",
note: "notes",
notebook: "notebooks",
shortcut: "shortcuts",
reminder: "reminders",
relation: "relations"
};
const ASYNC_COLLECTIONS_MAP = {
content: "content"
};
class Collector {
/**
*
@@ -30,95 +42,106 @@ class Collector {
this.logger = logger.scope("SyncCollector");
}
async collect(lastSyncedTimestamp, isForceSync) {
await this._db.notes.init();
this._lastSyncedTimestamp = lastSyncedTimestamp;
this.key = await this._db.user.getEncryptionKey();
const vaultKey = await this._db.vault._getKey();
const collections = {
note: this._db.notes.raw,
shortcut: this._db.shortcuts.raw,
notebook: this._db.notebooks.raw,
content: await this._db.content.all(),
attachment: this._db.attachments.syncable,
reminder: this._db.reminders.raw,
relation: this._db.relations.raw,
settings: [this._db.settings.raw]
};
const result = { items: [], types: [] };
for (const type in collections) {
this._collect(type, collections[type], result, isForceSync);
}
if (vaultKey) {
result.items.push(vaultKey);
result.types.push("vaultKey");
}
return result;
}
_serialize(item) {
if (!item) return null;
return this._db.storage.encrypt(this.key, JSON.stringify(item));
}
encrypt(array) {
if (!array.length) return [];
return Promise.all(array.map(this._map, this));
}
_collect(itemType, items, result, isForceSync) {
if (!items || !items.length) return;
for (const item of items) {
if (!item) continue;
const isSyncable = !item.synced || isForceSync;
const isUnsynced =
item.dateModified > this._lastSyncedTimestamp || isForceSync;
if (item.localOnly) {
result.items.push({
id: item.id,
deleted: true,
dateModified: item.dateModified,
deleteReason: "localOnly"
});
result.types.push(itemType);
} else if (isUnsynced && isSyncable) {
result.items.push(item);
result.types.push(itemType);
async *collect(chunkSize, lastSyncedTimestamp, isForceSync) {
const key = await this._db.user.getEncryptionKey();
for (const itemType in SYNC_COLLECTIONS_MAP) {
const collectionKey = SYNC_COLLECTIONS_MAP[itemType];
const collection = this._db[collectionKey]._collection;
for (const chunk of collection.iterateSync(chunkSize)) {
const items = await this.prepareChunk(
chunk,
lastSyncedTimestamp,
isForceSync,
key,
itemType
);
if (!items) continue;
yield items;
}
}
for (const itemType in ASYNC_COLLECTIONS_MAP) {
const collectionKey = ASYNC_COLLECTIONS_MAP[itemType];
const collection = this._db[collectionKey]._collection;
for await (const chunk of collection.iterate(chunkSize)) {
const items = await this.prepareChunk(
chunk.map((item) => item[1]),
lastSyncedTimestamp,
isForceSync,
key,
itemType
);
if (!items) continue;
yield items;
}
}
const items = await this.prepareChunk(
[this._db.settings.raw],
lastSyncedTimestamp,
isForceSync,
key,
"settings"
);
if (!items) return;
yield items;
}
// _map(item) {
// return {
// id: item.id,
// v: CURRENT_DATABASE_VERSION,
// iv: item.iv,
// cipher: item.cipher,
// length: item.length,
// alg: item.alg,
// dateModified: item.dateModified,
// };
// }
async prepareChunk(chunk, lastSyncedTimestamp, isForceSync, key, itemType) {
const { ids, items } = filterSyncableItems(
chunk,
lastSyncedTimestamp,
isForceSync
);
if (!ids.length) return;
const ciphers = await this._db.storage.encryptMulti(key, items);
return toPushItem(itemType, ids, ciphers);
}
}
export default Collector;
function toPushItem(type, ids, ciphers) {
const items = ciphers.map((cipher, index) => {
cipher.v = CURRENT_DATABASE_VERSION;
cipher.id = ids[index];
return cipher;
});
return {
items,
type
};
}
function filterSyncableItems(items, lastSyncedTimestamp, isForceSync) {
if (!items || !items.length) return { items: [], ids: [] };
const ids = [];
const syncableItems = [];
for (const item of items) {
if (!item) continue;
const isSyncable = !item.synced || isForceSync;
const isUnsynced = item.dateModified > lastSyncedTimestamp || isForceSync;
async _map(item) {
// in case of resolved content
delete item.resolved;
// synced is a local only property
delete item.synced;
return {
id: item.id,
v: CURRENT_DATABASE_VERSION,
...(await this._serialize(item))
};
if (item.localOnly) {
ids.push(item.id);
syncableItems.push(
JSON.stringify({
id: item.id,
deleted: true,
dateModified: item.dateModified,
deleteReason: "localOnly"
})
);
} else if (isUnsynced && isSyncable) {
ids.push(item.id);
syncableItems.push(JSON.stringify(item));
}
}
return { items: syncableItems, ids };
}
export default Collector;

View File

@@ -32,7 +32,6 @@ import * as signalr from "@microsoft/signalr";
import Merger from "./merger";
import Conflicts from "./conflicts";
import { AutoSync } from "./auto-sync";
import { toChunks } from "../../utils/array";
import { MessagePackHubProtocol } from "@microsoft/signalr-protocol-msgpack";
import { logger } from "../../logger";
import { Mutex } from "async-mutex";
@@ -42,21 +41,9 @@ import { migrateItem } from "../../migrations";
* @typedef {{
* items: any[],
* type: string,
* lastSynced: number,
* total: number
* }} SyncTransferItem
*/
/**
* @typedef {{
* items: string[],
* types: string[],
* lastSynced: number,
* current: number,
* total: number,
* }} BatchedSyncTransferItem
*/
export default class SyncManager {
/**
*
@@ -163,42 +150,20 @@ class Sync {
this.autoSync.stop();
});
this.connection.on("SyncItem", async (payload) => {
clearTimeout(remoteSyncTimeout);
remoteSyncTimeout = setTimeout(() => {
db.eventManager.publish(EVENTS.syncAborted);
}, 15000);
let count = 0;
this.connection.on("PushItems", async (chunk) => {
const key = await this.db.user.getEncryptionKey();
const item = JSON.parse(payload.item);
const decryptedItem = await this.db.storage.decrypt(key, item);
const dbLastSynced = await this.db.lastSynced();
await this.processChunk(chunk, key, dbLastSynced, true);
const deserialized = await deserializeItem(
decryptedItem,
item.v,
this.db
);
const mergedItem = await this.onSyncItem(deserialized, payload.itemType);
const collectionType = this.itemTypeToCollection[payload.itemType];
if (collectionType && this.db[collectionType])
await this.db[collectionType]._collection.addItem(mergedItem);
if (payload.itemType === "content" || payload.itemType === "note") {
this.db.eventManager.publish(EVENTS.syncItemMerged, mergedItem);
}
sendSyncProgressEvent(
this.db.eventManager,
"download",
payload.total,
payload.current
);
count += chunk.items.length;
sendSyncProgressEvent(this.db.eventManager, "download", count);
});
this.connection.on("RemoteSyncCompleted", (lastSynced) => {
this.connection.on("PushCompleted", (lastSynced) => {
count = 0;
clearTimeout(remoteSyncTimeout);
this.onRemoteSyncCompleted(lastSynced);
this.onPushCompleted(lastSynced);
});
}
@@ -226,17 +191,12 @@ class Sync {
const { lastSynced, oldLastSynced } = await this.init(force);
this.logger.info("Initialized sync", { lastSynced, oldLastSynced });
const { newLastSynced, data } = await this.collect(lastSynced, force);
this.logger.info("Data collected for sync", {
newLastSynced,
length: data.items.length,
isEmpty: data.items.length <= 0
});
const newLastSynced = Date.now();
const serverResponse = full ? await this.fetch(lastSynced) : null;
this.logger.info("Data fetched", serverResponse);
if (await this.send(data, newLastSynced)) {
if (await this.send(lastSynced, force, newLastSynced)) {
this.logger.info("New data sent");
await this.stop(newLastSynced);
} else if (serverResponse) {
@@ -278,49 +238,12 @@ class Sync {
const dbLastSynced = await this.db.lastSynced();
let count = 0;
this.connection.off("SyncItems");
this.connection.on("SyncItems", async (chunk) => {
const decrypted = await this.db.storage.decryptMulti(key, chunk.items);
const deserialized = await Promise.all(
decrypted.map((item, index) =>
deserializeItem(item, chunk.items[index].v, this.db)
)
);
let items = [];
if (this.merger.isSyncCollection(chunk.type)) {
items = deserialized.map((item) =>
this.merger.mergeItemSync(item, chunk.type, dbLastSynced)
);
} else if (chunk.type === "content") {
const localItems = await this.db.content.multi(
chunk.items.map((i) => i.id)
);
items = await Promise.all(
deserialized.map((item) =>
this.merger.mergeContent(item, localItems[item.id], dbLastSynced)
)
);
} else {
items = await Promise.all(
deserialized.map((item) =>
this.merger.mergeItem(item, chunk.type, dbLastSynced)
)
);
}
const collectionType = this.itemTypeToCollection[chunk.type];
if (collectionType && this.db[collectionType])
await this.db[collectionType]._collection.setItems(items);
this.connection.off("SendItems");
this.connection.on("SendItems", async (chunk) => {
await this.processChunk(chunk, key, dbLastSynced);
count += chunk.items.length;
sendSyncProgressEvent(
this.db.eventManager,
`download`,
chunk.total,
count
);
sendSyncProgressEvent(this.db.eventManager, `download`, count);
return true;
});
const serverResponse = await this.connection.invoke(
@@ -336,7 +259,7 @@ class Sync {
);
}
this.connection.off("SyncItems");
this.connection.off("SendItems");
if (await this.conflicts.check()) {
this.conflicts.throw();
@@ -345,57 +268,40 @@ class Sync {
return { lastSynced: serverResponse.lastSynced };
}
async collect(lastSynced, force) {
const newLastSynced = Date.now();
const data = await this.collector.collect(lastSynced, force);
return { newLastSynced, data };
}
/**
*
* @param {{ items: any[]; vaultKey: any; types: string[]; }} data
* @param {number} lastSynced
* @returns {Promise<boolean>}
*/
async send(data, lastSynced) {
async send(oldLastSynced, isForceSync, newLastSynced) {
await this.uploadAttachments();
if (data.types.length === 1 && data.types[0] === "vaultKey") return false;
if (data.items.length <= 0) return false;
let total = data.items.length;
const types = toChunks(data.types, 30);
const items = toChunks(data.items, 30);
let isSyncInitialized = false;
let done = 0;
for (let i = 0; i < items.length; ++i) {
this.logger.info(`Sending batch ${done}/${total}`);
const encryptedItems = (await this.collector.encrypt(items[i])).map(
(item) => JSON.stringify(item)
);
const result = await this.sendBatchToServer({
lastSynced,
current: i,
total,
items: encryptedItems,
types: types[i]
});
for await (const item of this.collector.collect(
100,
oldLastSynced,
isForceSync
)) {
if (!isSyncInitialized) {
const vaultKey = await this.db.vault._getKey();
newLastSynced = await this.connection.invoke("InitializePush", {
vaultKey,
lastSynced: newLastSynced
});
isSyncInitialized = true;
}
const result = await this.pushItem(item, newLastSynced);
if (result) {
done += encryptedItems.length;
sendSyncProgressEvent(this.db.eventManager, "upload", total, done);
done += item.items.length;
sendSyncProgressEvent(this.db.eventManager, "upload", done);
this.logger.info(`Batch sent (${done}/${total})`);
this.logger.info(`Batch sent (${done})`);
} else {
this.logger.error(
new Error(`Failed to send batch. Server returned falsy response.`)
);
}
}
return await this.connection.invoke("SyncCompleted", lastSynced);
if (!isSyncInitialized) return;
await this.connection.invoke("SyncCompleted", newLastSynced);
return true;
}
async stop(lastSynced) {
@@ -440,7 +346,7 @@ class Sync {
/**
* @private
*/
async onRemoteSyncCompleted(lastSynced) {
async onPushCompleted(lastSynced) {
// refresh monographs on sync completed
await this.db.monographs.init();
// refresh topic references
@@ -449,32 +355,62 @@ class Sync {
await this.start(false, false, lastSynced);
}
/**
* @private
*/
async onSyncItem(item, type, lastSynced) {
if (this.merger.isSyncCollection(type)) {
return this.merger.mergeItemSync(item, type, lastSynced);
} else if (type === "content") {
const localItem = await this.db.content.raw(item.id);
return await this.merger.mergeContent(item, localItem, lastSynced);
async processChunk(chunk, key, dbLastSynced, notify = false) {
const decrypted = await this.db.storage.decryptMulti(key, chunk.items);
const deserialized = await Promise.all(
decrypted.map((item, index) =>
deserializeItem(item, chunk.items[index].v, this.db)
)
);
let items = [];
if (this.merger.isSyncCollection(chunk.type)) {
items = deserialized.map((item) =>
this.merger.mergeItemSync(item, chunk.type, dbLastSynced)
);
} else if (chunk.type === "content") {
const localItems = await this.db.content.multi(
chunk.items.map((i) => i.id)
);
items = await Promise.all(
deserialized.map((item) =>
this.merger.mergeContent(item, localItems[item.id], dbLastSynced)
)
);
} else {
return await this.merger.mergeItem(item, type, lastSynced);
items = await Promise.all(
deserialized.map((item) =>
this.merger.mergeItem(item, chunk.type, dbLastSynced)
)
);
}
if (
notify &&
(chunk.type === "content" || chunk.type === "note") &&
items.length > 0
) {
items.forEach((item) =>
this.db.eventManager.publish(EVENTS.syncItemMerged, item)
);
}
const collectionType = this.itemTypeToCollection[chunk.type];
if (collectionType && this.db[collectionType])
await this.db[collectionType]._collection.setItems(items);
}
/**
*
* @param {BatchedSyncTransferItem} batch
* @param {SyncTransferItem} item
* @returns {Promise<boolean>}
* @private
*/
async sendBatchToServer(batch) {
if (!batch) return false;
async pushItem(item, newLastSynced) {
await this.checkConnection();
const result = await this.connection.invoke("SyncItem", batch);
return result === 1;
await this.connection.send("PushItems", item, newLastSynced);
return true; // () === 1;
}
async checkConnection() {

View File

@@ -45,11 +45,10 @@ export function sendAttachmentsProgressEvent(type, groupId, total, current) {
});
}
export function sendSyncProgressEvent(EV, type, total, current) {
export function sendSyncProgressEvent(EV, type, current) {
EV.publish(EVENTS.syncProgress, {
type,
total,
current: current === undefined ? total : current
current
});
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
import IndexedCollection from "./indexed-collection";
import MapStub from "../utils/map";
import { toChunks } from "../utils/array";
export default class CachedCollection extends IndexedCollection {
constructor(context, type, eventManager) {
@@ -113,6 +114,13 @@ export default class CachedCollection extends IndexedCollection {
this.invalidateCache();
}
*iterateSync(chunkSize) {
const chunks = toChunks(Array.from(this.map.values()), chunkSize);
for (const chunk of chunks) {
yield chunk;
}
}
invalidateCache() {
this.items = undefined;
}

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { EVENTS } from "../common";
import { toChunks } from "../utils/array";
import Indexer from "./indexer";
export default class IndexedCollection {
@@ -117,4 +118,11 @@ export default class IndexedCollection {
this.encryptionKey = await this.encryptionKeyFactory();
return this.encryptionKey;
}
async *iterate(chunkSize) {
const chunks = toChunks(this.indexer.indices, chunkSize);
for (const chunk of chunks) {
yield await this.indexer.readMulti(chunk);
}
}
}

View File

@@ -56,6 +56,10 @@ export default class Storage {
return this.storage.encrypt(password, data);
}
encryptMulti(password, data) {
return this.storage.encryptMulti(password, data);
}
decrypt(password, cipher) {
return this.storage.decrypt(password, cipher);
}