-
Notifications
You must be signed in to change notification settings - Fork 4.5k
feat(yaml): Add BigTable write connector #35435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 92 commits
ad54a58
0edf81d
18c9395
a25033e
0e4fb14
c048dcf
4ebc7c8
3bb3dfc
a8b8196
cf8bd8f
a06d7c6
5760278
5cb0dd7
f2640ae
69467c5
121ddf6
82d3612
14ca717
c36d864
d6366dd
fcb5d03
38ff386
a15e4ff
c759a07
8e19a81
b6d0157
7ea3f76
a35ced7
50bb5a3
426519d
3152094
84a3cfd
1ca2527
954355b
ab18e18
80a732e
16f1064
3c9c582
b842ac9
0ed5da1
cea5987
b6498c8
5f6992d
bdc9cff
37abe22
5cb46df
b1fae9c
4866acc
8ac0fda
32bfbe8
b217de2
1ad3a32
364a761
5338470
c1bc8c6
fad8ae8
4315c4f
9e4514c
64a7303
1fc5366
680678b
6fa20ab
2b2af72
8c96d22
373b87f
9bd071c
74b6dc3
01f84da
54b6ad1
c46ef26
b4fab07
c600ea0
2d30e08
221e558
9cb6c32
a9f77eb
c0596f3
1db6821
80544d0
5753f18
6cd69d5
a53045c
7874226
92a0ff9
54b9900
5b58815
ca12b07
81aa2ed
417bfea
fbf74a5
8aad18a
d3f17bd
d0d12ae
16030c6
15a8bd2
85c1392
2cdd808
636df03
0ab4db4
204ff4d
a712320
2e09dd7
f28eea9
d26b45d
0b6e855
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -34,16 +34,21 @@ | |||
| import java.util.Map; | ||||
| import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteSchemaTransformProvider.BigtableWriteSchemaTransformConfiguration; | ||||
| import org.apache.beam.sdk.schemas.AutoValueSchema; | ||||
| import org.apache.beam.sdk.schemas.Schema; | ||||
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | ||||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; | ||||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; | ||||
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; | ||||
| import org.apache.beam.sdk.transforms.GroupByKey; | ||||
| import org.apache.beam.sdk.transforms.MapElements; | ||||
| import org.apache.beam.sdk.transforms.SimpleFunction; | ||||
| import org.apache.beam.sdk.util.Preconditions; | ||||
| import org.apache.beam.sdk.values.KV; | ||||
| import org.apache.beam.sdk.values.PCollection; | ||||
| import org.apache.beam.sdk.values.PCollectionRowTuple; | ||||
| import org.apache.beam.sdk.values.Row; | ||||
| import org.apache.beam.sdk.values.TypeDescriptor; | ||||
| import org.apache.beam.sdk.values.TypeDescriptors; | ||||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; | ||||
|
|
||||
| /** | ||||
|
|
@@ -135,18 +140,186 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { | |||
| String.format( | ||||
| "Could not find expected input [%s] to %s.", INPUT_TAG, getClass().getSimpleName())); | ||||
|
|
||||
| PCollection<Row> beamRowMutations = input.get(INPUT_TAG); | ||||
| PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations = | ||||
| beamRowMutations.apply(MapElements.via(new GetMutationsFromBeamRow())); | ||||
| Schema testOriginialSchema = | ||||
| Schema.builder() | ||||
| .addByteArrayField("key") | ||||
| .addArrayField( | ||||
| "mutations", | ||||
| Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.BYTES)) | ||||
| .build(); | ||||
|
|
||||
| bigtableMutations.apply( | ||||
| BigtableIO.write() | ||||
| .withTableId(configuration.getTableId()) | ||||
| .withInstanceId(configuration.getInstanceId()) | ||||
| .withProjectId(configuration.getProjectId())); | ||||
| Schema inputSchema = input.getSinglePCollection().getSchema(); | ||||
|
|
||||
| System.out.println("Input Schema for BigTableMutations: " + inputSchema); | ||||
|
||||
| System.out.println("Input Schema for BigTableMutations: " + inputSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking we keep it so the user can compare what his/her schema is and what the valid config for schema is in case they are wrong/getting errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log the input schema only if it's relevant (e.g. if it's invalid and causing failures). It's not very useful when everything is WAI (and probably will just be noisy)
ahmedabu98 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete or log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a log, turned it into throw error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be throwing an error right? not just printing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just saw you throw a RuntimeException below. I think that can be thrown here instead with the message you have here
Uh oh!
There was an error while loading. Please reload this page.