add firetable type support for bigquery spark

This commit is contained in:
Bobby
2021-04-30 01:46:30 +10:00
parent 764a4b513b
commit 2dfa1192fd

View File

@@ -75,6 +75,68 @@ const significantDifference = (fieldsToSync, change) => {
}, false);
};
const transformToSQLValue = (value: any, ftType: string) => {
if (value === null || value === undefined) {
return `null`;
}
const sanitise = (x: string) => x?.replace?.(/\"/g, '\\"') ?? "";
switch (ftType) {
case "SIMPLE_TEXT":
case "LONG_TEXT":
case "EMAIL":
case "PHONE_NUMBER":
case "CODE":
case "RICH_TEXT":
case "ID":
case "SINGLE_SELECT":
// STRING
return `"${sanitise(value)}"`;
case "JSON":
case "FILE":
case "IMAGE":
case "USER":
case "COLOR":
// STRING (json representation)
if (!value.length) {
return `null`;
}
return `"${sanitise(JSON.stringify(value))}"`;
case "MULTI_SELECT":
// STRING (array representation, comma separated)
return `"${sanitise(value.toString())}"`;
case "CHECK_BOX":
// BOOLEAN
return value ? `true` : `false`;
case "NUMBER":
case "PERCENTAGE":
case "RATING":
case "SLIDER":
// NUMERIC
return Number(value);
case "DATE":
case "DATE_TIME":
case "DURATION":
// TIMESTAMP
if (!value?.toDate) {
return `null`;
}
return `timestamp("${value.toDate()}")`;
case "LAST":
case "STATUS":
case "SUB_TABLE":
case "DOCUMENT_SELECT":
case "SERVICE_SELECT":
case "ACTION":
case "DERIVATIVE":
case "AGGREGATE":
default:
// unknown or meaningless to sync
return `null`;
}
};
const bigqueryIndex = async (payload, sparkContext) => {
const { row, objectID, index, fieldsToSync } = payload;
@@ -87,21 +149,6 @@ const bigqueryIndex = async (payload, sparkContext) => {
const { projectID } = await getSecret("algolia");
console.log(`projectID: ${projectID}, index: ${index}`);
// big query specifig helper functions
type dataType = Record<string, string>;
// return keys and values in SQL format
// keys: key1,key2,key3
// values: "val1","val2","val3"
function formatData(data: dataType) {
return {
keys: Object.keys(data).join(","),
values: Object.values(data)
.map((value) => `"${value.replace(/\"/g, '\\"')}"`)
.join(","),
};
}
// return if the objectID exists in bool
async function exist() {
const query = `SELECT objectID FROM ${index}
@@ -114,7 +161,10 @@ const bigqueryIndex = async (payload, sparkContext) => {
}
async function insert(data) {
const { keys, values } = formatData(data);
const keys = Object.keys(data).join(",");
const values = Object.keys(data)
.map((key) => transformToSQLValue(data[key], fieldTypes[key]))
.join(",");
const query = `INSERT INTO ${index}
(objectID, ${keys})
VALUES ("${objectID}", ${values})
@@ -124,11 +174,10 @@ const bigqueryIndex = async (payload, sparkContext) => {
console.log(res);
}
async function update(data: dataType) {
async function update(data: Record<string, string>) {
const keys = Object.keys(data);
const values = Object.values(data);
const dictValues = Array.from(Array(keys.length).keys())
.map((i) => `${keys[i]}="${values[i].replace(/\"/g, '\\"')}"`)
.map((i) => `${keys[i]}=${transformToSQLValue(data[i], fieldTypes[i])}`)
.join(",");
const query = `UPDATE ${index}
SET ${dictValues}