sync: prepare -> collector, other optimizations

This commit is contained in:
thecodrr
2020-04-16 03:04:44 +05:00
parent 1828728f77
commit 8ad2bc5457
9 changed files with 119 additions and 110 deletions

View File

@@ -1,16 +0,0 @@
class Conflicts {
/**
*
* @param {import('./index').default} db
*/
constructor(db) {
this._db = db;
}
async recalculate() {
if (this._db.notes.conflicted.length <= 0) {
await this._db.context.write("hasConflicts", false);
}
}
}
export default Conflicts;

View File

@@ -8,7 +8,7 @@ import Sync from "./sync";
import Vault from "./vault"; import Vault from "./vault";
import Lookup from "./lookup"; import Lookup from "./lookup";
import Content from "../collections/content"; import Content from "../collections/content";
import Conflicts from "./conflicts"; import Conflicts from "./sync/conflicts";
class Database { class Database {
constructor(context) { constructor(context) {

View File

@@ -1,5 +1,5 @@
import StorageInterface from "../../../__mocks__/storage.mock"; import StorageInterface from "../../../__mocks__/storage.mock";
import Prepare from "../prepare"; import Collector from "../collector";
import { enableFetchMocks, disableFetchMocks } from "jest-fetch-mock"; import { enableFetchMocks, disableFetchMocks } from "jest-fetch-mock";
import { import {
noteTest, noteTest,
@@ -73,8 +73,8 @@ describe.each(tests)("%s preparation", (collection, add, addExtra) => {
.fill(0) .fill(0)
.map(() => add(db, collection)) .map(() => add(db, collection))
); );
const prepare = new Prepare(db); const collector = new Collector(db);
const data = await prepare.get(0); const data = await collector.collect(0);
expect(data[collection].length).toBe(MAX_ITEMS); expect(data[collection].length).toBe(MAX_ITEMS);
expect( expect(
data[collection].every((item) => !!item.iv && !!item.cipher) data[collection].every((item) => !!item.iv && !!item.cipher)
@@ -91,8 +91,8 @@ describe.each(tests)("%s preparation", (collection, add, addExtra) => {
.map(() => add(db, collection)) .map(() => add(db, collection))
); );
await addExtra(db, collection); await addExtra(db, collection);
const prepare = new Prepare(db); const collector = new Collector(db);
const data = await prepare.get(10); const data = await collector.collect(10);
expect(data[collection].length).toBe(MAX_ITEMS); expect(data[collection].length).toBe(MAX_ITEMS);
expect( expect(
data[collection].every((item) => !!item.iv && !!item.cipher) data[collection].every((item) => !!item.iv && !!item.cipher)

View File

@@ -56,13 +56,16 @@ describe.each(tests)(
test(`merge ${collection} into empty database`, () => test(`merge ${collection} into empty database`, () =>
databaseTest().then(async (db) => { databaseTest().then(async (db) => {
await login(db); await login(db);
const merger = new Merger(db, 0); const merger = new Merger(db);
const result = await merger.merge({ const result = await merger.merge(
{
[collection]: [ [collection]: [
{ id: testItem.id, ...(await getEncrypted(testItem)) }, { id: testItem.id, ...(await getEncrypted(testItem)) },
], ],
synced: false, synced: false,
}); },
0
);
expect(result).toBe(true); expect(result).toBe(true);
expect(db[collection].all[0].id).toStrictEqual(testItem.id); expect(db[collection].all[0].id).toStrictEqual(testItem.id);
expect(db[collection].all[0].dateEdited).toStrictEqual( expect(db[collection].all[0].dateEdited).toStrictEqual(
@@ -73,13 +76,16 @@ describe.each(tests)(
test(`merge local and remote ${collection}`, () => test(`merge local and remote ${collection}`, () =>
databaseTest().then(async (db) => { databaseTest().then(async (db) => {
await login(db); await login(db);
const merger = new Merger(db, 0); const merger = new Merger(db);
const item = await add(db); const item = await add(db);
item.title = "Google"; item.title = "Google";
const result = await merger.merge({ const result = await merger.merge(
{
[collection]: [{ id: item.id, ...(await getEncrypted(item)) }], [collection]: [{ id: item.id, ...(await getEncrypted(item)) }],
synced: false, synced: false,
}); },
0
);
expect(result).toBe(true); expect(result).toBe(true);
expect(db[collection].all.length).toBe(1); expect(db[collection].all.length).toBe(1);
expect(db[collection].all[0]).toStrictEqual(item); expect(db[collection].all[0]).toStrictEqual(item);
@@ -88,14 +94,17 @@ describe.each(tests)(
test(`local ${collection} are more updated than remote ones`, () => test(`local ${collection} are more updated than remote ones`, () =>
databaseTest().then(async (db) => { databaseTest().then(async (db) => {
await login(db); await login(db);
const merger = new Merger(db, 0); const merger = new Merger(db);
const item = await add(db); const item = await add(db);
await edit(db, item); await edit(db, item);
item.title = "Google"; item.title = "Google";
const result = await merger.merge({ const result = await merger.merge(
{
[collection]: [{ id: item.id, ...(await getEncrypted(item)) }], [collection]: [{ id: item.id, ...(await getEncrypted(item)) }],
synced: false, synced: false,
}); },
0
);
expect(result).toBe(true); expect(result).toBe(true);
expect(db[collection].all.length).toBe(1); expect(db[collection].all.length).toBe(1);
expect(db[collection].all[0]).toStrictEqual(get(db, item)); expect(db[collection].all[0]).toStrictEqual(get(db, item));
@@ -111,8 +120,9 @@ test("local delta updated after lastSyncedTimestamp should cause merge conflict"
delta: { ops: [{ insert: "my name is abdullah" }] }, delta: { ops: [{ insert: "my name is abdullah" }] },
}; };
const deltaId = db.notes.note(id).data.content.delta; const deltaId = db.notes.note(id).data.content.delta;
const merger = new Merger(db, 200); const merger = new Merger(db);
const result = await merger.merge({ const result = await merger.merge(
{
delta: [ delta: [
{ {
id: deltaId, id: deltaId,
@@ -126,7 +136,9 @@ test("local delta updated after lastSyncedTimestamp should cause merge conflict"
})), })),
}, },
], ],
}); },
200
);
const localDelta = await db.delta.raw(deltaId); const localDelta = await db.delta.raw(deltaId);
expect(localDelta.conflicted.id).toBe(deltaId); expect(localDelta.conflicted.id).toBe(deltaId);
expect(localDelta.conflicted.noteId).toBe(id); expect(localDelta.conflicted.noteId).toBe(id);

View File

@@ -2,7 +2,7 @@
import { enableFetchMocks, disableFetchMocks } from "jest-fetch-mock"; import { enableFetchMocks, disableFetchMocks } from "jest-fetch-mock";
import StorageInterface from "../../../__mocks__/storage.mock"; import StorageInterface from "../../../__mocks__/storage.mock";
//import Sync from "../sync"; //import Sync from "../sync";
//import Prepare from "../prepare"; //import Collector from "../prepare";
import { databaseTest, TEST_NOTE } from "../../../__tests__/utils"; import { databaseTest, TEST_NOTE } from "../../../__tests__/utils";
import { login, getEncrypted } from "./utils"; import { login, getEncrypted } from "./utils";

View File

@@ -4,7 +4,7 @@ if (!tfun) {
tfun = global.tfun; tfun = global.tfun;
} }
class Prepare { class Collector {
/** /**
* *
* @param {Database} db * @param {Database} db
@@ -13,17 +13,17 @@ class Prepare {
this._db = db; this._db = db;
} }
async get(lastSyncedTimestamp) { async collect(lastSyncedTimestamp) {
this._lastSyncedTimestamp = lastSyncedTimestamp; this._lastSyncedTimestamp = lastSyncedTimestamp;
this.key = await this._db.user.key(); this.key = await this._db.user.key();
return { return {
notes: await this._prepareForServer(this._db.notes.raw), notes: await this._collect(this._db.notes.raw),
notebooks: await this._prepareForServer(this._db.notebooks.raw), notebooks: await this._collect(this._db.notebooks.raw),
delta: await this._prepareForServer(await this._db.delta.all()), delta: await this._collect(await this._db.delta.all()),
text: await this._prepareForServer(await this._db.text.all()), text: await this._collect(await this._db.text.all()),
tags: await this._prepareForServer(this._db.tags.raw), tags: await this._collect(this._db.tags.raw),
colors: await this._prepareForServer(this._db.colors.raw), colors: await this._collect(this._db.colors.raw),
trash: await this._prepareForServer(this._db.trash.raw), trash: await this._collect(this._db.trash.raw),
vaultKey: await this._serialize(await this._db.vault._getKey()), vaultKey: await this._serialize(await this._db.vault._getKey()),
}; };
} }
@@ -33,7 +33,7 @@ class Prepare {
return this._db.context.encrypt(this.key, JSON.stringify(item)); return this._db.context.encrypt(this.key, JSON.stringify(item));
} }
_prepareForServer(array) { _collect(array) {
return Promise.all( return Promise.all(
tfun tfun
.filter((item) => item.dateEdited > this._lastSyncedTimestamp) .filter((item) => item.dateEdited > this._lastSyncedTimestamp)
@@ -49,4 +49,4 @@ class Prepare {
); );
} }
} }
export default Prepare; export default Collector;

View File

@@ -0,0 +1,27 @@
class Conflicts {
/**
*
* @param {import('../index').default} db
*/
constructor(db) {
this._db = db;
}
async recalculate() {
if (this._db.notes.conflicted.length <= 0) {
await this._db.context.write("hasConflicts", false);
}
}
async check() {
let hasConflicts = await this._db.context.read("hasConflicts");
if (hasConflicts) {
const mergeConflictError = new Error(
"Merge conflicts detected. Please resolve all conflicts to continue syncing."
);
mergeConflictError.code = "MERGE_CONFLICT";
throw mergeConflictError;
}
}
}
export default Conflicts;

View File

@@ -1,6 +1,6 @@
/** /**
* GENERAL PROCESS: * GENERAL PROCESS:
* make a get request to server with current lastSyncedTimestamp * make a get request to server with current lastSynced
* parse the response. the response should contain everything that user has on the server * parse the response. the response should contain everything that user has on the server
* decrypt the response * decrypt the response
* merge everything into the database and look for conflicts * merge everything into the database and look for conflicts
@@ -10,12 +10,12 @@
/** /**
* MERGING: * MERGING:
* Locally, get everything that was editted/added after the lastSyncedTimestamp * Locally, get everything that was editted/added after the lastSynced
* Run forEach loop on the server response. * Run forEach loop on the server response.
* Add items that do not exist in the local collections * Add items that do not exist in the local collections
* Remove items (without asking) that need to be removed * Remove items (without asking) that need to be removed
* Update items that were editted before the lastSyncedTimestamp * Update items that were editted before the lastSynced
* Try to merge items that were edited after the lastSyncedTimestamp * Try to merge items that were edited after the lastSynced
* Items in which the content has changed, send them for conflict resolution * Items in which the content has changed, send them for conflict resolution
* Otherwise, keep the most recently updated copy. * Otherwise, keep the most recently updated copy.
*/ */
@@ -25,9 +25,8 @@
* Syncing should pause until all the conflicts have been resolved * Syncing should pause until all the conflicts have been resolved
* And then it should continue. * And then it should continue.
*/ */
import Database from "../index";
import { HOST, HEADERS } from "../../utils/constants"; import { HOST, HEADERS } from "../../utils/constants";
import Prepare from "./prepare"; import Collector from "./collector";
import Merger from "./merger"; import Merger from "./merger";
import { areAllEmpty } from "./utils"; import { areAllEmpty } from "./utils";
var tfun = require("transfun/transfun.js").tfun; var tfun = require("transfun/transfun.js").tfun;
@@ -38,63 +37,50 @@ if (!tfun) {
export default class Sync { export default class Sync {
/** /**
* *
* @param {Database} db * @param {import("../index").default} db
*/ */
constructor(db) { constructor(db) {
this.db = db; this._db = db;
this._collector = new Collector(this._db);
this._merger = new Merger(this._db);
} }
async _fetch(lastSyncedTimestamp) { async _fetch(lastSynced, token) {
let token = await this.db.user.token(); let response = await fetch(`${HOST}sync?lst=${lastSynced}`, {
if (!token) throw new Error("You are not logged in");
let response = await fetch(`${HOST}sync?lst=${lastSyncedTimestamp}`, {
headers: { ...HEADERS, Authorization: `Bearer ${token}` }, headers: { ...HEADERS, Authorization: `Bearer ${token}` },
}); });
//TODO decrypt the response.
return await response.json(); return await response.json();
} }
async throwOnConflicts() {
let hasConflicts = await this.db.context.read("hasConflicts");
if (hasConflicts) {
const mergeConflictError = new Error(
"Merge conflicts detected. Please resolve all conflicts to continue syncing."
);
mergeConflictError.code = "MERGE_CONFLICT";
throw mergeConflictError;
}
}
async start() { async start() {
let user = await this.db.user.get(); let user = await this._db.user.get();
if (!user) throw new Error("You need to login to sync."); let token = await this._db.user.token();
if (!user || !token) throw new Error("You need to login to sync.");
await this.db.conflicts.recalculate(); // update the conflicts status and if find any, throw
await this.throwOnConflicts(); await this._db.conflicts.recalculate();
await this._db.conflicts.check();
let lastSyncedTimestamp = user.lastSynced || 0; let lastSynced = user.lastSynced || 0;
let serverResponse = await this._fetch(lastSyncedTimestamp); let serverResponse = await this._fetch(lastSynced, token);
// we prepare local data before merging so we always have correct data // we prepare local data before merging so we always have correct data
const prepare = new Prepare(this.db, user); const data = await this._collector.collect(lastSynced);
const data = await prepare.get(lastSyncedTimestamp);
// merge the server response // merge the server response
const merger = new Merger(this.db, lastSyncedTimestamp); await this._merger.merge(serverResponse, lastSynced);
await merger.merge(serverResponse);
await this.throwOnConflicts(); // check for conflicts and throw
await this._db.conflicts.check();
// send the data back to server // send the data back to server
const lastSynced = await this._send(data); lastSynced = await this._send(data, token);
// update our lastSynced time // update our lastSynced time
if (lastSynced) await this.db.user.set({ lastSynced }); if (lastSynced) await this._db.user.set({ lastSynced });
} }
async _send(data) { async _send(data, token) {
//TODO encrypt the payload
let token = await this.db.user.token();
if (!token) return;
let response = await fetch(`${HOST}sync`, { let response = await fetch(`${HOST}sync`, {
method: "POST", method: "POST",
headers: { ...HEADERS, Authorization: `Bearer ${token}` }, headers: { ...HEADERS, Authorization: `Bearer ${token}` },

View File

@@ -6,9 +6,8 @@ class Merger {
* *
* @param {Database} db * @param {Database} db
*/ */
constructor(db, lastSynced) { constructor(db) {
this._db = db; this._db = db;
this._lastSynced = lastSynced;
} }
async _deserialize(item) { async _deserialize(item) {
@@ -59,8 +58,9 @@ class Merger {
); );
} }
async merge(serverResponse) { async merge(serverResponse, lastSynced) {
if (!serverResponse) return false; if (!serverResponse) return false;
this._lastSynced = lastSynced;
const { const {
notes, notes,
synced, synced,