Skip to content

Commit e6c9faa

Browse files
authored
Add Iceberg Quickstart page (#37739)
* add iceberg quickstart * add license * spotless * adjustments * lint * add missing io * add yaml hadoop config * small fixes, and add support for python Timestamp * spotless * spotless * add public data quickstart * yaml instruction * spotless * spotless * fix spotbugs * tweaks * address comments * spotless * spotless * remove unused dep
1 parent 4769488 commit e6c9faa

File tree

13 files changed

+878
-12
lines changed

13 files changed

+878
-12
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.iceberg.snippets;
19+
20+
// [START iceberg_schema_and_row]
21+
import java.math.BigDecimal;
22+
import java.time.Instant;
23+
import java.time.LocalDate;
24+
import java.time.LocalTime;
25+
import java.util.Arrays;
26+
import org.apache.beam.sdk.schemas.Schema;
27+
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
28+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
29+
import org.apache.beam.sdk.values.Row;
30+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
31+
import org.joda.time.DateTime;
32+
33+
public class IcebergBeamSchemaAndRow {
34+
public Row createRow() {
35+
Schema nestedSchema =
36+
Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build();
37+
Schema beamSchema =
38+
Schema.builder()
39+
.addBooleanField("boolean_field")
40+
.addInt32Field("int_field")
41+
.addInt64Field("long_field")
42+
.addFloatField("float_field")
43+
.addDoubleField("double_field")
44+
.addDecimalField("numeric_field")
45+
.addByteArrayField("bytes_field")
46+
.addStringField("string_field")
47+
.addLogicalTypeField("time_field", SqlTypes.TIME)
48+
.addLogicalTypeField("date_field", SqlTypes.DATE)
49+
.addLogicalTypeField("timestamp_field", Timestamp.MICROS)
50+
.addDateTimeField("timestamptz_field")
51+
.addArrayField("array_field", Schema.FieldType.INT32)
52+
.addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32)
53+
.addRowField("struct_field", nestedSchema)
54+
.build();
55+
56+
Row beamRow =
57+
Row.withSchema(beamSchema)
58+
.withFieldValues(
59+
ImmutableMap.<String, Object>builder()
60+
.put("boolean_field", true)
61+
.put("int_field", 1)
62+
.put("long_field", 2L)
63+
.put("float_field", 3.4f)
64+
.put("double_field", 4.5d)
65+
.put("numeric_field", new BigDecimal(67))
66+
.put("bytes_field", new byte[] {1, 2, 3})
67+
.put("string_field", "value")
68+
.put("time_field", LocalTime.now())
69+
.put("date_field", LocalDate.now())
70+
.put("timestamp_field", Instant.now())
71+
.put("timestamptz_field", DateTime.now())
72+
.put("array_field", Arrays.asList(1, 2, 3))
73+
.put("map_field", ImmutableMap.of("a", 1, "b", 2))
74+
.put(
75+
"struct_field",
76+
Row.withSchema(nestedSchema).addValues("nested_value", 123).build())
77+
.build())
78+
.build();
79+
80+
return beamRow;
81+
}
82+
}
83+
// [END iceberg_schema_and_row]
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.iceberg.snippets;
19+
20+
import java.util.Arrays;
21+
import java.util.Map;
22+
import org.apache.beam.sdk.Pipeline;
23+
import org.apache.beam.sdk.managed.Managed;
24+
import org.apache.beam.sdk.schemas.Schema;
25+
import org.apache.beam.sdk.schemas.transforms.AddFields;
26+
import org.apache.beam.sdk.schemas.transforms.Group;
27+
import org.apache.beam.sdk.transforms.Create;
28+
import org.apache.beam.sdk.transforms.MapElements;
29+
import org.apache.beam.sdk.transforms.Mean;
30+
import org.apache.beam.sdk.transforms.Sum;
31+
import org.apache.beam.sdk.values.PCollection;
32+
import org.apache.beam.sdk.values.Row;
33+
import org.apache.beam.sdk.values.TypeDescriptors;
34+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
35+
36+
@SuppressWarnings("all")
37+
public class Quickstart {
38+
static final String PROJECT_ID = "apache-beam-testing";
39+
static final String BUCKET_NAME = "my-bucket";
40+
41+
public static void main(String[] args) {
42+
// [START hadoop_catalog_props]
43+
Map<String, String> catalogProps =
44+
ImmutableMap.of(
45+
"type", "hadoop",
46+
"warehouse", "file:///tmp/beam-iceberg-local-quickstart");
47+
// [END hadoop_catalog_props]
48+
}
49+
50+
public static void publicDatasets() {
51+
// [START biglake_public_catalog_props]
52+
Map<String, String> catalogProps =
53+
ImmutableMap.of(
54+
"type", "rest",
55+
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
56+
"warehouse", "gs://biglake-public-nyc-taxi-iceberg",
57+
"header.x-goog-user-project", PROJECT_ID,
58+
"rest.auth.type", "google",
59+
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
60+
"header.X-Iceberg-Access-Delegation", "vended-credentials");
61+
// [END biglake_public_catalog_props]
62+
63+
// [START biglake_public_query]
64+
Pipeline p = Pipeline.create();
65+
66+
// Set up query properties:
67+
Map<String, Object> config =
68+
ImmutableMap.of(
69+
"table",
70+
"public_data.nyc_taxicab",
71+
"catalog_properties",
72+
catalogProps,
73+
"filter",
74+
"data_file_year = 2021 AND tip_amount > 100",
75+
"keep",
76+
Arrays.asList("passenger_count", "total_amount", "trip_distance"));
77+
78+
// Read Iceberg records
79+
PCollection<Row> icebergRows =
80+
p.apply(Managed.read("iceberg").withConfig(config)).getSinglePCollection();
81+
82+
// Perform further analysis on records
83+
PCollection<Row> result =
84+
icebergRows
85+
.apply(AddFields.<Row>create().field("num_trips", Schema.FieldType.INT32, 1))
86+
.apply(
87+
Group.<Row>byFieldNames("passenger_count")
88+
.aggregateField("num_trips", Sum.ofIntegers(), "num_trips")
89+
.aggregateField("total_amount", Mean.of(), "avg_fare")
90+
.aggregateField("trip_distance", Mean.of(), "avg_distance"));
91+
92+
// Print to console
93+
result.apply(
94+
MapElements.into(TypeDescriptors.voids())
95+
.via(
96+
row -> {
97+
System.out.println(row);
98+
return null;
99+
}));
100+
101+
// Execute
102+
p.run().waitUntilFinish();
103+
// [END biglake_public_query]
104+
}
105+
106+
public static void other() {
107+
// [START biglake_catalog_props]
108+
Map<String, String> catalogProps =
109+
ImmutableMap.of(
110+
"type", "rest",
111+
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
112+
"warehouse", "gs://" + BUCKET_NAME,
113+
"header.x-goog-user-project", PROJECT_ID,
114+
"rest.auth.type", "google",
115+
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
116+
"header.X-Iceberg-Access-Delegation", "vended-credentials");
117+
// [END biglake_catalog_props]
118+
119+
// [START managed_iceberg_config]
120+
Map<String, Object> managedConfig =
121+
ImmutableMap.of("table", "my_db.my_table", "catalog_properties", catalogProps);
122+
123+
// Note: The table will get created when inserting data (see below)
124+
// [END managed_iceberg_config]
125+
126+
// [START managed_iceberg_insert]
127+
Schema inputSchema =
128+
Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();
129+
130+
Pipeline p = Pipeline.create();
131+
p.apply(
132+
Create.of(
133+
Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(),
134+
Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(),
135+
Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build()))
136+
.apply(Managed.write("iceberg").withConfig(managedConfig));
137+
138+
p.run();
139+
// [END managed_iceberg_insert]
140+
141+
// [START managed_iceberg_read]
142+
Pipeline q = Pipeline.create();
143+
PCollection<Row> rows =
144+
q.apply(Managed.read("iceberg").withConfig(managedConfig)).getSinglePCollection();
145+
146+
rows.apply(
147+
MapElements.into(TypeDescriptors.voids())
148+
.via(
149+
row -> {
150+
System.out.println(row);
151+
return null;
152+
}));
153+
154+
q.run();
155+
// [END managed_iceberg_read]
156+
}
157+
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import java.util.Map;
3333
import java.util.Optional;
3434
import java.util.UUID;
35+
import java.util.concurrent.TimeUnit;
3536
import java.util.stream.Collectors;
3637
import org.apache.beam.sdk.schemas.Schema;
3738
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
39+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
3840
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
3941
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
4042
import org.apache.beam.sdk.util.Preconditions;
@@ -75,6 +77,7 @@ private IcebergUtils() {}
7577
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
7678
.put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
7779
.put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get())
80+
.put(MicrosInstant.IDENTIFIER, Types.TimestampType.withZone())
7881
.build();
7982

8083
private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
@@ -294,6 +297,13 @@ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema s
294297

295298
/** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
296299
public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) {
300+
if (row.getSchema().getFieldCount() != schema.columns().size()) {
301+
throw new IllegalStateException(
302+
String.format(
303+
"Beam Row schema and Iceberg schema have different sizes.%n\tBeam Row columns: %s%n\tIceberg schema columns: %s",
304+
row.getSchema().getFieldNames(),
305+
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toList())));
306+
}
297307
return copyRowIntoRecord(GenericRecord.create(schema), row);
298308
}
299309

@@ -419,7 +429,11 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
419429
private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) {
420430
// timestamptz
421431
if (shouldAdjustToUtc) {
422-
if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME
432+
if (beamValue instanceof java.time.Instant) { // MicrosInstant
433+
java.time.Instant instant = (java.time.Instant) beamValue;
434+
return DateTimeUtil.timestamptzFromNanos(
435+
TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano());
436+
} else if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME
423437
return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC);
424438
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
425439
return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L);
@@ -434,7 +448,11 @@ private static Object getIcebergTimestampValue(Object beamValue, boolean shouldA
434448
}
435449

436450
// timestamp
437-
if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME
451+
if (beamValue instanceof java.time.Instant) { // MicrosInstant
452+
java.time.Instant instant = (java.time.Instant) beamValue;
453+
return DateTimeUtil.timestampFromNanos(
454+
TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano());
455+
} else if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME
438456
return beamValue;
439457
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
440458
return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L);

0 commit comments

Comments
 (0)