Skip to content

Commit c0954b7

Browse files
authored
add batch import yaml IT (#37976)
1 parent 01b2f21 commit c0954b7

File tree

2 files changed

+91
-8
lines changed

2 files changed

+91
-8
lines changed

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -557,20 +557,15 @@ public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) {
557557
this.identifier = identifier;
558558
}
559559

560-
@StartBundle
561-
public void start() {
562-
if (table == null) {
563-
table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
564-
}
565-
}
566-
567560
@ProcessElement
568561
public void process(
569562
@Element KV<Void, Iterable<SerializableDataFile>> files,
570563
@AlwaysFetched @StateId("lastCommitTimestamp") ValueState<Long> lastCommitTimestamp,
571564
OutputReceiver<Row> output) {
572565
String commitId = commitHash(files.getValue());
573-
Table table = checkStateNotNull(this.table);
566+
if (table == null) {
567+
table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
568+
}
574569
table.refresh();
575570

576571
if (shouldSkip(commitId, lastCommitTimestamp.read())) {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
fixtures:
19+
- name: TEMP_DIR
20+
type: "tempfile.TemporaryDirectory"
21+
22+
23+
pipelines:
24+
# Pipeline 1: Write dummy Parquet files
25+
- pipeline:
26+
type: chain
27+
transforms:
28+
- type: Create
29+
config:
30+
elements:
31+
- {label: "11a", rank: 0, bool: true}
32+
- {label: "37a", rank: 1, bool: false}
33+
- {label: "389a", rank: 2, bool: false}
34+
- {label: "3821b", rank: 3, bool: true}
35+
- {label: "990c", rank: 4, bool: true}
36+
- {label: "1024d", rank: 5, bool: false}
37+
- type: WriteToParquet
38+
config:
39+
path: "{TEMP_DIR}/data/data"
40+
file_name_suffix: ".parquet"
41+
num_shards: 6
42+
43+
# Pipeline 2: Register our generated files in our Iceberg table
44+
- pipeline:
45+
type: chain
46+
transforms:
47+
- type: ReadMatchFiles
48+
config:
49+
file_pattern: "{TEMP_DIR}/data/data*.parquet"
50+
- type: MapToFields
51+
config:
52+
language: python
53+
fields:
54+
path:
55+
callable: "lambda row: row.path"
56+
output_type: string
57+
- type: IcebergAddFiles
58+
config:
59+
table: "default.table"
60+
catalog_properties:
61+
type: "hadoop"
62+
warehouse: "{TEMP_DIR}/dir"
63+
64+
providers:
65+
- type: python
66+
config: { }
67+
transforms:
68+
ReadMatchFiles: 'apache_beam.io.fileio.MatchFiles'
69+
70+
# Pipeline 3: Read from Iceberg and verify the contents
71+
- pipeline:
72+
type: chain
73+
transforms:
74+
- type: ReadFromIceberg
75+
config:
76+
table: "default.table"
77+
catalog_properties:
78+
type: "hadoop"
79+
warehouse: "{TEMP_DIR}/dir"
80+
- type: AssertEqual
81+
config:
82+
elements:
83+
- {label: "11a", rank: 0, bool: true}
84+
- {label: "37a", rank: 1, bool: false}
85+
- {label: "389a", rank: 2, bool: false}
86+
- {label: "3821b", rank: 3, bool: true}
87+
- {label: "990c", rank: 4, bool: true}
88+
- {label: "1024d", rank: 5, bool: false}

0 commit comments

Comments
 (0)