implement bigquery rate limit retry algorithm

This commit is contained in:
Bobby
2021-05-03 14:54:13 +10:00
parent ebaa6d73c9
commit 09d3faa9b8

View File

@@ -335,6 +335,33 @@ const bigqueryIndex = async (payload, sparkContext) => {
console.log(res);
}
// execute a query, if rate limited, sleep and try again until success
// ATTENTION: cloud function might timeout the function execution time at 60,000ms
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
async function executeQuery(query, delayDepth = 1) {
try {
const res = await bigquery.query(query);
console.log(res);
} catch (error) {
if (
error?.errors?.length === 1 &&
error?.errors?.[0]?.reason === "rateLimitExceeded"
) {
const delay = Math.round(
Math.floor(Math.random() * 3_000 * (delayDepth % 20) + 1000)
);
console.log(`API error: rateLimitExceeded, try again in ${delay}ms`);
await sleep(delay);
await executeQuery(query, delayDepth + 1);
} else {
console.log(error?.errors ?? error);
}
}
if (delayDepth === 1) {
console.log("Query finished.");
}
}
async function update(data) {
const values = Object.keys(data)
.map((key) => `${key}=${transformToSQLValue(data[key], fieldTypes[key])}`)
@@ -344,8 +371,7 @@ const bigqueryIndex = async (payload, sparkContext) => {
WHERE objectID="${objectID}"
;`;
console.log(query);
const res = await bigquery.query(query);
console.log(res);
await executeQuery(query);
}
async function insertOrUpdate(data) {
@@ -362,8 +388,7 @@ const bigqueryIndex = async (payload, sparkContext) => {
WHERE objectID="${objectID}"
;`;
console.log(query);
const res = await bigquery.query(query);
console.log(res);
await executeQuery(query);
}
// preprocess before starting index logic