Fixes in document and node updates merge jobs (#109)

This commit is contained in:
Hakan Shehu
2025-07-04 06:50:13 +02:00
committed by GitHub
parent e8f56449d5
commit 6c34c188e5
2 changed files with 154 additions and 109 deletions

View File

@@ -39,7 +39,7 @@ export const documentUpdatesMergeHandler: JobHandler<
let mergedGroups = 0;
let deletedUpdates = 0;
let hasMore = true;
const currentCursor = cursor;
let currentCursor = cursor;
while (hasMore) {
const updates = await database
@@ -58,20 +58,22 @@ export const documentUpdatesMergeHandler: JobHandler<
debug(`Processing batch of ${updates.length} updates`);
const documentIds = [
...new Set(updates.map((update) => update.document_id)),
];
const documentsMap = new Map<string, SelectDocumentUpdate[]>();
for (const update of updates) {
const documentUpdates = documentsMap.get(update.document_id) ?? [];
documentUpdates.push(update);
documentsMap.set(update.document_id, documentUpdates);
}
const maxRevision = updates.reduce((max, update) => {
const rev = BigInt(update.revision);
return rev > max ? rev : max;
}, BigInt(0));
for (const documentId of documentIds) {
for (const [documentId, documentUpdates] of documentsMap.entries()) {
const result = await processDocumentUpdates(
documentId,
maxRevision,
cutoffTime,
documentUpdates,
config.jobs.documentUpdatesMerge.mergeWindow
);
mergedGroups += result.mergedGroups;
@@ -79,6 +81,7 @@ export const documentUpdatesMergeHandler: JobHandler<
}
await setCounter(database, 'document.updates.merge.cursor', maxRevision);
currentCursor = maxRevision;
if (updates.length < config.jobs.documentUpdatesMerge.batchSize) {
hasMore = false;
@@ -92,35 +95,56 @@ export const documentUpdatesMergeHandler: JobHandler<
const processDocumentUpdates = async (
documentId: string,
maxRevision: bigint,
cutoffTime: Date,
documentUpdates: SelectDocumentUpdate[],
mergeWindow: number
): Promise<{ mergedGroups: number; deletedUpdates: number }> => {
const allDocumentUpdates = await database
const firstUpdate = documentUpdates[0]!;
const cutoffTime = new Date(
firstUpdate.created_at.getTime() - 60 * 60 * 1000
);
const previousUpdate = await database
.selectFrom('document_updates')
.selectAll()
.where('document_id', '=', documentId)
.where('revision', '<=', maxRevision.toString())
.where('created_at', '<', cutoffTime)
.orderBy('created_at', 'asc')
.execute();
.where('revision', '<', firstUpdate.revision)
.where('created_at', '>=', cutoffTime)
.executeTakeFirst();
if (allDocumentUpdates.length < 2) {
const allUpdates = [...documentUpdates];
if (previousUpdate) {
allUpdates.unshift(previousUpdate);
}
if (allUpdates.length < 2) {
return { mergedGroups: 0, deletedUpdates: 0 };
}
const timeGroups = groupUpdatesByMergeWindow(allDocumentUpdates, mergeWindow);
const orderedUpdates = allUpdates.sort((a, b) => {
const revA = BigInt(a.revision);
const revB = BigInt(b.revision);
if (revA > revB) {
return 1;
} else if (revA < revB) {
return -1;
}
return 0;
});
const groups = groupUpdatesByMergeWindow(orderedUpdates, mergeWindow);
let mergedGroups = 0;
let deletedUpdates = 0;
for (const timeGroup of timeGroups) {
if (timeGroup.length >= 2) {
const success = await mergeUpdatesGroup(documentId, timeGroup);
if (success) {
mergedGroups++;
deletedUpdates += timeGroup.length - 1;
}
for (const group of groups) {
if (group.length < 2) {
continue;
}
const success = await mergeUpdatesGroup(documentId, group);
if (success) {
mergedGroups++;
deletedUpdates += group.length - 1;
}
}
@@ -133,21 +157,17 @@ const groupUpdatesByMergeWindow = (
): SelectDocumentUpdate[][] => {
if (updates.length === 0) return [];
const sortedUpdates = [...updates].sort(
(a, b) => a.created_at.getTime() - b.created_at.getTime()
);
const timeGroups: SelectDocumentUpdate[][] = [];
let currentGroup: SelectDocumentUpdate[] = [sortedUpdates[0]!];
let currentGroup: SelectDocumentUpdate[] = [updates[0]!];
for (let i = 1; i < sortedUpdates.length; i++) {
const currentUpdate = sortedUpdates[i]!;
for (let i = 1; i < updates.length; i++) {
const currentUpdate = updates[i]!;
const lastUpdateInGroup = currentGroup[currentGroup.length - 1]!;
const timeDiff =
currentUpdate.created_at.getTime() -
lastUpdateInGroup.created_at.getTime();
const timeDiffSeconds = timeDiff / 1000;
const timeDiffSeconds = Math.abs(timeDiff / 1000);
if (timeDiffSeconds <= mergeWindow) {
currentGroup.push(currentUpdate);
@@ -171,28 +191,33 @@ const mergeUpdatesGroup = async (
}
try {
const sortedUpdates = [...updates].sort((a, b) => {
const revA = BigInt(a.revision);
const revB = BigInt(b.revision);
if (revA > revB) {
return 1;
} else if (revA < revB) {
return -1;
}
return 0;
});
const updateData = sortedUpdates.map((update) => update.data);
const updateData = updates.map((update) => update.data);
const mergedState = mergeUpdates(updateData);
const lastUpdate = sortedUpdates[sortedUpdates.length - 1]!;
const mergedUpdatesMetadata: UpdateMergeMetadata[] = sortedUpdates
.slice(0, -1) // All except the last
.map((update) => ({
const lastUpdate = updates[updates.length - 1]!;
const updatesToMerge = updates.filter(
(update) => update.id !== lastUpdate.id
);
const updatesToMergeIds = updatesToMerge.map((update) => update.id);
const mergedUpdatesMetadata: UpdateMergeMetadata[] = [];
for (const update of updatesToMerge) {
if (update.merged_updates) {
for (const mergedUpdate of update.merged_updates) {
mergedUpdatesMetadata.push({
id: mergedUpdate.id,
createdAt: mergedUpdate.createdAt,
createdBy: mergedUpdate.createdBy,
});
}
}
mergedUpdatesMetadata.push({
id: update.id,
createdAt: update.created_at.toISOString(),
createdBy: update.created_by,
}));
});
}
await database.transaction().execute(async (trx) => {
await trx
@@ -207,13 +232,10 @@ const mergeUpdatesGroup = async (
.where('id', '=', lastUpdate.id)
.execute();
const updateIdsToDelete = sortedUpdates.slice(0, -1).map((u) => u.id);
if (updateIdsToDelete.length > 0) {
await trx
.deleteFrom('document_updates')
.where('id', 'in', updateIdsToDelete)
.execute();
}
await trx
.deleteFrom('document_updates')
.where('id', 'in', updatesToMergeIds)
.execute();
});
return true;

View File

@@ -39,7 +39,7 @@ export const nodeUpdatesMergeHandler: JobHandler<
let mergedGroups = 0;
let deletedUpdates = 0;
let hasMore = true;
const currentCursor = cursor;
let currentCursor = cursor;
while (hasMore) {
const updates = await database
@@ -58,17 +58,22 @@ export const nodeUpdatesMergeHandler: JobHandler<
debug(`Processing batch of ${updates.length} updates`);
const nodeIds = [...new Set(updates.map((update) => update.node_id))];
const nodesMap = new Map<string, SelectNodeUpdate[]>();
for (const update of updates) {
const nodeUpdates = nodesMap.get(update.node_id) ?? [];
nodeUpdates.push(update);
nodesMap.set(update.node_id, nodeUpdates);
}
const maxRevision = updates.reduce((max, update) => {
const rev = BigInt(update.revision);
return rev > max ? rev : max;
}, BigInt(0));
for (const nodeId of nodeIds) {
for (const [nodeId, nodeUpdates] of nodesMap.entries()) {
const result = await processNodeUpdates(
nodeId,
maxRevision,
cutoffTime,
nodeUpdates,
config.jobs.nodeUpdatesMerge.mergeWindow
);
mergedGroups += result.mergedGroups;
@@ -76,6 +81,7 @@ export const nodeUpdatesMergeHandler: JobHandler<
}
await setCounter(database, 'node.updates.merge.cursor', maxRevision);
currentCursor = maxRevision;
if (updates.length < config.jobs.nodeUpdatesMerge.batchSize) {
hasMore = false;
@@ -89,35 +95,56 @@ export const nodeUpdatesMergeHandler: JobHandler<
const processNodeUpdates = async (
nodeId: string,
maxRevision: bigint,
cutoffTime: Date,
nodeUpdates: SelectNodeUpdate[],
mergeWindow: number
): Promise<{ mergedGroups: number; deletedUpdates: number }> => {
const allNodeUpdates = await database
const firstUpdate = nodeUpdates[0]!;
const cutoffTime = new Date(
firstUpdate.created_at.getTime() - 60 * 60 * 1000
);
const previousUpdate = await database
.selectFrom('node_updates')
.selectAll()
.where('node_id', '=', nodeId)
.where('revision', '<=', maxRevision.toString())
.where('created_at', '<', cutoffTime)
.orderBy('created_at', 'asc')
.execute();
.where('revision', '<', firstUpdate.revision)
.where('created_at', '>=', cutoffTime)
.executeTakeFirst();
if (allNodeUpdates.length < 2) {
const allUpdates = [...nodeUpdates];
if (previousUpdate) {
allUpdates.unshift(previousUpdate);
}
if (allUpdates.length < 2) {
return { mergedGroups: 0, deletedUpdates: 0 };
}
const timeGroups = groupUpdatesByMergeWindow(allNodeUpdates, mergeWindow);
const orderedUpdates = allUpdates.sort((a, b) => {
const revA = BigInt(a.revision);
const revB = BigInt(b.revision);
if (revA > revB) {
return 1;
} else if (revA < revB) {
return -1;
}
return 0;
});
const groups = groupUpdatesByMergeWindow(orderedUpdates, mergeWindow);
let mergedGroups = 0;
let deletedUpdates = 0;
for (const timeGroup of timeGroups) {
if (timeGroup.length >= 2) {
const success = await mergeUpdatesGroup(nodeId, timeGroup);
if (success) {
mergedGroups++;
deletedUpdates += timeGroup.length - 1;
}
for (const group of groups) {
if (group.length < 2) {
continue;
}
const success = await mergeUpdatesGroup(nodeId, group);
if (success) {
mergedGroups++;
deletedUpdates += group.length - 1;
}
}
@@ -130,22 +157,17 @@ const groupUpdatesByMergeWindow = (
): SelectNodeUpdate[][] => {
if (updates.length === 0) return [];
// Sort updates by creation time
const sortedUpdates = [...updates].sort(
(a, b) => a.created_at.getTime() - b.created_at.getTime()
);
const timeGroups: SelectNodeUpdate[][] = [];
let currentGroup: SelectNodeUpdate[] = [sortedUpdates[0]!];
let currentGroup: SelectNodeUpdate[] = [updates[0]!];
for (let i = 1; i < sortedUpdates.length; i++) {
const currentUpdate = sortedUpdates[i]!;
for (let i = 1; i < updates.length; i++) {
const currentUpdate = updates[i]!;
const lastUpdateInGroup = currentGroup[currentGroup.length - 1]!;
const timeDiff =
currentUpdate.created_at.getTime() -
lastUpdateInGroup.created_at.getTime();
const timeDiffSeconds = timeDiff / 1000;
const timeDiffSeconds = Math.abs(timeDiff / 1000);
if (timeDiffSeconds <= mergeWindow) {
currentGroup.push(currentUpdate);
@@ -169,28 +191,32 @@ const mergeUpdatesGroup = async (
}
try {
const sortedUpdates = [...updates].sort((a, b) => {
const revA = BigInt(a.revision);
const revB = BigInt(b.revision);
if (revA > revB) {
return 1;
} else if (revA < revB) {
return -1;
}
return 0;
});
const updateData = sortedUpdates.map((update) => update.data);
const updateData = updates.map((update) => update.data);
const mergedState = mergeUpdates(updateData);
const lastUpdate = sortedUpdates[sortedUpdates.length - 1]!;
const mergedUpdatesMetadata: UpdateMergeMetadata[] = sortedUpdates
.slice(0, -1) // All except the last
.map((update) => ({
const lastUpdate = updates[updates.length - 1]!;
const updatesToMerge = updates.filter(
(update) => update.id !== lastUpdate.id
);
const updatesToMergeIds = updatesToMerge.map((update) => update.id);
const mergedUpdatesMetadata: UpdateMergeMetadata[] = [];
for (const update of updatesToMerge) {
if (update.merged_updates) {
for (const mergedUpdate of update.merged_updates) {
mergedUpdatesMetadata.push({
id: mergedUpdate.id,
createdAt: mergedUpdate.createdAt,
createdBy: mergedUpdate.createdBy,
});
}
}
mergedUpdatesMetadata.push({
id: update.id,
createdAt: update.created_at.toISOString(),
createdBy: update.created_by,
}));
});
}
await database.transaction().execute(async (trx) => {
await trx
@@ -205,13 +231,10 @@ const mergeUpdatesGroup = async (
.where('id', '=', lastUpdate.id)
.execute();
const updateIdsToDelete = sortedUpdates.slice(0, -1).map((u) => u.id);
if (updateIdsToDelete.length > 0) {
await trx
.deleteFrom('node_updates')
.where('id', 'in', updateIdsToDelete)
.execute();
}
await trx
.deleteFrom('node_updates')
.where('id', 'in', updatesToMergeIds)
.execute();
});
return true;