Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ jobs:
run: ./mvnw -B -T 1 clean test -D"license.skipAddThirdParty"=true -pl seatunnel-ci-tools -am --no-snapshot-updates
env:
MAVEN_OPTS: -Xmx512m
- name: Check for .class files in git
run: |
echo "Checking for .class files tracked by git..."

# Find all .class files tracked by git
CLASS_FILES=$(git ls-files '*.class')

if [ -n "$CLASS_FILES" ]; then
echo "ERROR: The following .class files are tracked by git:"
echo "$CLASS_FILES"
echo ""
echo "Please remove .class files from the repository."
echo "These files should not be committed. You can remove them using:"
echo " git rm --cached <file>.class"
echo " git commit -m 'Remove .class files'"
echo ""
echo "Also, consider adding '*.class' to .gitignore if not already present."
exit 1
else
echo "No .class files found in git repository."
fi

helm-chart-check:
name: Check Helm Chart Syntax
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Package Files #
*.jar
*.class
*.zip
*.tar.gz

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ public void checkConnectorOptionExist() {

private Set<String> buildWhiteList() {
Set<String> whiteList = new HashSet<>();
whiteList.add("MongodbSinkOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,29 @@
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbBaseOptions;

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;

@AutoService(Factory.class)
public class MongodbCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
return new MongodbCatalog(
catalogName, options.get(MongodbConfig.URI), options.get(MongodbConfig.DATABASE));
catalogName,
options.get(MongodbBaseOptions.URI),
options.get(MongodbBaseOptions.DATABASE));
}

@Override
public String factoryIdentifier() {
return CONNECTOR_IDENTITY;
return MongodbBaseOptions.CONNECTOR_IDENTITY;
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(MongodbConfig.URI, MongodbConfig.DATABASE).build();
return OptionRule.builder()
.required(MongodbBaseOptions.URI, MongodbBaseOptions.DATABASE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.mongodb.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;

public class MongodbBaseOptions {

public static final String ENCODE_VALUE_FIELD = "_value";

public static final JsonWriterSettings DEFAULT_JSON_WRITER_SETTINGS =
JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).build();

public static final String CONNECTOR_IDENTITY = "MongoDB";

public static final Option<String> URI =
Options.key("uri")
.stringType()
.noDefaultValue()
.withDescription("The MongoDB connection uri.");

public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("The name of MongoDB database to read or write.");

public static final Option<String> COLLECTION =
Options.key("collection")
.stringType()
.noDefaultValue()
.withDescription("The name of MongoDB collection to read or write.");
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.mongodb.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;

import java.util.Arrays;
import java.util.List;

import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;

public class MongodbSinkOptions extends MongodbBaseOptions {

public static final Option<Integer> BUFFER_FLUSH_MAX_ROWS =
Options.key("buffer-flush.max-rows")
.intType()
.defaultValue(1000)
.withDescription(
"Specifies the maximum number of buffered rows per batch request.");

public static final Option<Long> BUFFER_FLUSH_INTERVAL =
Options.key("buffer-flush.interval")
.longType()
.defaultValue(30000L)
.withDescription(
"Specifies the maximum interval of buffered rows per batch request, the unit is millisecond.");

public static final Option<Integer> RETRY_MAX =
Options.key("retry.max")
.intType()
.defaultValue(3)
.withDescription(
"Specifies the max number of retry if writing records to database failed.");

public static final Option<Long> RETRY_INTERVAL =
Options.key("retry.interval")
.longType()
.defaultValue(1000L)
.withDescription(
"Specifies the retry time interval if writing records to database failed.");

public static final Option<Boolean> UPSERT_ENABLE =
Options.key("upsert-enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether to write documents via upsert mode.");

public static final Option<List<String>> PRIMARY_KEY =
Options.key("primary-key")
.listType()
.noDefaultValue()
.withDescription(
"The primary keys for upsert/update. Keys are in csv format for properties.")
.withFallbackKeys("upsert-key");

public static final Option<Boolean> TRANSACTION =
Options.key("transaction").booleanType().defaultValue(false).withDescription(".");

public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
.defaultValue(APPEND_DATA)
.withDescription("The save mode of collection data");
}
Loading
Loading