Skip to content

Commit 1f653eb

Browse files
irjudsonclaude
andcommitted
Add custom SQL query support and proxy configuration
Implements two new features for enhanced BigQuery sync capabilities: 1. Custom SQL Query Support (Issue #24) - Add optional customQuery field to table configuration - Support @lastTimestamp variable substitution for incremental sync - Automatically wrap custom queries with partitioning logic (MOD-based) - Enable JOINs, transformations, and complex WHERE clauses - Maintain distributed workload partitioning across cluster 2. Proxy Configuration (Issue #23) - Add optional proxy configuration at bigquery level - Support HTTP/HTTPS proxies with authentication - Configure via proxy.enabled and proxy.url fields - Plugin-level configuration (does not affect Harper replication) - Uses environment variables respected by BigQuery SDK All existing tests pass (91/91) plus 18 new tests for these features. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 9148276 commit 1f653eb

File tree

6 files changed

+636
-45
lines changed

6 files changed

+636
-45
lines changed

config.yaml

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,15 @@ bigquery:
3030
credentials: service-account-key.json
3131
location: US
3232

33+
# Optional: Proxy configuration (applies to all BigQuery API calls)
34+
# Uncomment to enable proxy for corporate networks
35+
# proxy:
36+
# enabled: false
37+
# url: "http://proxy.vz.com:8080"
38+
3339
# Define tables to sync (each syncs independently)
3440
tables:
35-
# High-frequency vessel position tracking
41+
# Example 1: Standard table sync with column selection
3642
- id: vessel_positions
3743
dataset: maritime_tracking
3844
table: vessel_positions
@@ -44,7 +50,7 @@ bigquery:
4450
catchupBatchSize: 1000
4551
steadyBatchSize: 500
4652

47-
# Port arrival/departure events
53+
# Example 2: Standard table sync with all columns
4854
- id: port_events
4955
dataset: maritime_tracking
5056
table: port_events
@@ -58,7 +64,7 @@ bigquery:
5864
# Optional: Enable streaming inserts for lower latency (has cost implications)
5965
# useStreamingAPIs: true
6066

61-
# Vessel metadata updates
67+
# Example 3: Standard table sync with wildcard columns
6268
- id: vessel_metadata
6369
dataset: maritime_tracking
6470
table: vessel_metadata
@@ -70,6 +76,37 @@ bigquery:
7076
catchupBatchSize: 100
7177
steadyBatchSize: 10
7278

79+
# Example 4: Custom SQL query with JOINs and transformations
80+
# Uncomment to use custom query instead of standard table sync
81+
# - id: enriched_vessel_events
82+
# customQuery: |
83+
# SELECT
84+
# e.event_time as timestamp,
85+
# e.vessel_mmsi,
86+
# v.vessel_name,
87+
# v.vessel_type,
88+
# e.event_type,
89+
# e.port_id,
90+
# p.port_name,
91+
# CONCAT(p.port_name, ', ', p.country) as location,
92+
# e.latitude,
93+
# e.longitude
94+
# FROM `maritime_tracking.port_events` e
95+
# LEFT JOIN `maritime_tracking.vessel_metadata` v ON e.vessel_mmsi = v.mmsi
96+
# LEFT JOIN `maritime_tracking.ports` p ON e.port_id = p.id
97+
# WHERE e.event_time > TIMESTAMP(@lastTimestamp)
98+
# AND e.event_type IN ('ARRIVAL', 'DEPARTURE')
99+
# timestampColumn: timestamp
100+
# targetTable: EnrichedVesselEvents
101+
# sync:
102+
# initialBatchSize: 5000
103+
# catchupBatchSize: 500
104+
# steadyBatchSize: 100
105+
#
106+
# Note: customQuery must include @lastTimestamp variable for incremental sync
107+
# The query will be automatically wrapped with partitioning, ORDER BY, and LIMIT
108+
# Do NOT include ORDER BY or LIMIT in your custom query
109+
73110
# BigQuery Insert Method Configuration
74111
# Two options available (configurable per table):
75112
#
@@ -89,6 +126,54 @@ bigquery:
89126
# - id: port_events
90127
# useStreamingAPIs: true # Real-time events benefit from low latency
91128

129+
# Custom SQL Query Configuration
130+
# You can use custom SQL queries instead of standard table sync for advanced use cases:
131+
#
132+
# Use cases:
133+
# - JOIN multiple BigQuery tables
134+
# - Apply transformations (CONCAT, CAST, etc.)
135+
# - Filter data with complex WHERE conditions
136+
# - Aggregate or compute derived fields
137+
#
138+
# Requirements:
139+
# - Must include @lastTimestamp variable in WHERE clause for incremental sync
140+
# - timestampColumn must still be specified (refers to column in query RESULTS)
141+
# - Do NOT include ORDER BY or LIMIT (added automatically with partitioning)
142+
#
143+
# Example:
144+
# - id: my_custom_sync
145+
# customQuery: |
146+
# SELECT
147+
# t1.timestamp,
148+
# t1.id,
149+
# t2.name,
150+
# CONCAT(t1.city, ', ', t1.country) as location
151+
# FROM `dataset.table1` t1
152+
# LEFT JOIN `dataset.table2` t2 ON t1.id = t2.id
153+
# WHERE t1.timestamp > TIMESTAMP(@lastTimestamp)
154+
# AND t1.status = 'active'
155+
# timestampColumn: timestamp
156+
# targetTable: MyCustomData
157+
#
158+
# The plugin automatically wraps your query with:
159+
# - Distributed partitioning (MOD-based sharding)
160+
# - ORDER BY timestampColumn ASC
161+
# - LIMIT for batch control
162+
163+
# Proxy Configuration
164+
# Configure HTTP/HTTPS proxy for BigQuery API calls in corporate networks:
165+
#
166+
# proxy:
167+
# enabled: true
168+
# url: "http://proxy.company.com:8080"
169+
#
170+
# Supported formats:
171+
# - HTTP proxy: "http://proxy.company.com:8080"
172+
# - HTTPS proxy: "https://proxy.company.com:8443"
173+
# - Authenticated: "http://username:password@proxy.company.com:8080"
174+
#
175+
# Note: Proxy applies to ALL BigQuery API calls but does NOT affect Harper replication
176+
92177
# Maritime vessel data synthesizer configuration
93178
# When bigquery.tables is present, synthesizer uses multi-table orchestrator
94179
# to generate data for ALL tables (vessel_positions, port_events, vessel_metadata).

src/bigquery-client.js

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,44 +15,90 @@ export class BigQueryClient {
1515
* @param {Object} config - Configuration object
1616
* @param {Object} config.bigquery - BigQuery configuration
1717
* @param {string} config.bigquery.projectId - GCP project ID
18-
* @param {string} config.bigquery.dataset - BigQuery dataset name
19-
* @param {string} config.bigquery.table - BigQuery table name
18+
* @param {string} config.bigquery.dataset - BigQuery dataset name (optional if customQuery)
19+
* @param {string} config.bigquery.table - BigQuery table name (optional if customQuery)
2020
* @param {string} config.bigquery.timestampColumn - Timestamp column name
2121
* @param {string} config.bigquery.credentials - Path to credentials file
2222
* @param {string} config.bigquery.location - BigQuery location (e.g., 'US', 'EU')
2323
* @param {Array<string>} config.bigquery.columns - Columns to select (defaults to ['*'])
24+
* @param {string} config.bigquery.customQuery - Optional custom SQL query
25+
* @param {Object} config.bigquery.proxy - Optional proxy configuration
2426
*/
2527
constructor(config) {
2628
logger.info('[BigQueryClient] Constructor called - initializing BigQuery client');
2729
logger.debug(
28-
`[BigQueryClient] Config - projectId: ${config.bigquery.projectId}, dataset: ${config.bigquery.dataset}, table: ${config.bigquery.table}, location: ${config.bigquery.location}`
30+
`[BigQueryClient] Config - projectId: ${config.bigquery.projectId}, ` +
31+
`${config.bigquery.customQuery ? 'customQuery mode' : `dataset: ${config.bigquery.dataset}, table: ${config.bigquery.table}`}, ` +
32+
`location: ${config.bigquery.location}`
2933
);
3034

3135
this.config = config;
32-
this.client = new BigQuery({
36+
37+
// Configure BigQuery client options
38+
const clientOptions = {
3339
projectId: config.bigquery.projectId,
3440
keyFilename: config.bigquery.credentials,
3541
location: config.bigquery.location,
36-
});
42+
};
43+
44+
// Add proxy configuration if enabled
45+
// BigQuery client uses gaxios/google-auth-library which respects HTTP_PROXY/HTTPS_PROXY
46+
// These environment variables only affect BigQuery API calls, not Harper's replication
47+
if (config.bigquery.proxy?.enabled) {
48+
const proxyUrl = config.bigquery.proxy.url;
49+
logger.info(`[BigQueryClient] Configuring proxy: ${proxyUrl}`);
50+
51+
// Set proxy environment variables for BigQuery client
52+
// The google-cloud libraries (gaxios/google-auth-library) respect these variables
53+
// This is the standard Node.js approach for proxy configuration
54+
if (!process.env.HTTP_PROXY) {
55+
process.env.HTTP_PROXY = proxyUrl;
56+
}
57+
if (!process.env.HTTPS_PROXY) {
58+
process.env.HTTPS_PROXY = proxyUrl;
59+
}
60+
if (!process.env.http_proxy) {
61+
process.env.http_proxy = proxyUrl;
62+
}
63+
if (!process.env.https_proxy) {
64+
process.env.https_proxy = proxyUrl;
65+
}
66+
67+
logger.info(
68+
`[BigQueryClient] Proxy configured for BigQuery API calls: ${proxyUrl}. ` +
69+
`This affects only BigQuery SDK requests, not Harper replication.`
70+
);
71+
}
72+
73+
this.client = new BigQuery(clientOptions);
3774

3875
this.dataset = config.bigquery.dataset;
3976
this.table = config.bigquery.table;
4077
this.timestampColumn = config.bigquery.timestampColumn;
4178
this.columns = config.bigquery.columns || ['*'];
79+
this.customQuery = config.bigquery.customQuery;
4280

4381
// Retry configuration with exponential backoff
4482
this.maxRetries = config.bigquery.maxRetries || 5;
4583
this.initialRetryDelay = config.bigquery.initialRetryDelay || 1000; // 1 second
4684

47-
// Initialize query builder with column selection
85+
// Initialize query builder with column selection or custom query
4886
this.queryBuilder = new QueryBuilder({
4987
dataset: this.dataset,
5088
table: this.table,
5189
timestampColumn: this.timestampColumn,
5290
columns: this.columns,
91+
customQuery: this.customQuery,
5392
});
5493

55-
logger.info(`[BigQueryClient] Client initialized successfully with columns: ${this.queryBuilder.getColumnList()}`);
94+
if (this.customQuery) {
95+
logger.info(`[BigQueryClient] Client initialized with custom query, timestamp column: ${this.timestampColumn}`);
96+
} else {
97+
logger.info(
98+
`[BigQueryClient] Client initialized successfully with columns: ${this.queryBuilder.getColumnList()}`
99+
);
100+
}
101+
56102
logger.info(
57103
`[BigQueryClient] Retry configuration - maxRetries: ${this.maxRetries}, initialDelay: ${this.initialRetryDelay}ms`
58104
);

src/config-loader.js

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import { readFileSync } from 'fs';
77
import { parse } from 'yaml';
88
import { fileURLToPath } from 'url';
99
import { dirname, join } from 'path';
10-
import { validateFullConfig as _validateFullConfig, validateAndNormalizeColumns } from './validators.js';
10+
import {
11+
validateFullConfig as _validateFullConfig,
12+
validateAndNormalizeColumns,
13+
validateProxyConfig,
14+
validateCustomQuery,
15+
} from './validators.js';
1116

1217
const __filename = fileURLToPath(import.meta.url);
1318
const __dirname = dirname(__filename);
@@ -126,6 +131,7 @@ function normalizeConfig(config) {
126131
projectId: legacyBigQueryConfig.projectId,
127132
credentials: legacyBigQueryConfig.credentials,
128133
location: legacyBigQueryConfig.location,
134+
proxy: legacyBigQueryConfig.proxy, // Preserve proxy config if present
129135
tables: [tableConfig],
130136
},
131137
sync: {
@@ -167,23 +173,44 @@ function validateMultiTableConfig(config) {
167173
log.error('[ConfigLoader.validateMultiTableConfig] Missing required field: table.id');
168174
throw new Error('Missing required field: table.id');
169175
}
170-
if (!table.dataset) {
171-
log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'dataset' for table: ${table.id}`);
172-
throw new Error(`Missing required field 'dataset' for table: ${table.id}`);
173-
}
174-
if (!table.table) {
175-
log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'table' for table: ${table.id}`);
176-
throw new Error(`Missing required field 'table' for table: ${table.id}`);
177-
}
176+
177+
// timestampColumn is always required (for checkpoint tracking)
178178
if (!table.timestampColumn) {
179179
log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'timestampColumn' for table: ${table.id}`);
180180
throw new Error(`Missing required field 'timestampColumn' for table: ${table.id}`);
181181
}
182+
182183
if (!table.targetTable) {
183184
log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'targetTable' for table: ${table.id}`);
184185
throw new Error(`Missing required field 'targetTable' for table: ${table.id}`);
185186
}
186187

188+
// Check if using customQuery or standard table config
189+
if (table.customQuery) {
190+
// Custom query mode - validate the query
191+
validateCustomQuery(table.customQuery, table.timestampColumn);
192+
log.debug(`[ConfigLoader.validateMultiTableConfig] Table ${table.id} uses customQuery`);
193+
} else {
194+
// Standard mode - dataset, table, columns required
195+
if (!table.dataset) {
196+
log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'dataset' for table: ${table.id}`);
197+
throw new Error(
198+
`Missing required field 'dataset' for table: ${table.id}. ` +
199+
`Either provide dataset/table/columns OR use customQuery.`
200+
);
201+
}
202+
if (!table.table) {
203+
log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'table' for table: ${table.id}`);
204+
throw new Error(
205+
`Missing required field 'table' for table: ${table.id}. ` +
206+
`Either provide dataset/table/columns OR use customQuery.`
207+
);
208+
}
209+
log.debug(
210+
`[ConfigLoader.validateMultiTableConfig] Table ${table.id} uses standard sync (${table.dataset}.${table.table})`
211+
);
212+
}
213+
187214
// Check for duplicate IDs
188215
if (tableIds.has(table.id)) {
189216
log.error(`[ConfigLoader.validateMultiTableConfig] Duplicate table ID: ${table.id}`);
@@ -292,9 +319,20 @@ export function getPluginConfig(config = null) {
292319
}
293320

294321
// Config is already normalized to multi-table format by loadConfig
322+
// Validate proxy config if present
323+
if (fullConfig.bigquery.proxy) {
324+
validateProxyConfig(fullConfig.bigquery.proxy);
325+
}
326+
295327
// Validate and normalize columns for each table
296328
const tablesWithNormalizedColumns = fullConfig.bigquery.tables.map((table) => {
297-
const normalizedColumns = validateAndNormalizeColumns(table.columns, table.timestampColumn);
329+
// Validate custom query if present
330+
if (table.customQuery) {
331+
validateCustomQuery(table.customQuery, table.timestampColumn);
332+
}
333+
334+
const hasCustomQuery = !!table.customQuery;
335+
const normalizedColumns = validateAndNormalizeColumns(table.columns, table.timestampColumn, hasCustomQuery);
298336

299337
return {
300338
...table,
@@ -307,6 +345,7 @@ export function getPluginConfig(config = null) {
307345
projectId: fullConfig.bigquery.projectId,
308346
credentials: fullConfig.bigquery.credentials,
309347
location: fullConfig.bigquery.location || 'US',
348+
proxy: fullConfig.bigquery.proxy, // Pass through proxy config
310349
tables: tablesWithNormalizedColumns,
311350
},
312351
sync: fullConfig.sync,
@@ -336,8 +375,10 @@ export function getTableConfig(tableId, config = null) {
336375
table: tableConfig.table,
337376
timestampColumn: tableConfig.timestampColumn,
338377
columns: tableConfig.columns,
378+
customQuery: tableConfig.customQuery, // Include custom query if present
339379
credentials: fullConfig.bigquery.credentials,
340380
location: fullConfig.bigquery.location,
381+
proxy: fullConfig.bigquery.proxy, // Include proxy config
341382
},
342383
sync: {
343384
...fullConfig.sync,

0 commit comments

Comments
 (0)