From 09d3faa9b84da1444e2bf624aa33642bbb6e02e1 Mon Sep 17 00:00:00 2001 From: Bobby Date: Mon, 3 May 2021 14:54:13 +1000 Subject: [PATCH] implement bigquery rate limit retry algorithm --- ft_build/sparksLib/bigqueryIndex.ts | 33 +++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/ft_build/sparksLib/bigqueryIndex.ts b/ft_build/sparksLib/bigqueryIndex.ts index dbbe6e8c..dfddb370 100644 --- a/ft_build/sparksLib/bigqueryIndex.ts +++ b/ft_build/sparksLib/bigqueryIndex.ts @@ -335,6 +335,33 @@ const bigqueryIndex = async (payload, sparkContext) => { console.log(res); } + // execute a query, if rate limited, sleep and try again until success + // ATTENTION: cloud function might timeout the function execution time at 60,000ms + const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + async function executeQuery(query, delayDepth = 1) { + try { + const res = await bigquery.query(query); + console.log(res); + } catch (error) { + if ( + error?.errors?.length === 1 && + error?.errors?.[0]?.reason === "rateLimitExceeded" + ) { + const delay = Math.round( + Math.floor(Math.random() * 3_000 * (delayDepth % 20) + 1000) + ); + console.log(`API error: rateLimitExceeded, try again in ${delay}ms`); + await sleep(delay); + await executeQuery(query, delayDepth + 1); + } else { + console.log(error?.errors ?? error); + } + } + if (delayDepth === 1) { + console.log("Query finished."); + } + } + async function update(data) { const values = Object.keys(data) .map((key) => `${key}=${transformToSQLValue(data[key], fieldTypes[key])}`) @@ -344,8 +371,7 @@ const bigqueryIndex = async (payload, sparkContext) => { WHERE objectID="${objectID}" ;`; console.log(query); - const res = await bigquery.query(query); - console.log(res); + await executeQuery(query); } async function insertOrUpdate(data) { @@ -362,8 +388,7 @@ const bigqueryIndex = async (payload, sparkContext) => { WHERE objectID="${objectID}" ;`; console.log(query); - const res = await bigquery.query(query); - console.log(res); + await executeQuery(query); } // preprocess before starting index logic