Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
148 commits
Select commit Hold shift + click to select a range
ad54a58
Refactored BigTableReadSchemaTransformConfiguration
arnavarora2004 May 21, 2025
0edf81d
changed scope, working on buffer class for making BigTable yaml fully…
arnavarora2004 May 22, 2025
18c9395
Finished up a bit of standard_io.yaml
arnavarora2004 May 22, 2025
a25033e
Finished up a bit of standard_io.yaml
arnavarora2004 May 27, 2025
0e4fb14
Merge branch 'apache:master' into master
arnavarora2004 Jun 4, 2025
c048dcf
Added bigTable test
arnavarora2004 Jun 4, 2025
4ebc7c8
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 4, 2025
3bb3dfc
changed some tests for BigTable
arnavarora2004 Jun 4, 2025
a8b8196
Added new IT file for simpleWrite and also made changes integration t…
arnavarora2004 Jun 5, 2025
cf8bd8f
Added new IT file for simpleWrite and also made changes integration t…
arnavarora2004 Jun 5, 2025
a06d7c6
SetCell mutation test works, I want to see if this draft PR works CI …
arnavarora2004 Jun 12, 2025
5760278
Fixed a slight error
arnavarora2004 Jun 12, 2025
5cb0dd7
Merge branch 'apache:master' into master
arnavarora2004 Jun 25, 2025
f2640ae
Added way more changes to integrations test.py, BigTableSimpleWriteSc…
arnavarora2004 Jun 25, 2025
69467c5
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 25, 2025
121ddf6
BigTableSimpleWriteSchemaTransformProviderIT finished changes to muta…
arnavarora2004 Jun 25, 2025
82d3612
Merge branch 'apache:master' into master
arnavarora2004 Jun 25, 2025
14ca717
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
c36d864
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
d6366dd
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 25, 2025
fcb5d03
Merge branch 'apache:master' into master
arnavarora2004 Jun 26, 2025
38ff386
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jun 26, 2025
a15e4ff
Update sdks/java/io/google-cloud-platform/src/test/java/org/apache/be…
arnavarora2004 Jun 26, 2025
c759a07
changed comments
arnavarora2004 Jun 26, 2025
8e19a81
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jun 26, 2025
b6d0157
Added changes from derrick comments
arnavarora2004 Jun 26, 2025
7ea3f76
Merge branch 'apache:master' into master
arnavarora2004 Jun 27, 2025
a35ced7
Merge branch 'apache:master' into master
arnavarora2004 Jun 30, 2025
50bb5a3
Added default schema maybe fixes the issues
arnavarora2004 Jun 30, 2025
426519d
Added schema to every test specificly, will run tests to see if it works
arnavarora2004 Jun 30, 2025
3152094
Added default schema maybe fixes the issues
arnavarora2004 Jul 2, 2025
84a3cfd
Merge branch 'apache:master' into master
arnavarora2004 Jul 2, 2025
1ca2527
Following formatting tests
arnavarora2004 Jul 2, 2025
954355b
Merge branch 'apache:master' into master
arnavarora2004 Jul 2, 2025
ab18e18
Following formatting tests
arnavarora2004 Jul 2, 2025
80a732e
Following checkstyle tests
arnavarora2004 Jul 2, 2025
16f1064
Merge branch 'apache:master' into master
arnavarora2004 Jul 7, 2025
3c9c582
Made schema and test changes
arnavarora2004 Jul 7, 2025
b842ac9
Made schema and test changes
arnavarora2004 Jul 7, 2025
0ed5da1
Merge branch 'apache:master' into master
arnavarora2004 Jul 7, 2025
cea5987
Made schema and test changes
arnavarora2004 Jul 7, 2025
b6498c8
Made schema and test changes
arnavarora2004 Jul 8, 2025
5f6992d
Made schema and test changes
arnavarora2004 Jul 9, 2025
bdc9cff
Merge branch 'apache:master' into master
arnavarora2004 Jul 9, 2025
37abe22
Added final test
arnavarora2004 Jul 9, 2025
5cb46df
changed timestamp values
arnavarora2004 Jul 10, 2025
b1fae9c
added all mutations test
arnavarora2004 Jul 10, 2025
4866acc
added all mutations test
arnavarora2004 Jul 10, 2025
8ac0fda
pushed changes to format errors
arnavarora2004 Jul 10, 2025
32bfbe8
Merge branch 'apache:master' into master
arnavarora2004 Jul 10, 2025
b217de2
pushed changes to format errors
arnavarora2004 Jul 10, 2025
1ad3a32
Delete 4
arnavarora2004 Jul 10, 2025
364a761
pushed changes to format errors
arnavarora2004 Jul 10, 2025
5338470
pushed changes to format errors
arnavarora2004 Jul 10, 2025
c1bc8c6
pushed changes to format errors
arnavarora2004 Jul 10, 2025
fad8ae8
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
4315c4f
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
9e4514c
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
64a7303
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
1fc5366
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
680678b
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
6fa20ab
to see internal error added print(will remove)
arnavarora2004 Jul 14, 2025
2b2af72
import fixes
arnavarora2004 Jul 14, 2025
8c96d22
import fixes
arnavarora2004 Jul 14, 2025
373b87f
import fixes
arnavarora2004 Jul 14, 2025
9bd071c
import fixes
arnavarora2004 Jul 14, 2025
74b6dc3
import fixes
arnavarora2004 Jul 14, 2025
01f84da
import fixes
arnavarora2004 Jul 14, 2025
54b6ad1
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
c46ef26
pushed changes to debugging errors
arnavarora2004 Jul 14, 2025
b4fab07
Merge branch 'apache:master' into master
arnavarora2004 Jul 14, 2025
c600ea0
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 15, 2025
2d30e08
Merge branch 'apache:master' into master
arnavarora2004 Jul 15, 2025
221e558
made changes to allMutations test
arnavarora2004 Jul 15, 2025
9cb6c32
made changes to allMutations test
arnavarora2004 Jul 15, 2025
a9f77eb
Merge branch 'apache:master' into master
arnavarora2004 Jul 15, 2025
c0596f3
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
1db6821
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
80544d0
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jul 16, 2025
5753f18
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
6cd69d5
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
a53045c
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
7874226
Merge branch 'apache:master' into master
arnavarora2004 Jul 16, 2025
92a0ff9
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
54b9900
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
5b58815
new read errors fixed
arnavarora2004 Jul 16, 2025
ca12b07
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 16, 2025
81aa2ed
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 16, 2025
417bfea
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
fbf74a5
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
8aad18a
consolidated schema transform files, fixed small issues and bugs
arnavarora2004 Jul 17, 2025
d3f17bd
pushed changes to debugging errors, added pulls from other beam
arnavarora2004 Jul 17, 2025
d0d12ae
Merge branch 'apache:master' into master
arnavarora2004 Jul 17, 2025
16030c6
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
15a8bd2
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
85c1392
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
2cdd808
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
636df03
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
0ab4db4
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
204ff4d
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
a712320
Merge branch 'apache:master' into master
arnavarora2004 Jul 17, 2025
2e09dd7
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jul 17, 2025
f28eea9
pushed changes from ahmed
arnavarora2004 Jul 17, 2025
d26b45d
Following checkstyle tests
arnavarora2004 Jul 17, 2025
0b6e855
Following checkstyle tests
arnavarora2004 Jul 17, 2025
bfa8431
Merge branch 'apache:master' into master
arnavarora2004 Jul 23, 2025
78afb0d
Merge branch 'apache:master' into master
arnavarora2004 Jul 24, 2025
df653fc
Merge branch 'apache:master' into master
arnavarora2004 Jul 25, 2025
ff8bb26
pushed new changes to BigTableRead, making it work with new functiona…
arnavarora2004 Jul 25, 2025
80509a2
pushed new changes to BigTableRead, making it work with new functiona…
arnavarora2004 Jul 28, 2025
98642a3
pushed new changes to BigTableRead, making it work with new functiona…
arnavarora2004 Jul 28, 2025
9a7c16e
Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/be…
arnavarora2004 Jul 28, 2025
a12cdbd
pushed new changes to BigTableRead, making it work with new functiona…
arnavarora2004 Jul 28, 2025
de860a4
Merge branch 'master' of github.com:arnavarora2004/ArnavBeamWork
arnavarora2004 Jul 28, 2025
5293f96
pushed new changes to BigTableRead, making it work with new functiona…
arnavarora2004 Jul 28, 2025
bc1d637
Merge branch 'apache:master' into master
arnavarora2004 Jul 28, 2025
159a9ac
Merge branch 'apache:master' into master
arnavarora2004 Jul 29, 2025
4ba3104
pushed new changes to BigTableRead, making it work with new functiona…
arnavarora2004 Jul 29, 2025
ff6449d
pushed new changes to BigTableRead, making it work with new functiona…
arnavarora2004 Jul 29, 2025
dda6544
Merge branch 'apache:master' into master
arnavarora2004 Jul 30, 2025
215587d
new mongo files in branch
arnavarora2004 Jul 30, 2025
b140513
fixed family_name to string
arnavarora2004 Jul 30, 2025
9fd3658
fixed family_name to string
arnavarora2004 Jul 30, 2025
0651ec8
fixed family_name to string
arnavarora2004 Jul 30, 2025
764d51b
Merge branch 'apache:master' into master
arnavarora2004 Jul 30, 2025
b423787
Merge branch 'apache:master' into mongodb
arnavarora2004 Jul 30, 2025
2b0806d
fixed family_name to string
arnavarora2004 Jul 30, 2025
77e4dd3
fixed family_name to string
arnavarora2004 Jul 30, 2025
b4ad9e4
fixed family_name to string
arnavarora2004 Jul 31, 2025
4b9ce38
Merge branch 'apache:master' into master
arnavarora2004 Jul 31, 2025
f56e50d
Merge branch 'mongodb' into master
arnavarora2004 Jul 31, 2025
19d4223
Merge pull request #2 from arnavarora2004/master
arnavarora2004 Jul 31, 2025
3386f94
fixed family_name to string
arnavarora2004 Jul 31, 2025
2fd82ea
fixed family_name to string
arnavarora2004 Jul 31, 2025
3059504
fixed cmmit issues
arnavarora2004 Jul 31, 2025
f3b957a
Merge branch 'apache:master' into mongodb
arnavarora2004 Jul 31, 2025
e0a2bd5
Merge branch 'apache:master' into master
arnavarora2004 Jul 31, 2025
e7a44bc
fixed cmmit issues
arnavarora2004 Jul 31, 2025
af0e80b
Merge branch 'apache:master' into master
arnavarora2004 Aug 1, 2025
188222f
Merge branch 'apache:master' into master
arnavarora2004 Aug 5, 2025
7a0f7a2
Merge branch 'apache:master' into mongodb
arnavarora2004 Aug 5, 2025
6db0800
added new stuff to provider
arnavarora2004 Aug 6, 2025
ef8b856
Merge branch 'apache:master' into master
arnavarora2004 Aug 6, 2025
669b80d
commented assert test, everything should work now
arnavarora2004 Aug 6, 2025
0bf8e3b
commented assert test, everything should work now
arnavarora2004 Aug 6, 2025
afb690a
new changes, fixed some gradle stuff
arnavarora2004 Aug 6, 2025
d60c2f9
new branch
arnavarora2004 Aug 6, 2025
1783506
Merge branch 'master' into mongoWrite
arnavarora2004 Aug 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class BigtableReadSchemaTransformProvider
"column_families",
Schema.FieldType.STRING,
Schema.FieldType.map(
Schema.FieldType.STRING,
Schema.FieldType.BYTES,
Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
.build();
public static final Schema FLATTENED_ROW_SCHEMA =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.bson.Document;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@AutoService(SchemaTransformProvider.class)
public class MongoDbReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> {

private static final String OUTPUT_TAG = "output";

@Override
protected Class<MongoDbReadSchemaTransformConfiguration> configurationClass() {
return MongoDbReadSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) {
return new MongoDbReadSchemaTransform(configuration);
}

@Override
public String identifier() {
// Return a unique URN for the transform.
return "beam:schematransform:org.apache.beam:mongodb_read:v1";
}

@Override
public List<String> inputCollectionNames() {
// A read transform does not have an input PCollection.
return Collections.emptyList();
}

@Override
public List<String> outputCollectionNames() {
// The primary output is a PCollection of Rows.
// Error handling could be added later with a second "errors" output tag.
return Collections.singletonList(OUTPUT_TAG);
}

/** Configuration class for the MongoDB Read transform. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable {

@SchemaFieldDescription("The connection URI for the MongoDB server.")
public abstract String getUri();

@SchemaFieldDescription("The MongoDB database to read from.")
public abstract String getDatabase();

@SchemaFieldDescription("The MongoDB collection to read from.")
public abstract String getCollection();

@SchemaFieldDescription(
"An optional BSON filter to apply to the read. This should be a valid JSON string.")
@Nullable
public abstract String getFilter();

public void validate() {
checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified.");
checkArgument(
getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified.");
checkArgument(
getCollection() != null && !getCollection().isEmpty(),
"MongoDB collection must be specified.");
}

public static Builder builder() {
return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration
.Builder();
}

/** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setUri(String uri);

public abstract Builder setDatabase(String database);

public abstract Builder setCollection(String collection);

public abstract Builder setFilter(String filter);

public abstract MongoDbReadSchemaTransformConfiguration build();
}
}

/** The {@link SchemaTransform} that performs the read operation. */
private static class MongoDbReadSchemaTransform extends SchemaTransform {
private final MongoDbReadSchemaTransformConfiguration configuration;

MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// A read transform does not have an input, so we start with the pipeline.
PCollection<Document> mongoDocs =
input
.getPipeline()
.apply(
"ReadFromMongoDb",
MongoDbIO.read()
.withUri(configuration.getUri())
.withDatabase(configuration.getDatabase())
.withCollection(configuration.getCollection()));
// TODO: Add support for .withFilter() if it exists in your MongoDbIO,
// using configuration.getFilter().

// Convert the BSON Document objects into Beam Row objects.
PCollection<Row> beamRows =
mongoDocs.apply("ConvertToBeamRows", ParDo.of(new MongoDocumentToRowFn()));

return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
}
}

/**
* A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link Row}.
*
* <p>This is a critical step to ensure data is in a schema-aware format.
*/
private static class MongoDocumentToRowFn extends DoFn<Document, Row> {
// TODO: Define the Beam Schema that corresponds to your MongoDB documents.
// This could be made dynamic based on an inferred schema or a user-provided schema.
// For this skeleton, we assume a static schema.
// public static final Schema OUTPUT_SCHEMA = Schema.builder()...build();

@ProcessElement
public void processElement(@Element Document doc, OutputReceiver<Row> out) {
// Here you will convert the BSON document to a Beam Row.
// This requires you to know the target schema.

// Example pseudo-code:
// Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA);
// for (Map.Entry<String, Object> entry : doc.entrySet()) {
// rowBuilder.addValue(entry.getValue());
// }
// out.output(rowBuilder.build());

// For a robust implementation, you would handle data type conversions
// between BSON types and Beam schema types.
throw new UnsupportedOperationException(
"MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.bson.Document;

/**
* An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@AutoService(SchemaTransformProvider.class)
public class MongoDbWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<
MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> {

private static final String INPUT_TAG = "input";

@Override
protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) {
return new MongoDbWriteSchemaTransform(configuration);
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:mongodb_write:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}

/** Configuration class for the MongoDB Write transform. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class MongoDbWriteSchemaTransformConfiguration implements Serializable {

@SchemaFieldDescription("The connection URI for the MongoDB server.")
public abstract String getUri();

@SchemaFieldDescription("The MongoDB database to write to.")
public abstract String getDatabase();

@SchemaFieldDescription("The MongoDB collection to write to.")
public abstract String getCollection();

// @SchemaFieldDescription("The number of documents to include in each batch write.")
// @Nullable
// public abstract Long getBatchSize();
//
// @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.")
// @Nullable
// public abstract Boolean getOrdered();

public void validate() {
checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified.");
checkArgument(
getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified.");
checkArgument(
getCollection() != null && !getCollection().isEmpty(),
"MongoDB collection must be specified.");
}

public static Builder builder() {
return new AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration
.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setUri(String uri);

public abstract Builder setDatabase(String database);

public abstract Builder setCollection(String collection);

public abstract Builder setBatchSize(Long batchSize);

public abstract Builder setOrdered(Boolean ordered);

public abstract MongoDbWriteSchemaTransformConfiguration build();
}
}

/** The {@link SchemaTransform} that performs the write operation. */
private static class MongoDbWriteSchemaTransform extends SchemaTransform {
private final MongoDbWriteSchemaTransformConfiguration configuration;

MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rows = input.get(INPUT_TAG);

PCollection<Document> documents =
rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn()));

MongoDbIO.Write write =
MongoDbIO.write()
.withUri(configuration.getUri())
.withDatabase(configuration.getDatabase())
.withCollection(configuration.getCollection());

// if (configuration.getBatchSize() != null) {
// write = write.withBatchSize(configuration.getBatchSize());
// }
// if (configuration.getOrdered() != null) {
// write = write.withOrdered(configuration.getOrdered());
// }

documents.apply("WriteToMongo", write);

return PCollectionRowTuple.empty(input.getPipeline());
}
}

/** A {@link DoFn} to convert a Beam {@link Row} to a MongoDB {@link Document}. */
private static class RowToMongoDocumentFn extends DoFn<Row, Document> {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Document> out) {
Document doc = new Document();
for (int i = 0; i < row.getSchema().getFieldCount(); i++) {
String fieldName = row.getSchema().getField(i).getName();
Object value = row.getValue(i);

if (value != null) {
doc.append(fieldName, value);
}
}
out.output(doc);
}
}
/** Converts a Beam {@link Row} to a BSON {@link Document}. */
static class RowToBsonDocumentFn extends DoFn<Row, Document> {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Document> out) {
Document doc = new Document();
for (Field field : row.getSchema().getFields()) {
doc.append(field.getName(), row.getValue(field.getName()));
}
out.output(doc);
}
}
}
Loading
Loading