Skip to content

Commit 4114f7c

Browse files
arnavarora2004derrickawahmedabu98
authored
Add BigTableRead connector and new feature implemented (#35696)
* Refactored BigTableReadSchemaTransformConfiguration * changed scope, working on buffer class for making BigTable yaml fully connected and actually look good on user end for mutations * Finished up a bit of standard_io.yaml * Finished up a bit of standard_io.yaml * Added bigTable test * changed some tests for BigTable * Added new IT file for simpleWrite and also made changes integration test debugging * Added new IT file for simpleWrite and also made changes integration test debugging * SetCell mutation test works, I want to see if this draft PR works CI test wise * Fixed a slight error * Added way more changes to integrations test.py, BigTableSimpleWriteSchemaTransformProviderIT, and testing out new mutations etc * BigTableSimpleWriteSchemaTransformProviderIT finished changes to mutated new user input, all mutations work correctly, put demo code for it * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <[email protected]> * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <[email protected]> * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <[email protected]> * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java Co-authored-by: Derrick Williams <[email protected]> * Update sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProviderIT.java Co-authored-by: Derrick Williams <[email protected]> * changed comments * Added changes from derrick comments * Added default schema maybe fixes the issues * Added schema to every test specificly, will run tests to see if it works * Added default schema maybe fixes the issues * Following formatting tests * Following formatting tests * Following checkstyle tests * Made schema and test changes * Made schema and test changes * Made schema and test changes * Made schema and test changes * Made schema and test changes * Added final test * changed timestamp values * added all mutations test * added all mutations test * pushed changes to format errors * pushed changes to format errors * Delete 4 * pushed changes to format errors * pushed changes to format errors * pushed changes to format errors * pushed changes to debugging errors * pushed changes to debugging errors * to see internal error added print(will remove) * to see internal error added print(will remove) * to see internal error added print(will remove) * import fixes * import fixes * import fixes * import fixes * import fixes * import fixes * pushed changes to debugging errors * pushed changes to debugging errors * pushed changes to debugging errors, added pulls from other beam * made changes to allMutations test * made changes to allMutations test * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * pushed changes to debugging errors, added pulls from other beam * new read errors fixed * pushed changes to debugging errors, added pulls from other beam * consolidated schema transform files, fixed small issues and bugs * consolidated schema transform files, fixed small issues and bugs * consolidated schema transform files, fixed small issues and bugs * consolidated schema transform files, fixed small issues and bugs * pushed changes to debugging errors, added pulls from other beam * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * pushed changes from ahmed * Following checkstyle tests * Following checkstyle tests * pushed new changes to BigTableRead, making it work with new functionality feature of allowing flatten (defaulted to true) * pushed new changes to BigTableRead, making it work with new functionality feature of allowing flatten (defaulted to true) and added a new test in IT and fixed formatting stuff * pushed new changes to BigTableRead, making it work with new functionality feature of allowing flatten (defaulted to true) and added a new test in IT and fixed formatting stuff * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java Co-authored-by: Ahmed Abualsaud <[email protected]> * pushed new changes to BigTableRead, making it work with new functionality feature of allowing flatten (defaulted to true) and added a new test in IT and fixed formatting stuff * pushed new changes to BigTableRead, making it work with new functionality feature of allowing flatten (defaulted to true) and added a new test in IT and fixed formatting stuff * pushed new changes to BigTableRead, making it work with new functionality feature of allowing flatten (defaulted to true) and added a new test in IT and fixed formatting stuff * pushed new changes to BigTableRead, making it work with new functionality feature of allowing flatten (defaulted to true) and added a new test in IT and fixed formatting stuff * new mongo files in branch * fixed family_name to string * fixed family_name to string * fixed family_name to string * fixed family_name to string * fixed family_name to string * fixed family_name to string * fixed family_name to string * fixed family_name to string * fixed cmmit issues * commented assert test, everything should work now --------- Co-authored-by: Derrick Williams <[email protected]> Co-authored-by: Ahmed Abualsaud <[email protected]>
1 parent e0cbfe1 commit 4114f7c

File tree

8 files changed

+425
-166
lines changed

8 files changed

+425
-166
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java

Lines changed: 100 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
import com.google.bigtable.v2.Cell;
2525
import com.google.bigtable.v2.Column;
2626
import com.google.bigtable.v2.Family;
27-
import java.nio.ByteBuffer;
27+
import com.google.protobuf.ByteString;
28+
import java.io.Serializable;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
3031
import java.util.HashMap;
@@ -37,11 +38,12 @@
3738
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
3839
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
3940
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
40-
import org.apache.beam.sdk.transforms.MapElements;
41-
import org.apache.beam.sdk.transforms.SimpleFunction;
41+
import org.apache.beam.sdk.transforms.DoFn;
42+
import org.apache.beam.sdk.transforms.ParDo;
4243
import org.apache.beam.sdk.values.PCollection;
4344
import org.apache.beam.sdk.values.PCollectionRowTuple;
4445
import org.apache.beam.sdk.values.Row;
46+
import org.checkerframework.checker.nullness.qual.Nullable;
4547

4648
/**
4749
* An implementation of {@link TypedSchemaTransformProvider} for Bigtable Read jobs configured via
@@ -69,6 +71,13 @@ public class BigtableReadSchemaTransformProvider
6971
Schema.FieldType.STRING,
7072
Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
7173
.build();
74+
public static final Schema FLATTENED_ROW_SCHEMA =
75+
Schema.builder()
76+
.addByteArrayField("key")
77+
.addStringField("family_name")
78+
.addByteArrayField("column_qualifier")
79+
.addArrayField("cells", Schema.FieldType.row(CELL_SCHEMA))
80+
.build();
7281

7382
@Override
7483
protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) {
@@ -88,7 +97,7 @@ public List<String> outputCollectionNames() {
8897
/** Configuration for reading from Bigtable. */
8998
@DefaultSchema(AutoValueSchema.class)
9099
@AutoValue
91-
public abstract static class BigtableReadSchemaTransformConfiguration {
100+
public abstract static class BigtableReadSchemaTransformConfiguration implements Serializable {
92101
/** Instantiates a {@link BigtableReadSchemaTransformConfiguration.Builder} instance. */
93102
public void validate() {
94103
String emptyStringMessage =
@@ -100,7 +109,8 @@ public void validate() {
100109

101110
public static Builder builder() {
102111
return new AutoValue_BigtableReadSchemaTransformProvider_BigtableReadSchemaTransformConfiguration
103-
.Builder();
112+
.Builder()
113+
.setFlatten(true);
104114
}
105115

106116
public abstract String getTableId();
@@ -109,6 +119,8 @@ public static Builder builder() {
109119

110120
public abstract String getProjectId();
111121

122+
public abstract @Nullable Boolean getFlatten();
123+
112124
/** Builder for the {@link BigtableReadSchemaTransformConfiguration}. */
113125
@AutoValue.Builder
114126
public abstract static class Builder {
@@ -118,6 +130,8 @@ public abstract static class Builder {
118130

119131
public abstract Builder setProjectId(String projectId);
120132

133+
public abstract Builder setFlatten(Boolean flatten);
134+
121135
/** Builds a {@link BigtableReadSchemaTransformConfiguration} instance. */
122136
public abstract BigtableReadSchemaTransformConfiguration build();
123137
}
@@ -152,45 +166,97 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
152166
.withInstanceId(configuration.getInstanceId())
153167
.withProjectId(configuration.getProjectId()));
154168

169+
Schema outputSchema =
170+
Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : FLATTENED_ROW_SCHEMA;
171+
155172
PCollection<Row> beamRows =
156-
bigtableRows.apply(MapElements.via(new BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA);
173+
bigtableRows
174+
.apply("ConvertToBeamRows", ParDo.of(new BigtableRowConverterDoFn(configuration)))
175+
.setRowSchema(outputSchema);
157176

158177
return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
159178
}
160179
}
161180

162-
public static class BigtableRowToBeamRow extends SimpleFunction<com.google.bigtable.v2.Row, Row> {
163-
@Override
164-
public Row apply(com.google.bigtable.v2.Row bigtableRow) {
165-
// The collection of families is represented as a Map of column families.
166-
// Each column family is represented as a Map of columns.
167-
// Each column is represented as a List of cells
168-
// Each cell is represented as a Beam Row consisting of value and timestamp_micros
169-
Map<String, Map<String, List<Row>>> families = new HashMap<>();
170-
171-
for (Family fam : bigtableRow.getFamiliesList()) {
172-
// Map of column qualifier to list of cells
173-
Map<String, List<Row>> columns = new HashMap<>();
174-
for (Column col : fam.getColumnsList()) {
175-
List<Row> cells = new ArrayList<>();
176-
for (Cell cell : col.getCellsList()) {
177-
Row cellRow =
178-
Row.withSchema(CELL_SCHEMA)
179-
.withFieldValue("value", ByteBuffer.wrap(cell.getValue().toByteArray()))
180-
.withFieldValue("timestamp_micros", cell.getTimestampMicros())
181+
/**
182+
* A {@link DoFn} that converts a Bigtable {@link com.google.bigtable.v2.Row} to a Beam {@link
183+
* Row}. It supports both a nested representation and a flattened representation where each column
184+
* becomes a separate output element.
185+
*/
186+
private static class BigtableRowConverterDoFn extends DoFn<com.google.bigtable.v2.Row, Row> {
187+
private final BigtableReadSchemaTransformConfiguration configuration;
188+
189+
BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration configuration) {
190+
this.configuration = configuration;
191+
}
192+
193+
private List<Row> convertCells(List<Cell> bigtableCells) {
194+
List<Row> beamCells = new ArrayList<>();
195+
for (Cell cell : bigtableCells) {
196+
Row cellRow =
197+
Row.withSchema(CELL_SCHEMA)
198+
.withFieldValue("value", cell.getValue().toByteArray())
199+
.withFieldValue("timestamp_micros", cell.getTimestampMicros())
200+
.build();
201+
beamCells.add(cellRow);
202+
}
203+
return beamCells;
204+
}
205+
206+
@ProcessElement
207+
public void processElement(
208+
@Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row> out) {
209+
// The builder defaults flatten to true. We check for an explicit false setting to disable it.
210+
211+
if (Boolean.FALSE.equals(configuration.getFlatten())) {
212+
// Non-flattening logic (original behavior): one output row per Bigtable row.
213+
Map<String, Map<String, List<Row>>> families = new HashMap<>();
214+
for (Family fam : bigtableRow.getFamiliesList()) {
215+
Map<String, List<Row>> columns = new HashMap<>();
216+
for (Column col : fam.getColumnsList()) {
217+
218+
List<Cell> bigTableCells = col.getCellsList();
219+
220+
List<Row> cells = convertCells(bigTableCells);
221+
222+
columns.put(col.getQualifier().toStringUtf8(), cells);
223+
}
224+
families.put(fam.getName(), columns);
225+
}
226+
Row beamRow =
227+
Row.withSchema(ROW_SCHEMA)
228+
.withFieldValue("key", bigtableRow.getKey().toByteArray())
229+
.withFieldValue("column_families", families)
230+
.build();
231+
out.output(beamRow);
232+
} else {
233+
// Flattening logic (new behavior): one output row per column qualifier.
234+
byte[] key = bigtableRow.getKey().toByteArray();
235+
for (Family fam : bigtableRow.getFamiliesList()) {
236+
String familyName = fam.getName();
237+
for (Column col : fam.getColumnsList()) {
238+
ByteString qualifierName = col.getQualifier();
239+
List<Row> cells = new ArrayList<>();
240+
for (Cell cell : col.getCellsList()) {
241+
Row cellRow =
242+
Row.withSchema(CELL_SCHEMA)
243+
.withFieldValue("value", cell.getValue().toByteArray())
244+
.withFieldValue("timestamp_micros", cell.getTimestampMicros())
245+
.build();
246+
cells.add(cellRow);
247+
}
248+
249+
Row flattenedRow =
250+
Row.withSchema(FLATTENED_ROW_SCHEMA)
251+
.withFieldValue("key", key)
252+
.withFieldValue("family_name", familyName)
253+
.withFieldValue("column_qualifier", qualifierName.toByteArray())
254+
.withFieldValue("cells", cells)
181255
.build();
182-
cells.add(cellRow);
256+
out.output(flattenedRow);
183257
}
184-
columns.put(col.getQualifier().toStringUtf8(), cells);
185258
}
186-
families.put(fam.getName(), columns);
187259
}
188-
Row beamRow =
189-
Row.withSchema(ROW_SCHEMA)
190-
.withFieldValue("key", ByteBuffer.wrap(bigtableRow.getKey().toByteArray()))
191-
.withFieldValue("column_families", families)
192-
.build();
193-
return beamRow;
194260
}
195261
}
196262
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
168168
validateField(inputSchema, "column_qualifier", Schema.TypeName.BYTES);
169169
}
170170
if (inputSchema.hasField("family_name")) {
171-
validateField(inputSchema, "family_name", Schema.TypeName.BYTES);
171+
validateField(inputSchema, "family_name", Schema.TypeName.STRING);
172172
}
173173
if (inputSchema.hasField("timestamp_micros")) {
174174
validateField(inputSchema, "timestamp_micros", Schema.TypeName.INT64);
@@ -189,7 +189,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
189189
+ "\"type\": String\n"
190190
+ "\"value\": ByteString\n"
191191
+ "\"column_qualifier\": ByteString\n"
192-
+ "\"family_name\": ByteString\n"
192+
+ "\"family_name\": String\n"
193193
+ "\"timestamp_micros\": Long\n"
194194
+ "\"start_timestamp_micros\": Long\n"
195195
+ "\"end_timestamp_micros\": Long\n"
@@ -259,11 +259,10 @@ public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
259259
Preconditions.checkStateNotNull(
260260
input.getBytes("column_qualifier"),
261261
"Encountered SetCell mutation with null 'column_qualifier' property. ")))
262-
.setFamilyNameBytes(
263-
ByteString.copyFrom(
264-
Preconditions.checkStateNotNull(
265-
input.getBytes("family_name"),
266-
"Encountered SetCell mutation with null 'family_name' property.")));
262+
.setFamilyName(
263+
Preconditions.checkStateNotNull(
264+
input.getString("family_name"),
265+
"Encountered SetCell mutation with null 'family_name' property."));
267266
// Use timestamp if provided, else default to -1 (current
268267
// Bigtable
269268
// server time)
@@ -284,11 +283,10 @@ public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
284283
Preconditions.checkStateNotNull(
285284
input.getBytes("column_qualifier"),
286285
"Encountered DeleteFromColumn mutation with null 'column_qualifier' property.")))
287-
.setFamilyNameBytes(
288-
ByteString.copyFrom(
289-
Preconditions.checkStateNotNull(
290-
input.getBytes("family_name"),
291-
"Encountered DeleteFromColumn mutation with null 'family_name' property.")));
286+
.setFamilyName(
287+
Preconditions.checkStateNotNull(
288+
input.getString("family_name"),
289+
"Encountered DeleteFromColumn mutation with null 'family_name' property."));
292290

293291
// if start or end timestamp provided
294292
// Timestamp Range (optional, assuming Long type in Row schema)
@@ -322,11 +320,10 @@ public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
322320
Mutation.newBuilder()
323321
.setDeleteFromFamily(
324322
Mutation.DeleteFromFamily.newBuilder()
325-
.setFamilyNameBytes(
326-
ByteString.copyFrom(
327-
Preconditions.checkStateNotNull(
328-
input.getBytes("family_name"),
329-
"Encountered DeleteFromFamily mutation with null 'family_name' property.")))
323+
.setFamilyName(
324+
Preconditions.checkStateNotNull(
325+
input.getString("family_name"),
326+
"Encountered DeleteFromFamily mutation with null 'family_name' property."))
330327
.build())
331328
.build();
332329
break;

0 commit comments

Comments
 (0)