-
Notifications
You must be signed in to change notification settings - Fork 4.5k
MongoDB Draft Pr #35794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MongoDB Draft Pr #35794
Changes from all commits
ad54a58
0edf81d
18c9395
a25033e
0e4fb14
c048dcf
4ebc7c8
3bb3dfc
a8b8196
cf8bd8f
a06d7c6
5760278
5cb0dd7
f2640ae
69467c5
121ddf6
82d3612
14ca717
c36d864
d6366dd
fcb5d03
38ff386
a15e4ff
c759a07
8e19a81
b6d0157
7ea3f76
a35ced7
50bb5a3
426519d
3152094
84a3cfd
1ca2527
954355b
ab18e18
80a732e
16f1064
3c9c582
b842ac9
0ed5da1
cea5987
b6498c8
5f6992d
bdc9cff
37abe22
5cb46df
b1fae9c
4866acc
8ac0fda
32bfbe8
b217de2
1ad3a32
364a761
5338470
c1bc8c6
fad8ae8
4315c4f
9e4514c
64a7303
1fc5366
680678b
6fa20ab
2b2af72
8c96d22
373b87f
9bd071c
74b6dc3
01f84da
54b6ad1
c46ef26
b4fab07
c600ea0
2d30e08
221e558
9cb6c32
a9f77eb
c0596f3
1db6821
80544d0
5753f18
6cd69d5
a53045c
7874226
92a0ff9
54b9900
5b58815
ca12b07
81aa2ed
417bfea
fbf74a5
8aad18a
d3f17bd
d0d12ae
16030c6
15a8bd2
85c1392
2cdd808
636df03
0ab4db4
204ff4d
a712320
2e09dd7
f28eea9
d26b45d
0b6e855
bfa8431
78afb0d
df653fc
ff8bb26
80509a2
98642a3
9a7c16e
a12cdbd
de860a4
5293f96
bc1d637
159a9ac
4ba3104
ff6449d
dda6544
215587d
b140513
9fd3658
0651ec8
764d51b
b423787
2b0806d
77e4dd3
b4ad9e4
4b9ce38
f56e50d
19d4223
3386f94
2fd82ea
3059504
f3b957a
e0a2bd5
e7a44bc
af0e80b
188222f
7a0f7a2
6db0800
ef8b856
669b80d
0bf8e3b
afb690a
8d2db97
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,193 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.mongodb; | ||
|
|
||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import com.google.auto.value.AutoValue; | ||
| import java.io.Serializable; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import org.apache.beam.sdk.schemas.AutoValueSchema; | ||
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | ||
| import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; | ||
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; | ||
| import org.apache.beam.sdk.transforms.DoFn; | ||
| import org.apache.beam.sdk.transforms.ParDo; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
| import org.apache.beam.sdk.values.PCollectionRowTuple; | ||
| import org.apache.beam.sdk.values.Row; | ||
| import org.bson.Document; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
|
||
| /** | ||
| * An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. | ||
| * | ||
| * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We | ||
| * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam | ||
| * repository. | ||
| */ | ||
| @AutoService(SchemaTransformProvider.class) | ||
| public class MongoDbReadSchemaTransformProvider | ||
| extends TypedSchemaTransformProvider< | ||
| MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> { | ||
|
|
||
| private static final String OUTPUT_TAG = "output"; | ||
|
|
||
| @Override | ||
| protected Class<MongoDbReadSchemaTransformConfiguration> configurationClass() { | ||
| return MongoDbReadSchemaTransformConfiguration.class; | ||
| } | ||
|
|
||
| @Override | ||
| protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { | ||
| return new MongoDbReadSchemaTransform(configuration); | ||
| } | ||
|
|
||
| @Override | ||
| public String identifier() { | ||
| // Return a unique URN for the transform. | ||
| return "beam:schematransform:org.apache.beam:mongodb_read:v1"; | ||
| } | ||
|
|
||
| @Override | ||
| public List<String> inputCollectionNames() { | ||
| // A read transform does not have an input PCollection. | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| @Override | ||
| public List<String> outputCollectionNames() { | ||
| // The primary output is a PCollection of Rows. | ||
| // Error handling could be added later with a second "errors" output tag. | ||
| return Collections.singletonList(OUTPUT_TAG); | ||
| } | ||
|
|
||
| /** Configuration class for the MongoDB Read transform. */ | ||
| @DefaultSchema(AutoValueSchema.class) | ||
| @AutoValue | ||
| public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable { | ||
|
|
||
| @SchemaFieldDescription("The connection URI for the MongoDB server.") | ||
| public abstract String getUri(); | ||
|
|
||
| @SchemaFieldDescription("The MongoDB database to read from.") | ||
| public abstract String getDatabase(); | ||
|
|
||
| @SchemaFieldDescription("The MongoDB collection to read from.") | ||
| public abstract String getCollection(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "An optional BSON filter to apply to the read. This should be a valid JSON string.") | ||
| @Nullable | ||
| public abstract String getFilter(); | ||
|
|
||
| public void validate() { | ||
| checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); | ||
| checkArgument( | ||
| getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); | ||
| checkArgument( | ||
| getCollection() != null && !getCollection().isEmpty(), | ||
| "MongoDB collection must be specified."); | ||
| } | ||
|
|
||
| public static Builder builder() { | ||
| return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration | ||
| .Builder(); | ||
| } | ||
|
|
||
| /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */ | ||
| @AutoValue.Builder | ||
| public abstract static class Builder { | ||
| public abstract Builder setUri(String uri); | ||
|
|
||
| public abstract Builder setDatabase(String database); | ||
|
|
||
| public abstract Builder setCollection(String collection); | ||
|
|
||
| public abstract Builder setFilter(String filter); | ||
|
|
||
| public abstract MongoDbReadSchemaTransformConfiguration build(); | ||
| } | ||
| } | ||
|
|
||
| /** The {@link SchemaTransform} that performs the read operation. */ | ||
| private static class MongoDbReadSchemaTransform extends SchemaTransform { | ||
| private final MongoDbReadSchemaTransformConfiguration configuration; | ||
|
|
||
| MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { | ||
| configuration.validate(); | ||
| this.configuration = configuration; | ||
| } | ||
|
|
||
| @Override | ||
| public PCollectionRowTuple expand(PCollectionRowTuple input) { | ||
| // A read transform does not have an input, so we start with the pipeline. | ||
| PCollection<Document> mongoDocs = | ||
| input | ||
| .getPipeline() | ||
| .apply( | ||
| "ReadFromMongoDb", | ||
| MongoDbIO.read() | ||
| .withUri(configuration.getUri()) | ||
| .withDatabase(configuration.getDatabase()) | ||
| .withCollection(configuration.getCollection())); | ||
| // TODO: Add support for .withFilter() if it exists in your MongoDbIO, | ||
| // using configuration.getFilter(). | ||
|
|
||
| // Convert the BSON Document objects into Beam Row objects. | ||
| PCollection<Row> beamRows = | ||
| mongoDocs.apply("ConvertToBeamRows", ParDo.of(new MongoDocumentToRowFn())); | ||
|
|
||
| return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link Row}. | ||
| * | ||
| * <p>This is a critical step to ensure data is in a schema-aware format. | ||
| */ | ||
| private static class MongoDocumentToRowFn extends DoFn<Document, Row> { | ||
| // TODO: Define the Beam Schema that corresponds to your MongoDB documents. | ||
| // This could be made dynamic based on an inferred schema or a user-provided schema. | ||
| // For this skeleton, we assume a static schema. | ||
| // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build(); | ||
|
|
||
| @ProcessElement | ||
| public void processElement(@Element Document doc, OutputReceiver<Row> out) { | ||
| // Here you will convert the BSON document to a Beam Row. | ||
| // This requires you to know the target schema. | ||
|
|
||
| // Example pseudo-code: | ||
| // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA); | ||
| // for (Map.Entry<String, Object> entry : doc.entrySet()) { | ||
| // rowBuilder.addValue(entry.getValue()); | ||
| // } | ||
| // out.output(rowBuilder.build()); | ||
|
|
||
| // For a robust implementation, you would handle data type conversions | ||
| // between BSON types and Beam schema types. | ||
| throw new UnsupportedOperationException( | ||
| "MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows."); | ||
| } | ||
arnavarora2004 marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+169
to
+191
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,184 @@ | ||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||||||||||||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||||||||||||||||||||||||
| * distributed with this work for additional information | ||||||||||||||||||||||||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||||||||||||||||||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||||||||||||||||||||||||
| * "License"); you may not use this file except in compliance | ||||||||||||||||||||||||||||||
| * with the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||
| package org.apache.beam.sdk.io.mongodb; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import com.google.auto.service.AutoService; | ||||||||||||||||||||||||||||||
| import com.google.auto.value.AutoValue; | ||||||||||||||||||||||||||||||
| import java.io.Serializable; | ||||||||||||||||||||||||||||||
| import java.util.Collections; | ||||||||||||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.schemas.AutoValueSchema; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.schemas.Schema.Field; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.transforms.DoFn; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.transforms.ParDo; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.values.PCollection; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.values.PCollectionRowTuple; | ||||||||||||||||||||||||||||||
| import org.apache.beam.sdk.values.Row; | ||||||||||||||||||||||||||||||
| import org.bson.Document; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||
| * An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB. | ||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We | ||||||||||||||||||||||||||||||
| * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam | ||||||||||||||||||||||||||||||
| * repository. | ||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||
| @AutoService(SchemaTransformProvider.class) | ||||||||||||||||||||||||||||||
| public class MongoDbWriteSchemaTransformProvider | ||||||||||||||||||||||||||||||
| extends TypedSchemaTransformProvider< | ||||||||||||||||||||||||||||||
| MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> { | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private static final String INPUT_TAG = "input"; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||
| protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) { | ||||||||||||||||||||||||||||||
| return new MongoDbWriteSchemaTransform(configuration); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||
| public String identifier() { | ||||||||||||||||||||||||||||||
| return "beam:schematransform:org.apache.beam:mongodb_write:v1"; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||
| public List<String> inputCollectionNames() { | ||||||||||||||||||||||||||||||
| return Collections.singletonList(INPUT_TAG); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /** Configuration class for the MongoDB Write transform. */ | ||||||||||||||||||||||||||||||
| @DefaultSchema(AutoValueSchema.class) | ||||||||||||||||||||||||||||||
| @AutoValue | ||||||||||||||||||||||||||||||
| public abstract static class MongoDbWriteSchemaTransformConfiguration implements Serializable { | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @SchemaFieldDescription("The connection URI for the MongoDB server.") | ||||||||||||||||||||||||||||||
| public abstract String getUri(); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @SchemaFieldDescription("The MongoDB database to write to.") | ||||||||||||||||||||||||||||||
| public abstract String getDatabase(); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @SchemaFieldDescription("The MongoDB collection to write to.") | ||||||||||||||||||||||||||||||
| public abstract String getCollection(); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // @SchemaFieldDescription("The number of documents to include in each batch write.") | ||||||||||||||||||||||||||||||
| // @Nullable | ||||||||||||||||||||||||||||||
| // public abstract Long getBatchSize(); | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| // @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.") | ||||||||||||||||||||||||||||||
| // @Nullable | ||||||||||||||||||||||||||||||
| // public abstract Boolean getOrdered(); | ||||||||||||||||||||||||||||||
|
Comment on lines
+84
to
+90
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The configuration properties
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public void validate() { | ||||||||||||||||||||||||||||||
| checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); | ||||||||||||||||||||||||||||||
| checkArgument( | ||||||||||||||||||||||||||||||
| getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); | ||||||||||||||||||||||||||||||
| checkArgument( | ||||||||||||||||||||||||||||||
| getCollection() != null && !getCollection().isEmpty(), | ||||||||||||||||||||||||||||||
| "MongoDB collection must be specified."); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public static Builder builder() { | ||||||||||||||||||||||||||||||
| return new AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration | ||||||||||||||||||||||||||||||
| .Builder(); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @AutoValue.Builder | ||||||||||||||||||||||||||||||
| public abstract static class Builder { | ||||||||||||||||||||||||||||||
| public abstract Builder setUri(String uri); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public abstract Builder setDatabase(String database); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public abstract Builder setCollection(String collection); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public abstract Builder setBatchSize(Long batchSize); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public abstract Builder setOrdered(Boolean ordered); | ||||||||||||||||||||||||||||||
|
Comment on lines
+114
to
+116
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The builder includes |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public abstract MongoDbWriteSchemaTransformConfiguration build(); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /** The {@link SchemaTransform} that performs the write operation. */ | ||||||||||||||||||||||||||||||
| private static class MongoDbWriteSchemaTransform extends SchemaTransform { | ||||||||||||||||||||||||||||||
| private final MongoDbWriteSchemaTransformConfiguration configuration; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration configuration) { | ||||||||||||||||||||||||||||||
| configuration.validate(); | ||||||||||||||||||||||||||||||
| this.configuration = configuration; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||
| public PCollectionRowTuple expand(PCollectionRowTuple input) { | ||||||||||||||||||||||||||||||
| PCollection<Row> rows = input.get(INPUT_TAG); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| PCollection<Document> documents = | ||||||||||||||||||||||||||||||
| rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn())); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| MongoDbIO.Write write = | ||||||||||||||||||||||||||||||
| MongoDbIO.write() | ||||||||||||||||||||||||||||||
| .withUri(configuration.getUri()) | ||||||||||||||||||||||||||||||
| .withDatabase(configuration.getDatabase()) | ||||||||||||||||||||||||||||||
| .withCollection(configuration.getCollection()); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // if (configuration.getBatchSize() != null) { | ||||||||||||||||||||||||||||||
| // write = write.withBatchSize(configuration.getBatchSize()); | ||||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||||
| // if (configuration.getOrdered() != null) { | ||||||||||||||||||||||||||||||
| // write = write.withOrdered(configuration.getOrdered()); | ||||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||||
|
Comment on lines
+144
to
+149
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| documents.apply("WriteToMongo", write); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| return PCollectionRowTuple.empty(input.getPipeline()); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /** A {@link DoFn} to convert a Beam {@link Row} to a MongoDB {@link Document}. */ | ||||||||||||||||||||||||||||||
| private static class RowToMongoDocumentFn extends DoFn<Row, Document> { | ||||||||||||||||||||||||||||||
| @ProcessElement | ||||||||||||||||||||||||||||||
| public void processElement(@Element Row row, OutputReceiver<Document> out) { | ||||||||||||||||||||||||||||||
| Document doc = new Document(); | ||||||||||||||||||||||||||||||
| for (int i = 0; i < row.getSchema().getFieldCount(); i++) { | ||||||||||||||||||||||||||||||
| String fieldName = row.getSchema().getField(i).getName(); | ||||||||||||||||||||||||||||||
| Object value = row.getValue(i); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if (value != null) { | ||||||||||||||||||||||||||||||
| doc.append(fieldName, value); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| out.output(doc); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+158
to
+172
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Comment on lines
+158
to
+172
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||
| /** Converts a Beam {@link Row} to a BSON {@link Document}. */ | ||||||||||||||||||||||||||||||
| static class RowToBsonDocumentFn extends DoFn<Row, Document> { | ||||||||||||||||||||||||||||||
| @ProcessElement | ||||||||||||||||||||||||||||||
| public void processElement(@Element Row row, OutputReceiver<Document> out) { | ||||||||||||||||||||||||||||||
| Document doc = new Document(); | ||||||||||||||||||||||||||||||
| for (Field field : row.getSchema().getFields()) { | ||||||||||||||||||||||||||||||
| doc.append(field.getName(), row.getValue(field.getName())); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| out.output(doc); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
filterfrom the configuration is not being used. TheMongoDbIO.read()transform should be configured with the filter provided inconfiguration.getFilter(). You'll need to parse the filter string into a BsonDocument.