Skip to content

Commit 4633f88

Browse files
committed
fix(firestore-bigquery-changetracker): create table fix (#2060)
* fix(firestore-bigquery-export): initialization fix * chore(firestore-bigquery-export): bump changetracker version
1 parent 77976bb commit 4633f88

File tree

8 files changed

+317
-1850
lines changed

8 files changed

+317
-1850
lines changed

firestore-bigquery-export/firestore-bigquery-change-tracker/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"url": "github.com/firebase/extensions.git",
66
"directory": "firestore-bigquery-export/firestore-bigquery-change-tracker"
77
},
8-
"version": "1.1.32",
8+
"version": "1.1.33",
99
"description": "Core change-tracker library for Cloud Firestore Collection BigQuery Exports",
1010
"main": "./lib/index.js",
1111
"scripts": {

firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,31 @@ let dataset: Dataset;
2828
let table: Table;
2929
let view: Table;
3030
describe("e2e", () => {
31+
describe("initialization", () => {
32+
beforeEach(async () => {
33+
randomID = (Math.random() + 1).toString(36).substring(7);
34+
datasetId = `dataset_${randomID}`;
35+
tableId = `table_${randomID}`;
36+
tableId_raw = `${tableId}_raw_changelog`;
37+
dataset = bq.dataset(datasetId);
38+
});
39+
40+
afterEach(async () => {
41+
await deleteTable({
42+
datasetId,
43+
});
44+
});
45+
test("successfully creates a dataset and table", async () => {
46+
await changeTracker({
47+
datasetId,
48+
tableId,
49+
}).record([event]);
50+
51+
const [metadata] = await dataset.table(tableId_raw).getMetadata();
52+
53+
expect(metadata).toBeDefined();
54+
});
55+
});
3156
describe("Partitioning", () => {
3257
beforeEach(async () => {
3358
randomID = (Math.random() + 1).toString(36).substring(7);
@@ -108,13 +133,14 @@ describe("e2e", () => {
108133

109134
const [metadata] = await dataset.table(`${tableId_raw}`).getMetadata();
110135

136+
expect(metadata.timePartitioning).toBeDefined();
137+
111138
const [changeLogRows] = await getBigQueryTableData(
112139
process.env.PROJECT_ID,
113140
datasetId,
114141
tableId
115142
);
116143

117-
expect(metadata.timePartitioning).toBeDefined();
118144
expect(changeLogRows[0].created.value).toBe(
119145
BigQuery.timestamp(created.toDate()).value
120146
);
@@ -369,6 +395,10 @@ describe("e2e", () => {
369395
});
370396

371397
test("does not update an existing non partitioned table, that has a valid schema with timePartitioning only", async () => {
398+
const tableExists = await dataset.table(tableId_raw).exists();
399+
400+
expect(tableExists[0]).toBe(true);
401+
372402
await changeTracker({
373403
datasetId,
374404
tableId,
@@ -380,7 +410,7 @@ describe("e2e", () => {
380410
expect(metadata.timePartitioning).toBeUndefined();
381411

382412
expect(consoleLogSpyWarn).toBeCalledWith(
383-
`Cannot partition an existing table ${datasetId}_${tableId_raw}`
413+
`Did not add partitioning to schema: Partition field not provided`
384414
);
385415
expect(consoleLogSpy).toBeCalledWith(
386416
`BigQuery dataset already exists: ${datasetId}`

firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import {
4242
import { Partitioning } from "./partitioning";
4343
import { Clustering } from "./clustering";
4444
import { tableRequiresUpdate, viewRequiresUpdate } from "./checkUpdates";
45+
import { parseErrorMessage, waitForInitialization } from "./utils";
4546

4647
export { RawChangelogSchema, RawChangelogViewSchema } from "./schema";
4748

@@ -203,27 +204,10 @@ export class FirestoreBigQueryEventHistoryTracker
203204
* A half a second delay is added per check while the function
204205
* continually re-checks until the referenced dataset and table become available.
205206
*/
206-
private async waitForInitialization() {
207-
return new Promise((resolve) => {
208-
let handle = setInterval(async () => {
209-
try {
210-
const dataset = this.bigqueryDataset();
211-
const changelogName = this.rawChangeLogTableName();
212-
const table = dataset.table(changelogName);
213-
214-
const [datasetExists] = await dataset.exists();
215-
const [tableExists] = await table.exists();
216-
217-
if (datasetExists && tableExists) {
218-
clearInterval(handle);
219-
return resolve(table);
220-
}
221-
} catch (ex) {
222-
clearInterval(handle);
223-
logs.failedToInitializeWait(ex.message);
224-
}
225-
}, 5000);
226-
});
207+
private async _waitForInitialization() {
208+
const dataset = this.bigqueryDataset();
209+
const changelogName = this.rawChangeLogTableName();
210+
return waitForInitialization({ dataset, changelogName });
227211
}
228212

229213
/**
@@ -279,17 +263,39 @@ export class FirestoreBigQueryEventHistoryTracker
279263
if (this._initialized) {
280264
return;
281265
}
266+
try {
267+
await this.initializeDataset();
268+
} catch (error) {
269+
const message = parseErrorMessage(error, "initializing dataset");
270+
throw new Error(`Error initializing dataset: ${message}`);
271+
}
282272

283-
await this.initializeDataset();
284-
285-
await this.initializeRawChangeLogTable();
273+
try {
274+
await this.initializeRawChangeLogTable();
275+
} catch (error) {
276+
const message = parseErrorMessage(
277+
error,
278+
"initializing raw change log table"
279+
);
280+
throw new Error(`Error initializing raw change log table: ${message}`);
281+
}
286282

287-
await this.initializeLatestView();
283+
try {
284+
await this.initializeLatestView();
285+
} catch (error) {
286+
const message = parseErrorMessage(error, "initializing latest view");
287+
throw new Error(`Error initializing latest view: ${message}`);
288+
}
289+
await this._waitForInitialization();
288290

289291
this._initialized = true;
290-
} catch (ex) {
291-
await this.waitForInitialization();
292-
this._initialized = true;
292+
} catch (error) {
293+
const message = parseErrorMessage(
294+
error,
295+
"initializing BigQuery resources"
296+
);
297+
console.error("Error initializing BigQuery resources: ", message);
298+
throw error;
293299
}
294300
}
295301

@@ -396,7 +402,6 @@ export class FirestoreBigQueryEventHistoryTracker
396402
kmsKeyName: this.config.kmsKeyName,
397403
};
398404
}
399-
400405
//Add partitioning
401406
await partitioning.addPartitioningToSchema(schema.fields);
402407

firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/partitioning.ts

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import * as firebase from "firebase-admin";
44

55
import * as logs from "../logs";
66
import * as bigquery from "@google-cloud/bigquery";
7-
7+
import * as functions from "firebase-functions";
88
import { getNewPartitionField } from "./schema";
99
import { BigQuery, TableMetadata } from "@google-cloud/bigquery";
1010

@@ -135,18 +135,22 @@ export class Partitioning {
135135
}
136136

137137
private async isTablePartitioned() {
138+
const [tableExists] = await this.table.exists();
139+
140+
if (!this.table || !tableExists) return false;
141+
138142
/* Return true if partition metadata already exists */
139143
const [metadata] = await this.table.getMetadata();
140-
if (!!metadata.timePartitioning) {
144+
if (metadata.timePartitioning) {
141145
logs.cannotPartitionExistingTable(this.table);
142-
return Promise.resolve(true);
146+
return true;
143147
}
144148

145149
/** Find schema fields **/
146150
const schemaFields = await this.metaDataSchemaFields();
147151

148152
/** Return false if no schema exists */
149-
if (!schemaFields) return Promise.resolve(false);
153+
if (!schemaFields) return false;
150154

151155
/* Return false if time partition field not found */
152156
return schemaFields.some(
@@ -156,11 +160,11 @@ export class Partitioning {
156160

157161
async isValidPartitionForExistingTable(): Promise<boolean> {
158162
/** Return false if partition type option has not been set */
159-
if (!this.isPartitioningEnabled()) return Promise.resolve(false);
163+
if (!this.isPartitioningEnabled()) return false;
160164

161165
/* Return false if table is already partitioned */
162166
const isPartitioned = await this.isTablePartitioned();
163-
if (isPartitioned) return Promise.resolve(false);
167+
if (isPartitioned) return false;
164168

165169
return this.hasValidCustomPartitionConfig();
166170
}
@@ -242,44 +246,54 @@ export class Partitioning {
242246
return fields.map(($) => $.name).includes(timePartitioningField);
243247
}
244248

245-
async addPartitioningToSchema(fields = []): Promise<void> {
246-
/** Return if partition type option has not been set */
247-
if (!this.isPartitioningEnabled()) return;
248-
249-
/** Return if class has invalid table reference */
250-
if (!this.hasValidTableReference()) return;
251-
252-
/** Return if table is already partitioned **/
253-
if (await this.isTablePartitioned()) return;
254-
255-
/** Return if partition config is invalid */
256-
if (!this.hasValidCustomPartitionConfig()) return;
257-
258-
/** Return if an invalid partition type has been requested */
259-
if (!this.hasValidTimePartitionType()) return;
260-
261-
/** Return if an invalid partition option has been requested */
262-
if (!this.hasValidTimePartitionOption()) return;
263-
264-
/** Return if invalid partitioning and field type combination */
265-
if (this.hasHourAndDatePartitionConfig()) return;
266-
267-
/** Return if partition field has not been provided */
268-
if (!this.config.timePartitioningField) return;
269-
270-
/** Return if field already exists on schema */
271-
if (this.customFieldExists(fields)) return;
249+
private async shouldAddPartitioningToSchema(): Promise<{
250+
proceed: boolean;
251+
message: string;
252+
}> {
253+
if (!this.isPartitioningEnabled()) {
254+
return { proceed: false, message: "Partitioning not enabled" };
255+
}
256+
if (!this.hasValidTableReference()) {
257+
return { proceed: false, message: "Invalid table reference" };
258+
}
259+
if (!this.hasValidCustomPartitionConfig()) {
260+
return { proceed: false, message: "Invalid partition config" };
261+
}
262+
if (!this.hasValidTimePartitionType()) {
263+
return { proceed: false, message: "Invalid partition type" };
264+
}
265+
if (!this.hasValidTimePartitionOption()) {
266+
return { proceed: false, message: "Invalid partition option" };
267+
}
268+
if (this.hasHourAndDatePartitionConfig()) {
269+
return {
270+
proceed: false,
271+
message: "Invalid partitioning and field type combination",
272+
};
273+
}
274+
if (this.customFieldExists()) {
275+
return { proceed: false, message: "Field already exists on schema" };
276+
}
277+
if (await this.isTablePartitioned()) {
278+
return { proceed: false, message: "Table is already partitioned" };
279+
}
280+
if (!this.config.timePartitioningField) {
281+
return { proceed: false, message: "Partition field not provided" };
282+
}
283+
return { proceed: true, message: "" };
284+
}
272285

273-
/** Add new partitioning field **/
286+
async addPartitioningToSchema(fields = []): Promise<void> {
287+
const { proceed, message } = await this.shouldAddPartitioningToSchema();
288+
if (!proceed) {
289+
functions.logger.warn(`Did not add partitioning to schema: ${message}`);
290+
return;
291+
}
292+
// Add new partitioning field
274293
fields.push(getNewPartitionField(this.config));
275-
276-
/** Log successful addition of partition column */
277-
logs.addPartitionFieldColumn(
278-
this.table.id,
279-
this.config.timePartitioningField
294+
functions.logger.log(
295+
`Added new partition field: ${this.config.timePartitioningField} to table ID: ${this.table.id}`
280296
);
281-
282-
return;
283297
}
284298

285299
async updateTableMetadata(options: bigquery.TableMetadata): Promise<void> {

0 commit comments

Comments
 (0)