Skip to content
Merged
Show file tree
Hide file tree
Changes from 88 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
ad54a58
Refactored BigTableReadSchemaTransformConfiguration
arnavarora2004 May 21, 2025
0edf81d
changed scope, working on buffer class for making BigTable yaml fully…
arnavarora2004 May 22, 2025
18c9395
Finished up a bit of standard_io.yaml
arnavarora2004 May 22, 2025
a25033e
Finished up a bit of standard_io.yaml
arnavarora2004 May 27, 2025
0e4fb14
Merge branch 'apache:master' into master
arnavarora2004 Jun 4, 2025
c048dcf
Added bigTable test
arnavarora2004 Jun 4, 2025
4ebc7c8
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 4, 2025
3bb3dfc
changed some tests for BigTable
arnavarora2004 Jun 4, 2025
a8b8196
Added new IT file for simpleWrite and also made changes integration t…
arnavarora2004 Jun 5, 2025
cf8bd8f
Added new IT file for simpleWrite and also made changes integration t…
arnavarora2004 Jun 5, 2025
a06d7c6
SetCell mutation test works, I want to see if this draft PR works CI …
arnavarora2004 Jun 12, 2025
5760278
Fixed a slight error
arnavarora2004 Jun 12, 2025
5cb0dd7
Merge branch 'apache:master' into master
arnavarora2004 Jun 25, 2025
f2640ae
Added way more changes to integrations test.py, BigTableSimpleWriteSc…
arnavarora2004 Jun 25, 2025
69467c5
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 25, 2025
121ddf6
BigTableSimpleWriteSchemaTransformProviderIT finished changes to muta…
arnavarora2004 Jun 25, 2025
82d3612
Merge branch 'apache:master' into master
arnavarora2004 Jun 25, 2025
14ca717
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
c36d864
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
d6366dd
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
fcb5d03
Merge branch 'apache:master' into master
arnavarora2004 Jun 26, 2025
38ff386
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 26, 2025
a15e4ff
Update sdks/java/io/google-cloud-platform/src/test/java/org/apache/be…
arnavarora2004 Jun 26, 2025
c759a07
changed comments
arnavarora2004 Jun 26, 2025
8e19a81
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 26, 2025
b6d0157
Added changes from derrick comments
arnavarora2004 Jun 26, 2025
7ea3f76
Merge branch 'apache:master' into master
arnavarora2004 Jun 27, 2025
a35ced7
Merge branch 'apache:master' into master
arnavarora2004 Jun 30, 2025
50bb5a3
Added default schema maybe fixes the issues
arnavarora2004 Jun 30, 2025
426519d
Added schema to every test specificly, will run tests to see if it works
arnavarora2004 Jun 30, 2025
3152094
Added default schema maybe fixes the issues
arnavarora2004 Jul 2, 2025
84a3cfd
Merge branch 'apache:master' into master
arnavarora2004 Jul 2, 2025
1ca2527
Following formatting tests
arnavarora2004 Jul 2, 2025
954355b
Merge branch 'apache:master' into master
arnavarora2004 Jul 2, 2025
ab18e18
Following formatting tests
arnavarora2004 Jul 2, 2025
80a732e
Following checkstyle tests
arnavarora2004 Jul 2, 2025
16f1064
Merge branch 'apache:master' into master
arnavarora2004 Jul 7, 2025
3c9c582
Made schema and test changes
arnavarora2004 Jul 7, 2025
b842ac9
Made schema and test changes
arnavarora2004 Jul 7, 2025
0ed5da1
Merge branch 'apache:master' into master
arnavarora2004 Jul 7, 2025
cea5987
Made schema and test changes
arnavarora2004 Jul 7, 2025
b6498c8
Made schema and test changes
arnavarora2004 Jul 8, 2025
5f6992d
Made schema and test changes
arnavarora2004 Jul 9, 2025
bdc9cff
Merge branch 'apache:master' into master
arnavarora2004 Jul 9, 2025
37abe22
Added final test
arnavarora2004 Jul 9, 2025
5cb46df
changed timestamp values
arnavarora2004 Jul 10, 2025
b1fae9c
added all mutations test
arnavarora2004 Jul 10, 2025
4866acc
added all mutations test
arnavarora2004 Jul 10, 2025
8ac0fda
pushed changes to format errors
arnavarora2004 Jul 10, 2025
32bfbe8
Merge branch 'apache:master' into master
arnavarora2004 Jul 10, 2025
b217de2
pushed changes to format errors
arnavarora2004 Jul 10, 2025
1ad3a32
Delete 4
arnavarora2004 Jul 10, 2025
364a761
pushed changes to format errors
arnavarora2004 Jul 10, 2025
5338470
pushed changes to format errors
arnavarora2004 Jul 10, 2025
c1bc8c6
pushed changes to format errors
arnavarora2004 Jul 10, 2025
fad8ae8
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
4315c4f
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
9e4514c
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
64a7303
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
1fc5366
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
680678b
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
6fa20ab
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
2b2af72
import fixes
arnavarora2004 Jul 14, 2025
8c96d22
import fixes
arnavarora2004 Jul 14, 2025
373b87f
import fixes
arnavarora2004 Jul 14, 2025
9bd071c
import fixes
arnavarora2004 Jul 14, 2025
74b6dc3
import fixes
arnavarora2004 Jul 14, 2025
01f84da
import fixes
arnavarora2004 Jul 14, 2025
54b6ad1
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
c46ef26
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
b4fab07
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
c600ea0
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 15, 2025
2d30e08
Merge branch 'apache:master' into master
arnavarora2004 Jul 15, 2025
221e558
made changes to allMutations test
arnavarora2004 Jul 15, 2025
9cb6c32
made changes to allMutations test
arnavarora2004 Jul 15, 2025
a9f77eb
Merge branch 'apache:master' into master
arnavarora2004 Jul 15, 2025
c0596f3
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
1db6821
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
80544d0
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jul 16, 2025
5753f18
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
6cd69d5
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
a53045c
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
7874226
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
92a0ff9
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
54b9900
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
5b58815
new read errors fixed
arnavarora2004 Jul 16, 2025
ca12b07
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
81aa2ed
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 16, 2025
417bfea
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
fbf74a5
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
8aad18a
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
d3f17bd
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 17, 2025
d0d12ae
Merge branch 'apache:master' into master
arnavarora2004 Jul 17, 2025
16030c6
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
15a8bd2
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
85c1392
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
2cdd808
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
636df03
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
0ab4db4
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
204ff4d
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
a712320
Merge branch 'apache:master' into master
arnavarora2004 Jul 17, 2025
2e09dd7
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jul 17, 2025
f28eea9
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
d26b45d
Following checkstyle tests
arnavarora2004 Jul 17, 2025
0b6e855
Following checkstyle tests
arnavarora2004 Jul 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteSchemaTransformProvider.BigtableWriteSchemaTransformConfiguration;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;

/**
Expand All @@ -67,7 +72,7 @@ protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configu

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:bigtable_write:v1";
return "beam:schematransform:org.apache.beam:bigtable_simple_write:v1";
}

@Override
Expand Down Expand Up @@ -135,18 +140,197 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
String.format(
"Could not find expected input [%s] to %s.", INPUT_TAG, getClass().getSimpleName()));

PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations =
beamRowMutations.apply(MapElements.via(new GetMutationsFromBeamRow()));
Schema testOriginialSchema =
Schema.builder()
.addByteArrayField("key")
.addArrayField(
"mutations",
Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.BYTES))
.build();

bigtableMutations.apply(
BigtableIO.write()
.withTableId(configuration.getTableId())
.withInstanceId(configuration.getInstanceId())
.withProjectId(configuration.getProjectId()));
Schema inputSchema = input.getSinglePCollection().getSchema();

System.out.println("Input Schema for BigTableMutations: " + inputSchema);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
System.out.println("Input Schema for BigTableMutations: " + inputSchema);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we keep it so the user can compare what his/her schema is and what the valid config for schema is in case they are wrong/getting errors

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log the input schema only if it's relevant (e.g. if it's invalid and causing failures). It's not very useful when everything is WAI (and probably will just be noisy)


PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations = null;
if (inputSchema.equals(testOriginialSchema)) {
PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
bigtableMutations =
beamRowMutations.apply(
// Original schema inputs gets sent out to the original transform provider mutations
// function
MapElements.via(
new BigtableWriteSchemaTransformProvider.GetMutationsFromBeamRow()));
} else if (inputSchema.hasField("type")) {
// // validate early doesn't work for all mutations IT test but it does help
// if (inputSchema.hasField("column_qualifier")) {
// Schema.FieldType columnQualifierType =
// inputSchema.getField("column_qualifier").getType();
// checkState(
// columnQualifierType.equals(Schema.FieldType.STRING)
// || columnQualifierType.equals(Schema.FieldType.BYTES),
// "column_qualifier should be of type STRING or BYTES");
// }
// // new schema inputs get sent to the new transform provider mutation function
bigtableMutations = changeMutationInput(input);
} else {
System.out.println(
Copy link
Copy Markdown
Collaborator

@derrickaw derrickaw Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete or log?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a log, turned it into throw error

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be throwing an error right? not just printing?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw you throw a RuntimeException below. I think that can be thrown here instead with the message you have here

"Inputted Schema is Invalid; the schema should be formatted in one of two ways:\n "
+ "key\": ByteString\n"
+ "\"type\": String\n"
+ "\"column_qualifier\": ByteString\n"
+ "\"family_name\": ByteString\n"
+ "\"timestamp_micros\": Long\n"
+ "\"start_timestamp_micros\": Long\n"
+ "\"end_timestamp_micros\": Long"
+ "OR\n"
+ "\n"
+ "\"key\": ByteString\n"
+ "(\"mutations\", contains map(String, ByteString) of mutations in the mutation schema format");
}

if (bigtableMutations != null) {
bigtableMutations.apply(
BigtableIO.write()
.withTableId(configuration.getTableId())
.withInstanceId(configuration.getInstanceId())
.withProjectId(configuration.getProjectId()));
} else {
checkArgument(
true,
"Inputted Schema caused mutation error, check error logs and input schema format");
}
return PCollectionRowTuple.empty(input.getPipeline());
}

public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
PCollectionRowTuple inputR) {
PCollection<Row> beamRowMutationsList = inputR.getSinglePCollection();
// convert all row inputs into KV<ByteString, Mutation>
PCollection<KV<ByteString, Mutation>> changedBeamRowMutationsList =
beamRowMutationsList.apply(
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptor.of(ByteString.class), TypeDescriptor.of(Mutation.class)))
.via(
(Row input) -> {
@SuppressWarnings("nullness")
ByteString key =
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("key"),
"Encountered row with incorrect 'key' property."));

Mutation bigtableMutation;
String mutationType =
input.getString("type"); // Direct call, can return null
if (mutationType == null) {
throw new IllegalArgumentException("Mutation type cannot be null.");
}
switch (mutationType) {
case "SetCell":
@SuppressWarnings("nullness")
Mutation.SetCell.Builder setMutation =
Mutation.SetCell.newBuilder()
.setValue(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("value"),
"Encountered SetCell mutation with incorrect 'family_name' property.")))
.setColumnQualifier(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("column_qualifier"),
"Encountered SetCell mutation with incorrect 'family_name' property. ")))
.setFamilyNameBytes(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("family_name"),
"Encountered SetCell mutation with incorrect 'family_name' property.")));
// Use timestamp if provided, else default to -1 (current
// Bigtable
// server time)
// Timestamp (optional, assuming Long type in Row schema)
Long timestampMicros = input.getInt64("timestamp_micros");
setMutation.setTimestampMicros(
timestampMicros != null ? timestampMicros : -1);

bigtableMutation =
Mutation.newBuilder().setSetCell(setMutation.build()).build();
break;
case "DeleteFromColumn":
// set timestamp range if applicable
@SuppressWarnings("nullness")
Mutation.DeleteFromColumn.Builder deleteMutation =
Mutation.DeleteFromColumn.newBuilder()
.setColumnQualifier(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("column_qualifier"),
"Encountered DeleteFromColumn mutation with incorrect 'column_qualifier' property.")))
.setFamilyNameBytes(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("family_name"),
"Encountered DeleteFromColumn mutation with incorrect 'family_name' property.")));

// if start or end timestamp provided
// Timestamp Range (optional, assuming Long type in Row schema)
Long startTimestampMicros = null;
Long endTimestampMicros = null;

if (input.getSchema().hasField("start_timestamp_micros")) {
startTimestampMicros = input.getInt64("start_timestamp_micros");
}
if (input.getSchema().hasField("end_timestamp_micros")) {
endTimestampMicros = input.getInt64("end_timestamp_micros");
}

if (startTimestampMicros != null || endTimestampMicros != null) {
TimestampRange.Builder timeRange = TimestampRange.newBuilder();
if (startTimestampMicros != null) {
timeRange.setStartTimestampMicros(startTimestampMicros);
}
if (endTimestampMicros != null) {
timeRange.setEndTimestampMicros(endTimestampMicros);
}
deleteMutation.setTimeRange(timeRange.build());
}
bigtableMutation =
Mutation.newBuilder()
.setDeleteFromColumn(deleteMutation.build())
.build();
break;
case "DeleteFromFamily":
bigtableMutation =
Mutation.newBuilder()
.setDeleteFromFamily(
Mutation.DeleteFromFamily.newBuilder()
.setFamilyNameBytes(
ByteString.copyFrom(
Preconditions.checkStateNotNull(
input.getBytes("family_name"),
"Encountered DeleteFromFamily mutation with incorrect 'family_name' property.")))
.build())
.build();
break;
case "DeleteFromRow":
bigtableMutation =
Mutation.newBuilder()
.setDeleteFromRow(Mutation.DeleteFromRow.newBuilder().build())
.build();
break;
default:
throw new RuntimeException(
String.format(
"Unexpected mutation type [%s]: %s",
((input.getString("type"))), input));
}
return KV.of(key, bigtableMutation);
}));
// now we need to make the KV into a PCollection of KV<ByteString, Iterable<Mutation>>
return changedBeamRowMutationsList.apply(GroupByKey.create());
}
}

public static class GetMutationsFromBeamRow
Expand Down
Loading
Loading