Save deleted files and message in separate tombstone tables

This commit is contained in:
Hakan Shehu
2024-12-30 00:29:59 +01:00
parent 26933fdbd9
commit 7939da3c29
19 changed files with 585 additions and 143 deletions

View File

@@ -5,6 +5,7 @@ import {
extractFileType,
SyncFileData,
SyncFileInteractionData,
SyncFileTombstoneData,
} from '@colanode/core';
import axios from 'axios';
import mime from 'mime-types';
@@ -442,46 +443,6 @@ class FileService {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
if (file.deletedAt) {
const deletedFile = await workspaceDatabase
.deleteFrom('files')
.returningAll()
.where('id', '=', file.id)
.executeTakeFirst();
if (!deletedFile) {
return;
}
await workspaceDatabase
.deleteFrom('file_interactions')
.where('file_id', '=', file.id)
.execute();
await workspaceDatabase
.deleteFrom('file_states')
.where('file_id', '=', file.id)
.execute();
// if the file exists in the workspace, we need to delete it
const filePath = path.join(
getWorkspaceFilesDirectoryPath(userId),
`${file.id}${file.extension}`
);
if (fs.existsSync(filePath)) {
fs.rmSync(filePath, { force: true });
}
eventBus.publish({
type: 'file_deleted',
userId,
file: mapFile(deletedFile),
});
return;
}
const existingFile = await workspaceDatabase
.selectFrom('files')
.selectAll()
@@ -565,6 +526,50 @@ class FileService {
this.debug(`Server file ${file.id} has been synced`);
}
public async syncServerFileTombstone(
userId: string,
fileTombstone: SyncFileTombstoneData
) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const deletedFile = await workspaceDatabase
.deleteFrom('files')
.returningAll()
.where('id', '=', fileTombstone.id)
.executeTakeFirst();
await workspaceDatabase
.deleteFrom('file_interactions')
.where('file_id', '=', fileTombstone.id)
.execute();
await workspaceDatabase
.deleteFrom('file_states')
.where('file_id', '=', fileTombstone.id)
.execute();
if (deletedFile) {
// if the file exists in the workspace, we need to delete it
const filePath = path.join(
getWorkspaceFilesDirectoryPath(userId),
`${fileTombstone.id}${deletedFile.extension}`
);
if (fs.existsSync(filePath)) {
fs.rmSync(filePath, { force: true });
}
eventBus.publish({
type: 'file_deleted',
userId,
file: mapFile(deletedFile),
});
}
this.debug(`Server file tombstone ${fileTombstone.id} has been synced`);
}
public async syncServerFileInteraction(
userId: string,
fileInteraction: SyncFileInteractionData

View File

@@ -5,6 +5,7 @@ import {
SyncMessageData,
SyncMessageInteractionData,
SyncMessageReactionData,
SyncMessageTombstoneData,
} from '@colanode/core';
import { fileService } from '@/main/services/file-service';
@@ -24,41 +25,6 @@ class MessageService {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
if (message.deletedAt) {
const deletedMessage = await workspaceDatabase
.deleteFrom('messages')
.returningAll()
.where('id', '=', message.id)
.executeTakeFirst();
if (!deletedMessage) {
return;
}
await workspaceDatabase
.deleteFrom('message_reactions')
.where('message_id', '=', message.id)
.execute();
await workspaceDatabase
.deleteFrom('message_interactions')
.where('message_id', '=', message.id)
.execute();
await workspaceDatabase
.deleteFrom('texts')
.where('id', '=', message.id)
.execute();
eventBus.publish({
type: 'message_deleted',
userId,
message: mapMessage(deletedMessage),
});
return;
}
const existingMessage = await workspaceDatabase
.selectFrom('messages')
.selectAll()
@@ -154,6 +120,47 @@ class MessageService {
this.debug(`Server message ${message.id} has been synced`);
}
public async syncServerMessageTombstone(
userId: string,
messageTombstone: SyncMessageTombstoneData
) {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
const deletedMessage = await workspaceDatabase
.deleteFrom('messages')
.returningAll()
.where('id', '=', messageTombstone.id)
.executeTakeFirst();
await workspaceDatabase
.deleteFrom('message_reactions')
.where('message_id', '=', messageTombstone.id)
.execute();
await workspaceDatabase
.deleteFrom('message_interactions')
.where('message_id', '=', messageTombstone.id)
.execute();
await workspaceDatabase
.deleteFrom('texts')
.where('id', '=', messageTombstone.id)
.execute();
if (deletedMessage) {
eventBus.publish({
type: 'message_deleted',
userId,
message: mapMessage(deletedMessage),
});
}
this.debug(
`Server message tombstone ${messageTombstone.id} has been synced`
);
}
public async syncServerMessageReaction(
userId: string,
messageReaction: SyncMessageReactionData

View File

@@ -14,6 +14,8 @@ import { FileSynchronizer } from '@/main/synchronizers/files';
import { EntryInteractionSynchronizer } from '@/main/synchronizers/entry-interactions';
import { FileInteractionSynchronizer } from '@/main/synchronizers/file-interactions';
import { MessageInteractionSynchronizer } from '@/main/synchronizers/message-interactions';
import { FileTombstoneSynchronizer } from '@/main/synchronizers/file-tombstones';
import { MessageTombstoneSynchronizer } from '@/main/synchronizers/message-tombstones';
import { eventBus } from '@/shared/lib/event-bus';
class SyncService {
@@ -207,6 +209,24 @@ class SyncService {
);
}
if (input.type === 'file_tombstones') {
return new FileTombstoneSynchronizer(
userId,
accountId,
input,
workspaceDatabase
);
}
if (input.type === 'message_tombstones') {
return new MessageTombstoneSynchronizer(
userId,
accountId,
input,
workspaceDatabase
);
}
return null;
}
@@ -250,6 +270,16 @@ class SyncService {
type: 'message_interactions',
rootId,
});
await this.initSynchronizer(userId, accountId, workspaceDatabase, {
type: 'file_tombstones',
rootId,
});
await this.initSynchronizer(userId, accountId, workspaceDatabase, {
type: 'message_tombstones',
rootId,
});
}
private removeRootNodeSynchronizers(userId: string, rootId: string) {
@@ -266,48 +296,46 @@ class SyncService {
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
}
if (
} else if (
synchronizer.input.type === 'messages' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
}
if (
} else if (
synchronizer.input.type === 'message_reactions' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
}
if (
} else if (
synchronizer.input.type === 'files' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
}
if (
} else if (
synchronizer.input.type === 'entry_interactions' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
}
if (
} else if (
synchronizer.input.type === 'file_interactions' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
}
if (
} else if (
synchronizer.input.type === 'message_interactions' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
} else if (
synchronizer.input.type === 'file_tombstones' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
} else if (
synchronizer.input.type === 'message_tombstones' &&
synchronizer.input.rootId === rootId
) {
this.synchronizers.delete(key);
}
}
}

View File

@@ -0,0 +1,14 @@
import { SyncFileTombstonesInput, SyncFileTombstoneData } from '@colanode/core';
import { BaseSynchronizer } from '@/main/synchronizers/base';
import { fileService } from '@/main/services/file-service';
export class FileTombstoneSynchronizer extends BaseSynchronizer<SyncFileTombstonesInput> {
protected async process(data: SyncFileTombstoneData): Promise<void> {
await fileService.syncServerFileTombstone(this.userId, data);
}
protected get cursorKey(): string {
return `file_tombstones:${this.input.rootId}`;
}
}

View File

@@ -0,0 +1,17 @@
import {
SyncMessageTombstonesInput,
SyncMessageTombstoneData,
} from '@colanode/core';
import { BaseSynchronizer } from '@/main/synchronizers/base';
import { messageService } from '@/main/services/message-service';
export class MessageTombstoneSynchronizer extends BaseSynchronizer<SyncMessageTombstonesInput> {
protected async process(data: SyncMessageTombstoneData): Promise<void> {
await messageService.syncServerMessageTombstone(this.userId, data);
}
protected get cursorKey(): string {
return `message_tombstones:${this.input.rootId}`;
}
}

View File

@@ -337,8 +337,6 @@ const createMessagesTable: Migration = {
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
.addColumn('updated_at', 'timestamptz')
.addColumn('updated_by', 'varchar(30)')
.addColumn('deleted_at', 'timestamptz')
.addColumn('deleted_by', 'varchar(30)')
.addColumn('version', 'bigint', (col) =>
col.notNull().defaultTo(sql`nextval('messages_version_sequence')`)
)
@@ -481,6 +479,39 @@ const createMessageInteractionsTable: Migration = {
},
};
const createMessageTombstonesTable: Migration = {
up: async (db) => {
await sql`
CREATE SEQUENCE IF NOT EXISTS message_tombstones_version_sequence
START WITH 1000000000
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
`.execute(db);
await db.schema
.createTable('message_tombstones')
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
.addColumn('root_id', 'varchar(30)', (col) => col.notNull())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('deleted_at', 'timestamptz', (col) => col.notNull())
.addColumn('deleted_by', 'varchar(30)', (col) => col.notNull())
.addColumn('version', 'bigint', (col) =>
col
.notNull()
.defaultTo(sql`nextval('message_tombstones_version_sequence')`)
)
.execute();
},
down: async (db) => {
await db.schema.dropTable('message_tombstones').execute();
await sql`DROP SEQUENCE IF EXISTS message_tombstones_version_sequence`.execute(
db
);
},
};
const createFilesTable: Migration = {
up: async (db) => {
await sql`
@@ -509,8 +540,6 @@ const createFilesTable: Migration = {
.addColumn('created_by', 'varchar(30)', (col) => col.notNull())
.addColumn('updated_at', 'timestamptz')
.addColumn('updated_by', 'varchar(30)')
.addColumn('deleted_at', 'timestamptz')
.addColumn('deleted_by', 'varchar(30)')
.addColumn('status', 'integer', (col) => col.notNull())
.addColumn('version', 'bigint', (col) =>
col.notNull().defaultTo(sql`nextval('files_version_sequence')`)
@@ -601,6 +630,39 @@ const createFileInteractionsTable: Migration = {
},
};
const createFileTombstonesTable: Migration = {
up: async (db) => {
await sql`
CREATE SEQUENCE IF NOT EXISTS file_tombstones_version_sequence
START WITH 1000000000
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
`.execute(db);
await db.schema
.createTable('file_tombstones')
.addColumn('id', 'varchar(30)', (col) => col.notNull().primaryKey())
.addColumn('root_id', 'varchar(30)', (col) => col.notNull())
.addColumn('workspace_id', 'varchar(30)', (col) => col.notNull())
.addColumn('deleted_at', 'timestamptz', (col) => col.notNull())
.addColumn('deleted_by', 'varchar(30)', (col) => col.notNull())
.addColumn('version', 'bigint', (col) =>
col
.notNull()
.defaultTo(sql`nextval('file_tombstones_version_sequence')`)
)
.execute();
},
down: async (db) => {
await db.schema.dropTable('file_tombstones').execute();
await sql`DROP SEQUENCE IF EXISTS file_tombstones_version_sequence`.execute(
db
);
},
};
const createEntryPathsTable: Migration = {
up: async (db) => {
await db.schema
@@ -683,11 +745,13 @@ export const databaseMigrations: Record<string, Migration> = {
'00005_create_entries_table': createEntriesTable,
'00006_create_entry_transactions_table': createEntryTransactionsTable,
'00007_create_entry_interactions_table': createEntryInteractionsTable,
'00008_create_messages_table': createMessagesTable,
'00009_create_message_reactions_table': createMessageReactionsTable,
'00010_create_message_interactions_table': createMessageInteractionsTable,
'00011_create_files_table': createFilesTable,
'00012_create_file_interactions_table': createFileInteractionsTable,
'00013_create_collaborations_table': createCollaborationsTable,
'00014_create_entry_paths_table': createEntryPathsTable,
'00008_create_entry_paths_table': createEntryPathsTable,
'00009_create_messages_table': createMessagesTable,
'00010_create_message_reactions_table': createMessageReactionsTable,
'00011_create_message_interactions_table': createMessageInteractionsTable,
'00012_create_message_tombstones_table': createMessageTombstonesTable,
'00013_create_files_table': createFilesTable,
'00014_create_file_interactions_table': createFileInteractionsTable,
'00015_create_file_tombstones_table': createFileTombstonesTable,
'00016_create_collaborations_table': createCollaborationsTable,
};

View File

@@ -162,6 +162,17 @@ export type SelectEntryInteraction = Selectable<EntryInteractionTable>;
export type CreateEntryInteraction = Insertable<EntryInteractionTable>;
export type UpdateEntryInteraction = Updateable<EntryInteractionTable>;
interface EntryPathTable {
ancestor_id: ColumnType<string, string, never>;
descendant_id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
level: ColumnType<number, number, number>;
}
export type SelectEntryPath = Selectable<EntryPathTable>;
export type CreateEntryPath = Insertable<EntryPathTable>;
export type UpdateEntryPath = Updateable<EntryPathTable>;
interface MessageTable {
id: ColumnType<string, string, never>;
type: ColumnType<MessageType, MessageType, MessageType>;
@@ -174,8 +185,6 @@ interface MessageTable {
created_by: ColumnType<string, string, never>;
updated_at: ColumnType<Date | null, Date | null, Date>;
updated_by: ColumnType<string | null, string | null, string>;
deleted_at: ColumnType<Date | null, Date | null, Date>;
deleted_by: ColumnType<string | null, string | null, string>;
version: ColumnType<bigint, never, never>;
}
@@ -213,6 +222,19 @@ export type SelectMessageInteraction = Selectable<MessageInteractionTable>;
export type CreateMessageInteraction = Insertable<MessageInteractionTable>;
export type UpdateMessageInteraction = Updateable<MessageInteractionTable>;
interface MessageTombstoneTable {
id: ColumnType<string, string, never>;
root_id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
deleted_at: ColumnType<Date, Date, Date>;
deleted_by: ColumnType<string, string, never>;
version: ColumnType<bigint, never, never>;
}
export type SelectMessageTombstone = Selectable<MessageTombstoneTable>;
export type CreateMessageTombstone = Insertable<MessageTombstoneTable>;
export type UpdateMessageTombstone = Updateable<MessageTombstoneTable>;
interface FileTable {
id: ColumnType<string, string, never>;
type: ColumnType<FileType, FileType, FileType>;
@@ -229,8 +251,6 @@ interface FileTable {
created_by: ColumnType<string, string, never>;
updated_at: ColumnType<Date | null, Date | null, Date | null>;
updated_by: ColumnType<string | null, string | null, string | null>;
deleted_at: ColumnType<Date | null, Date | null, Date | null>;
deleted_by: ColumnType<string | null, string | null, string | null>;
status: ColumnType<FileStatus, FileStatus, FileStatus>;
version: ColumnType<bigint, never, never>;
}
@@ -255,16 +275,18 @@ export type SelectFileInteraction = Selectable<FileInteractionTable>;
export type CreateFileInteraction = Insertable<FileInteractionTable>;
export type UpdateFileInteraction = Updateable<FileInteractionTable>;
interface EntryPathTable {
ancestor_id: ColumnType<string, string, never>;
descendant_id: ColumnType<string, string, never>;
interface FileTombstoneTable {
id: ColumnType<string, string, never>;
root_id: ColumnType<string, string, never>;
workspace_id: ColumnType<string, string, never>;
level: ColumnType<number, number, number>;
deleted_at: ColumnType<Date, Date, Date>;
deleted_by: ColumnType<string, string, never>;
version: ColumnType<bigint, never, never>;
}
export type SelectEntryPath = Selectable<EntryPathTable>;
export type CreateEntryPath = Insertable<EntryPathTable>;
export type UpdateEntryPath = Updateable<EntryPathTable>;
export type SelectFileTombstone = Selectable<FileTombstoneTable>;
export type CreateFileTombstone = Insertable<FileTombstoneTable>;
export type UpdateFileTombstone = Updateable<FileTombstoneTable>;
export interface DatabaseSchema {
accounts: AccountTable;
@@ -274,11 +296,13 @@ export interface DatabaseSchema {
entries: EntryTable;
entry_transactions: EntryTransactionTable;
entry_interactions: EntryInteractionTable;
collaborations: CollaborationTable;
entry_paths: EntryPathTable;
messages: MessageTable;
message_reactions: MessageReactionTable;
message_interactions: MessageInteractionTable;
message_tombstones: MessageTombstoneTable;
files: FileTable;
file_interactions: FileInteractionTable;
entry_paths: EntryPathTable;
file_tombstones: FileTombstoneTable;
collaborations: CollaborationTable;
}

View File

@@ -8,11 +8,13 @@ import {
MarkFileOpenedMutation,
MarkFileSeenMutation,
} from '@colanode/core';
import { DeleteObjectCommand } from '@aws-sdk/client-s3';
import { database } from '@/data/database';
import { SelectUser } from '@/data/schema';
import { mapEntry } from '@/lib/entries';
import { eventBus } from '@/lib/event-bus';
import { filesStorage, BUCKET_NAMES } from '@/data/storage';
class FileService {
public async createFile(
@@ -109,20 +111,48 @@ class FileService {
return false;
}
const deletedFile = await database
.updateTable('files')
.returningAll()
.set({
deleted_at: new Date(mutation.data.deletedAt),
deleted_by: user.id,
})
.where('id', '=', mutation.data.id)
.executeTakeFirst();
const deletedFile = await database.transaction().execute(async (tx) => {
const deletedFile = await tx
.deleteFrom('files')
.returningAll()
.where('id', '=', mutation.data.id)
.executeTakeFirst();
if (!deletedFile) {
return null;
}
await tx
.deleteFrom('file_interactions')
.where('file_id', '=', deletedFile.id)
.execute();
await tx
.insertInto('file_tombstones')
.values({
id: deletedFile.id,
root_id: deletedFile.root_id,
workspace_id: deletedFile.workspace_id,
deleted_at: new Date(mutation.data.deletedAt),
deleted_by: user.id,
})
.executeTakeFirst();
return deletedFile;
});
if (!deletedFile) {
return false;
}
const path = `files/${deletedFile.workspace_id}/${deletedFile.id}${deletedFile.extension}`;
const command = new DeleteObjectCommand({
Bucket: BUCKET_NAMES.FILES,
Key: path,
});
await filesStorage.send(command);
eventBus.publish({
type: 'file_deleted',
fileId: deletedFile.id,

View File

@@ -104,15 +104,35 @@ class MessageService {
return false;
}
const deletedMessage = await database
.updateTable('messages')
.returningAll()
.set({
deleted_at: new Date(mutation.data.deletedAt),
deleted_by: user.id,
})
.where('id', '=', mutation.data.id)
.executeTakeFirst();
const deletedMessage = await database.transaction().execute(async (tx) => {
const deletedMessage = await tx
.deleteFrom('messages')
.returningAll()
.where('id', '=', mutation.data.id)
.executeTakeFirst();
if (!deletedMessage) {
return null;
}
await tx
.deleteFrom('message_interactions')
.where('message_id', '=', deletedMessage.id)
.execute();
await tx
.insertInto('message_tombstones')
.values({
id: deletedMessage.id,
root_id: deletedMessage.root_id,
workspace_id: deletedMessage.workspace_id,
deleted_at: new Date(mutation.data.deletedAt),
deleted_by: user.id,
})
.executeTakeFirst();
return deletedMessage;
});
if (!deletedMessage) {
return false;

View File

@@ -28,6 +28,8 @@ import { EntryTransactionSynchronizer } from '@/synchronizers/entry-transactions
import { MessageInteractionSynchronizer } from '@/synchronizers/message-interactions';
import { EntryInteractionSynchronizer } from '@/synchronizers/entry-interactions';
import { FileInteractionSynchronizer } from '@/synchronizers/file-interactions';
import { FileTombstoneSynchronizer } from '@/synchronizers/file-tombstones';
import { MessageTombstoneSynchronizer } from '@/synchronizers/message-tombstones';
type SocketUser = {
user: ConnectedUser;
@@ -204,6 +206,20 @@ export class SocketConnection {
message.input,
cursor
);
} else if (message.input.type === 'file_tombstones') {
return new FileTombstoneSynchronizer(
message.id,
user.user,
message.input,
cursor
);
} else if (message.input.type === 'message_tombstones') {
return new MessageTombstoneSynchronizer(
message.id,
user.user,
message.input,
cursor
);
}
return null;

View File

@@ -0,0 +1,88 @@
import {
SynchronizerOutputMessage,
SyncFileTombstonesInput,
SyncFileTombstoneData,
} from '@colanode/core';
import { BaseSynchronizer } from '@/synchronizers/base';
import { Event } from '@/types/events';
import { database } from '@/data/database';
import { SelectFileTombstone } from '@/data/schema';
export class FileTombstoneSynchronizer extends BaseSynchronizer<SyncFileTombstonesInput> {
public async fetchData(): Promise<SynchronizerOutputMessage<SyncFileTombstonesInput> | null> {
const fileTombstones = await this.fetchFileTombstones();
if (fileTombstones.length === 0) {
return null;
}
return this.buildMessage(fileTombstones);
}
public async fetchDataFromEvent(
event: Event
): Promise<SynchronizerOutputMessage<SyncFileTombstonesInput> | null> {
if (!this.shouldFetch(event)) {
return null;
}
const fileTombstones = await this.fetchFileTombstones();
if (fileTombstones.length === 0) {
return null;
}
return this.buildMessage(fileTombstones);
}
private async fetchFileTombstones() {
if (this.status === 'fetching') {
return [];
}
this.status = 'fetching';
const fileTombstones = await database
.selectFrom('file_tombstones')
.selectAll()
.where('root_id', '=', this.input.rootId)
.where('version', '>', this.cursor)
.orderBy('version', 'asc')
.limit(20)
.execute();
this.status = 'pending';
return fileTombstones;
}
private buildMessage(
unsyncedFileTombstones: SelectFileTombstone[]
): SynchronizerOutputMessage<SyncFileTombstonesInput> {
const items: SyncFileTombstoneData[] = unsyncedFileTombstones.map(
(fileTombstone) => ({
id: fileTombstone.id,
rootId: fileTombstone.root_id,
workspaceId: fileTombstone.workspace_id,
deletedAt: fileTombstone.deleted_at.toISOString(),
deletedBy: fileTombstone.deleted_by,
version: fileTombstone.version.toString(),
})
);
return {
type: 'synchronizer_output',
userId: this.user.userId,
id: this.id,
items: items.map((item) => ({
cursor: item.version,
data: item,
})),
};
}
private shouldFetch(event: Event) {
if (event.type === 'file_deleted' && event.rootId === this.input.rootId) {
return true;
}
return false;
}
}

View File

@@ -72,8 +72,6 @@ export class FileSynchronizer extends BaseSynchronizer<SyncFilesInput> {
createdBy: file.created_by,
updatedAt: file.updated_at?.toISOString() ?? null,
updatedBy: file.updated_by ?? null,
deletedAt: file.deleted_at?.toISOString() ?? null,
deletedBy: file.deleted_by ?? null,
version: file.version.toString(),
status: file.status,
}));

View File

@@ -0,0 +1,91 @@
import {
SynchronizerOutputMessage,
SyncMessageTombstonesInput,
SyncMessageTombstoneData,
} from '@colanode/core';
import { BaseSynchronizer } from '@/synchronizers/base';
import { Event } from '@/types/events';
import { database } from '@/data/database';
import { SelectMessageTombstone } from '@/data/schema';
export class MessageTombstoneSynchronizer extends BaseSynchronizer<SyncMessageTombstonesInput> {
public async fetchData(): Promise<SynchronizerOutputMessage<SyncMessageTombstonesInput> | null> {
const messageTombstones = await this.fetchMessageTombstones();
if (messageTombstones.length === 0) {
return null;
}
return this.buildMessage(messageTombstones);
}
public async fetchDataFromEvent(
event: Event
): Promise<SynchronizerOutputMessage<SyncMessageTombstonesInput> | null> {
if (!this.shouldFetch(event)) {
return null;
}
const messageTombstones = await this.fetchMessageTombstones();
if (messageTombstones.length === 0) {
return null;
}
return this.buildMessage(messageTombstones);
}
private async fetchMessageTombstones() {
if (this.status === 'fetching') {
return [];
}
this.status = 'fetching';
const messageTombstones = await database
.selectFrom('message_tombstones')
.selectAll()
.where('root_id', '=', this.input.rootId)
.where('version', '>', this.cursor)
.orderBy('version', 'asc')
.limit(20)
.execute();
this.status = 'pending';
return messageTombstones;
}
private buildMessage(
unsyncedMessageTombstones: SelectMessageTombstone[]
): SynchronizerOutputMessage<SyncMessageTombstonesInput> {
const items: SyncMessageTombstoneData[] = unsyncedMessageTombstones.map(
(messageTombstone) => ({
id: messageTombstone.id,
rootId: messageTombstone.root_id,
workspaceId: messageTombstone.workspace_id,
deletedAt: messageTombstone.deleted_at.toISOString(),
deletedBy: messageTombstone.deleted_by,
version: messageTombstone.version.toString(),
})
);
return {
type: 'synchronizer_output',
userId: this.user.userId,
id: this.id,
items: items.map((item) => ({
cursor: item.version,
data: item,
})),
};
}
private shouldFetch(event: Event) {
if (
event.type === 'message_deleted' &&
event.rootId === this.input.rootId
) {
return true;
}
return false;
}
}

View File

@@ -68,8 +68,6 @@ export class MessageSynchronizer extends BaseSynchronizer<SyncMessagesInput> {
createdBy: message.created_by,
updatedAt: message.updated_at?.toISOString() ?? null,
updatedBy: message.updated_by ?? null,
deletedAt: message.deleted_at?.toISOString() ?? null,
deletedBy: message.deleted_by ?? null,
version: message.version.toString(),
}));

View File

@@ -0,0 +1,22 @@
export type SyncFileTombstonesInput = {
type: 'file_tombstones';
rootId: string;
};
export type SyncFileTombstoneData = {
id: string;
rootId: string;
workspaceId: string;
deletedBy: string;
deletedAt: string;
version: string;
};
declare module '@colanode/core' {
interface SynchronizerMap {
file_tombstones: {
input: SyncFileTombstonesInput;
data: SyncFileTombstoneData;
};
}
}

View File

@@ -22,8 +22,6 @@ export type SyncFileData = {
createdBy: string;
updatedAt: string | null;
updatedBy: string | null;
deletedAt: string | null;
deletedBy: string | null;
version: string;
};

View File

@@ -7,6 +7,8 @@ export * from './message-interactions';
export * from './file-interactions';
export * from './entry-interactions';
export * from './collaborations';
export * from './message-tombstones';
export * from './file-tombstones';
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface SynchronizerMap {}

View File

@@ -0,0 +1,22 @@
export type SyncMessageTombstonesInput = {
type: 'message_tombstones';
rootId: string;
};
export type SyncMessageTombstoneData = {
id: string;
rootId: string;
workspaceId: string;
deletedAt: string;
deletedBy: string;
version: string;
};
declare module '@colanode/core' {
interface SynchronizerMap {
message_tombstones: {
input: SyncMessageTombstonesInput;
data: SyncMessageTombstoneData;
};
}
}

View File

@@ -17,8 +17,6 @@ export type SyncMessageData = {
createdBy: string;
updatedAt: string | null;
updatedBy: string | null;
deletedAt: string | null;
deletedBy: string | null;
version: string;
};