diff --git a/ft_build/sparksLib/bigqueryIndex.ts b/ft_build/sparksLib/bigqueryIndex.ts index 339e62da..aff63926 100644 --- a/ft_build/sparksLib/bigqueryIndex.ts +++ b/ft_build/sparksLib/bigqueryIndex.ts @@ -75,6 +75,68 @@ const significantDifference = (fieldsToSync, change) => { }, false); }; +const transformToSQLValue = (value: any, ftType: string) => { + if (value === null || value === undefined) { + return `null`; + } + + const sanitise = (x: string) => x?.replace?.(/\"/g, '\\"') ?? ""; + + switch (ftType) { + case "SIMPLE_TEXT": + case "LONG_TEXT": + case "EMAIL": + case "PHONE_NUMBER": + case "CODE": + case "RICH_TEXT": + case "ID": + case "SINGLE_SELECT": + // STRING + return `"${sanitise(value)}"`; + case "JSON": + case "FILE": + case "IMAGE": + case "USER": + case "COLOR": + // STRING (json representation) + if (!value.length) { + return `null`; + } + return `"${sanitise(JSON.stringify(value))}"`; + case "MULTI_SELECT": + // STRING (array representation, comma separated) + return `"${sanitise(value.toString())}"`; + case "CHECK_BOX": + // BOOLEAN + return value ? `true` : `false`; + case "NUMBER": + case "PERCENTAGE": + case "RATING": + case "SLIDER": + // NUMERIC + return Number(value); + case "DATE": + case "DATE_TIME": + case "DURATION": + // TIMESTAMP + if (!value?.toDate) { + return `null`; + } + return `timestamp("${value.toDate()}")`; + case "LAST": + case "STATUS": + case "SUB_TABLE": + case "DOCUMENT_SELECT": + case "SERVICE_SELECT": + case "ACTION": + case "DERIVATIVE": + case "AGGREGATE": + default: + // unknown or meaningless to sync + return `null`; + } +}; + const bigqueryIndex = async (payload, sparkContext) => { const { row, objectID, index, fieldsToSync } = payload; @@ -87,21 +149,6 @@ const bigqueryIndex = async (payload, sparkContext) => { const { projectID } = await getSecret("algolia"); console.log(`projectID: ${projectID}, index: ${index}`); - // big query specifig helper functions - type dataType = Record; - - // return keys and values in SQL format - // keys: key1,key2,key3 - // values: "val1","val2","val3" - function formatData(data: dataType) { - return { - keys: Object.keys(data).join(","), - values: Object.values(data) - .map((value) => `"${value.replace(/\"/g, '\\"')}"`) - .join(","), - }; - } - // return if the objectID exists in bool async function exist() { const query = `SELECT objectID FROM ${index} @@ -114,7 +161,10 @@ const bigqueryIndex = async (payload, sparkContext) => { } async function insert(data) { - const { keys, values } = formatData(data); + const keys = Object.keys(data).join(","); + const values = Object.keys(data) + .map((key) => transformToSQLValue(data[key], fieldTypes[key])) + .join(","); const query = `INSERT INTO ${index} (objectID, ${keys}) VALUES ("${objectID}", ${values}) @@ -124,11 +174,10 @@ const bigqueryIndex = async (payload, sparkContext) => { console.log(res); } - async function update(data: dataType) { + async function update(data: Record) { const keys = Object.keys(data); - const values = Object.values(data); const dictValues = Array.from(Array(keys.length).keys()) - .map((i) => `${keys[i]}="${values[i].replace(/\"/g, '\\"')}"`) + .map((i) => `${keys[i]}=${transformToSQLValue(data[i], fieldTypes[i])}`) .join(","); const query = `UPDATE ${index} SET ${dictValues}