diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml index d72688774dae..d7449233aab5 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml @@ -60,4 +60,56 @@ pipelines: - {label: "389a", rank: 2} options: project: "apache-beam-testing" - temp_location: "{TEMP_DIR}" \ No newline at end of file + temp_location: "{TEMP_DIR}" + + - name: read_cdc_batch + pipeline: + type: chain + transforms: + - type: ReadFromIcebergCDC + config: + table: db.labels + catalog_name: hadoop_catalog + catalog_properties: + type: hadoop + warehouse: "{TEMP_DIR}" + from_timestamp: 1762819200000 + to_timestamp: 2078352000000 + filter: '"label" = ''11a'' or "rank" = 1' + keep: + - label + - rank + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR}" + + - name: read_cdc_streaming + pipeline: + type: chain + transforms: + - type: ReadFromIcebergCDC + config: + table: db.labels + catalog_name: hadoop_catalog + catalog_properties: + type: hadoop + warehouse: "{TEMP_DIR}" + streaming: True + to_timestamp: 2078352000000 + filter: '"label" = ''11a'' or "rank" = 1' + keep: + - label + - rank + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR}" diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 66f0c124b4cf..458d3d63e436 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -403,3 +403,31 @@ 'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1' config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + +#IcebergCDC +- type: renaming + transforms: + 'ReadFromIcebergCDC': 'ReadFromIcebergCDC' + config: + mappings: + 'ReadFromIcebergCDC': + table: 'table' + catalog_name: 'catalog_name' + catalog_properties: 'catalog_properties' + config_properties: 'config_properties' + drop: 'drop' + filter: 'filter' + from_snapshot: 'from_snapshot' + from_timestamp: 'from_timestamp' + keep: 'keep' + poll_interval_seconds: 'poll_interval_seconds' + starting_strategy: 'starting_strategy' + streaming: 'streaming' + to_snapshot: 'to_snapshot' + to_timestamp: 'to_timestamp' + underlying_provider: + type: beamJar + transforms: + 'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar'