web: add support for multiple browser tabs & windows

This commit is contained in:
Abdullah Atta
2024-03-28 09:50:32 +05:00
parent ec00779062
commit 751af3c2d1
6 changed files with 493 additions and 24 deletions

View File

@@ -69,7 +69,8 @@ async function initializeDatabase(persistence: DatabasePersistence) {
synchronous: "normal",
pageSize: 8192,
cacheSize: -32000,
password: Buffer.from(databaseKey).toString("hex")
password: Buffer.from(databaseKey).toString("hex"),
skipInitialization: !IS_DESKTOP_APP
},
storage: storage,
eventsource: EventSource,

View File

@@ -32,12 +32,16 @@ declare module "kysely" {
}
}
export const createDialect = (name: string): Dialect => {
export const createDialect = (
name: string,
init?: () => Promise<void>
): Dialect => {
return {
createDriver: () =>
new WaSqliteWorkerDriver({
async: !isFeatureSupported("opfs"),
dbName: name
dbName: name,
init
}),
createAdapter: () => new SqliteAdapter(),
createIntrospector: (db) => new SqliteIntrospector(db),

View File

@@ -0,0 +1,376 @@
/*
This file is part of the Notesnook project (https://notesnook.com/)
Copyright (C) 2023 Streetwriters (Private) Limited
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import SharedWorker from "./shared-service.worker.ts?sharedworker";
const PROVIDER_REQUEST_TIMEOUT = 1000;
const sharedWorker = globalThis.SharedWorker ? new SharedWorker() : null;
export class SharedService<T extends object> extends EventTarget {
#clientId: Promise<string>;
// This BroadcastChannel is used for client messaging. The provider
// must have a separate BroadcastChannel in case the instance is
// both client and provider.
#clientChannel = new BroadcastChannel("SharedService");
#onDeactivate?: AbortController | null = null;
#onClose: AbortController = new AbortController();
// This is client state to track the provider. The provider state is
// mostly managed within activate().
#providerPort: Promise<MessagePort | null>;
providerCallbacks: Map<
string,
{ resolve: (result: unknown) => void; reject: (reason: unknown) => void }
> = new Map();
#providerCounter = 0;
#providerChangeCleanup: (() => void)[] = [];
proxy: T;
constructor(private readonly serviceName: string) {
super();
this.#clientId = this.#getClientId();
// Connect to the current provider and future providers.
this.#providerPort = this.#providerChange();
this.#clientChannel.addEventListener(
"message",
async ({ data }) => {
if (
data?.type === "provider" &&
data?.sharedService === this.serviceName
) {
// A context (possibly this one) announced itself as the new provider.
// Discard any old provider and connect to the new one.
this.#closeProviderPort(this.#providerPort);
this.#providerPort = this.#providerChange();
}
},
{ signal: this.#onClose.signal }
);
this.proxy = this.#createProxy();
}
activate(portProviderFunc: () => MessagePort | Promise<MessagePort>) {
if (this.#onDeactivate) return;
// When acquire a lock on the service name then we become the service
// provider. Only one instance at a time will get the lock; the rest
// will wait their turn.
this.#onDeactivate = new AbortController();
const LOCK_NAME = `SharedService-${this.serviceName}`;
navigator.locks.request(
LOCK_NAME,
{ signal: this.#onDeactivate.signal },
async () => {
// Get the port to request client ports.
const port = await portProviderFunc();
port.start();
// Listen for client requests. A separate BroadcastChannel
// instance is necessary because we may be serving our own
// request.
const providerId = await this.#clientId;
const broadcastChannel = new BroadcastChannel("SharedService");
broadcastChannel.addEventListener(
"message",
async ({ data }) => {
if (
data?.type === "request" &&
data?.sharedService === this.serviceName
) {
// Get a port to send to the client.
const requestedPort = await new Promise<MessagePort>(
(resolve) => {
port.addEventListener(
"message",
(event) => resolve(event.ports[0]),
{ once: true }
);
port.postMessage(data.clientId);
}
);
this.#sendPortToClient(data, requestedPort);
}
},
{ signal: this.#onDeactivate?.signal }
);
// Tell everyone that we are the new provider.
broadcastChannel.postMessage({
type: "provider",
sharedService: this.serviceName,
providerId
});
// Release the lock only on user abort or context destruction.
return new Promise((_, reject) => {
this.#onDeactivate?.signal.addEventListener("abort", () => {
broadcastChannel.close();
reject(this.#onDeactivate?.signal.reason);
});
});
}
);
}
deactivate() {
this.#onDeactivate?.abort();
this.#onDeactivate = null;
}
close() {
this.deactivate();
this.#onClose.abort();
for (const { reject } of this.providerCallbacks.values()) {
reject(new Error("SharedService closed"));
}
}
async #sendPortToClient(message: any, port: MessagePort) {
sharedWorker?.port.postMessage(message, [port]);
}
async #getClientId() {
// Use a Web Lock to determine our clientId.
const nonce = Math.random().toString();
const clientId = await navigator.locks.request(nonce, async () => {
const { held } = await navigator.locks.query();
return held?.find((lock) => lock.name === nonce)?.clientId;
});
// Acquire a Web Lock named after the clientId. This lets other contexts
// track this context's lifetime.
// TODO: It would be better to lock on the clientId+serviceName (passing
// that lock name in the service request). That would allow independent
// instance lifetime tracking.
await SharedService.#acquireContextLock(clientId);
// Configure message forwarding via the SharedWorker. This must be
// done after acquiring the clientId lock to avoid a race condition
// in the SharedWorker.
sharedWorker?.port.addEventListener("message", (event) => {
event.data.ports = event.ports;
this.dispatchEvent(new MessageEvent("message", { data: event.data }));
});
sharedWorker?.port.start();
sharedWorker?.port.postMessage({ clientId });
return clientId;
}
async #providerChange() {
// Multiple calls to this function could be in flight at once. If that
// happens, we only care about the most recent call, i.e. the one
// assigned to this.#providerPort. This counter lets us determine
// whether this call is still the most recent.
const providerCounter = ++this.#providerCounter;
// Obtain a MessagePort from the provider. The request can fail during
// a provider transition, so retry until successful.
let providerPort: MessagePort | null = null;
const clientId = await this.#clientId;
while (!providerPort && providerCounter === this.#providerCounter) {
// Broadcast a request for the port.
const nonce = randomString();
this.#clientChannel.postMessage({
type: "request",
nonce,
sharedService: this.serviceName,
clientId
});
// Wait for the provider to respond (via the service worker) or
// timeout. A timeout can occur if there is no provider to receive
// the broadcast or if the provider is too busy.
const providerPortReady = new Promise<MessagePort>((resolve) => {
const abortController = new AbortController();
this.addEventListener(
"message",
(event) => {
if (event instanceof MessageEvent && event.data?.nonce === nonce) {
resolve(event.data.ports[0]);
abortController.abort();
}
},
{ signal: abortController.signal }
);
this.#providerChangeCleanup.push(() => abortController.abort());
});
let timeout = 0;
providerPort = await Promise.race([
providerPortReady,
new Promise<null>(
(resolve) =>
(timeout = setTimeout(() => {
console.error("Provider request timed out", nonce);
resolve(null);
}, PROVIDER_REQUEST_TIMEOUT) as unknown as number)
)
]);
clearTimeout(timeout);
if (!providerPort) {
// The provider request timed out. If it does eventually arrive
// just close it.
providerPortReady.then((port) => {
console.warn("port arrived but timed out. Closing", port);
port?.close();
});
}
}
if (providerPort && providerCounter === this.#providerCounter) {
// Clean up all earlier attempts to get the provider port.
this.#providerChangeCleanup.forEach((f) => f());
this.#providerChangeCleanup = [];
// Configure the port.
providerPort.addEventListener("message", ({ data }) => {
const callbacks = this.providerCallbacks.get(data.nonce);
if (!callbacks) return;
if (!data.error) {
callbacks.resolve(data.result);
} else {
callbacks.reject(Object.assign(new Error(), data.error));
}
});
providerPort.addEventListener("messageerror", console.error);
providerPort.start();
return providerPort;
} else {
// Either there is no port because this request timed out, or there
// is a port but it is already obsolete because a new provider has
// announced itself.
providerPort?.close();
return null;
}
}
#closeProviderPort(providerPort: Promise<MessagePort | null>) {
providerPort.then((port) => port?.close());
for (const { reject } of this.providerCallbacks.values()) {
reject(new Error("SharedService provider change"));
}
}
#createProxy() {
return new Proxy<T>({} as T, {
get: (_, method) => {
return async (...args: any[]) => {
// Use a nonce to match up requests and responses. This allows
// the responses to be out of order.
const nonce = randomString();
const providerPort = await this.getProviderPort();
return new Promise((resolve, reject) => {
this.providerCallbacks.set(nonce, { resolve, reject });
providerPort.postMessage({ nonce, method, args });
}).finally(() => {
this.providerCallbacks.delete(nonce);
});
};
}
});
}
static #acquireContextLock = (function () {
let p: Promise<void> | undefined = undefined;
return function (clientId: string) {
return p
? p
: (p = new Promise<void>((resolve) => {
navigator.locks.request(
clientId,
() =>
new Promise((_) => {
resolve();
})
);
}));
};
})();
async getProviderPort() {
let tries = 0;
let providerPort = await this.#providerPort;
while (!providerPort) {
if (++tries > 10)
throw new Error("Could not find a provider port to communicate with.");
providerPort = await this.#providerPort;
console.warn("Provider port not found. Retrying in 500ms...");
await new Promise((resolve) => setTimeout(resolve, 500));
}
return providerPort;
}
}
/**
* Wrap a target with MessagePort for proxying.
*/
export function createSharedServicePort(target: any) {
const { port1: providerPort1, port2: providerPort2 } = new MessageChannel();
providerPort1.addEventListener("message", ({ data: clientId }) => {
const { port1, port2 } = new MessageChannel();
// The port requester holds a lock while using the channel. When the
// lock is released by the requester, clean up the port on this side.
navigator.locks.request(clientId, () => {
port1.close();
});
port1.addEventListener("message", async ({ data }) => {
try {
port1.postMessage({
nonce: data.nonce,
result: await target[data.method](...data.args)
});
} catch (e) {
// Error is not structured cloneable so copy into POJO.
const error =
e instanceof Error
? Object.fromEntries(
Object.getOwnPropertyNames(e).map((k) => [k, (e as any)[k]])
)
: e;
port1.postMessage({
nonce: data.nonce,
error
});
}
});
port1.start();
providerPort1.postMessage(null, [port2]);
});
providerPort1.start();
return providerPort2;
}
function randomString() {
return Math.random().toString(36).replace("0.", "");
}

View File

@@ -0,0 +1,52 @@
/*
This file is part of the Notesnook project (https://notesnook.com/)
Copyright (C) 2023 Streetwriters (Private) Limited
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* eslint-disable no-var */
/// <reference lib="webworker" />
export default null;
declare var self: SharedWorkerGlobalScope & typeof globalThis;
const mapClientIdToPort: Map<string, MessagePort> = new Map();
self.addEventListener("connect", (event) => {
// The first message from a client associates the clientId with the port.
const workerPort = event.ports[0];
workerPort.addEventListener(
"message",
(event) => {
mapClientIdToPort.set(event.data.clientId, workerPort);
// Remove the entry when the client goes away, which we detect when
// the lock on its name becomes available.
navigator.locks.request(event.data.clientId, { mode: "shared" }, () => {
mapClientIdToPort.get(event.data.clientId)?.close();
mapClientIdToPort.delete(event.data.clientId);
});
// Subsequent messages will be forwarded.
workerPort.addEventListener("message", (event) => {
const port = mapClientIdToPort.get(event.data.clientId);
port?.postMessage(event.data, [...event.ports]);
});
},
{ once: true }
);
workerPort.start();
});

View File

@@ -23,10 +23,11 @@ import SQLiteAsyncESMFactory from "./wa-sqlite-async";
import SQLiteSyncESMFactory from "./wa-sqlite";
import { IDBBatchAtomicVFS } from "./IDBBatchAtomicVFS";
import { AccessHandlePoolVFS } from "./AccessHandlePoolVFS";
import { expose, transfer } from "comlink";
import { transfer } from "comlink";
import type { RunMode } from "./type";
import { QueryResult } from "kysely";
import { DatabaseSource } from "./sqlite-export";
import { createSharedServicePort } from "./shared-service";
type PreparedStatement = {
stmt: number;
@@ -39,6 +40,8 @@ let vfs: IDBBatchAtomicVFS | AccessHandlePoolVFS | null = null;
const preparedStatements: Map<string, PreparedStatement> = new Map();
async function init(dbName: string, async: boolean, url?: string) {
if (db) return;
const option = url ? { locateFile: () => url } : {};
const SQLiteAsyncModule = async
? await SQLiteAsyncESMFactory(option)
@@ -162,4 +165,9 @@ const worker = {
};
export type SQLiteWorker = typeof worker;
expose(worker);
addEventListener("message", async (event) => {
await worker.init(event.data.dbName, event.data.async, event.data.uri);
const providerPort = createSharedServicePort(worker);
postMessage(null, [providerPort]);
});

View File

@@ -23,32 +23,63 @@ import Worker from "./sqlite.worker.ts?worker";
import type { SQLiteWorker } from "./sqlite.worker";
import SQLiteSyncURI from "./wa-sqlite.wasm?url";
import SQLiteAsyncURI from "./wa-sqlite-async.wasm?url";
import { wrap } from "comlink";
import { Mutex } from "async-mutex";
import { SharedService } from "./shared-service";
type Config = { dbName: string; async: boolean };
type Config = { dbName: string; async: boolean; init?: () => Promise<void> };
const SHARED_SERVICE_NAME = "notesnook-sqlite";
export class WaSqliteWorkerDriver implements Driver {
private connection?: DatabaseConnection;
private connectionMutex = new ConnectionMutex();
private worker: SQLiteWorker;
constructor(private readonly config: Config) {
this.worker = wrap<SQLiteWorker>(new Worker()) as SQLiteWorker;
}
private worker?: SQLiteWorker;
constructor(private readonly config: Config) {}
async init(): Promise<void> {
await this.worker.init(
this.config.dbName,
this.config.async,
this.config.async ? SQLiteAsyncURI : SQLiteSyncURI
const sharedService = new SharedService<SQLiteWorker>(SHARED_SERVICE_NAME);
sharedService.activate(
() =>
new Promise<MessagePort>((resolve) => {
this.needsInitialization = true;
const baseWorker = new Worker();
baseWorker.addEventListener(
"message",
(event) => resolve(event.ports[0]),
{ once: true }
);
baseWorker.postMessage({
dbName: this.config.dbName,
async: this.config.async,
uri: this.config.async ? SQLiteAsyncURI : SQLiteSyncURI
});
})
);
this.connection = new WaSqliteWorkerConnection(this.worker);
console.log("waiting to initialize");
// we have to wait until a provider becomes available, otherwise
// a race condition is created where the client starts executing
// queries before it is initialized.
await sharedService.getProviderPort();
// await this.config.onCreateConnection?.(this.connection);
this.worker = sharedService.proxy;
this.connection = new WaSqliteWorkerConnection(this.worker);
}
private needsInitialization = false;
async #initialize() {
if (this.needsInitialization) {
this.needsInitialization = false;
try {
await this.config.init?.();
} catch (e) {
this.needsInitialization = true;
throw e;
}
}
}
async acquireConnection(): Promise<DatabaseConnection> {
await this.#initialize();
// SQLite only has one single connection. We use a mutex here to wait
// until the single connection has been released.
await this.connectionMutex.lock();
@@ -72,18 +103,15 @@ export class WaSqliteWorkerDriver implements Driver {
}
async destroy(): Promise<void> {
if (!this.worker) {
return;
}
return await this.worker.close();
return await this.worker?.close();
}
async delete() {
return this.worker.delete();
return this.worker?.delete();
}
async export() {
return this.worker.export(this.config.dbName, this.config.async);
return this.worker?.export(this.config.dbName, this.config.async);
}
}