diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java index 2ed75d7bc7e0..48049b1b6c21 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java @@ -68,7 +68,7 @@ public class BigtableReadSchemaTransformProvider "column_families", Schema.FieldType.STRING, Schema.FieldType.map( - Schema.FieldType.STRING, + Schema.FieldType.BYTES, Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA)))) .build(); public static final Schema FLATTENED_ROW_SCHEMA = @@ -210,6 +210,7 @@ public void processElement( if (Boolean.FALSE.equals(configuration.getFlatten())) { // Non-flattening logic (original behavior): one output row per Bigtable row. + Map>> families = new HashMap<>(); for (Family fam : bigtableRow.getFamiliesList()) { Map> columns = new HashMap<>(); diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java new file mode 100644 index 000000000000..b8e566ed7239 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> { + + private static final String OUTPUT_TAG = "output"; + + @Override + protected Class configurationClass() { + return MongoDbReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + // Return a unique URN for the transform. + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List inputCollectionNames() { + // A read transform does not have an input PCollection. + return Collections.emptyList(); + } + + @Override + public List outputCollectionNames() { + // The primary output is a PCollection of Rows. + // Error handling could be added later with a second "errors" output tag. + return Collections.singletonList(OUTPUT_TAG); + } + + /** Configuration class for the MongoDB Read transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to read from.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to read from.") + public abstract String getCollection(); + + @SchemaFieldDescription( + "An optional BSON filter to apply to the read. This should be a valid JSON string.") + @Nullable + public abstract String getFilter(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration + .Builder(); + } + + /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setFilter(String filter); + + public abstract MongoDbReadSchemaTransformConfiguration build(); + } + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + // A read transform does not have an input, so we start with the pipeline. + PCollection mongoDocs = + input + .getPipeline() + .apply( + "ReadFromMongoDb", + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection())); + // TODO: Add support for .withFilter() if it exists in your MongoDbIO, + // using configuration.getFilter(). + + // Convert the BSON Document objects into Beam Row objects. + PCollection beamRows = + mongoDocs.apply("ConvertToBeamRows", ParDo.of(new MongoDocumentToRowFn())); + + return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); + } + } + + /** + * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link Row}. + * + *

This is a critical step to ensure data is in a schema-aware format. + */ + private static class MongoDocumentToRowFn extends DoFn { + // TODO: Define the Beam Schema that corresponds to your MongoDB documents. + // This could be made dynamic based on an inferred schema or a user-provided schema. + // For this skeleton, we assume a static schema. + // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build(); + + @ProcessElement + public void processElement(@Element Document doc, OutputReceiver out) { + // Here you will convert the BSON document to a Beam Row. + // This requires you to know the target schema. + + // Example pseudo-code: + // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA); + // for (Map.Entry entry : doc.entrySet()) { + // rowBuilder.addValue(entry.getValue()); + // } + // out.output(rowBuilder.build()); + + // For a robust implementation, you would handle data type conversions + // between BSON types and Beam schema types. + throw new UnsupportedOperationException( + "MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows."); + } + } +} diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java new file mode 100644 index 000000000000..7554489229fb --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbWriteSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> { + + private static final String INPUT_TAG = "input"; + + @Override + protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) { + return new MongoDbWriteSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_write:v1"; + } + + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + /** Configuration class for the MongoDB Write transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbWriteSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to write to.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to write to.") + public abstract String getCollection(); + + // @SchemaFieldDescription("The number of documents to include in each batch write.") + // @Nullable + // public abstract Long getBatchSize(); + // + // @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.") + // @Nullable + // public abstract Boolean getOrdered(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration + .Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setBatchSize(Long batchSize); + + public abstract Builder setOrdered(Boolean ordered); + + public abstract MongoDbWriteSchemaTransformConfiguration build(); + } + } + + /** The {@link SchemaTransform} that performs the write operation. */ + private static class MongoDbWriteSchemaTransform extends SchemaTransform { + private final MongoDbWriteSchemaTransformConfiguration configuration; + + MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection rows = input.get(INPUT_TAG); + + PCollection documents = + rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn())); + + MongoDbIO.Write write = + MongoDbIO.write() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + // if (configuration.getBatchSize() != null) { + // write = write.withBatchSize(configuration.getBatchSize()); + // } + // if (configuration.getOrdered() != null) { + // write = write.withOrdered(configuration.getOrdered()); + // } + + documents.apply("WriteToMongo", write); + + return PCollectionRowTuple.empty(input.getPipeline()); + } + } + + /** A {@link DoFn} to convert a Beam {@link Row} to a MongoDB {@link Document}. */ + private static class RowToMongoDocumentFn extends DoFn { + @ProcessElement + public void processElement(@Element Row row, OutputReceiver out) { + Document doc = new Document(); + for (int i = 0; i < row.getSchema().getFieldCount(); i++) { + String fieldName = row.getSchema().getField(i).getName(); + Object value = row.getValue(i); + + if (value != null) { + doc.append(fieldName, value); + } + } + out.output(doc); + } + } + /** Converts a Beam {@link Row} to a BSON {@link Document}. */ + static class RowToBsonDocumentFn extends DoFn { + @ProcessElement + public void processElement(@Element Row row, OutputReceiver out) { + Document doc = new Document(); + for (Field field : row.getSchema().getFields()) { + doc.append(field.getName(), row.getValue(field.getName())); + } + out.output(doc); + } + } +} diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 38fa2689268e..62368c126336 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -45,6 +45,7 @@ from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.google import PubSubContainer from testcontainers.kafka import KafkaContainer +from testcontainers.mongodb import MongoDbContainer from testcontainers.mssql import SqlServerContainer from testcontainers.mysql import MySqlContainer from testcontainers.postgres import PostgresContainer @@ -201,6 +202,60 @@ def temp_bigtable_table(project, prefix='yaml_bt_it_'): _LOGGER.warning("Failed to clean up instance") +@contextlib.contextmanager +def temp_mongodb_table(): + """ + provides a temporary MongoDB instance. + + starts a MongoDB container, creates a unique database + and collection name for test isolation, and yields them as a dictionary. + + This allows YAML test files to get connection details without hardcoding them. + Example usage in a YAML test file's fixture section: + + fixtures: + - name: mongo_vars + type: path.to.this.file.mongodb_fixture + + Then, in the pipeline definition, you can use placeholders like: + - uri: ${mongo_vars.URI} + - database: ${mongo_vars.DATABASE} + - collection: ${mongo_vars.COLLECTION} + """ + _LOGGER.info("Setting up MongoDB fixture...") + # Initialize and start the MongoDB container. + # This will pull the 'mongo:7.0.7' image if it's not available locally. + mongo_container = MongoDbContainer("mongo:7.0.7") + try: + mongo_container.start() + + # Get the dynamically generated connection URI. + mongo_uri = mongo_container.get_connection_url() + + # Generate a unique database and collection name for this test run to ensure + # isolation between different test files. + db_name = f'db_{uuid.uuid4().hex}' + collection_name = f'collection_{uuid.uuid4().hex}' + + _LOGGER.info( + "MongoDB container started. URI: [%s], DB: [%s], Collection: [%s]", + mongo_uri, + db_name, + collection_name) + + yield { + 'URI': mongo_uri, + 'DATABASE': db_name, + 'COLLECTION': collection_name, + } + + finally: + # This block executes after the test suite finishes. + _LOGGER.info("Tearing down MongoDB fixture...") + mongo_container.stop() + _LOGGER.info("MongoDB container stopped.") + + @contextlib.contextmanager def temp_sqlite_database(prefix='yaml_jdbc_it_'): """Context manager to provide a temporary SQLite database via JDBC for diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 3d619c187076..71388dfa6bfa 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -397,3 +397,26 @@ 'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1' config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + +#MongoDB +- type: renaming + transforms: + 'ReadFromMongoDB': 'ReadFromMongoDB' + 'WriteToMongoDB': 'WriteToMongoDB' + config: + mappings: + 'ReadFromMongoDB': + connection_uri: "connection_uri" + database: "database" + collection: "collection" + 'WriteToMongoDB': + connection_uri: "connection_uri" + database: "database" + collection: "collection" + underlying_provider: + type: beamJar + transforms: + 'ReadFromBigTable': 'beam:schematransform:org.apache.beam:mongodb_read:v1' + 'WriteToBigTable': 'beam:schematransform:org.apache.beam:mongodb_write:v1' + config: + gradle_target: ':sdks:java:io:mongodb:shadowJar' diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml new file mode 100644 index 000000000000..e69de29bb2d1