Skip to content

Commit d336fcb

Browse files
feat(yaml): Add BigTable write connector (#35435)
* Refactored BigTableReadSchemaTransformConfiguration * changed scope, working on buffer class for making BigTable yaml fully connected and actually look good on user end for mutations * Finished up a bit of standard_io.yaml * Finished up a bit of standard_io.yaml * Added bigTable test * changed some tests for BigTable * Added new IT file for simpleWrite and also made changes integration test debugging * Added new IT file for simpleWrite and also made changes integration test debugging * SetCell mutation test works, I want to see if this draft PR works CI test wise * Fixed a slight error * Added way more changes to integrations test.py, BigTableSimpleWriteSchemaTransformProviderIT, and testing out new mutations etc * BigTableSimpleWriteSchemaTransformProviderIT finished changes to mutated new user input, all mutations work correctly, put demo code for it * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <myutat@gmail.com> * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <myutat@gmail.com> * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <myutat@gmail.com> * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <myutat@gmail.com> * Update sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProviderIT.java Co-authored-by: Derrick Williams <myutat@gmail.com> * changed comments * Added changes from derrick comments * Added default schema maybe fixes the issues * Added schema to every test specificly, will run tests to see if it works * Added default schema maybe fixes the issues * Following formatting tests * Following formatting tests * Following checkstyle tests * Made schema and test changes * Made schema and test changes * Made schema and test changes * Made schema and test changes * Made schema and test changes * Added final test * changed timestamp values * added all mutations test * added all mutations test * pushed changes to format errors * pushed changes to format errors * Delete 4 * pushed changes to format errors * pushed changes to format errors * pushed changes to format errors * pushed changes to debugging errors * pushed changes to debugging errors * to see internal error added print(will remove) * to see internal error added print(will remove) * to see internal error added print(will remove) * import fixes * import fixes * import fixes * import fixes * import fixes * import fixes * pushed changes to debugging errors * pushed changes to debugging errors * pushed changes to debugging errors, added pulls from other beam * made changes to allMutations test * made changes to allMutations test * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * new read errors fixed * pushed changes to debugging errors, added pulls from other beam * consolidated schema transform files, fixed small issues and bugs * consolidated schema transform files, fixed small issues and bugs * consolidated schema transform files, fixed small issues and bugs * consolidated schema transform files, fixed small issues and bugs * pushed changes to debugging errors, added pulls from other beam * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * Following checkstyle tests * Following checkstyle tests --------- Co-authored-by: Derrick Williams <myutat@gmail.com>
1 parent 4d764df commit d336fcb

File tree

6 files changed

+1058
-15
lines changed

6 files changed

+1058
-15
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java

Lines changed: 209 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static java.util.Optional.ofNullable;
2121
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
22+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2223

2324
import com.google.auto.service.AutoService;
2425
import com.google.auto.value.AutoValue;
@@ -34,16 +35,21 @@
3435
import java.util.Map;
3536
import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteSchemaTransformProvider.BigtableWriteSchemaTransformConfiguration;
3637
import org.apache.beam.sdk.schemas.AutoValueSchema;
38+
import org.apache.beam.sdk.schemas.Schema;
3739
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
3840
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
3941
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
4042
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
43+
import org.apache.beam.sdk.transforms.GroupByKey;
4144
import org.apache.beam.sdk.transforms.MapElements;
4245
import org.apache.beam.sdk.transforms.SimpleFunction;
46+
import org.apache.beam.sdk.util.Preconditions;
4347
import org.apache.beam.sdk.values.KV;
4448
import org.apache.beam.sdk.values.PCollection;
4549
import org.apache.beam.sdk.values.PCollectionRowTuple;
4650
import org.apache.beam.sdk.values.Row;
51+
import org.apache.beam.sdk.values.TypeDescriptor;
52+
import org.apache.beam.sdk.values.TypeDescriptors;
4753
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
4854

4955
/**
@@ -60,6 +66,13 @@ public class BigtableWriteSchemaTransformProvider
6066

6167
private static final String INPUT_TAG = "input";
6268

69+
private static final Schema BATCHED_MUTATIONS_SCHEMA =
70+
Schema.builder()
71+
.addByteArrayField("key")
72+
.addArrayField(
73+
"mutations", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.BYTES))
74+
.build();
75+
6376
@Override
6477
protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) {
6578
return new BigtableWriteSchemaTransform(configuration);
@@ -135,18 +148,206 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
135148
String.format(
136149
"Could not find expected input [%s] to %s.", INPUT_TAG, getClass().getSimpleName()));
137150

138-
PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
139-
PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations =
140-
beamRowMutations.apply(MapElements.via(new GetMutationsFromBeamRow()));
151+
Schema inputSchema = input.getSinglePCollection().getSchema();
141152

142-
bigtableMutations.apply(
143-
BigtableIO.write()
144-
.withTableId(configuration.getTableId())
145-
.withInstanceId(configuration.getInstanceId())
146-
.withProjectId(configuration.getProjectId()));
153+
PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations = null;
154+
if (inputSchema.equals(BATCHED_MUTATIONS_SCHEMA)) {
155+
PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
156+
bigtableMutations =
157+
beamRowMutations.apply(
158+
// Original schema inputs gets sent out to the original transform provider mutations
159+
// function
160+
MapElements.via(new GetMutationsFromBeamRow()));
161+
} else if (inputSchema.hasField("type")) {
162+
validateField(inputSchema, "key", Schema.TypeName.BYTES);
163+
validateField(inputSchema, "type", Schema.TypeName.STRING);
164+
if (inputSchema.hasField("value")) {
165+
validateField(inputSchema, "value", Schema.TypeName.BYTES);
166+
}
167+
if (inputSchema.hasField("column_qualifier")) {
168+
validateField(inputSchema, "column_qualifier", Schema.TypeName.BYTES);
169+
}
170+
if (inputSchema.hasField("family_name")) {
171+
validateField(inputSchema, "family_name", Schema.TypeName.BYTES);
172+
}
173+
if (inputSchema.hasField("timestamp_micros")) {
174+
validateField(inputSchema, "timestamp_micros", Schema.TypeName.INT64);
175+
}
176+
if (inputSchema.hasField("start_timestamp_micros")) {
177+
validateField(inputSchema, "start_timestamp_micros", Schema.TypeName.INT64);
178+
}
179+
if (inputSchema.hasField("end_timestamp_micros")) {
180+
validateField(inputSchema, "end_timestamp_micros", Schema.TypeName.INT64);
181+
}
182+
bigtableMutations = changeMutationInput(input);
183+
} else {
184+
throw new RuntimeException(
185+
"Input Schema is invalid: "
186+
+ inputSchema
187+
+ "\n\nSchema should be formatted in one of two ways:\n "
188+
+ "key\": ByteString\n"
189+
+ "\"type\": String\n"
190+
+ "\"value\": ByteString\n"
191+
+ "\"column_qualifier\": ByteString\n"
192+
+ "\"family_name\": ByteString\n"
193+
+ "\"timestamp_micros\": Long\n"
194+
+ "\"start_timestamp_micros\": Long\n"
195+
+ "\"end_timestamp_micros\": Long\n"
196+
+ "\nOR\n"
197+
+ "\n"
198+
+ "\"key\": ByteString\n"
199+
+ "(\"mutations\", contains map(String, ByteString) of mutations in the mutation schema format");
200+
}
147201

202+
if (bigtableMutations != null) {
203+
bigtableMutations.apply(
204+
BigtableIO.write()
205+
.withTableId(configuration.getTableId())
206+
.withInstanceId(configuration.getInstanceId())
207+
.withProjectId(configuration.getProjectId()));
208+
} else {
209+
throw new RuntimeException(
210+
"Inputted Schema caused mutation error, check error logs and input schema format");
211+
}
148212
return PCollectionRowTuple.empty(input.getPipeline());
149213
}
214+
215+
private void validateField(Schema inputSchema, String field, Schema.TypeName expectedType) {
216+
Schema.TypeName actualType = inputSchema.getField(field).getType().getTypeName();
217+
checkState(
218+
actualType.equals(expectedType),
219+
"Schema field '%s' should be of type %s, but was %s.",
220+
field,
221+
expectedType,
222+
actualType);
223+
}
224+
225+
public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
226+
PCollectionRowTuple inputR) {
227+
PCollection<Row> beamRowMutationsList = inputR.getSinglePCollection();
228+
// convert all row inputs into KV<ByteString, Mutation>
229+
PCollection<KV<ByteString, Mutation>> changedBeamRowMutationsList =
230+
beamRowMutationsList.apply(
231+
MapElements.into(
232+
TypeDescriptors.kvs(
233+
TypeDescriptor.of(ByteString.class), TypeDescriptor.of(Mutation.class)))
234+
.via(
235+
(Row input) -> {
236+
ByteString key =
237+
ByteString.copyFrom(
238+
Preconditions.checkStateNotNull(
239+
input.getBytes("key"),
240+
"Encountered row with null 'key' property."));
241+
242+
Mutation bigtableMutation;
243+
String mutationType =
244+
input.getString("type"); // Direct call, can return null
245+
if (mutationType == null) {
246+
throw new IllegalArgumentException("Mutation type cannot be null.");
247+
}
248+
switch (mutationType) {
249+
case "SetCell":
250+
Mutation.SetCell.Builder setMutation =
251+
Mutation.SetCell.newBuilder()
252+
.setValue(
253+
ByteString.copyFrom(
254+
Preconditions.checkStateNotNull(
255+
input.getBytes("value"),
256+
"Encountered SetCell mutation with null 'value' property.")))
257+
.setColumnQualifier(
258+
ByteString.copyFrom(
259+
Preconditions.checkStateNotNull(
260+
input.getBytes("column_qualifier"),
261+
"Encountered SetCell mutation with null 'column_qualifier' property. ")))
262+
.setFamilyNameBytes(
263+
ByteString.copyFrom(
264+
Preconditions.checkStateNotNull(
265+
input.getBytes("family_name"),
266+
"Encountered SetCell mutation with null 'family_name' property.")));
267+
// Use timestamp if provided, else default to -1 (current
268+
// Bigtable
269+
// server time)
270+
// Timestamp (optional, assuming Long type in Row schema)
271+
Long timestampMicros = input.getInt64("timestamp_micros");
272+
setMutation.setTimestampMicros(
273+
timestampMicros != null ? timestampMicros : -1);
274+
275+
bigtableMutation =
276+
Mutation.newBuilder().setSetCell(setMutation.build()).build();
277+
break;
278+
case "DeleteFromColumn":
279+
// set timestamp range if applicable
280+
Mutation.DeleteFromColumn.Builder deleteMutation =
281+
Mutation.DeleteFromColumn.newBuilder()
282+
.setColumnQualifier(
283+
ByteString.copyFrom(
284+
Preconditions.checkStateNotNull(
285+
input.getBytes("column_qualifier"),
286+
"Encountered DeleteFromColumn mutation with null 'column_qualifier' property.")))
287+
.setFamilyNameBytes(
288+
ByteString.copyFrom(
289+
Preconditions.checkStateNotNull(
290+
input.getBytes("family_name"),
291+
"Encountered DeleteFromColumn mutation with null 'family_name' property.")));
292+
293+
// if start or end timestamp provided
294+
// Timestamp Range (optional, assuming Long type in Row schema)
295+
Long startTimestampMicros = null;
296+
Long endTimestampMicros = null;
297+
298+
if (input.getSchema().hasField("start_timestamp_micros")) {
299+
startTimestampMicros = input.getInt64("start_timestamp_micros");
300+
}
301+
if (input.getSchema().hasField("end_timestamp_micros")) {
302+
endTimestampMicros = input.getInt64("end_timestamp_micros");
303+
}
304+
305+
if (startTimestampMicros != null || endTimestampMicros != null) {
306+
TimestampRange.Builder timeRange = TimestampRange.newBuilder();
307+
if (startTimestampMicros != null) {
308+
timeRange.setStartTimestampMicros(startTimestampMicros);
309+
}
310+
if (endTimestampMicros != null) {
311+
timeRange.setEndTimestampMicros(endTimestampMicros);
312+
}
313+
deleteMutation.setTimeRange(timeRange.build());
314+
}
315+
bigtableMutation =
316+
Mutation.newBuilder()
317+
.setDeleteFromColumn(deleteMutation.build())
318+
.build();
319+
break;
320+
case "DeleteFromFamily":
321+
bigtableMutation =
322+
Mutation.newBuilder()
323+
.setDeleteFromFamily(
324+
Mutation.DeleteFromFamily.newBuilder()
325+
.setFamilyNameBytes(
326+
ByteString.copyFrom(
327+
Preconditions.checkStateNotNull(
328+
input.getBytes("family_name"),
329+
"Encountered DeleteFromFamily mutation with null 'family_name' property.")))
330+
.build())
331+
.build();
332+
break;
333+
case "DeleteFromRow":
334+
bigtableMutation =
335+
Mutation.newBuilder()
336+
.setDeleteFromRow(Mutation.DeleteFromRow.newBuilder().build())
337+
.build();
338+
break;
339+
default:
340+
throw new RuntimeException(
341+
String.format(
342+
"Unexpected mutation type [%s]: Key value is %s",
343+
((input.getString("type"))),
344+
Arrays.toString(input.getBytes("key"))));
345+
}
346+
return KV.of(key, bigtableMutation);
347+
}));
348+
// now we need to make the KV into a PCollection of KV<ByteString, Iterable<Mutation>>
349+
return changedBeamRowMutationsList.apply(GroupByKey.create());
350+
}
150351
}
151352

152353
public static class GetMutationsFromBeamRow

0 commit comments

Comments
 (0)