-
Notifications
You must be signed in to change notification settings - Fork 4.5k
feat(yaml): Add BigTable write connector #35435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 50 commits
ad54a58
0edf81d
18c9395
a25033e
0e4fb14
c048dcf
4ebc7c8
3bb3dfc
a8b8196
cf8bd8f
a06d7c6
5760278
5cb0dd7
f2640ae
69467c5
121ddf6
82d3612
14ca717
c36d864
d6366dd
fcb5d03
38ff386
a15e4ff
c759a07
8e19a81
b6d0157
7ea3f76
a35ced7
50bb5a3
426519d
3152094
84a3cfd
1ca2527
954355b
ab18e18
80a732e
16f1064
3c9c582
b842ac9
0ed5da1
cea5987
b6498c8
5f6992d
bdc9cff
37abe22
5cb46df
b1fae9c
4866acc
8ac0fda
32bfbe8
b217de2
1ad3a32
364a761
5338470
c1bc8c6
fad8ae8
4315c4f
9e4514c
64a7303
1fc5366
680678b
6fa20ab
2b2af72
8c96d22
373b87f
9bd071c
74b6dc3
01f84da
54b6ad1
c46ef26
b4fab07
c600ea0
2d30e08
221e558
9cb6c32
a9f77eb
c0596f3
1db6821
80544d0
5753f18
6cd69d5
a53045c
7874226
92a0ff9
54b9900
5b58815
ca12b07
81aa2ed
417bfea
fbf74a5
8aad18a
d3f17bd
d0d12ae
16030c6
15a8bd2
85c1392
2cdd808
636df03
0ab4db4
204ff4d
a712320
2e09dd7
f28eea9
d26b45d
0b6e855
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,211 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.gcp.bigtable; | ||
|
|
||
| import static java.util.Optional.ofNullable; | ||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import com.google.bigtable.v2.Mutation; | ||
| import com.google.bigtable.v2.TimestampRange; | ||
| import com.google.protobuf.ByteString; | ||
| import java.util.Objects; | ||
| import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteSchemaTransformProvider.BigtableWriteSchemaTransformConfiguration; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; | ||
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; | ||
| import org.apache.beam.sdk.transforms.GroupByKey; | ||
| import org.apache.beam.sdk.transforms.MapElements; | ||
| import org.apache.beam.sdk.values.KV; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
| import org.apache.beam.sdk.values.PCollectionRowTuple; | ||
| import org.apache.beam.sdk.values.Row; | ||
| import org.apache.beam.sdk.values.TypeDescriptor; | ||
| import org.apache.beam.sdk.values.TypeDescriptors; | ||
|
|
||
| /** | ||
| * An implementation of {@link TypedSchemaTransformProvider} for Bigtable Write jobs configured via | ||
| * {@link BigtableWriteSchemaTransformConfiguration}. | ||
| */ | ||
| @AutoService(SchemaTransformProvider.class) | ||
| public class BigtableSimpleWriteSchemaTransformProvider | ||
arnavarora2004 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| extends TypedSchemaTransformProvider<BigtableWriteSchemaTransformConfiguration> { | ||
|
|
||
| private static final String INPUT_TAG = "input"; | ||
|
|
||
| @Override | ||
| protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) { | ||
| return new BigtableSimpleWriteSchemaTransform(configuration); | ||
| } | ||
|
|
||
| @Override | ||
| public String identifier() { | ||
| return "beam:schematransform:org.apache.beam:bigtable_simple_write:v1"; | ||
| } | ||
|
|
||
| /** | ||
| * A {@link SchemaTransform} for Bigtable writes, configured with {@link | ||
| * BigtableWriteSchemaTransformConfiguration} and instantiated by {@link | ||
| * BigtableWriteSchemaTransformProvider}. | ||
| */ | ||
| private static class BigtableSimpleWriteSchemaTransform extends SchemaTransform { | ||
| private final BigtableWriteSchemaTransformConfiguration configuration; | ||
|
|
||
| BigtableSimpleWriteSchemaTransform(BigtableWriteSchemaTransformConfiguration configuration) { | ||
| configuration.validate(); | ||
| this.configuration = configuration; | ||
| } | ||
|
|
||
| @Override | ||
| public PCollectionRowTuple expand(PCollectionRowTuple input) { | ||
| checkArgument( | ||
arnavarora2004 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| input.has(INPUT_TAG), | ||
| String.format( | ||
| "Could not find expected input [%s] to %s.", INPUT_TAG, getClass().getSimpleName())); | ||
|
|
||
| PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations = | ||
| changeMutationInput(input); | ||
|
|
||
| bigtableMutations.apply( | ||
| BigtableIO.write() | ||
| .withTableId(configuration.getTableId()) | ||
| .withInstanceId(configuration.getInstanceId()) | ||
| .withProjectId(configuration.getProjectId())); | ||
|
|
||
| return PCollectionRowTuple.empty(input.getPipeline()); | ||
| } | ||
|
|
||
| public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput( | ||
| PCollectionRowTuple inputR) { | ||
| PCollection<Row> beamRowMutationsList = inputR.getSinglePCollection(); | ||
| // convert all row inputs into KV<ByteString, Mutation> | ||
| PCollection<KV<ByteString, Mutation>> changedBeamRowMutationsList = | ||
| beamRowMutationsList.apply( | ||
| MapElements.into( | ||
| TypeDescriptors.kvs( | ||
| TypeDescriptor.of(ByteString.class), TypeDescriptor.of(Mutation.class))) | ||
| .via( | ||
| (Row input) -> { | ||
| @SuppressWarnings("nullness") | ||
| ByteString key = | ||
| ByteString.copyFrom(((Objects.requireNonNull(input.getBytes("key"))))); | ||
|
|
||
| Mutation bigtableMutation; | ||
| String mutationType = | ||
| input.getString("type"); // Direct call, can return null | ||
| if (mutationType == null) { | ||
| throw new IllegalArgumentException("Mutation type cannot be null."); | ||
|
||
| } | ||
| switch (mutationType) { | ||
| case "SetCell": | ||
| @SuppressWarnings("nullness") | ||
| Mutation.SetCell.Builder setMutation = | ||
| Mutation.SetCell.newBuilder() | ||
| .setValue( | ||
| ByteString.copyFrom( | ||
| ((Objects.requireNonNull(input.getBytes("value")))))) | ||
arnavarora2004 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .setColumnQualifier( | ||
| ByteString.copyFrom( | ||
| ((Objects.requireNonNull( | ||
| input.getBytes("column_qualifier")))))) | ||
| .setFamilyNameBytes( | ||
| ByteString.copyFrom( | ||
| (Objects.requireNonNull( | ||
| input.getBytes("family_name"))))); | ||
| // Use timestamp if provided, else default to -1 (current | ||
| // Bigtable | ||
| // server time) | ||
| // Timestamp (optional, assuming Long type in Row schema) | ||
| Long timestampMicros = input.getInt64("timestamp_micros"); | ||
| setMutation.setTimestampMicros( | ||
| timestampMicros != null ? timestampMicros : -1); | ||
|
|
||
| bigtableMutation = | ||
| Mutation.newBuilder().setSetCell(setMutation.build()).build(); | ||
| break; | ||
| case "DeleteFromColumn": | ||
| // set timestamp range if applicable | ||
| @SuppressWarnings("nullness") | ||
| Mutation.DeleteFromColumn.Builder deleteMutation = | ||
| Mutation.DeleteFromColumn.newBuilder() | ||
| .setColumnQualifier( | ||
| ByteString.copyFrom( | ||
| Objects.requireNonNull( | ||
| input.getBytes("column_qualifier")))) | ||
| .setFamilyNameBytes( | ||
| ByteString.copyFrom( | ||
| ofNullable(input.getBytes("family_name")).get())); | ||
|
|
||
| // if start or end timestamp provided | ||
| // Timestamp Range (optional, assuming Long type in Row schema) | ||
| Long startTimestampMicros = null; | ||
| Long endTimestampMicros = null; | ||
|
|
||
| if (input.getSchema().hasField("start_timestamp_micros")) { | ||
| startTimestampMicros = input.getInt64("start_timestamp_micros"); | ||
| } | ||
| if (input.getSchema().hasField("end_timestamp_micros")) { | ||
| endTimestampMicros = input.getInt64("end_timestamp_micros"); | ||
| } | ||
|
|
||
| if (startTimestampMicros != null || endTimestampMicros != null) { | ||
| TimestampRange.Builder timeRange = TimestampRange.newBuilder(); | ||
| if (startTimestampMicros != null) { | ||
| timeRange.setStartTimestampMicros(startTimestampMicros); | ||
| } | ||
| if (endTimestampMicros != null) { | ||
| timeRange.setEndTimestampMicros(endTimestampMicros); | ||
| } | ||
| deleteMutation.setTimeRange(timeRange.build()); | ||
| } | ||
| bigtableMutation = | ||
| Mutation.newBuilder() | ||
| .setDeleteFromColumn(deleteMutation.build()) | ||
| .build(); | ||
| break; | ||
| case "DeleteFromFamily": | ||
| bigtableMutation = | ||
| Mutation.newBuilder() | ||
| .setDeleteFromFamily( | ||
| Mutation.DeleteFromFamily.newBuilder() | ||
| .setFamilyNameBytes( | ||
| ByteString.copyFrom( | ||
| ofNullable(input.getBytes("family_name")) | ||
| .get())) | ||
| .build()) | ||
| .build(); | ||
| break; | ||
| case "DeleteFromRow": | ||
| bigtableMutation = | ||
| Mutation.newBuilder() | ||
| .setDeleteFromRow(Mutation.DeleteFromRow.newBuilder().build()) | ||
| .build(); | ||
| break; | ||
| default: | ||
| throw new RuntimeException( | ||
| String.format( | ||
| "Unexpected mutation type [%s]: %s", | ||
| ((input.getString("type"))), input)); | ||
| } | ||
| return KV.of(key, bigtableMutation); | ||
| })); | ||
| // now we need to make the KV into a PCollection of KV<ByteString, Iterable<Mutation>> | ||
| return changedBeamRowMutationsList.apply(GroupByKey.create()); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us update CHANGES.md