Skip to content

Commit 332fe03

Browse files
authored
[Managed Iceberg] unbounded source (#33504)
1 parent 9e1cf5a commit 332fe03

34 files changed

+2783
-377
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 3
3+
"modification": 4
44
}

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666

6767
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
6868
* [Java] Use API compatible with both com.google.cloud.bigdataoss:util 2.x and 3.x in BatchLoads ([#34105](https://github.com/apache/beam/pull/34105))
69+
* [IcebergIO] Added new CDC source for batch and streaming, available as `Managed.ICEBERG_CDC` ([#33504](https://github.com/apache/beam/pull/33504))
6970
* [IcebergIO] Address edge case where bundle retry following a successful data commit results in data duplication ([#34264](https://github.com/apache/beam/pull/34264))
7071

7172
## New Features / Improvements

contributor-docs/discussion-docs/2025.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,9 @@ limitations under the License.
1515
# List Of Documents Submitted To dev@beam.apache.org In 2025
1616
| No. | Author | Subject | Date (UTC) |
1717
|---|---|---|---|
18-
| 1 | Danny McCormick | [Beam Python Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 17:50:00 |
19-
| 2 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 17:50:00 |
18+
| 1 | Kenneth Knowles | [Apache Beam Release Acceptance Criteria - Google Sheets](https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw) | 2025-01-13 10:54:22 |
19+
| 2 | Danny McCormick | [Apache Beam Vendored Dependencies Release Guide](https://s.apache.org/beam-release-vendored-artifacts) | 2025-01-13 15:00:51 |
20+
| 3 | Danny McCormick | [Beam Python & ML Dependency Extras](https://docs.google.com/document/d/1c84Gc-cZRCfrU8f7kWGsNR2o8oSRjCM-dGHO9KvPWPw) | 2025-01-27 15:33:36 |
21+
| 4 | Danny McCormick | [How vLLM Model Handler Works (Plus a Summary of Model Memory Management in Beam ML)](https://docs.google.com/document/d/1UB4umrtnp1Eg45fiUB3iLS7kPK3BE6pcf0YRDkA289Q) | 2025-01-31 11:56:59 |
22+
| 5 | Shunping Huang | [Improve Logging Dependencies in Beam Java SDK](https://docs.google.com/document/d/1IkbiM4m8D-aB3NYI1aErFZHt6M7BQ-8eCULh284Davs) | 2025-02-04 15:13:14 |
23+
| 6 | Ahmed Abualsaud | [Iceberg Incremental Source design](https://s.apache.org/beam-iceberg-incremental-source) | 2025-03-03 14:52:42 |

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ message ManagedTransforms {
7474
"beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
7575
BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
7676
"beam:schematransform:org.apache.beam:bigquery_write:v1"];
77+
ICEBERG_CDC_READ = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
78+
"beam:schematransform:org.apache.beam:iceberg_cdc_read:v1"];
7779
}
7880
}
7981

sdks/java/io/iceberg/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ dependencies {
5050
implementation library.java.slf4j_api
5151
implementation library.java.joda_time
5252
implementation "org.apache.parquet:parquet-column:$parquet_version"
53+
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
5354
implementation "org.apache.orc:orc-core:$orc_version"
5455
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
5556
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
@@ -88,7 +89,7 @@ dependencies {
8889
testImplementation library.java.google_api_services_bigquery
8990

9091
testRuntimeOnly library.java.slf4j_jdk14
91-
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
92+
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
9293
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
9394
hadoopVersions.each {kv ->
9495
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value"

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public String apply(FileWriteResult input) {
8282
.apply(
8383
"Append metadata updates to tables",
8484
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix)))
85-
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER));
85+
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.getCoder()));
8686
}
8787

8888
private static class AppendFilesToTablesDoFn
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
import java.util.concurrent.ExecutionException;
25+
import org.apache.beam.sdk.metrics.Counter;
26+
import org.apache.beam.sdk.metrics.Metrics;
27+
import org.apache.beam.sdk.transforms.DoFn;
28+
import org.apache.beam.sdk.values.KV;
29+
import org.apache.iceberg.CombinedScanTask;
30+
import org.apache.iceberg.DataOperations;
31+
import org.apache.iceberg.IncrementalAppendScan;
32+
import org.apache.iceberg.Table;
33+
import org.apache.iceberg.io.CloseableIterable;
34+
import org.checkerframework.checker.nullness.qual.Nullable;
35+
import org.joda.time.Instant;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
/**
40+
* Scans the given snapshot and creates multiple {@link ReadTask}s. Each task represents a portion
41+
* of a data file that was appended within the snapshot range.
42+
*/
43+
class CreateReadTasksDoFn
44+
extends DoFn<KV<String, List<SnapshotInfo>>, KV<ReadTaskDescriptor, ReadTask>> {
45+
private static final Logger LOG = LoggerFactory.getLogger(CreateReadTasksDoFn.class);
46+
private static final Counter totalScanTasks =
47+
Metrics.counter(CreateReadTasksDoFn.class, "totalScanTasks");
48+
private final IcebergScanConfig scanConfig;
49+
50+
CreateReadTasksDoFn(IcebergScanConfig scanConfig) {
51+
this.scanConfig = scanConfig;
52+
}
53+
54+
@Setup
55+
public void setup() {
56+
TableCache.setup(scanConfig);
57+
}
58+
59+
@ProcessElement
60+
public void process(
61+
@Element KV<String, List<SnapshotInfo>> element,
62+
OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
63+
throws IOException, ExecutionException {
64+
// force refresh because the table must be updated before scanning snapshots
65+
Table table = TableCache.getRefreshed(element.getKey());
66+
67+
// scan snapshots individually and assign commit timestamp to files
68+
for (SnapshotInfo snapshot : element.getValue()) {
69+
@Nullable Long fromSnapshot = snapshot.getParentId();
70+
long toSnapshot = snapshot.getSnapshotId();
71+
72+
if (!DataOperations.APPEND.equals(snapshot.getOperation())) {
73+
LOG.info(
74+
"Skipping non-append snapshot of operation '{}'. Sequence number: {}, id: {}",
75+
snapshot.getOperation(),
76+
snapshot.getSequenceNumber(),
77+
snapshot.getSnapshotId());
78+
}
79+
80+
LOG.info("Planning to scan snapshot {}", toSnapshot);
81+
IncrementalAppendScan scan = table.newIncrementalAppendScan().toSnapshot(toSnapshot);
82+
if (fromSnapshot != null) {
83+
scan = scan.fromSnapshotExclusive(fromSnapshot);
84+
}
85+
86+
createAndOutputReadTasks(scan, snapshot, out);
87+
}
88+
}
89+
90+
private void createAndOutputReadTasks(
91+
IncrementalAppendScan scan,
92+
SnapshotInfo snapshot,
93+
OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
94+
throws IOException {
95+
int numTasks = 0;
96+
try (CloseableIterable<CombinedScanTask> combinedScanTasks = scan.planTasks()) {
97+
for (CombinedScanTask combinedScanTask : combinedScanTasks) {
98+
ReadTask task = ReadTask.builder().setCombinedScanTask(combinedScanTask).build();
99+
ReadTaskDescriptor descriptor =
100+
ReadTaskDescriptor.builder()
101+
.setTableIdentifierString(checkStateNotNull(snapshot.getTableIdentifierString()))
102+
.build();
103+
104+
out.outputWithTimestamp(
105+
KV.of(descriptor, task), Instant.ofEpochMilli(snapshot.getTimestampMillis()));
106+
numTasks += combinedScanTask.tasks().size();
107+
}
108+
}
109+
totalScanTasks.inc(numTasks);
110+
LOG.info("Snapshot {} produced {} read tasks.", snapshot.getSnapshotId(), numTasks);
111+
}
112+
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,25 @@ public static Builder builder() {
5151
}
5252

5353
public org.apache.iceberg.catalog.Catalog catalog() {
54-
if (cachedCatalog != null) {
55-
return cachedCatalog;
54+
if (cachedCatalog == null) {
55+
String catalogName = getCatalogName();
56+
if (catalogName == null) {
57+
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
58+
}
59+
Map<String, String> catalogProps = getCatalogProperties();
60+
if (catalogProps == null) {
61+
catalogProps = Maps.newHashMap();
62+
}
63+
Map<String, String> confProps = getConfigProperties();
64+
if (confProps == null) {
65+
confProps = Maps.newHashMap();
66+
}
67+
Configuration config = new Configuration();
68+
for (Map.Entry<String, String> prop : confProps.entrySet()) {
69+
config.set(prop.getKey(), prop.getValue());
70+
}
71+
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
5672
}
57-
String catalogName = getCatalogName();
58-
if (catalogName == null) {
59-
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
60-
}
61-
Map<String, String> catalogProps = getCatalogProperties();
62-
if (catalogProps == null) {
63-
catalogProps = Maps.newHashMap();
64-
}
65-
Map<String, String> confProps = getConfigProperties();
66-
if (confProps == null) {
67-
confProps = Maps.newHashMap();
68-
}
69-
Configuration config = new Configuration();
70-
for (Map.Entry<String, String> prop : confProps.entrySet()) {
71-
config.set(prop.getKey(), prop.getValue());
72-
}
73-
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
7473
return cachedCatalog;
7574
}
7675

0 commit comments

Comments
 (0)