Skip to content
Draft
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9b4feb6
feat(replication): Add support for relation changes
iambriccardo Sep 29, 2025
077d081
Improve
iambriccardo Sep 30, 2025
c42451a
Improve
iambriccardo Sep 30, 2025
ea6087f
Improve
iambriccardo Sep 30, 2025
0e2bd9d
Improve
iambriccardo Sep 30, 2025
d4701be
Improve
iambriccardo Sep 30, 2025
14c364f
Improve
iambriccardo Sep 30, 2025
ecfbb09
Improve
iambriccardo Sep 30, 2025
15ffe5e
Improve
iambriccardo Sep 30, 2025
e2ef40c
Improve
iambriccardo Sep 30, 2025
4d4177f
Improve
iambriccardo Sep 30, 2025
adf0f15
Improve
iambriccardo Sep 30, 2025
7d6fd10
Improve
iambriccardo Sep 30, 2025
8505998
Improve
iambriccardo Oct 1, 2025
791257b
Improve
iambriccardo Oct 1, 2025
0d079e0
Improve
iambriccardo Oct 1, 2025
8cec274
Improve
iambriccardo Oct 1, 2025
3871267
Improve
iambriccardo Oct 1, 2025
3aeab78
Improve
iambriccardo Oct 1, 2025
6912b1c
Improve
iambriccardo Oct 1, 2025
76e4a5c
Improve
iambriccardo Oct 1, 2025
431c10f
Improve
iambriccardo Oct 1, 2025
f21a74f
Improve
iambriccardo Oct 1, 2025
9bf2a34
Improve
iambriccardo Oct 1, 2025
f642630
Improve
iambriccardo Oct 1, 2025
88c8d00
Improve
iambriccardo Oct 2, 2025
6bee409
Improve
iambriccardo Oct 2, 2025
97d5c9b
Improve
iambriccardo Oct 2, 2025
dd47160
Improve
iambriccardo Oct 2, 2025
bfd08b3
Improve
iambriccardo Oct 2, 2025
fff89cd
Improve
iambriccardo Oct 6, 2025
172ddc7
Improve
iambriccardo Oct 6, 2025
794f148
Improve
iambriccardo Oct 6, 2025
551bbca
Merge
iambriccardo Oct 6, 2025
2873396
Improve
iambriccardo Oct 6, 2025
a400b73
Improve
iambriccardo Oct 6, 2025
8426402
Improve
iambriccardo Oct 6, 2025
5c8f4db
Improve
iambriccardo Oct 6, 2025
a837b89
Improve
iambriccardo Oct 6, 2025
b7418b8
Improve
iambriccardo Oct 6, 2025
8eea075
Improve
iambriccardo Oct 6, 2025
c63e2f8
Improve
iambriccardo Oct 6, 2025
92cf8e8
Improve
iambriccardo Oct 6, 2025
3f0a917
Improve
iambriccardo Oct 6, 2025
dae764d
Improve
iambriccardo Oct 6, 2025
5ecac84
Improve
iambriccardo Oct 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions etl-api/tests/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,11 +1209,11 @@ async fn deleting_pipeline_removes_table_schemas_from_source_database() {

// Insert table schemas using production schema
let table_schema_id_1 = sqlx::query_scalar::<_, i64>(
"INSERT INTO etl.table_schemas (pipeline_id, table_id, schema_name, table_name) VALUES ($1, $2, 'public', 'test_users') RETURNING id"
"INSERT INTO etl.table_schemas (pipeline_id, table_id, schema_name, table_name, schema_version) VALUES ($1, $2, 'public', 'test_users', 0) RETURNING id"
).bind(pipeline_id).bind(table1_oid).fetch_one(&source_db_pool).await.unwrap();

let table_schema_id_2 = sqlx::query_scalar::<_, i64>(
"INSERT INTO etl.table_schemas (pipeline_id, table_id, schema_name, table_name) VALUES ($1, $2, 'public', 'test_orders') RETURNING id"
"INSERT INTO etl.table_schemas (pipeline_id, table_id, schema_name, table_name, schema_version) VALUES ($1, $2, 'public', 'test_orders', 0) RETURNING id"
).bind(pipeline_id).bind(table2_oid).fetch_one(&source_db_pool).await.unwrap();

// Insert multiple columns for each table to test CASCADE behavior
Expand Down
1 change: 1 addition & 0 deletions etl-destinations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ uuid = { workspace = true, optional = true, features = ["v4"] }

[dev-dependencies]
etl = { workspace = true, features = ["test-utils"] }
etl-postgres = { workspace = true, features = ["test-utils", "sqlx"] }
etl-telemetry = { workspace = true }

base64 = { workspace = true }
Expand Down
224 changes: 180 additions & 44 deletions etl-destinations/src/bigquery/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ pub struct BigQueryClient {
}

impl BigQueryClient {
/// Quotes a single BigQuery identifier, escaping embedded backticks.
fn quote_identifier(identifier: &str) -> String {
format!("`{}`", identifier.replace('`', "``"))
}

/// Quotes a dotted BigQuery path (e.g. project.dataset.table), escaping each segment.
fn quote_qualified_identifiers(parts: &[&str]) -> String {
let escaped = parts
.iter()
.map(|part| part.replace('`', "``"))
.collect::<Vec<_>>()
.join(".");

format!("`{escaped}`")
}

/// Creates a new [`BigQueryClient`] from a service account key file.
///
/// Authenticates with BigQuery using the service account key at the specified file path.
Expand Down Expand Up @@ -106,7 +122,11 @@ impl BigQueryClient {
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
) -> String {
format!("`{}.{}.{}`", self.project_id, dataset_id, table_id)
Self::quote_qualified_identifiers(&[
&self.project_id,
dataset_id.as_str(),
table_id.as_str(),
])
}

/// Creates a table in BigQuery if it doesn't already exist, otherwise efficiently truncates
Expand Down Expand Up @@ -259,6 +279,129 @@ impl BigQueryClient {
Ok(())
}

/// Adds a column to an existing BigQuery table if it does not already exist.
pub async fn add_column(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
column_schema: &ColumnSchema,
) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);
let column_definition = Self::column_spec(column_schema);
let query =
format!("alter table {full_table_name} add column if not exists {column_definition}");

let _ = self.query(QueryRequest::new(query)).await?;

Ok(())
}

/// Drops a column from an existing BigQuery table if it exists.
pub async fn drop_column(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
column_name: &str,
) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);
let column_identifier = Self::quote_identifier(column_name);
let query =
format!("alter table {full_table_name} drop column if exists {column_identifier}");

let _ = self.query(QueryRequest::new(query)).await?;

Ok(())
}

/// Alters the data type of the existing column in a BigQuery table.
pub async fn alter_column_type(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
column_schema: &ColumnSchema,
) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);
let column_identifier = Self::quote_identifier(&column_schema.name);
let column_type = Self::postgres_to_bigquery_type(&column_schema.typ);
let query = format!(
"alter table {full_table_name} alter column {column_identifier} set data type {column_type}"
);

let _ = self.query(QueryRequest::new(query)).await?;

Ok(())
}

/// Synchronizes the primary key definition for a BigQuery table with the provided schema.
pub async fn sync_primary_key(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
column_schemas: &[ColumnSchema],
) -> EtlResult<()> {
let primary_columns: Vec<&ColumnSchema> = column_schemas
.iter()
.filter(|column| column.primary)
.collect();

let has_primary_key = self.has_primary_key(dataset_id, table_id).await?;
if has_primary_key {
self.drop_primary_key(dataset_id, table_id).await?;
}

if primary_columns.is_empty() {
return Ok(());
}

let columns = primary_columns
.iter()
.map(|column| Self::quote_identifier(&column.name))
.collect::<Vec<_>>()
.join(",");

let full_table_name = self.full_table_name(dataset_id, table_id);
let query =
format!("alter table {full_table_name} add primary key ({columns}) not enforced");

let _ = self.query(QueryRequest::new(query)).await?;

Ok(())
}

async fn has_primary_key(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
) -> EtlResult<bool> {
let info_schema_table = Self::quote_qualified_identifiers(&[
&self.project_id,
dataset_id.as_str(),
"INFORMATION_SCHEMA",
"TABLE_CONSTRAINTS",
]);
let table_literal = table_id;
let query = format!(
"select constraint_name from {info_schema_table} where table_name = '{table_literal}' and constraint_type = 'PRIMARY KEY'",
);

let result_set = self.query(QueryRequest::new(query)).await?;

Ok(result_set.row_count() > 0)
}

async fn drop_primary_key(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);
let query = format!("alter table {full_table_name} drop primary key");

let _ = self.query(QueryRequest::new(query)).await?;

Ok(())
}

/// Checks whether a table exists in the BigQuery dataset.
///
/// Returns `true` if the table exists, `false` otherwise.
Expand Down Expand Up @@ -401,8 +544,8 @@ impl BigQueryClient {
/// Generates SQL column specification for CREATE TABLE statements.
fn column_spec(column_schema: &ColumnSchema) -> String {
let mut column_spec = format!(
"`{}` {}",
column_schema.name,
"{} {}",
Self::quote_identifier(&column_schema.name),
Self::postgres_to_bigquery_type(&column_schema.typ)
);

Expand All @@ -420,7 +563,7 @@ impl BigQueryClient {
let identity_columns: Vec<String> = column_schemas
.iter()
.filter(|s| s.primary)
.map(|c| format!("`{}`", c.name))
.map(|c| Self::quote_identifier(&c.name))
.collect();

if identity_columns.is_empty() {
Expand Down Expand Up @@ -498,7 +641,7 @@ impl BigQueryClient {
/// Converts Postgres column schemas to a BigQuery [`TableDescriptor`].
///
/// Maps data types and nullability to BigQuery column specifications, setting
/// appropriate column modes and automatically adding CDC special columns.
/// appropriate column modes, and automatically adding CDC special columns.
pub fn column_schemas_to_table_descriptor(
column_schemas: &[ColumnSchema],
use_cdc_sequence_column: bool,
Expand Down Expand Up @@ -773,33 +916,32 @@ mod tests {

#[test]
fn test_column_spec() {
let column_schema = ColumnSchema::new("test_col".to_string(), Type::TEXT, -1, true, false);
let column_schema = ColumnSchema::new("test_col".to_string(), Type::TEXT, -1, false);
let spec = BigQueryClient::column_spec(&column_schema);
assert_eq!(spec, "`test_col` string");

let not_null_column = ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true);
let not_null_column = ColumnSchema::new("id".to_string(), Type::INT4, -1, true);
let not_null_spec = BigQueryClient::column_spec(&not_null_column);
assert_eq!(not_null_spec, "`id` int64 not null");
assert_eq!(not_null_spec, "`id` int64");

let array_column =
ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false, false);
let array_column = ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false);
let array_spec = BigQueryClient::column_spec(&array_column);
assert_eq!(array_spec, "`tags` array<string>");
}

#[test]
fn test_add_primary_key_clause() {
let columns_with_pk = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("id".to_string(), Type::INT4, -1, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, false),
];
let pk_clause = BigQueryClient::add_primary_key_clause(&columns_with_pk);
assert_eq!(pk_clause, ", primary key (`id`) not enforced");

let columns_with_composite_pk = vec![
ColumnSchema::new("tenant_id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("tenant_id".to_string(), Type::INT4, -1, true),
ColumnSchema::new("id".to_string(), Type::INT4, -1, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, false),
];
let composite_pk_clause =
BigQueryClient::add_primary_key_clause(&columns_with_composite_pk);
Expand All @@ -809,8 +951,8 @@ mod tests {
);

let columns_no_pk = vec![
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("age".to_string(), Type::INT4, -1, true, false),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, false),
ColumnSchema::new("age".to_string(), Type::INT4, -1, false),
];
let no_pk_clause = BigQueryClient::add_primary_key_clause(&columns_no_pk);
assert_eq!(no_pk_clause, "");
Expand All @@ -819,14 +961,14 @@ mod tests {
#[test]
fn test_create_columns_spec() {
let columns = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false),
ColumnSchema::new("id".to_string(), Type::INT4, -1, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, false),
ColumnSchema::new("active".to_string(), Type::BOOL, -1, false),
];
let spec = BigQueryClient::create_columns_spec(&columns);
assert_eq!(
spec,
"(`id` int64 not null,`name` string,`active` bool not null, primary key (`id`) not enforced)"
"(`id` int64,`name` string,`active` bool, primary key (`id`) not enforced)"
);
}

Expand All @@ -839,10 +981,10 @@ mod tests {
#[test]
fn test_column_schemas_to_table_descriptor() {
let columns = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false),
ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false, false),
ColumnSchema::new("id".to_string(), Type::INT4, -1, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, false),
ColumnSchema::new("active".to_string(), Type::BOOL, -1, false),
ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false),
];

let descriptor = BigQueryClient::column_schemas_to_table_descriptor(&columns, true);
Expand All @@ -857,7 +999,7 @@ mod tests {
));
assert!(matches!(
descriptor.field_descriptors[0].mode,
ColumnMode::Required
ColumnMode::Nullable
));

assert_eq!(descriptor.field_descriptors[1].name, "name");
Expand All @@ -877,7 +1019,7 @@ mod tests {
));
assert!(matches!(
descriptor.field_descriptors[2].mode,
ColumnMode::Required
ColumnMode::Nullable
));

// Check array column
Expand Down Expand Up @@ -922,12 +1064,12 @@ mod tests {
#[test]
fn test_column_schemas_to_table_descriptor_complex_types() {
let columns = vec![
ColumnSchema::new("uuid_col".to_string(), Type::UUID, -1, true, false),
ColumnSchema::new("json_col".to_string(), Type::JSON, -1, true, false),
ColumnSchema::new("bytea_col".to_string(), Type::BYTEA, -1, true, false),
ColumnSchema::new("numeric_col".to_string(), Type::NUMERIC, -1, true, false),
ColumnSchema::new("date_col".to_string(), Type::DATE, -1, true, false),
ColumnSchema::new("time_col".to_string(), Type::TIME, -1, true, false),
ColumnSchema::new("uuid_col".to_string(), Type::UUID, -1, false),
ColumnSchema::new("json_col".to_string(), Type::JSON, -1, false),
ColumnSchema::new("bytea_col".to_string(), Type::BYTEA, -1, false),
ColumnSchema::new("numeric_col".to_string(), Type::NUMERIC, -1, false),
ColumnSchema::new("date_col".to_string(), Type::DATE, -1, false),
ColumnSchema::new("time_col".to_string(), Type::TIME, -1, false),
];

let descriptor = BigQueryClient::column_schemas_to_table_descriptor(&columns, true);
Expand Down Expand Up @@ -979,16 +1121,16 @@ mod tests {
let table_id = "test_table";

let columns = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("id".to_string(), Type::INT4, -1, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, false),
];

// Simulate the query generation logic
let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`");
let columns_spec = BigQueryClient::create_columns_spec(&columns);
let query = format!("create or replace table {full_table_name} {columns_spec}");

let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null,`name` string, primary key (`id`) not enforced)";
let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64,`name` string, primary key (`id`) not enforced)";
assert_eq!(query, expected_query);
}

Expand All @@ -999,13 +1141,7 @@ mod tests {
let table_id = "test_table";
let max_staleness_mins = 15;

let columns = vec![ColumnSchema::new(
"id".to_string(),
Type::INT4,
-1,
false,
true,
)];
let columns = vec![ColumnSchema::new("id".to_string(), Type::INT4, -1, true)];

// Simulate the query generation logic with staleness
let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`");
Expand All @@ -1015,7 +1151,7 @@ mod tests {
"create or replace table {full_table_name} {columns_spec} {max_staleness_option}"
);

let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null, primary key (`id`) not enforced) options (max_staleness = interval 15 minute)";
let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64, primary key (`id`) not enforced) options (max_staleness = interval 15 minute)";
assert_eq!(query, expected_query);
}
}
Loading
Loading