Skip to content

feat(yaml): Add BigTable write connector#35435

Merged
liferoad merged 105 commits intoapache:masterfrom
arnavarora2004:master
Jul 18, 2025
Merged

feat(yaml): Add BigTable write connector#35435
liferoad merged 105 commits intoapache:masterfrom
arnavarora2004:master

Conversation

@arnavarora2004
Copy link
Copy Markdown
Contributor

@arnavarora2004 arnavarora2004 commented Jun 25, 2025

@damccorm @derrickaw @fozzie15 @ahmedabu98

I added the BigTable Connector for BeamYaml

added new logic for bigtable yaml with the option to simplify the user input and make it more readable than the old logic,

all mutations work correctly

added tests to integration_test.py (I commented out some bugs in integration_test.py from new commits on the main branch, will remove if no more errors)

added a new test IT class called BigTableSimpleWriteSchemaTransformProviderIT

all logic works, please let me know if anything looks off/funny and if anything can be improved


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

…am/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java

Co-authored-by: Derrick Williams <myutat@gmail.com>
Comment on lines +51 to +66
# # Deletes all cells in a specific column, optionally within a time range.
# - {key: 'row2',
# type: 'DeleteFromColumn',
# family_name: "cf1",
# column_qualifier: "cq1",
# start_timestamp_micros: 2000,
# end_timestamp_micros: 5000 }
#
# # Deletes all cells in a specific column family.
# - {key: 'row3',
# type: 'DeleteFromFamily',
# family_name: "cf2" }
#
# # Deletes all cells in a specific row.
# - {key: 'row4',
# type: 'DeleteFromRow' }
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use this or create another pipeline?

Comment on lines +106 to +137
# - pipeline:
# type: chain
# transforms:
# - type: ReadFromBigTable
# config:
# project: 'apache-beam-testing'
# instance: 'bt-write-tests'
# table: 'test-table'
# - type: MapToFields # Convert bytes back to strings for comparison
# name: ConvertBytesToStrings
# config:
# language: python
# fields:
# key:
# callable: |
# def convert_to_string(row):
# return row.key.decode('utf-8') if row.key is not None else None
# family_name:
# callable: |
# def convert_to_string(row):
# return row.family_name.decode('utf-8') if row.family_name is not None else None
# column_qualifier:
# callable: |
# def convert_to_string(row):
# return row.column_qualifier.decode('utf-8') if row.column_qualifier is not None else None
# value:
# callable: |
# def convert_to_string(row):
# return row.value.decode('utf-8') if row.value is not None else None
# - type: AssertEqual
# config:
# elements:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use ReadFromBigTable?

.withProjectId(configuration.getProjectId()));
Schema inputSchema = input.getSinglePCollection().getSchema();

System.out.println("Input Schema for BigTableMutations: " + inputSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log the input schema only if it's relevant (e.g. if it's invalid and causing failures). It's not very useful when everything is WAI (and probably will just be noisy)

// // new schema inputs get sent to the new transform provider mutation function
bigtableMutations = changeMutationInput(input);
} else {
System.out.println(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be throwing an error right? not just printing?

// // new schema inputs get sent to the new transform provider mutation function
bigtableMutations = changeMutationInput(input);
} else {
System.out.println(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw you throw a RuntimeException below. I think that can be thrown here instead with the message you have here

Comment on lines +382 to +385
'ReadFromBigTable':
project: 'project_id'
instance: 'instance_id'
table: 'table_id'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not expose ReadFromBigtable just yet

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay i put comments around it, do i comment out the read from bigtable in underlying provider or keep that so the IT tests dont fail?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove it too. It should not be exposed to users

Do you mean IT tests are failing without it? How come?

project: 'project_id'
instance: 'instance_id'
table: 'table_id'
Rows: "rows"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

underlying_provider:
type: beamJar
transforms:
'ReadFromBigTable': 'beam:schematransform:org.apache.beam:bigtable_read:v1'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup here too

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any more updates ahmed?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only test failing is Execution failed for task ':sdks:python:test-suites:tox:py39:testPy39CloudCoverage'.
I think its ready to merge, up to @ahmedabu98 and @derrickaw now,

Copy link
Copy Markdown
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sticking through it @arnava2004! LGTM

Copy link
Copy Markdown
Collaborator

@derrickaw derrickaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@liferoad liferoad merged commit d336fcb into apache:master Jul 18, 2025
103 of 105 checks passed
@derrickaw
Copy link
Copy Markdown
Collaborator

#33902

@derrickaw
Copy link
Copy Markdown
Collaborator

#28672

@derrickaw derrickaw mentioned this pull request Sep 24, 2025
15 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants