Skip to content
37 changes: 36 additions & 1 deletion sdks/python/apache_beam/yaml/examples/testing/examples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,9 @@ def _kafka_test_preprocessor(
'test_anomaly_scoring_yaml',
'test_wordCountInclude_yaml',
'test_wordCountImport_yaml',
'test_iceberg_to_alloydb_yaml'
'test_iceberg_to_alloydb_yaml',
'test_iceberg_to_iceberg_streaming_yaml',
'test_iceberg_to_iceberg_batch_yaml'
])
def _io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
Expand Down Expand Up @@ -792,6 +794,37 @@ def _iceberg_io_read_test_preprocessor(
return test_spec


@YamlExamplesTestSuite.register_test_preprocessor([
'test_iceberg_to_iceberg_streaming_yaml',
'test_iceberg_to_iceberg_batch_yaml'
])
def _iceberg_cdc_read_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
Preprocessor for tests that involve reading CDC events from Iceberg.

This preprocessor replaces any ReadFromIcebergCDC transform with a Create
transform that reads from a predefined in-memory dictionary. This allows
the test to verify the pipeline's correctness without relying on Iceberg
tables stored externally.
"""
if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'ReadFromIcebergCDC':
config = transform['config']
db_name, table_name = config['table'].split('.')

transform['type'] = 'Create'
transform['config'] = {
k: v
for k, v in config.items() if k.startswith('__')
}
transform['config']['elements'] = INPUT_TABLES[(
config['catalog_name'], db_name, table_name)]

return test_spec


@YamlExamplesTestSuite.register_test_preprocessor([
'test_spanner_read_yaml',
'test_enrich_spanner_with_bigquery_yaml',
Expand Down Expand Up @@ -1318,6 +1351,8 @@ def _jinja_preprocessor(raw_spec_string: str, test_name: str):
('orders-test', 'order-database', 'orders'): input_data.
spanner_orders_data(),
('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(),
('shipment_data', 'shipment_dataset_bronze', 'shipments'): input_data.
shipment_data_filtered(),
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
bigtable_data(),
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(),
Expand Down
17 changes: 17 additions & 0 deletions sdks/python/apache_beam/yaml/examples/testing/input_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,23 @@ def shipments_data():
}]


def shipment_data_filtered():
return [{
'customer_id': 'C1',
'shipment_date': '2023-05-01',
'shipment_cost': 150.0,
'customer_name': 'Alice',
'customer_email': '[email protected]'
},
{
'customer_id': 'C1',
'shipment_date': '2023-05-10',
'shipment_cost': 20.0,
'customer_name': 'Alice',
'customer_email': '[email protected]'
}]


def bigtable_data():
return [{
'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A pipeline that reads records from a Medallion Bronze Iceberg dataset
# in a batch manner and writes to a Silver Iceberg dataset.

pipeline:
type: chain
transforms:
# Step 1: Read Records from Iceberg Table within a time range
- type: ReadFromIcebergCDC
name: ReadFromBronzeDataset
config:
table: "shipment_dataset_bronze.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"
filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0'
keep:
- "customer_id"
- "shipment_date"
- "shipment_cost"
- "customer_name"
- "customer_email"

# Step 2: Write records out to Iceberg
- type: WriteToIceberg
name: WriteToSilverDataset
config:
table: "shipment_dataset_silver.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"

# Expected:
# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]')
# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]')
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A pipeline that reads Append CDC events from Medallion Bronze Iceberg dataset
# and writes to Silver Iceberg dataset.

pipeline:
type: chain
transforms:
# Step 1: Read CDC Append Records from Iceberg Table
- type: ReadFromIcebergCDC
name: ReadFromBronzeDataset
config:
table: "shipment_dataset_bronze.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"
poll_interval_seconds: 30
streaming: true
filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0'
keep:
- "customer_id"
- "shipment_date"
- "shipment_cost"
- "customer_name"
- "customer_email"

# Step 2: Write records out to Iceberg
- type: WriteToIceberg
name: WriteToSilverDataset
config:
table: "shipment_dataset_silver.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"
triggering_frequency_seconds: 30

# Expected:
# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]')
# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]')
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,27 @@ pipelines:
- {label: "389a", rank: 2}
options:
project: "apache-beam-testing"
temp_location: "{TEMP_DIR}"
temp_location: "{TEMP_DIR}"

- name: read_cdc
pipeline:
type: chain
transforms:
- type: ReadFromIcebergCDC
config:
table: db.labels
catalog_name: hadoop_catalog
catalog_properties:
type: hadoop
warehouse: "{TEMP_DIR}"
filter: '"label" = ''11a'''
keep:
- label
- rank
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0}
options:
project: "apache-beam-testing"
temp_location: "{TEMP_DIR}"
28 changes: 28 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,31 @@
'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1'
config:
gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'

#IcebergCDC
- type: renaming
transforms:
'ReadFromIcebergCDC': 'ReadFromIcebergCDC'
config:
mappings:
'ReadFromIcebergCDC':
table: 'table'
catalog_name: 'catalog_name'
catalog_properties: 'catalog_properties'
config_properties: 'config_properties'
drop: 'drop'
filter: 'filter'
from_snapshot: 'from_snapshot'
from_timestamp: 'from_timestamp'
keep: 'keep'
poll_interval_seconds: 'poll_interval_seconds'
starting_strategy: 'starting_strategy'
streaming: 'streaming'
to_snapshot: 'to_snapshot'
to_timestamp: 'to_timestamp'
underlying_provider:
type: beamJar
transforms:
'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
config:
gradle_target: 'sdks:java:io:expansion-service:shadowJar'
Loading