diff --git a/apps/web/src/common/db.ts b/apps/web/src/common/db.ts index d47566161..c66a676fe 100644 --- a/apps/web/src/common/db.ts +++ b/apps/web/src/common/db.ts @@ -71,7 +71,7 @@ async function initializeDatabase(persistence: DatabasePersistence) { pageSize: 8192, cacheSize: -32000, password: Buffer.from(databaseKey).toString("hex"), - skipInitialization: !IS_DESKTOP_APP + skipInitialization: !IS_DESKTOP_APP && !!globalThis.SharedWorker }, storage: storage, eventsource: EventSource, diff --git a/apps/web/src/common/sqlite/index.ts b/apps/web/src/common/sqlite/index.ts index d61a31b7a..56dc905dd 100644 --- a/apps/web/src/common/sqlite/index.ts +++ b/apps/web/src/common/sqlite/index.ts @@ -23,7 +23,10 @@ import { SqliteIntrospector, Dialect } from "kysely"; -import { WaSqliteWorkerDriver } from "./wa-sqlite-kysely-driver"; +import { + WaSqliteWorkerMultipleTabDriver, + WaSqliteWorkerSingleTabDriver +} from "./wa-sqlite-kysely-driver"; import { isFeatureSupported } from "../../utils/feature-check"; declare module "kysely" { @@ -39,12 +42,18 @@ export const createDialect = ( ): Dialect => { return { createDriver: () => - new WaSqliteWorkerDriver({ - async: !isFeatureSupported("opfs"), - dbName: name, - encrypted, - init - }), + globalThis.SharedWorker + ? new WaSqliteWorkerMultipleTabDriver({ + async: !isFeatureSupported("opfs"), + dbName: name, + encrypted, + init + }) + : new WaSqliteWorkerSingleTabDriver({ + async: !isFeatureSupported("opfs"), + dbName: name, + encrypted + }), createAdapter: () => new SqliteAdapter(), createIntrospector: (db) => new SqliteIntrospector(db), createQueryCompiler: () => new SqliteQueryCompiler() diff --git a/apps/web/src/common/sqlite/shared-service.ts b/apps/web/src/common/sqlite/shared-service.ts index b2c610d0b..8bd883546 100644 --- a/apps/web/src/common/sqlite/shared-service.ts +++ b/apps/web/src/common/sqlite/shared-service.ts @@ -182,7 +182,9 @@ export class SharedService extends EventTarget { } #sendPortToClient(message: any, port: MessagePort) { - sharedWorker?.port.postMessage(message, [port]); + if (!sharedWorker) + throw new Error("Shared worker is not supported in this environment."); + sharedWorker.port.postMessage(message, [port]); } async #getClientId() { diff --git a/apps/web/src/common/sqlite/sqlite.worker.ts b/apps/web/src/common/sqlite/sqlite.worker.ts index 20b08028a..c987aded2 100644 --- a/apps/web/src/common/sqlite/sqlite.worker.ts +++ b/apps/web/src/common/sqlite/sqlite.worker.ts @@ -19,7 +19,7 @@ along with this program. If not, see . import type { SQLiteAPI, SQLiteCompatibleType } from "./sqlite-types"; import { Factory, SQLITE_ROW, SQLiteError } from "./sqlite-api"; -import { transfer } from "comlink"; +import { expose, transfer } from "comlink"; import type { RunMode } from "./type"; import { QueryResult } from "kysely"; import { DatabaseSource } from "./sqlite-export"; @@ -32,6 +32,12 @@ type PreparedStatement = { columns: string[]; }; +type SQLiteOptions = { + async: boolean; + url?: string; + encrypted: boolean; +}; + class _SQLiteWorker { sqlite!: SQLiteAPI; db: number | undefined = undefined; @@ -39,21 +45,22 @@ class _SQLiteWorker { initialized = false; preparedStatements: Map = new Map(); retryCounter: Record = {}; - constructor( - private readonly dbName: string, - private readonly encrypted: boolean - ) { - console.log("new sqlite worker", dbName, encrypted); - } + encrypted = false; + name = ""; + async = false; - async open(async: boolean, url?: string) { + async open(name: string, options: SQLiteOptions) { if (this.db) { console.error("Database is already initialized", this.db); return; } - const option = url ? { locateFile: () => url } : {}; - const sqliteModule = async + this.encrypted = options.encrypted; + this.name = name; + this.async = options.async; + + const option = options.url ? { locateFile: () => options.url } : {}; + const sqliteModule = options.async ? await import("./wa-sqlite-async").then( ({ default: SQLiteAsyncESMFactory }) => SQLiteAsyncESMFactory(option) ) @@ -61,11 +68,11 @@ class _SQLiteWorker { SQLiteSyncESMFactory(option) ); this.sqlite = Factory(sqliteModule); - this.vfs = await this.getVFS(this.dbName, async); + this.vfs = await this.getVFS(name, options.async); this.sqlite.vfs_register(this.vfs, false); this.db = await this.sqlite.open_v2( - this.dbName, + name, undefined, `multipleciphers-${this.vfs.name}` ); @@ -163,7 +170,7 @@ class _SQLiteWorker { if (this.encrypted && !sql.startsWith("PRAGMA key")) { await this.waitForDatabase(); } - if (!this.db) throw new Error("No database is not opened."); + if (!this.db) throw new Error("Database is not opened."); const rows = (await this.exec(sql, mode, parameters)) as R[]; if (mode === "query") return { rows }; @@ -194,16 +201,16 @@ class _SQLiteWorker { this.initialized = false; } - async export(dbName: string, async: boolean) { - const vfs = await this.getVFS(dbName, async); - const stream = new ReadableStream(new DatabaseSource(vfs, dbName)); + async export() { + const vfs = await this.getVFS(this.name, this.async); + const stream = new ReadableStream(new DatabaseSource(vfs, this.name)); return transfer(stream, [stream]); } - async delete(dbName: string, async: boolean) { + async delete() { await this.close(); if (this.vfs) await this.vfs.delete(); - else await (await this.getVFS(dbName, async)).delete(); + else await (await this.getVFS(this.name, this.async)).delete(); } async getVFS(dbName: string, async: boolean) { @@ -222,7 +229,7 @@ class _SQLiteWorker { async initialize() { self.dispatchEvent( new MessageEvent("message", { - data: { type: "databaseInitialized", dbName: this.dbName } + data: { type: "databaseInitialized", dbName: this.name } }) ); console.log("Database initialized", this.db); @@ -237,7 +244,7 @@ class _SQLiteWorker { self.addEventListener("message", (ev) => { if ( ev.data.type === "databaseInitialized" && - ev.data.dbName === this.dbName + ev.data.dbName === this.name ) resolve(true); }) @@ -251,11 +258,17 @@ export type SQLiteWorker = typeof _SQLiteWorker.prototype; addEventListener("message", async (event) => { if (!event.data.type) { - const worker = new _SQLiteWorker(event.data.dbName, event.data.encrypted); - await worker.open(event.data.async, event.data.uri); + const worker = new _SQLiteWorker(); + await worker.open(event.data.dbName, { + async: event.data.async, + encrypted: event.data.encrypted, + url: event.data.uri + }); const providerPort = createSharedServicePort(worker); postMessage(null, [providerPort]); self.addEventListener("beforeunload", () => worker.close()); } }); +const worker = new _SQLiteWorker(); +expose(worker); diff --git a/apps/web/src/common/sqlite/wa-sqlite-kysely-driver.ts b/apps/web/src/common/sqlite/wa-sqlite-kysely-driver.ts index b49f0ca0d..3453b7e09 100644 --- a/apps/web/src/common/sqlite/wa-sqlite-kysely-driver.ts +++ b/apps/web/src/common/sqlite/wa-sqlite-kysely-driver.ts @@ -25,6 +25,7 @@ import SQLiteSyncURI from "./wa-sqlite.wasm?url"; import SQLiteAsyncURI from "./wa-sqlite-async.wasm?url"; import { Mutex } from "async-mutex"; import { SharedService } from "./shared-service"; +import { Remote, wrap } from "comlink"; type Config = { dbName: string; @@ -38,13 +39,14 @@ const servicePool = new Map< { service: SharedService; activated: boolean; closed: boolean } >(); -export class WaSqliteWorkerDriver implements Driver { +export class WaSqliteWorkerMultipleTabDriver implements Driver { private connection?: DatabaseConnection; private connectionMutex = new Mutex(); private initializationMutex = new Mutex(); private readonly serviceName; constructor(private readonly config: Config) { + console.log("multi tab driver", config.dbName); this.serviceName = `${config.dbName}-service`; } @@ -59,10 +61,11 @@ export class WaSqliteWorkerDriver implements Driver { if (activated) { if (closed) { console.log("Already activated. Reinitializing..."); - await service.proxy.open( - this.config.async, - this.config.async ? SQLiteAsyncURI : SQLiteSyncURI - ); + await service.proxy.open(this.config.dbName, { + async: this.config.async, + encrypted: this.config.encrypted, + url: this.config.async ? SQLiteAsyncURI : SQLiteSyncURI + }); this.needsInitialization = true; servicePool.set(this.serviceName, { service, @@ -193,19 +196,76 @@ export class WaSqliteWorkerDriver implements Driver { async delete() { const service = servicePool.get(this.serviceName); if (!service || !service.service) return; - await service.service?.proxy?.delete(this.config.dbName, this.config.async); + await service.service?.proxy?.delete(); service.closed = true; } async export() { - return servicePool - .get(this.serviceName) - ?.service?.proxy?.export(this.config.dbName, this.config.async); + return servicePool.get(this.serviceName)?.service?.proxy?.export(); + } +} + +export class WaSqliteWorkerSingleTabDriver implements Driver { + private connection?: DatabaseConnection; + private connectionMutex = new Mutex(); + private readonly worker = wrap( + new Worker({ name: this.config.dbName }) + ); + + constructor(private readonly config: Config) { + console.log("single tab driver", config.dbName); + } + + async init(): Promise { + await this.worker.open(this.config.dbName, { + async: this.config.async, + encrypted: this.config.encrypted, + url: this.config.async ? SQLiteAsyncURI : SQLiteSyncURI + }); + this.connection = new WaSqliteWorkerConnection(this.worker); + } + + async acquireConnection(): Promise { + if (!this.connection) throw new Error("Driver not initialized."); + + // SQLite only has one single connection. We use a mutex here to wait + // until the single connection has been released. + await this.connectionMutex.waitForUnlock(); + await this.connectionMutex.acquire(); + return this.connection; + } + + async beginTransaction(connection: DatabaseConnection): Promise { + await connection.executeQuery(CompiledQuery.raw("begin")); + } + + async commitTransaction(connection: DatabaseConnection): Promise { + await connection.executeQuery(CompiledQuery.raw("commit")); + } + + async rollbackTransaction(connection: DatabaseConnection): Promise { + await connection.executeQuery(CompiledQuery.raw("rollback")); + } + + async releaseConnection(): Promise { + this.connectionMutex.release(); + } + + async destroy(): Promise { + await this.worker.close(); + } + + async delete() { + await this.worker.delete(); + } + + async export() { + return await this.worker.export(); } } class WaSqliteWorkerConnection implements DatabaseConnection { - constructor(private readonly worker: SQLiteWorker) {} + constructor(private readonly worker: SQLiteWorker | Remote) {} streamQuery(): AsyncIterableIterator> { throw new Error("wasqlite driver doesn't support streaming"); @@ -221,6 +281,8 @@ class WaSqliteWorkerConnection implements DatabaseConnection { : query.kind === "RawNode" ? "raw" : "exec"; - return this.worker.run(mode, sql, parameters as any); + return this.worker.run(mode, sql, parameters as any) as Promise< + QueryResult + >; } } diff --git a/apps/web/src/utils/logger.ts b/apps/web/src/utils/logger.ts index eb9265092..fb93d9738 100644 --- a/apps/web/src/utils/logger.ts +++ b/apps/web/src/utils/logger.ts @@ -44,7 +44,7 @@ async function initializeLogger() { synchronous: "normal", pageSize: 8192, cacheSize: -32000, - skipInitialization: !IS_DESKTOP_APP + skipInitialization: !IS_DESKTOP_APP && !!globalThis.SharedWorker }, false );