From e77ff5ad1034ca0c19b30f15f35a5548df66e401 Mon Sep 17 00:00:00 2001 From: Bobby Date: Fri, 30 Apr 2021 17:26:41 +1000 Subject: [PATCH] create dataset/table if not exist in bigquery index spark --- ft_build/sparksLib/bigqueryIndex.ts | 38 ++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/ft_build/sparksLib/bigqueryIndex.ts b/ft_build/sparksLib/bigqueryIndex.ts index 8a1aa01c..2cab97d2 100644 --- a/ft_build/sparksLib/bigqueryIndex.ts +++ b/ft_build/sparksLib/bigqueryIndex.ts @@ -142,7 +142,40 @@ const bigqueryIndex = async (payload, sparkContext) => { const bigquery = new BigQuery(); const { projectID } = await getSecret("algolia"); - console.log(`projectID: ${projectID}, index: ${index}`); + const tableFullName = `${projectID}.firetable.${index}`; + console.log( + `projectID: ${projectID}, index: ${index}, tableFullName: ${tableFullName}` + ); + + // create dataset with exact name "firetable" if not exists + async function preprocessDataset() { + const dataset = bigquery.dataset("firetable"); + const res = await dataset.exists(); + const exists = res[0]; + if (!exists) { + console.log("Dataset 'firetable' does not exist, creating dataset..."); + await dataset.create(); + console.log("Dataset 'firetable' created."); + } else { + console.log("Dataset 'firetable' exists."); + } + } + + async function preprocessTable() { + const dataset = bigquery.dataset("firetable"); + const table = dataset.table(index); + const res = await table.exists(); + const exists = res[0]; + if (!exists) { + console.log( + `Table '${index}' does not exist in dataset 'firetable', creating dataset...` + ); + await table.create(); + console.log(`Table '${index}' created in dataset 'firetable'.`); + } else { + console.log(`Table 'firetable' exists in 'firetable'.`); + } + } // return if the objectID exists in bool async function exist() { @@ -200,6 +233,9 @@ const bigqueryIndex = async (payload, sparkContext) => { console.log(res); } + // preprocess before starting index logic + await preprocessDataset(); + await preprocessTable(); switch (triggerType) { case "delete": await remove();