Add trace logs for desktop services

This commit is contained in:
Hakan Shehu
2024-11-30 02:25:21 +01:00
parent 2f1155c005
commit b566603bbc
11 changed files with 351 additions and 15 deletions

View File

@@ -17,7 +17,7 @@ class AccountService {
private readonly logger = createLogger('account-service');
async syncAccounts() {
this.logger.info('Syncing accounts');
this.logger.debug('Syncing all accounts');
const accounts = await databaseService.appDatabase
.selectFrom('accounts')
@@ -32,7 +32,7 @@ class AccountService {
}
private async syncAccount(account: SelectAccount) {
this.logger.debug(`Syncing account ${account.email}`);
this.logger.trace(`Syncing account ${account.email}`);
const server = await databaseService.appDatabase
.selectFrom('servers')
@@ -48,7 +48,7 @@ class AccountService {
}
if (!serverService.isAvailable(server.domain)) {
this.logger.debug(
this.logger.trace(
`Server ${server.domain} is not available for syncing account ${account.email}`
);
return;
@@ -62,6 +62,8 @@ class AccountService {
}
);
this.logger.trace(`Account sync response status code: ${status}`);
if (status >= 400 && status < 500) {
this.logger.info(`Account ${account.email} is not valid, logging out...`);
await this.logoutAccount(account);
@@ -91,6 +93,8 @@ class AccountService {
if (!updatedAccount) {
this.logger.warn(`Failed to update account ${account.email} after sync`);
return;
} else {
this.logger.trace(`Updated account ${account.email} after sync`);
}
eventBus.publish({
@@ -125,6 +129,10 @@ class AccountService {
`Failed to create workspace ${workspace.id} for account ${account.email}`
);
return;
} else {
this.logger.trace(
`Created workspace ${workspace.id} for account ${account.email} after sync`
);
}
eventBus.publish({
@@ -151,6 +159,10 @@ class AccountService {
`Failed to update workspace ${currentWorkspace.user_id} for account ${account.email}`
);
return;
} else {
this.logger.trace(
`Updated workspace ${currentWorkspace.user_id} for account ${account.email} after sync`
);
}
eventBus.publish({
@@ -172,6 +184,8 @@ class AccountService {
}
public async logoutAccount(account: SelectAccount): Promise<boolean> {
this.logger.debug(`Logging out account ${account.email}`);
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id'])
@@ -194,7 +208,10 @@ class AccountService {
.executeTakeFirst();
if (!deletedAccount) {
this.logger.warn(`Failed to delete account ${account.email}`);
return false;
} else {
this.logger.trace(`Deleted account ${account.email}`);
}
eventBus.publish({
@@ -216,7 +233,7 @@ class AccountService {
}
public async syncDeletedTokens() {
this.logger.info('Syncing deleted tokens');
this.logger.debug('Syncing deleted tokens');
const deletedTokens = await databaseService.appDatabase
.selectFrom('deleted_tokens')
@@ -248,6 +265,10 @@ class AccountService {
token: deletedToken.token,
});
this.logger.trace(
`Deleted token logout response status code: ${status}`
);
if (status !== 200) {
return;
}
@@ -281,6 +302,8 @@ class AccountService {
if (!deletedWorkspace) {
this.logger.warn(`Failed to delete workspace ${userId}`);
return false;
} else {
this.logger.trace(`Deleted workspace ${userId}`);
}
await databaseService.deleteWorkspaceDatabase(userId);
@@ -288,6 +311,7 @@ class AccountService {
if (fs.existsSync(workspaceDir)) {
fs.rmSync(workspaceDir, { recursive: true });
}
this.logger.trace(`Deleted workspace directory ${workspaceDir}`);
eventBus.publish({
type: 'workspace_deleted',

View File

@@ -4,12 +4,19 @@ import {
} from '@colanode/core';
import { databaseService } from '@/main/data/database-service';
import { eventBus } from '@/shared/lib/event-bus';
import { createLogger } from '@/main/logger';
class CollaborationService {
private readonly logger = createLogger('collaboration-service');
public async applyServerCollaboration(
userId: string,
collaboration: ServerCollaboration
) {
this.logger.trace(
`Applying server collaboration: ${collaboration.nodeId} for user ${userId}`
);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -47,6 +54,10 @@ class CollaborationService {
userId: string,
revocation: ServerCollaborationRevocation
) {
this.logger.trace(
`Applying server collaboration revocation: ${revocation.nodeId} for user ${userId}`
);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);

View File

@@ -10,14 +10,13 @@ class CommandService {
public async executeCommand<T extends CommandInput>(
input: T
): Promise<CommandMap[T['type']]['output']> {
this.logger.debug(`Executing command: ${input.type}`);
this.logger.trace(`Executing command: ${input.type}`);
const handler = commandHandlerMap[
input.type
] as unknown as CommandHandler<T>;
if (!handler) {
this.logger.warn(`No handler found for command type: ${input.type}`);
throw new Error(`No handler found for command type: ${input.type}`);
}

View File

@@ -139,7 +139,7 @@ class FileService {
}
public async syncFiles() {
this.logger.info('Syncing files');
this.logger.debug('Syncing files');
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')

View File

@@ -1,5 +1,4 @@
import { MutationInput } from '@/shared/mutations';
import { MutationMap } from '@/shared/mutations';
import { mutationHandlerMap } from '@/main/mutations';
import { MutationHandler } from '@/main/types';
@@ -15,10 +14,9 @@ class MutationService {
input.type
] as unknown as MutationHandler<T>;
this.logger.debug(`Executing mutation: ${input.type}`);
this.logger.trace(`Executing mutation: ${input.type}`);
if (!handler) {
this.logger.warn(`No handler found for mutation type: ${input.type}`);
throw new Error(`No handler found for mutation type: ${input.type}`);
}

View File

@@ -1,3 +1,4 @@
import { sql } from 'kysely';
import {
Node,
NodeAttributes,
@@ -28,8 +29,8 @@ import {
} from '@/main/data/workspace/schema';
import { eventBus } from '@/shared/lib/event-bus';
import { SelectWorkspace } from '@/main/data/app/schema';
import { sql } from 'kysely';
import { interactionService } from '@/main/services/interaction-service';
import { createLogger } from '@/main/logger';
export type CreateNodeInput = {
id: string;
@@ -39,6 +40,8 @@ export type CreateNodeInput = {
};
class NodeService {
private readonly logger = createLogger('node-service');
public async fetchNode(nodeId: string, userId: string): Promise<Node | null> {
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -60,6 +63,7 @@ class NodeService {
userId: string,
input: CreateNodeInput | CreateNodeInput[]
) {
this.logger.trace(`Creating ${Array.isArray(input) ? 'nodes' : 'node'}`);
const workspace = await this.fetchWorkspace(userId);
const inputs = Array.isArray(input) ? input : [input];
@@ -179,6 +183,10 @@ class NodeService {
});
for (const createdNode of createdNodes) {
this.logger.trace(
`Created node ${createdNode.id} with type ${createdNode.type}`
);
eventBus.publish({
type: 'node_created',
userId,
@@ -187,6 +195,10 @@ class NodeService {
}
for (const createdTransaction of createdNodeTransactions) {
this.logger.trace(
`Created transaction ${createdTransaction.id} for node ${createdTransaction.node_id} with operation ${createdTransaction.operation}`
);
eventBus.publish({
type: 'node_transaction_created',
userId,
@@ -203,6 +215,10 @@ class NodeService {
}
for (const createdUpload of createdUploads) {
this.logger.trace(
`Created upload ${createdUpload.upload_id} for node ${createdUpload.node_id}`
);
eventBus.publish({
type: 'upload_created',
userId,
@@ -211,6 +227,10 @@ class NodeService {
}
for (const createdDownload of createdDownloads) {
this.logger.trace(
`Created download ${createdDownload.upload_id} for node ${createdDownload.node_id}`
);
eventBus.publish({
type: 'download_created',
userId,
@@ -240,6 +260,8 @@ class NodeService {
userId: string,
updater: (attributes: NodeAttributes) => NodeAttributes
): Promise<boolean> {
this.logger.trace(`Updating node ${nodeId}`);
const workspace = await this.fetchWorkspace(userId);
const workspaceDatabase =
@@ -339,14 +361,24 @@ class NodeService {
});
if (updatedNode) {
this.logger.trace(
`Updated node ${updatedNode.id} with type ${updatedNode.type}`
);
eventBus.publish({
type: 'node_updated',
userId,
node: mapNode(updatedNode),
});
} else {
this.logger.trace(`Failed to update node ${nodeId}`);
}
if (createdTransaction) {
this.logger.trace(
`Created transaction ${createdTransaction.id} for node ${nodeId}`
);
eventBus.publish({
type: 'node_transaction_created',
userId,
@@ -360,6 +392,8 @@ class NodeService {
'lastReceivedTransactionId',
createdTransaction.id
);
} else {
this.logger.trace(`Failed to create transaction for node ${nodeId}`);
}
return updatedNode !== undefined;
@@ -429,19 +463,31 @@ class NodeService {
});
if (deletedNode) {
this.logger.trace(
`Deleted node ${deletedNode.id} with type ${deletedNode.type}`
);
eventBus.publish({
type: 'node_deleted',
userId,
node: mapNode(deletedNode),
});
} else {
this.logger.trace(`Failed to delete node ${nodeId}`);
}
if (createdTransaction) {
this.logger.trace(
`Created transaction ${createdTransaction.id} for node ${nodeId}`
);
eventBus.publish({
type: 'node_transaction_created',
userId,
transaction: mapTransaction(createdTransaction),
});
} else {
this.logger.trace(`Failed to create transaction for node ${nodeId}`);
}
}
@@ -552,6 +598,10 @@ class NodeService {
userId: string,
transaction: ServerNodeCreateTransaction
) {
this.logger.trace(
`Applying server create transaction ${transaction.id} for node ${transaction.nodeId}`
);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -568,6 +618,9 @@ class NodeService {
existingTransaction.version === version &&
existingTransaction.server_created_at === transaction.serverCreatedAt
) {
this.logger.trace(
`Server create transaction ${transaction.id} for node ${transaction.nodeId} is already synced`
);
return;
}
@@ -581,6 +634,9 @@ class NodeService {
.where('id', '=', transaction.id)
.execute();
this.logger.trace(
`Server create transaction ${transaction.id} for node ${transaction.nodeId} has been synced`
);
return;
}
@@ -625,6 +681,10 @@ class NodeService {
});
if (createdNode) {
this.logger.trace(
`Created node ${createdNode.id} with type ${createdNode.type} with transaction ${transaction.id}`
);
eventBus.publish({
type: 'node_created',
userId,
@@ -639,6 +699,10 @@ class NodeService {
transaction.id
);
} else {
this.logger.trace(
`Server create transaction ${transaction.id} for node ${transaction.nodeId} is incomplete`
);
eventBus.publish({
type: 'node_transaction_incomplete',
userId,
@@ -667,6 +731,9 @@ class NodeService {
existingTransaction.version === version &&
existingTransaction.server_created_at === transaction.serverCreatedAt
) {
this.logger.trace(
`Server update transaction ${transaction.id} for node ${transaction.nodeId} is already synced`
);
return;
}
@@ -680,6 +747,9 @@ class NodeService {
.where('id', '=', transaction.id)
.execute();
this.logger.trace(
`Server update transaction ${transaction.id} for node ${transaction.nodeId} has been synced`
);
return;
}
@@ -737,6 +807,10 @@ class NodeService {
});
if (updatedNode) {
this.logger.trace(
`Updated node ${updatedNode.id} with type ${updatedNode.type} with transaction ${transaction.id}`
);
eventBus.publish({
type: 'node_updated',
userId,
@@ -751,6 +825,10 @@ class NodeService {
transaction.id
);
} else {
this.logger.trace(
`Server update transaction ${transaction.id} for node ${transaction.nodeId} is incomplete`
);
eventBus.publish({
type: 'node_transaction_incomplete',
userId,
@@ -763,6 +841,10 @@ class NodeService {
userId: string,
transaction: ServerNodeDeleteTransaction
) {
this.logger.trace(
`Applying server delete transaction ${transaction.id} for node ${transaction.nodeId}`
);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -794,6 +876,10 @@ class NodeService {
});
if (result) {
this.logger.trace(
`Deleted node ${result.id} with type ${result.type} with transaction ${transaction.id}`
);
eventBus.publish({
type: 'node_deleted',
userId,

View File

@@ -28,12 +28,11 @@ class QueryService {
public async executeQuery<T extends QueryInput>(
input: T
): Promise<QueryMap[T['type']]['output']> {
this.logger.debug(`Executing query: ${input.type}`);
this.logger.trace(`Executing query: ${input.type}`);
const handler = queryHandlerMap[input.type] as unknown as QueryHandler<T>;
if (!handler) {
this.logger.warn(`No handler found for query type: ${input.type}`);
throw new Error(`No handler found for query type: ${input.type}`);
}
@@ -53,7 +52,6 @@ class QueryService {
const handler = queryHandlerMap[input.type] as unknown as QueryHandler<T>;
if (!handler) {
this.logger.warn(`No handler found for query type: ${input.type}`);
throw new Error(`No handler found for query type: ${input.type}`);
}

View File

@@ -18,6 +18,8 @@ class ServerService {
private readonly logger = createLogger('server-service');
public async syncServers() {
this.logger.trace('Syncing servers');
const rows = await databaseService.appDatabase
.selectFrom('servers')
.selectAll()
@@ -57,7 +59,7 @@ class ServerService {
});
}
this.logger.info(
this.logger.trace(
`Server ${server.domain} is ${isAvailable ? 'available' : 'unavailable'}`
);
@@ -85,6 +87,8 @@ class ServerService {
}
public async fetchServerConfig(domain: string) {
this.logger.trace(`Fetching server config for ${domain}`);
const baseUrl = this.buildApiBaseUrl(domain);
const configUrl = `${baseUrl}/v1/config`;
try {

View File

@@ -4,8 +4,11 @@ import { Message } from '@colanode/core';
import { SelectAccount } from '@/main/data/app/schema';
import { syncService } from '@/main/services/sync-service';
import { eventBus } from '@/shared/lib/event-bus';
import { createLogger } from '@/main/logger';
export class SocketConnection {
private readonly logger = createLogger('socket-connection');
private readonly synapseUrl: string;
private readonly account: SelectAccount;
private socket: WebSocket | null;
@@ -21,6 +24,10 @@ export class SocketConnection {
}
public init(): void {
this.logger.trace(
`Initializing socket connection for account ${this.account.id}`
);
if (this.isConnected()) {
return;
}
@@ -47,6 +54,10 @@ export class SocketConnection {
return;
}
const message: Message = JSON.parse(data);
this.logger.trace(
`Received message of type ${message.type} for account ${this.account.id}`
);
if (message.type === 'node_transactions_batch') {
syncService.syncServerTransactions(message);
} else if (message.type === 'collaboration_revocations_batch') {
@@ -59,6 +70,10 @@ export class SocketConnection {
};
this.socket.onopen = () => {
this.logger.trace(
`Socket connection for account ${this.account.id} opened`
);
this.backoffCalculator.reset();
eventBus.publish({
type: 'socket_connection_opened',
@@ -67,10 +82,18 @@ export class SocketConnection {
};
this.socket.onerror = () => {
this.logger.trace(
`Socket connection for account ${this.account.id} errored`
);
this.backoffCalculator.increaseError();
};
this.socket.onclose = () => {
this.logger.trace(
`Socket connection for account ${this.account.id} closed`
);
this.backoffCalculator.increaseError();
};
}
@@ -81,6 +104,10 @@ export class SocketConnection {
public sendMessage(message: Message): boolean {
if (this.socket && this.isConnected()) {
this.logger.trace(
`Sending message of type ${message.type} for account ${this.account.id}`
);
this.socket.send(JSON.stringify(message));
return true;
}
@@ -90,6 +117,10 @@ export class SocketConnection {
public close(): void {
if (this.socket) {
this.logger.trace(
`Closing socket connection for account ${this.account.id}`
);
this.socket.close();
}
}

View File

@@ -3,8 +3,10 @@ import { databaseService } from '@/main/data/database-service';
import { Message } from '@colanode/core';
import { serverService } from '@/main/services/server-service';
import { eventBus } from '@/shared/lib/event-bus';
import { createLogger } from '@/main/logger';
class SocketService {
private readonly logger = createLogger('socket-service');
private readonly sockets: Map<string, SocketConnection> = new Map();
constructor() {
@@ -31,6 +33,8 @@ class SocketService {
}
public async checkConnections() {
this.logger.trace('Checking socket connections');
const accounts = await databaseService.appDatabase
.selectFrom('accounts')
.selectAll()
@@ -40,6 +44,10 @@ class SocketService {
// Update accounts map
for (const account of accounts) {
if (!serverService.isAvailable(account.server)) {
this.logger.trace(
`Server ${account.server} is not available, skipping socket connection`
);
continue;
}

View File

@@ -74,6 +74,8 @@ class SyncService {
}
public async syncAllWorkspaces() {
this.logger.trace('Syncing all workspaces');
const workspaces = await databaseService.appDatabase
.selectFrom('workspaces')
.select(['user_id'])
@@ -94,6 +96,8 @@ class SyncService {
}
public async syncLocalPendingTransactions(userId: string) {
this.logger.trace(`Syncing local pending transactions for user ${userId}`);
if (!this.localPendingTransactionStates.has(userId)) {
this.localPendingTransactionStates.set(userId, {
isSyncing: false,
@@ -103,6 +107,9 @@ class SyncService {
const syncState = this.localPendingTransactionStates.get(userId)!;
if (syncState.isSyncing) {
this.logger.trace(
`Syncing of local pending transactions already in progress for user ${userId}, scheduling sync`
);
syncState.scheduledSync = true;
return;
}
@@ -117,6 +124,9 @@ class SyncService {
);
} finally {
syncState.isSyncing = false;
this.logger.trace(
`Syncing of local pending transactions completed for user ${userId}`
);
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
@@ -126,6 +136,8 @@ class SyncService {
}
public async syncLocalPendingInteractions(userId: string) {
this.logger.trace(`Syncing local pending interactions for user ${userId}`);
if (!this.localPendingInteractionStates.has(userId)) {
this.localPendingInteractionStates.set(userId, {
isSyncing: false,
@@ -135,6 +147,9 @@ class SyncService {
const syncState = this.localPendingInteractionStates.get(userId)!;
if (syncState.isSyncing) {
this.logger.trace(
`Syncing of local pending interactions already in progress for user ${userId}, scheduling sync`
);
syncState.scheduledSync = true;
return;
}
@@ -149,6 +164,9 @@ class SyncService {
);
} finally {
syncState.isSyncing = false;
this.logger.trace(
`Syncing of local pending interactions completed for user ${userId}`
);
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
@@ -158,6 +176,10 @@ class SyncService {
}
public async syncLocalIncompleteTransactions(userId: string) {
this.logger.trace(
`Syncing local incomplete transactions for user ${userId}`
);
if (!this.localIncompleteTransactionStates.has(userId)) {
this.localIncompleteTransactionStates.set(userId, {
isSyncing: false,
@@ -167,6 +189,9 @@ class SyncService {
const syncState = this.localIncompleteTransactionStates.get(userId)!;
if (syncState.isSyncing) {
this.logger.trace(
`Syncing of local incomplete transactions already in progress for user ${userId}, scheduling sync`
);
syncState.scheduledSync = true;
return;
}
@@ -181,6 +206,9 @@ class SyncService {
);
} finally {
syncState.isSyncing = false;
this.logger.trace(
`Syncing of local incomplete transactions completed for user ${userId}`
);
if (syncState.scheduledSync) {
syncState.scheduledSync = false;
@@ -190,7 +218,12 @@ class SyncService {
}
public async syncServerTransactions(message: NodeTransactionsBatchMessage) {
this.logger.trace(`Syncing server transactions for user ${message.userId}`);
if (this.syncingTransactions.has(message.userId)) {
this.logger.trace(
`Syncing of server transactions already in progress for user ${message.userId}, skipping`
);
return;
}
@@ -211,13 +244,24 @@ class SyncService {
`Error syncing server transactions for user ${message.userId}`
);
} finally {
this.logger.trace(
`Syncing of server transactions completed for user ${message.userId}`
);
this.syncingTransactions.delete(message.userId);
this.requireNodeTransactions(message.userId);
}
}
public async syncServerCollaborations(message: CollaborationsBatchMessage) {
this.logger.trace(
`Syncing server collaborations for user ${message.userId}`
);
if (this.syncingCollaborations.has(message.userId)) {
this.logger.trace(
`Syncing of server collaborations already in progress for user ${message.userId}, skipping`
);
return;
}
@@ -249,7 +293,12 @@ class SyncService {
public async syncServerRevocations(
message: CollaborationRevocationsBatchMessage
) {
this.logger.trace(`Syncing server revocations for user ${message.userId}`);
if (this.syncingRevocations.has(message.userId)) {
this.logger.trace(
`Syncing of server revocations already in progress for user ${message.userId}, skipping`
);
return;
}
@@ -273,13 +322,22 @@ class SyncService {
`Error syncing server revocations for user ${message.userId}`
);
} finally {
this.logger.trace(
`Syncing of server revocations completed for user ${message.userId}`
);
this.syncingRevocations.delete(message.userId);
this.requireCollaborationRevocations(message.userId);
}
}
public async syncServerInteractions(message: InteractionsBatchMessage) {
this.logger.trace(`Syncing server interactions for user ${message.userId}`);
if (this.syncingInteractions.has(message.userId)) {
this.logger.trace(
`Syncing of server interactions already in progress for user ${message.userId}, skipping`
);
return;
}
@@ -303,12 +361,18 @@ class SyncService {
`Error syncing server interactions for user ${message.userId}`
);
} finally {
this.logger.trace(
`Syncing of server interactions completed for user ${message.userId}`
);
this.syncingInteractions.delete(message.userId);
this.requireInteractions(message.userId);
}
}
private async syncIncompleteTransactions(userId: string) {
this.logger.trace(`Syncing incomplete transactions for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -319,6 +383,9 @@ class SyncService {
.execute();
if (incompleteTransactions.length === 0) {
this.logger.trace(
`No incomplete transactions found for user ${userId}, skipping`
);
return;
}
@@ -338,10 +405,14 @@ class SyncService {
.executeTakeFirst();
if (!workspace) {
this.logger.trace(`No workspace found for user ${userId}, skipping`);
return;
}
if (!serverService.isAvailable(workspace.domain)) {
this.logger.trace(
`Server ${workspace.domain} is not available, skipping`
);
return;
}
@@ -357,6 +428,10 @@ class SyncService {
for (const [nodeId, transactions] of Object.entries(groupedByNodeId)) {
try {
this.logger.trace(
`Syncing incomplete transactions for node ${nodeId} for user ${userId}`
);
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
`/v1/nodes/${workspace.workspace_id}/transactions/${nodeId}`,
{
@@ -366,6 +441,10 @@ class SyncService {
);
if (data.transactions.length === 0) {
this.logger.trace(
`No transactions found for node ${nodeId} for user ${userId}, deleting`
);
await workspaceDatabase
.deleteFrom('node_transactions')
.where(
@@ -384,6 +463,10 @@ class SyncService {
);
if (!synced) {
this.logger.trace(
`Failed to sync transactions for node ${nodeId} for user ${userId}, incrementing retry count`
);
await workspaceDatabase
.updateTable('node_transactions')
.set({ retry_count: sql`retry_count + 1` })
@@ -394,6 +477,10 @@ class SyncService {
)
.execute();
} else {
this.logger.trace(
`Successfully synced transactions for node ${nodeId} for user ${userId}, resetting retry count`
);
await workspaceDatabase
.updateTable('node_transactions')
.set({ retry_count: sql`retry_count + 1` })
@@ -415,6 +502,8 @@ class SyncService {
}
private async syncMissingNodes(userId: string) {
this.logger.trace(`Syncing missing nodes for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -426,6 +515,7 @@ class SyncService {
.execute();
if (missingNodes.length === 0) {
this.logger.trace(`No missing nodes found for user ${userId}, skipping`);
return;
}
@@ -445,15 +535,23 @@ class SyncService {
.executeTakeFirst();
if (!workspace) {
this.logger.trace(`No workspace found for user ${userId}, skipping`);
return;
}
if (!serverService.isAvailable(workspace.domain)) {
this.logger.trace(
`Server ${workspace.domain} is not available, skipping`
);
return;
}
for (const node of missingNodes) {
try {
this.logger.trace(
`Syncing missing node ${node.node_id} for user ${userId}`
);
const { data } = await httpClient.get<GetNodeTransactionsOutput>(
`/v1/nodes/${workspace.workspace_id}/transactions/${node.node_id}`,
{
@@ -477,6 +575,8 @@ class SyncService {
}
private async checkForMissingNode(userId: string, nodeId: string) {
this.logger.trace(`Checking for missing node ${nodeId} for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -487,6 +587,7 @@ class SyncService {
.executeTakeFirst();
if (node) {
this.logger.trace(`Node ${nodeId} for user ${userId} found, skipping`);
return;
}
@@ -506,10 +607,14 @@ class SyncService {
.executeTakeFirst();
if (!workspace) {
this.logger.trace(`No workspace found for user ${userId}, skipping`);
return;
}
if (!serverService.isAvailable(workspace.domain)) {
this.logger.trace(
`Server ${workspace.domain} is not available, skipping`
);
return;
}
@@ -532,6 +637,8 @@ class SyncService {
}
private async sendLocalTransactions(userId: string) {
this.logger.trace(`Sending local pending transactions for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -547,6 +654,10 @@ class SyncService {
return;
}
this.logger.trace(
`Sending ${unsyncedTransactions.length} local pending transactions for user ${userId}`
);
const workspace = await databaseService.appDatabase
.selectFrom('workspaces')
.innerJoin('accounts', 'workspaces.account_id', 'accounts.id')
@@ -563,10 +674,16 @@ class SyncService {
.executeTakeFirst();
if (!workspace) {
this.logger.trace(
`No workspace found for user ${userId}, skipping sending local pending transactions`
);
return;
}
if (!serverService.isAvailable(workspace.domain)) {
this.logger.trace(
`Server ${workspace.domain} is not available, skipping sending local pending transactions`
);
return;
}
@@ -595,6 +712,10 @@ class SyncService {
}
if (syncedTransactionIds.length > 0) {
this.logger.trace(
`Marking ${syncedTransactionIds.length} local pending transactions as sent for user ${userId}`
);
await workspaceDatabase
.updateTable('node_transactions')
.set({ status: 'sent' })
@@ -604,6 +725,10 @@ class SyncService {
}
if (unsyncedTransactionIds.length > 0) {
this.logger.trace(
`Marking ${unsyncedTransactionIds.length} local pending transactions as failed for user ${userId}`
);
await workspaceDatabase
.updateTable('node_transactions')
.set((eb) => ({ retry_count: eb('retry_count', '+', 1) }))
@@ -614,6 +739,8 @@ class SyncService {
}
private async sendLocalInteractions(userId: string) {
this.logger.trace(`Sending local pending interactions for user ${userId}`);
const workspaceDatabase =
await databaseService.getWorkspaceDatabase(userId);
@@ -624,6 +751,9 @@ class SyncService {
.executeTakeFirst();
if (!workspace) {
this.logger.trace(
`No workspace found for user ${userId}, skipping sending local pending interactions`
);
return;
}
@@ -641,10 +771,17 @@ class SyncService {
.execute();
if (interactionEvents.length === 0) {
this.logger.trace(
`No local pending interactions found for user ${userId}, stopping sync`
);
hasMore = false;
break;
}
this.logger.trace(
`Sending ${interactionEvents.length} local pending interactions for user ${userId}`
);
const groupedByNodeId: Record<string, SelectInteractionEvent[]> = {};
for (const event of interactionEvents) {
groupedByNodeId[event.node_id] = [
@@ -683,6 +820,10 @@ class SyncService {
}
if (sentEventIds.length > 0) {
this.logger.trace(
`Marking ${sentEventIds.length} local pending interactions as sent for user ${userId}`
);
await workspaceDatabase
.updateTable('interaction_events')
.set({ sent_at: new Date().toISOString() })
@@ -693,6 +834,8 @@ class SyncService {
}
private async requireNodeTransactions(userId: string) {
this.logger.trace(`Requiring node transactions for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
@@ -706,6 +849,9 @@ class SyncService {
.executeTakeFirst();
if (!workspaceWithCursor) {
this.logger.trace(
`No workspace found for user ${userId}, skipping requiring node transactions`
);
return;
}
@@ -720,6 +866,8 @@ class SyncService {
}
private async requireCollaborations(userId: string) {
this.logger.trace(`Requiring collaborations for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
@@ -733,6 +881,9 @@ class SyncService {
.executeTakeFirst();
if (!workspaceWithCursor) {
this.logger.trace(
`No workspace found for user ${userId}, skipping requiring collaborations`
);
return;
}
@@ -747,6 +898,8 @@ class SyncService {
}
private async requireCollaborationRevocations(userId: string) {
this.logger.trace(`Requiring collaboration revocations for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
@@ -755,6 +908,9 @@ class SyncService {
.executeTakeFirst();
if (!workspaceWithCursor) {
this.logger.trace(
`No workspace found for user ${userId}, skipping requiring collaboration revocations`
);
return;
}
@@ -769,6 +925,8 @@ class SyncService {
}
private async requireInteractions(userId: string) {
this.logger.trace(`Requiring interactions for user ${userId}`);
const workspaceWithCursor = await databaseService.appDatabase
.selectFrom('workspaces as w')
.leftJoin('workspace_cursors as wc', 'w.user_id', 'wc.user_id')
@@ -782,6 +940,9 @@ class SyncService {
.executeTakeFirst();
if (!workspaceWithCursor) {
this.logger.trace(
`No workspace found for user ${userId}, skipping requiring interactions`
);
return;
}
@@ -796,6 +957,10 @@ class SyncService {
}
private async updateNodeTransactionCursor(userId: string, cursor: bigint) {
this.logger.trace(
`Updating node transaction cursor for user ${userId} to ${cursor}`
);
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({
@@ -813,6 +978,10 @@ class SyncService {
}
private async updateCollaborationCursor(userId: string, cursor: bigint) {
this.logger.trace(
`Updating collaboration cursor for user ${userId} to ${cursor}`
);
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({
@@ -833,6 +1002,10 @@ class SyncService {
userId: string,
cursor: bigint
) {
this.logger.trace(
`Updating collaboration revocation cursor for user ${userId} to ${cursor}`
);
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({
@@ -850,6 +1023,10 @@ class SyncService {
}
private async updateInteractionCursor(userId: string, cursor: bigint) {
this.logger.trace(
`Updating interaction cursor for user ${userId} to ${cursor}`
);
await databaseService.appDatabase
.insertInto('workspace_cursors')
.values({