bigquery spark update

This commit is contained in:
Shams mosowi
2021-06-24 11:51:34 +10:00
parent 20a2cdf4bc
commit 7f957cb678

View File

@@ -163,10 +163,10 @@ const transformToSQLType = (ftType: string) => {
};
const bigqueryIndex = async (payload, sparkContext) => {
const { row, objectID, index, fieldsToSync, projectID } = payload;
const { objectID, index, fieldsToSync, projectID } = payload;
const { triggerType, change, fieldTypes } = sparkContext;
const record = rowReducer(fieldsToSync, row);
const record = rowReducer(fieldsToSync, sparkContext.row);
const { BigQuery } = require("@google-cloud/bigquery");
const bigquery = new BigQuery();
@@ -396,7 +396,7 @@ const bigqueryIndex = async (payload, sparkContext) => {
await preprocessSchema();
// only proceed with fields that have known types
const typeKnownReccord = getTypeKnownRecord(record);
const typeKnownRecord = getTypeKnownRecord(record);
switch (triggerType) {
case "delete":
@@ -406,13 +406,13 @@ const bigqueryIndex = async (payload, sparkContext) => {
if (
significantDifference([...fieldsToSync, "_ft_forcedUpdateAt"], change)
) {
await insertOrUpdate(typeKnownReccord);
await insertOrUpdate(typeKnownRecord);
} else {
console.log("significantDifference is false, no update needed.");
}
break;
case "create":
await insertOrUpdate(typeKnownReccord);
await insertOrUpdate(typeKnownRecord);
break;
default:
break;