diff --git a/mariadb-plugin/docs/Mariadb-batchsink.md b/mariadb-plugin/docs/Mariadb-batchsink.md
index 11176c0db..e4541fe67 100644
--- a/mariadb-plugin/docs/Mariadb-batchsink.md
+++ b/mariadb-plugin/docs/Mariadb-batchsink.md
@@ -60,41 +60,39 @@ connections.
Data Types Mapping
----------
- +--------------------------------+-----------------------+------------------------------------+
- | MariaDB Data Type | CDAP Schema Data Type | Comment |
- +--------------------------------+-----------------------+------------------------------------+
- | TINYINT | int | |
- | BOOLEAN, BOOL | boolean | |
- | SMALLINT | int | |
- | MEDIUMINT | int | |
- | INT, INTEGER | int | |
- | BIGINT | long | |
- | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
- | FLOAT | float | |
- | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
- | BIT | boolean | |
- | CHAR | string | |
- | VARCHAR | string | |
- | BINARY | bytes | |
- | CHAR BYTE | bytes | |
- | VARBINARY | bytes | |
- | TINYBLOB | bytes | |
- | BLOB | bytes | |
- | MEDIUMBLOB | bytes | |
- | LONGBLOB | bytes | |
- | TINYTEXT | string | |
- | TEXT | string | |
- | MEDIUMTEXT | string | |
- | LONGTEXT | string | |
- | JSON | string | In MariaDB it is alias to LONGTEXT |
- | ENUM | string | Mapping to String by default |
- | SET | string | |
- | DATE | date | |
- | TIME | time_micros | |
- | DATETIME | timestamp_micros | |
- | TIMESTAMP | timestamp_micros | |
- | YEAR | date | |
- +--------------------------------+-----------------------+------------------------------------+
+ | MariaDB Data Type | CDAP Schema Data Type | Comment |
+ |--------------------------------|-----------------------|---------------------------------------------------------|
+ | TINYINT | int | |
+ | BOOLEAN, BOOL | boolean | |
+ | SMALLINT | int | |
+ | MEDIUMINT | int | |
+ | INT, INTEGER | int | |
+ | BIGINT | long | |
+ | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
+ | FLOAT | float | |
+ | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
+ | BIT | boolean | |
+ | CHAR | string | |
+ | VARCHAR | string | |
+ | BINARY | bytes | |
+ | CHAR BYTE | bytes | |
+ | VARBINARY | bytes | |
+ | TINYBLOB | bytes | |
+ | BLOB | bytes | |
+ | MEDIUMBLOB | bytes | |
+ | LONGBLOB | bytes | |
+ | TINYTEXT | string | |
+ | TEXT | string | |
+ | MEDIUMTEXT | string | |
+ | LONGTEXT | string | |
+ | JSON | string | In MariaDB it is alias to LONGTEXT |
+ | ENUM | string | Mapping to String by default |
+ | SET | string | |
+ | DATE | date | |
+ | TIME | time_micros | |
+ | DATETIME | timestamp_micros | |
+ | TIMESTAMP | timestamp_micros | |
+ | YEAR | int | Users can manually set output schema to map it to Date. |
Example
-------
diff --git a/mariadb-plugin/docs/Mariadb-batchsource.md b/mariadb-plugin/docs/Mariadb-batchsource.md
index 2b1fe3944..713af2ee8 100644
--- a/mariadb-plugin/docs/Mariadb-batchsource.md
+++ b/mariadb-plugin/docs/Mariadb-batchsource.md
@@ -78,43 +78,39 @@ with the tradeoff of higher memory usage.
Data Types Mapping
----------
-
- +--------------------------------+-----------------------+------------------------------------+
- | MariaDB Data Type | CDAP Schema Data Type | Comment |
- +--------------------------------+-----------------------+------------------------------------+
- | TINYINT | int | |
- | BOOLEAN, BOOL | boolean | |
- | SMALLINT | int | |
- | MEDIUMINT | int | |
- | INT, INTEGER | int | |
- | BIGINT | long | |
- | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
- | FLOAT | float | |
- | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
- | BIT | boolean | |
- | CHAR | string | |
- | VARCHAR | string | |
- | BINARY | bytes | |
- | CHAR BYTE | bytes | |
- | VARBINARY | bytes | |
- | TINYBLOB | bytes | |
- | BLOB | bytes | |
- | MEDIUMBLOB | bytes | |
- | LONGBLOB | bytes | |
- | TINYTEXT | string | |
- | TEXT | string | |
- | MEDIUMTEXT | string | |
- | LONGTEXT | string | |
- | JSON | string | In MariaDB it is alias to LONGTEXT |
- | ENUM | string | Mapping to String by default |
- | SET | string | |
- | DATE | date | |
- | TIME | time_micros | |
- | DATETIME | timestamp_micros | |
- | TIMESTAMP | timestamp_micros | |
- | YEAR | date | |
- +--------------------------------+-----------------------+------------------------------------+
-
+ | MariaDB Data Type | CDAP Schema Data Type | Comment |
+ |--------------------------------|-----------------------|---------------------------------------------------------|
+ | TINYINT | int | |
+ | BOOLEAN, BOOL | boolean | |
+ | SMALLINT | int | |
+ | MEDIUMINT | int | |
+ | INT, INTEGER | int | |
+ | BIGINT | long | |
+ | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
+ | FLOAT | float | |
+ | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
+ | BIT | boolean | |
+ | CHAR | string | |
+ | VARCHAR | string | |
+ | BINARY | bytes | |
+ | CHAR BYTE | bytes | |
+ | VARBINARY | bytes | |
+ | TINYBLOB | bytes | |
+ | BLOB | bytes | |
+ | MEDIUMBLOB | bytes | |
+ | LONGBLOB | bytes | |
+ | TINYTEXT | string | |
+ | TEXT | string | |
+ | MEDIUMTEXT | string | |
+ | LONGTEXT | string | |
+ | JSON | string | In MariaDB it is alias to LONGTEXT |
+ | ENUM | string | Mapping to String by default |
+ | SET | string | |
+ | DATE | date | |
+ | TIME | time_micros | |
+ | DATETIME | timestamp_micros | |
+ | TIMESTAMP | timestamp_micros | |
+ | YEAR | int | Users can manually set output schema to map it to Date. |
Example
------
diff --git a/mariadb-plugin/pom.xml b/mariadb-plugin/pom.xml
index b82cc7c24..e95253277 100644
--- a/mariadb-plugin/pom.xml
+++ b/mariadb-plugin/pom.xml
@@ -83,6 +83,11 @@
RELEASE
compile
+
+ io.cdap.plugin
+ mysql-plugin
+ ${project.version}
+
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java
new file mode 100644
index 000000000..94498c787
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbDBRecord.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.plugin.db.ColumnType;
+import io.cdap.plugin.mysql.MysqlDBRecord;
+import java.util.List;
+
+/**
+ * Writable class for MariaDB Source/Sink.
+ */
+public class MariadbDBRecord extends MysqlDBRecord {
+
+ /**
+ * Used in map-reduce. Do not remove.
+ */
+ @SuppressWarnings("unused")
+ public MariadbDBRecord() {
+ // Required by Hadoop DBRecordReader to create an instance
+ }
+
+ public MariadbDBRecord(StructuredRecord record, List columnTypes) {
+ super(record, columnTypes);
+ }
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java
new file mode 100644
index 000000000..37ac12a93
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSchemaReader.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+
+import io.cdap.plugin.mysql.MysqlSchemaReader;
+import java.util.Map;
+
+/**
+ * Schema reader for mapping Maria DB type
+ */
+public class MariadbSchemaReader extends MysqlSchemaReader {
+
+ public MariadbSchemaReader (String sessionID) {
+ super(sessionID);
+ }
+
+ public MariadbSchemaReader (String sessionID, Map connectionArguments) {
+ super(sessionID, connectionArguments);
+ }
+
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
index ab20f3c5d..9dbf0c7d4 100644
--- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
@@ -19,10 +19,14 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.plugin.db.DBRecord;
+import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.DBSpecificSinkConfig;
import io.cdap.plugin.db.sink.AbstractDBSink;
+import io.cdap.plugin.mysql.MysqlDBRecord;
import java.util.Map;
import javax.annotation.Nullable;
@@ -45,6 +49,17 @@ public MariadbSink(MariadbSinkConfig mariadbSinkConfig) {
this.mariadbSinkConfig = mariadbSinkConfig;
}
+ @Override
+ protected DBRecord getDBRecord(StructuredRecord output) {
+ return new MariadbDBRecord(output, columnTypes);
+ }
+
+ @Override
+ protected SchemaReader getSchemaReader() {
+ return new MariadbSchemaReader(null);
+ }
+
+
/**
* MariaDB Sink Config.
*/
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
index d5ffcb290..4a4d689bb 100644
--- a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
@@ -20,9 +20,16 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import io.cdap.plugin.common.Asset;
+import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.DBSpecificSourceConfig;
import io.cdap.plugin.db.source.AbstractDBSource;
+import io.cdap.plugin.util.DBUtils;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -53,10 +60,36 @@ protected String createConnectionString() {
mariadbSourceConfig.host, mariadbSourceConfig.port, mariadbSourceConfig.database);
}
+ @Override
+ protected Class extends DBWritable> getDBRecordType() {
+ return MariadbDBRecord.class;
+ }
+
+ @Override
+ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
+ String fqn = DBUtils.constructFQN("mariadb",
+ mariadbSourceConfig.host,
+ mariadbSourceConfig.port,
+ mariadbSourceConfig.database,
+ mariadbSourceConfig.getReferenceName());
+ Asset asset = Asset.builder(mariadbSourceConfig.getReferenceName()).setFqn(fqn).build();
+ return new LineageRecorder(context, asset);
+ }
+
+ @Override
+ protected SchemaReader getSchemaReader() {
+ return new MariadbSchemaReader(null, mariadbSourceConfig.getConnectionArguments());
+ }
+
/**
* MaraiDB source mariadbSourceConfig.
*/
public static class MariadbSourceConfig extends DBSpecificSourceConfig {
+ private static final String JDBC_PROPERTY_CONNECT_TIMEOUT = "connectTimeout";
+ private static final String JDBC_PROPERTY_SOCKET_TIMEOUT = "socketTimeout";
+ private static final String JDBC_REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements";
+
+ private static final String MARIADB_TINYINT1_IS_BIT = "tinyInt1isBit";
@Name(MariadbConstants.AUTO_RECONNECT)
@Description("Should the driver try to re-establish stale and/or dead connections")
@@ -116,5 +149,18 @@ public Map getDBSpecificArguments() {
public List getInitQueries() {
return MariadbUtil.composeDbInitQueries(useAnsiQuotes);
}
+
+ @Override
+ public Map getConnectionArguments() {
+ Map arguments = new HashMap<>(super.getConnectionArguments());
+ // the unit below is millisecond
+ arguments.putIfAbsent(JDBC_PROPERTY_CONNECT_TIMEOUT, "20000");
+ arguments.putIfAbsent(JDBC_PROPERTY_SOCKET_TIMEOUT, "20000");
+ arguments.putIfAbsent(JDBC_REWRITE_BATCHED_STATEMENTS, "true");
+ // MariaDB property to ensure that TINYINT(1) type data is not converted to MariaDB Bit/Boolean type in the
+ // ResultSet.
+ arguments.putIfAbsent(MARIADB_TINYINT1_IS_BIT, "false");
+ return arguments;
+ }
}
}