Skip to content

MongoDB Draft Pr #35794

Closed
arnavarora2004 wants to merge 147 commits intoapache:masterfrom
arnavarora2004:mongodb
Closed

MongoDB Draft Pr #35794
arnavarora2004 wants to merge 147 commits intoapache:masterfrom
arnavarora2004:mongodb

Conversation

@arnavarora2004
Copy link
Contributor

Please add a meaningful description for your change here

I added the MongoDB Read and Write Schema transforms only in Java as advised by my mentor

This is just to run tests


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

arnavarora2004 and others added 30 commits May 21, 2025 11:43
… connected and actually look good on user end for mutations
…hemaTransformProviderIT, and testing out new mutations etc
…ted new user input, all mutations work correctly, put demo code for it
…am/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java

Co-authored-by: Derrick Williams <myutat@gmail.com>
…am/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java

Co-authored-by: Derrick Williams <myutat@gmail.com>
…am/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java

Co-authored-by: Derrick Williams <myutat@gmail.com>
…am/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProvider.java

Co-authored-by: Derrick Williams <myutat@gmail.com>
…am/sdk/io/gcp/bigtable/BigtableSimpleWriteSchemaTransformProviderIT.java

Co-authored-by: Derrick Williams <myutat@gmail.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @arnavarora2004, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces initial Java-based Schema Transforms for MongoDB Read and Write operations, laying the groundwork for integrating MongoDB with Beam's Schema-aware capabilities. Concurrently, it enhances the existing Bigtable Schema Transforms by adding a new 'flattened' output mode for reads and standardizing family_name handling in writes, along with corresponding updates to Python bindings and integration tests.

Highlights

  • MongoDB Schema Transforms (Java): New MongoDbReadSchemaTransformProvider and MongoDbWriteSchemaTransformProvider have been introduced, enabling schema-aware interactions for reading from and writing to MongoDB. These transforms facilitate conversion between Beam Row objects and MongoDB Documents.
  • Bigtable Read Flattening Option: The BigtableReadSchemaTransform now includes an optional flatten parameter. When enabled, it provides a flattened output where each column qualifier from Bigtable is represented as a separate Beam Row, offering a more granular data view. Corresponding Python bindings and tests have been updated.
  • Bigtable Write family_name Standardization: The BigtableWriteSchemaTransform has been updated to consistently expect the family_name field as a String type, simplifying its usage and aligning with common string representations for column families.
  • Python Integration Test Enhancements: A new temp_mongodb_table context manager has been added to the Python integration tests, providing a robust way to spin up temporary MongoDB instances for testing purposes. Existing Bigtable YAML tests were also updated to reflect schema changes and test the new flattening feature.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@arnavarora2004
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces new schema transforms for MongoDB (Read and Write) and enhances the Bigtable read transform with a flatten option. The changes span both Java and Python code, including new providers, tests, and YAML configurations.

My review has identified a few issues. There is a critical copy-paste error in the YAML configuration for the new MongoDB transforms that needs to be fixed. I also found a high-severity bug in a type check within a core Python class (pvalue.py) and noted that the MongoDB read transform is incomplete, which is expected for a draft but important to track.

Additionally, I've provided several medium-severity suggestions to improve code quality by addressing code duplication, removing dead code, and fixing inconsistencies in both production and test files. Overall, this is a good start, but the identified issues should be addressed.

Comment on lines +681 to +682
elif type(other) == type(NamedTuple):
other_dict = other._asdict()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The check type(other) == type(NamedTuple) is incorrect for identifying an instance of a NamedTuple. NamedTuple is a factory function, not a class type that instances will have. An instance of a NamedTuple is a subclass of tuple. A better way to check if other is a NamedTuple-like object is to check if it's a tuple and has the _asdict method.

Suggested change
elif type(other) == type(NamedTuple):
other_dict = other._asdict()
elif isinstance(other, tuple) and hasattr(other, '_asdict'):
other_dict = other._asdict()

Comment on lines +239 to +247
List<Row> cells = new ArrayList<>();
for (Cell cell : col.getCellsList()) {
Row cellRow =
Row.withSchema(CELL_SCHEMA)
.withFieldValue("value", cell.getValue().toByteArray())
.withFieldValue("timestamp_micros", cell.getTimestampMicros())
.build();
cells.add(cellRow);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is duplicated logic for converting Bigtable cells to Beam Rows. The else block for flattened output re-implements the logic already present in the convertCells helper method. To improve maintainability and reduce code duplication, you should call the existing convertCells method here.

Suggested change
List<Row> cells = new ArrayList<>();
for (Cell cell : col.getCellsList()) {
Row cellRow =
Row.withSchema(CELL_SCHEMA)
.withFieldValue("value", cell.getValue().toByteArray())
.withFieldValue("timestamp_micros", cell.getTimestampMicros())
.build();
cells.add(cellRow);
}
List<Row> cells = convertCells(col.getCellsList());

Comment on lines +140 to +224
public void testRead() {
int numRows = 20;
List<Row> expectedRows = new ArrayList<>();
for (int i = 1; i <= numRows; i++) {
String key = "key" + i;
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
String valueA = "value a" + i;
byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
String valueB = "value b" + i;
byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
String valueC = "value c" + i;
byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
String valueD = "value d" + i;
byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
long timestamp = 1000L * i;

try {
for (int i = 1; i <= numRows; i++) {
String key = "key" + i;
String valueA = "value a" + i;
String valueB = "value b" + i;
String valueC = "value c" + i;
String valueD = "value d" + i;
long timestamp = 1000L * i;

RowMutation rowMutation =
RowMutation.create(tableId, key)
.setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
.setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
.setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
.setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
dataClient.mutateRow(rowMutation);

// Set up expected Beam Row
Map<String, List<Row>> columns1 = new HashMap<>();
columns1.put(
"a",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue(
"value", ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8)))
.withFieldValue("timestamp_micros", timestamp)
.build()));
columns1.put(
"b",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue(
"value", ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8)))
.withFieldValue("timestamp_micros", timestamp)
.build()));

Map<String, List<Row>> columns2 = new HashMap<>();
columns2.put(
"c",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue(
"value", ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8)))
.withFieldValue("timestamp_micros", timestamp)
.build()));
columns2.put(
"d",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue(
"value", ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8)))
.withFieldValue("timestamp_micros", timestamp)
.build()));

Map<String, Map<String, List<Row>>> families = new HashMap<>();
families.put(COLUMN_FAMILY_NAME_1, columns1);
families.put(COLUMN_FAMILY_NAME_2, columns2);

Row expectedRow =
Row.withSchema(ROW_SCHEMA)
.withFieldValue("key", ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)))
.withFieldValue("column_families", families)
.build();

expectedRows.add(expectedRow);
}
LOG.info("Finished writing {} rows to table {}", numRows, tableId);
} catch (NotFoundException e) {
throw new RuntimeException("Failed to write to table", e);
RowMutation rowMutation =
RowMutation.create(tableId, key)
.setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
.setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
.setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
.setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
dataClient.mutateRow(rowMutation);

// Set up expected Beam Row
Map<String, List<Row>> columns1 = new HashMap<>();
columns1.put(
"a",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue("value", valueABytes)
.withFieldValue("timestamp_micros", timestamp)
.build()));
columns1.put(
"b",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue("value", valueBBytes)
.withFieldValue("timestamp_micros", timestamp)
.build()));

Map<String, List<Row>> columns2 = new HashMap<>();
columns2.put(
"c",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue("value", valueCBytes)
.withFieldValue("timestamp_micros", timestamp)
.build()));
columns2.put(
"d",
Arrays.asList(
Row.withSchema(CELL_SCHEMA)
.withFieldValue("value", valueDBytes)
.withFieldValue("timestamp_micros", timestamp)
.build()));

Map<String, Map<String, List<Row>>> families = new HashMap<>();
families.put(COLUMN_FAMILY_NAME_1, columns1);
families.put(COLUMN_FAMILY_NAME_2, columns2);

Row expectedRow =
Row.withSchema(ROW_SCHEMA)
.withFieldValue("key", keyBytes)
.withFieldValue("column_families", families)
.build();

expectedRows.add(expectedRow);
}
return expectedRows;
LOG.info("Finished writing {} rows to table {}", numRows, tableId);

BigtableReadSchemaTransformConfiguration config =
BigtableReadSchemaTransformConfiguration.builder()
.setTableId(tableId)
.setInstanceId(instanceId)
.setProjectId(projectId)
.setFlatten(false)
.build();

SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config);

PCollection<Row> rows = PCollectionRowTuple.empty(p).apply(transform).get("output");

PAssert.that(rows).containsInAnyOrder(expectedRows);
p.run().waitUntilFinish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test setup logic, specifically writing data to Bigtable, is duplicated between testRead() and testReadFlatten(). To improve code reuse and make the tests cleaner, consider extracting the data writing part into a separate helper method. This method could take numRows as a parameter and be called from both test methods.

Comment on lines +114 to +116
public abstract Builder setBatchSize(Long batchSize);

public abstract Builder setOrdered(Boolean ordered);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The builder includes setBatchSize and setOrdered methods, but the corresponding getters in MongoDbWriteSchemaTransformConfiguration and their usage in MongoDbWriteSchemaTransform are commented out. This is inconsistent. If these features are not yet supported, these methods should be removed from the builder to avoid confusion.

Comment on lines +158 to +172
private static class RowToMongoDocumentFn extends DoFn<Row, Document> {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Document> out) {
Document doc = new Document();
for (int i = 0; i < row.getSchema().getFieldCount(); i++) {
String fieldName = row.getSchema().getField(i).getName();
Object value = row.getValue(i);

if (value != null) {
doc.append(fieldName, value);
}
}
out.output(doc);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The RowToMongoDocumentFn class appears to be unused. The expand method uses RowToBsonDocumentFn for the conversion. This class should be removed to avoid dead code.

@arnavarora2004
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces new SchemaTransforms for reading from and writing to MongoDB, along with supporting test fixtures and YAML configurations. While this is a good first step, there are several critical issues that need to be addressed. The read transform is currently non-functional due to an unimplemented DoFn, and the write transform has partially implemented features that should be completed. Additionally, the YAML configuration contains a critical copy-paste error that will prevent the transforms from being correctly recognized. I've provided detailed comments and suggestions to fix these issues.

Comment on lines +169 to +191
private static class MongoDocumentToRowFn extends DoFn<Document, Row> {
// TODO: Define the Beam Schema that corresponds to your MongoDB documents.
// This could be made dynamic based on an inferred schema or a user-provided schema.
// For this skeleton, we assume a static schema.
// public static final Schema OUTPUT_SCHEMA = Schema.builder()...build();

@ProcessElement
public void processElement(@Element Document doc, OutputReceiver<Row> out) {
// Here you will convert the BSON document to a Beam Row.
// This requires you to know the target schema.

// Example pseudo-code:
// Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA);
// for (Map.Entry<String, Object> entry : doc.entrySet()) {
// rowBuilder.addValue(entry.getValue());
// }
// out.output(rowBuilder.build());

// For a robust implementation, you would handle data type conversions
// between BSON types and Beam schema types.
throw new UnsupportedOperationException(
"MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The MongoDocumentToRowFn is not implemented and throws an UnsupportedOperationException. This makes the entire MongoDbReadSchemaTransformProvider non-functional. The logic to convert a BSON Document to a Beam Row needs to be implemented. This will likely involve handling schema inference or requiring a user-provided schema.

Comment on lines +419 to +420
'ReadFromBigTable': 'beam:schematransform:org.apache.beam:mongodb_read:v1'
'WriteToBigTable': 'beam:schematransform:org.apache.beam:mongodb_write:v1'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There appears to be a copy-paste error. The transform names ReadFromBigTable and WriteToBigTable are used for the MongoDB provider. They should be ReadFromMongoDB and WriteToMongoDB to match the transform names defined for this provider.

        'ReadFromMongoDB': 'beam:schematransform:org.apache.beam:mongodb_read:v1'
        'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1'

Comment on lines +144 to +154
PCollection<Document> mongoDocs =
input
.getPipeline()
.apply(
"ReadFromMongoDb",
MongoDbIO.read()
.withUri(configuration.getUri())
.withDatabase(configuration.getDatabase())
.withCollection(configuration.getCollection()));
// TODO: Add support for .withFilter() if it exists in your MongoDbIO,
// using configuration.getFilter().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The filter from the configuration is not being used. The MongoDbIO.read() transform should be configured with the filter provided in configuration.getFilter(). You'll need to parse the filter string into a Bson Document.

Suggested change
PCollection<Document> mongoDocs =
input
.getPipeline()
.apply(
"ReadFromMongoDb",
MongoDbIO.read()
.withUri(configuration.getUri())
.withDatabase(configuration.getDatabase())
.withCollection(configuration.getCollection()));
// TODO: Add support for .withFilter() if it exists in your MongoDbIO,
// using configuration.getFilter().
MongoDbIO.Read read =
MongoDbIO.read()
.withUri(configuration.getUri())
.withDatabase(configuration.getDatabase())
.withCollection(configuration.getCollection());
if (configuration.getFilter() != null && !configuration.getFilter().isEmpty()) {
read =
read.withQueryFn(
FindQuery.create().withFilters(Document.parse(configuration.getFilter())));
}
PCollection<Document> mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read);

Comment on lines +84 to +90
// @SchemaFieldDescription("The number of documents to include in each batch write.")
// @Nullable
// public abstract Long getBatchSize();
//
// @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.")
// @Nullable
// public abstract Boolean getOrdered();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The configuration properties batchSize and ordered are commented out, but their builder methods exist. This functionality should be enabled to allow users to configure these important write options. Please uncomment these properties and their usage in the MongoDbWriteSchemaTransform.expand method.

Suggested change
// @SchemaFieldDescription("The number of documents to include in each batch write.")
// @Nullable
// public abstract Long getBatchSize();
//
// @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.")
// @Nullable
// public abstract Boolean getOrdered();
@SchemaFieldDescription("The number of documents to include in each batch write.")
@Nullable
public abstract Long getBatchSize();
@SchemaFieldDescription("Whether the writes should be performed in an ordered manner.")
@Nullable
public abstract Boolean getOrdered();

Comment on lines +144 to +149
// if (configuration.getBatchSize() != null) {
// write = write.withBatchSize(configuration.getBatchSize());
// }
// if (configuration.getOrdered() != null) {
// write = write.withOrdered(configuration.getOrdered());
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The batchSize and ordered configurations are not being applied to the MongoDbIO.Write transform. Please uncomment this block to enable this functionality, assuming the corresponding configuration properties are also uncommented.

Suggested change
// if (configuration.getBatchSize() != null) {
// write = write.withBatchSize(configuration.getBatchSize());
// }
// if (configuration.getOrdered() != null) {
// write = write.withOrdered(configuration.getOrdered());
// }
if (configuration.getBatchSize() != null) {
write = write.withBatchSize(configuration.getBatchSize());
}
if (configuration.getOrdered() != null) {
write = write.withOrdered(configuration.getOrdered());
}

Comment on lines +158 to +172
private static class RowToMongoDocumentFn extends DoFn<Row, Document> {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Document> out) {
Document doc = new Document();
for (int i = 0; i < row.getSchema().getFieldCount(); i++) {
String fieldName = row.getSchema().getField(i).getName();
Object value = row.getValue(i);

if (value != null) {
doc.append(fieldName, value);
}
}
out.output(doc);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The class RowToMongoDocumentFn appears to be unused. The transform uses RowToBsonDocumentFn. Please remove the unused RowToMongoDocumentFn class to improve code clarity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant