Skip to content
Merged
Show file tree
Hide file tree
Changes from 99 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
ad54a58
Refactored BigTableReadSchemaTransformConfiguration
arnavarora2004 May 21, 2025
0edf81d
changed scope, working on buffer class for making BigTable yaml fully…
arnavarora2004 May 22, 2025
18c9395
Finished up a bit of standard_io.yaml
arnavarora2004 May 22, 2025
a25033e
Finished up a bit of standard_io.yaml
arnavarora2004 May 27, 2025
0e4fb14
Merge branch 'apache:master' into master
arnavarora2004 Jun 4, 2025
c048dcf
Added bigTable test
arnavarora2004 Jun 4, 2025
4ebc7c8
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 4, 2025
3bb3dfc
changed some tests for BigTable
arnavarora2004 Jun 4, 2025
a8b8196
Added new IT file for simpleWrite and also made changes integration t…
arnavarora2004 Jun 5, 2025
cf8bd8f
Added new IT file for simpleWrite and also made changes integration t…
arnavarora2004 Jun 5, 2025
a06d7c6
SetCell mutation test works, I want to see if this draft PR works CI …
arnavarora2004 Jun 12, 2025
5760278
Fixed a slight error
arnavarora2004 Jun 12, 2025
5cb0dd7
Merge branch 'apache:master' into master
arnavarora2004 Jun 25, 2025
f2640ae
Added way more changes to integrations test.py, BigTableSimpleWriteSc…
arnavarora2004 Jun 25, 2025
69467c5
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 25, 2025
121ddf6
BigTableSimpleWriteSchemaTransformProviderIT finished changes to muta…
arnavarora2004 Jun 25, 2025
82d3612
Merge branch 'apache:master' into master
arnavarora2004 Jun 25, 2025
14ca717
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
c36d864
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
d6366dd
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
fcb5d03
Merge branch 'apache:master' into master
arnavarora2004 Jun 26, 2025
38ff386
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 26, 2025
a15e4ff
Update sdks/java/io/google-cloud-platform/src/test/java/org/apache/be…
arnavarora2004 Jun 26, 2025
c759a07
changed comments
arnavarora2004 Jun 26, 2025
8e19a81
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 26, 2025
b6d0157
Added changes from derrick comments
arnavarora2004 Jun 26, 2025
7ea3f76
Merge branch 'apache:master' into master
arnavarora2004 Jun 27, 2025
a35ced7
Merge branch 'apache:master' into master
arnavarora2004 Jun 30, 2025
50bb5a3
Added default schema maybe fixes the issues
arnavarora2004 Jun 30, 2025
426519d
Added schema to every test specificly, will run tests to see if it works
arnavarora2004 Jun 30, 2025
3152094
Added default schema maybe fixes the issues
arnavarora2004 Jul 2, 2025
84a3cfd
Merge branch 'apache:master' into master
arnavarora2004 Jul 2, 2025
1ca2527
Following formatting tests
arnavarora2004 Jul 2, 2025
954355b
Merge branch 'apache:master' into master
arnavarora2004 Jul 2, 2025
ab18e18
Following formatting tests
arnavarora2004 Jul 2, 2025
80a732e
Following checkstyle tests
arnavarora2004 Jul 2, 2025
16f1064
Merge branch 'apache:master' into master
arnavarora2004 Jul 7, 2025
3c9c582
Made schema and test changes
arnavarora2004 Jul 7, 2025
b842ac9
Made schema and test changes
arnavarora2004 Jul 7, 2025
0ed5da1
Merge branch 'apache:master' into master
arnavarora2004 Jul 7, 2025
cea5987
Made schema and test changes
arnavarora2004 Jul 7, 2025
b6498c8
Made schema and test changes
arnavarora2004 Jul 8, 2025
5f6992d
Made schema and test changes
arnavarora2004 Jul 9, 2025
bdc9cff
Merge branch 'apache:master' into master
arnavarora2004 Jul 9, 2025
37abe22
Added final test
arnavarora2004 Jul 9, 2025
5cb46df
changed timestamp values
arnavarora2004 Jul 10, 2025
b1fae9c
added all mutations test
arnavarora2004 Jul 10, 2025
4866acc
added all mutations test
arnavarora2004 Jul 10, 2025
8ac0fda
pushed changes to format errors
arnavarora2004 Jul 10, 2025
32bfbe8
Merge branch 'apache:master' into master
arnavarora2004 Jul 10, 2025
b217de2
pushed changes to format errors
arnavarora2004 Jul 10, 2025
1ad3a32
Delete 4
arnavarora2004 Jul 10, 2025
364a761
pushed changes to format errors
arnavarora2004 Jul 10, 2025
5338470
pushed changes to format errors
arnavarora2004 Jul 10, 2025
c1bc8c6
pushed changes to format errors
arnavarora2004 Jul 10, 2025
fad8ae8
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
4315c4f
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
9e4514c
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
64a7303
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
1fc5366
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
680678b
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
6fa20ab
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
2b2af72
import fixes
arnavarora2004 Jul 14, 2025
8c96d22
import fixes
arnavarora2004 Jul 14, 2025
373b87f
import fixes
arnavarora2004 Jul 14, 2025
9bd071c
import fixes
arnavarora2004 Jul 14, 2025
74b6dc3
import fixes
arnavarora2004 Jul 14, 2025
01f84da
import fixes
arnavarora2004 Jul 14, 2025
54b6ad1
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
c46ef26
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
b4fab07
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
c600ea0
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 15, 2025
2d30e08
Merge branch 'apache:master' into master
arnavarora2004 Jul 15, 2025
221e558
made changes to allMutations test
arnavarora2004 Jul 15, 2025
9cb6c32
made changes to allMutations test
arnavarora2004 Jul 15, 2025
a9f77eb
Merge branch 'apache:master' into master
arnavarora2004 Jul 15, 2025
c0596f3
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
1db6821
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
80544d0
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jul 16, 2025
5753f18
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
6cd69d5
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
a53045c
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
7874226
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
92a0ff9
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
54b9900
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
5b58815
new read errors fixed
arnavarora2004 Jul 16, 2025
ca12b07
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
81aa2ed
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 16, 2025
417bfea
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
fbf74a5
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
8aad18a
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
d3f17bd
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 17, 2025
d0d12ae
Merge branch 'apache:master' into master
arnavarora2004 Jul 17, 2025
16030c6
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
15a8bd2
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
85c1392
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
2cdd808
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
636df03
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
0ab4db4
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
204ff4d
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
a712320
Merge branch 'apache:master' into master
arnavarora2004 Jul 17, 2025
2e09dd7
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jul 17, 2025
f28eea9
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
d26b45d
Following checkstyle tests
arnavarora2004 Jul 17, 2025
0b6e855
Following checkstyle tests
arnavarora2004 Jul 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static java.util.Optional.ofNullable;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
Expand All @@ -34,16 +35,21 @@
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteSchemaTransformProvider.BigtableWriteSchemaTransformConfiguration;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;

/**
Expand All @@ -60,6 +66,13 @@ public class BigtableWriteSchemaTransformProvider

private static final String INPUT_TAG = "input";

private static final Schema BATCHED_MUTATIONS_SCHEMA =
Schema.builder()
.addByteArrayField("key")
.addArrayField(
"mutations", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.BYTES))
.build();

@Override
protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) {
return new BigtableWriteSchemaTransform(configuration);
Expand Down Expand Up @@ -135,18 +148,206 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
String.format(
"Could not find expected input [%s] to %s.", INPUT_TAG, getClass().getSimpleName()));

PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations =
beamRowMutations.apply(MapElements.via(new GetMutationsFromBeamRow()));
Schema inputSchema = input.getSinglePCollection().getSchema();

bigtableMutations.apply(
BigtableIO.write()
.withTableId(configuration.getTableId())
.withInstanceId(configuration.getInstanceId())
.withProjectId(configuration.getProjectId()));
PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations = null;
if (inputSchema.equals(BATCHED_MUTATIONS_SCHEMA)) {
PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
bigtableMutations =
beamRowMutations.apply(
// Original schema inputs gets sent out to the original transform provider mutations
// function
MapElements.via(new GetMutationsFromBeamRow()));
} else if (inputSchema.hasField("type")) {
validateField(inputSchema, "key", Schema.TypeName.BYTES);
validateField(inputSchema, "type", Schema.TypeName.STRING);
if (inputSchema.hasField("value")) {
validateField(inputSchema, "value", Schema.TypeName.BYTES);
}
if (inputSchema.hasField("column_qualifier")) {
validateField(inputSchema, "column_qualifier", Schema.TypeName.BYTES);
}
if (inputSchema.hasField("family_name")) {
validateField(inputSchema, "family_name", Schema.TypeName.BYTES);
}
if (inputSchema.hasField("timestamp_micros")) {
validateField(inputSchema, "timestamp_micros", Schema.TypeName.INT64);
}
if (inputSchema.hasField("start_timestamp_micros")) {
validateField(inputSchema, "start_timestamp_micros", Schema.TypeName.INT64);
}
if (inputSchema.hasField("end_timestamp_micros")) {
validateField(inputSchema, "end_timestamp_micros", Schema.TypeName.INT64);
}
bigtableMutations = changeMutationInput(input);
} else {
throw new RuntimeException(
"Input Schema is invalid: "
+ inputSchema
+ "\n\nSchema should be formatted in one of two ways:\n "
+ "key\": ByteString\n"
+ "\"type\": String\n"
+ "\"value\": ByteString\n"
+ "\"column_qualifier\": ByteString\n"
+ "\"family_name\": ByteString\n"
+ "\"timestamp_micros\": Long\n"
+ "\"start_timestamp_micros\": Long\n"
+ "\"end_timestamp_micros\": Long\n"
+ "\nOR\n"
+ "\n"
+ "\"key\": ByteString\n"
+ "(\"mutations\", contains map(String, ByteString) of mutations in the mutation schema format");
}

if (bigtableMutations != null) {
bigtableMutations.apply(
BigtableIO.write()
.withTableId(configuration.getTableId())
.withInstanceId(configuration.getInstanceId())
.withProjectId(configuration.getProjectId()));
} else {
throw new RuntimeException(
"Inputted Schema caused mutation error, check error logs and input schema format");
}
return PCollectionRowTuple.empty(input.getPipeline());
}

private void validateField(Schema inputSchema, String field, Schema.TypeName expectedType) {
Schema.TypeName actualType = inputSchema.getField(field).getType().getTypeName();
checkState(
actualType.equals(expectedType),
"Schema field '%s' should be of type %s, but was %s.",
field,
expectedType,
actualType);
}

public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
PCollectionRowTuple inputR) {
PCollection<Row> beamRowMutationsList = inputR.getSinglePCollection();
// convert all row inputs into KV<ByteString, Mutation>
PCollection<KV<ByteString, Mutation>> changedBeamRowMutationsList =
beamRowMutationsList.apply(
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptor.of(ByteString.class), TypeDescriptor.of(Mutation.class)))
.via(
(Row input) -> {
ByteString key =
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("key"),
"Encountered row with null 'key' property."));

Mutation bigtableMutation;
String mutationType =
input.getString("type"); // Direct call, can return null
if (mutationType == null) {
throw new IllegalArgumentException("Mutation type cannot be null.");
}
switch (mutationType) {
case "SetCell":
Mutation.SetCell.Builder setMutation =
Mutation.SetCell.newBuilder()
.setValue(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("value"),
"Encountered SetCell mutation with null 'value' property.")))
.setColumnQualifier(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("column_qualifier"),
"Encountered SetCell mutation with null 'column_qualifier' property. ")))
.setFamilyNameBytes(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("family_name"),
"Encountered SetCell mutation with null 'family_name' property.")));
// Use timestamp if provided, else default to -1 (current
// Bigtable
// server time)
// Timestamp (optional, assuming Long type in Row schema)
Long timestampMicros = input.getInt64("timestamp_micros");
setMutation.setTimestampMicros(
timestampMicros != null ? timestampMicros : -1);

bigtableMutation =
Mutation.newBuilder().setSetCell(setMutation.build()).build();
break;
case "DeleteFromColumn":
// set timestamp range if applicable
Mutation.DeleteFromColumn.Builder deleteMutation =
Mutation.DeleteFromColumn.newBuilder()
.setColumnQualifier(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("column_qualifier"),
"Encountered DeleteFromColumn mutation with null 'column_qualifier' property.")))
.setFamilyNameBytes(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("family_name"),
"Encountered DeleteFromColumn mutation with null 'family_name' property.")));

// if start or end timestamp provided
// Timestamp Range (optional, assuming Long type in Row schema)
Long startTimestampMicros = null;
Long endTimestampMicros = null;

if (input.getSchema().hasField("start_timestamp_micros")) {
startTimestampMicros = input.getInt64("start_timestamp_micros");
}
if (input.getSchema().hasField("end_timestamp_micros")) {
endTimestampMicros = input.getInt64("end_timestamp_micros");
}

if (startTimestampMicros != null || endTimestampMicros != null) {
TimestampRange.Builder timeRange = TimestampRange.newBuilder();
if (startTimestampMicros != null) {
timeRange.setStartTimestampMicros(startTimestampMicros);
}
if (endTimestampMicros != null) {
timeRange.setEndTimestampMicros(endTimestampMicros);
}
deleteMutation.setTimeRange(timeRange.build());
}
bigtableMutation =
Mutation.newBuilder()
.setDeleteFromColumn(deleteMutation.build())
.build();
break;
case "DeleteFromFamily":
bigtableMutation =
Mutation.newBuilder()
.setDeleteFromFamily(
Mutation.DeleteFromFamily.newBuilder()
.setFamilyNameBytes(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("family_name"),
"Encountered DeleteFromFamily mutation with null 'family_name' property.")))
.build())
.build();
break;
case "DeleteFromRow":
bigtableMutation =
Mutation.newBuilder()
.setDeleteFromRow(Mutation.DeleteFromRow.newBuilder().build())
.build();
break;
default:
throw new RuntimeException(
String.format(
"Unexpected mutation type [%s]: Key value is %s",
((input.getString("type"))),
Arrays.toString(input.getBytes("key"))));
}
return KV.of(key, bigtableMutation);
}));
// now we need to make the KV into a PCollection of KV<ByteString, Iterable<Mutation>>
return changedBeamRowMutationsList.apply(GroupByKey.create());
}
}

public static class GetMutationsFromBeamRow
Expand Down
Loading
Loading