Skip to content

Commit b9444d0

Browse files
committed
PLUGIN-1893 : Adding flag via field for backward compatibility of timestamp
1 parent 8ffb8d7 commit b9444d0

File tree

7 files changed

+66
-15
lines changed

7 files changed

+66
-15
lines changed

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ protected DBConnectorPath getDBConnectorPath(String path) {
112112

113113
@Override
114114
protected SchemaReader getSchemaReader(String sessionID) {
115-
return new OracleSourceSchemaReader(sessionID);
115+
return new OracleSourceSchemaReader(sessionID, config.getTreatAsOldTimestampConn());
116116
}
117117

118118
@Override

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import io.cdap.plugin.db.TransactionIsolationLevel;
2323
import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig;
2424

25-
import java.util.HashMap;
26-
import java.util.Map;
2725
import java.util.Properties;
2826
import javax.annotation.Nullable;
2927

@@ -43,12 +41,12 @@ public OracleConnectorConfig(String host, int port, String user, String password
4341

4442
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
4543
String connectionArguments, String connectionType, String database) {
46-
this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null);
44+
this(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, null, null, null);
4745
}
4846

4947
public OracleConnectorConfig(String host, int port, String user, String password, String jdbcPluginName,
5048
String connectionArguments, String connectionType, String database,
51-
String role, Boolean useSSL) {
49+
String role, Boolean useSSL, String treatAsOldTimestampConn) {
5250

5351
this.host = host;
5452
this.port = port;
@@ -60,6 +58,7 @@ public OracleConnectorConfig(String host, int port, String user, String password
6058
this.database = database;
6159
this.role = role;
6260
this.useSSL = useSSL;
61+
this.treatAsOldTimestampConn = treatAsOldTimestampConn;
6362
}
6463

6564
@Override
@@ -86,6 +85,11 @@ public String getConnectionString() {
8685
@Nullable
8786
public Boolean useSSL;
8887

88+
@Name(OracleConstants.TREAT_AS_OLD_TIMESTAMP_CONN)
89+
@Description("A hidden field to handle timestamp as CDAP's timestamp micros or string as per old behavior.")
90+
@Nullable
91+
public String treatAsOldTimestampConn;
92+
8993
@Override
9094
protected int getDefaultPort() {
9195
return 1521;
@@ -108,6 +112,10 @@ public Boolean getSSlMode() {
108112
return useSSL != null && useSSL;
109113
}
110114

115+
public Boolean getTreatAsOldTimestampConn() {
116+
return Boolean.parseBoolean(treatAsOldTimestampConn);
117+
}
118+
111119
@Override
112120
public Properties getConnectionArgumentsProperties() {
113121
Properties prop = super.getConnectionArgumentsProperties();

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ private OracleConstants() {
4343
public static final String TNS_CONNECTION_TYPE = "tns";
4444
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
4545
public static final String USE_SSL = "useSSL";
46+
public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp";
47+
public static final String TREAT_AS_OLD_TIMESTAMP_CONN = "treatAsOldTimestampConn";
4648

4749
/**
4850
* Constructs the Oracle connection string based on the provided connection type, host, port, and database.

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,18 @@ protected String createConnectionString() {
6363

6464
@Override
6565
protected SchemaReader getSchemaReader() {
66-
return new OracleSourceSchemaReader();
66+
//PLUGIN-1893 : Based on field/properties from Oracle source and Oracle connection we will pass the flag to control
67+
//handle schema to make it backward compatible.
68+
boolean treatAsOldTimestamp = oracleSourceConfig.getConnection().getTreatAsOldTimestampConn();
69+
70+
// If a user has modified the Oracle Source's properties, then it will take precedence.
71+
// In case of default value (which is false), the Conn's value should be considered.
72+
if (sourceConfig.getTreatAsOldTimestamp().equalsIgnoreCase("true") ||
73+
sourceConfig.getTreatAsOldTimestamp().equalsIgnoreCase("false")) {
74+
treatAsOldTimestamp = Boolean.parseBoolean(sourceConfig.getTreatAsOldTimestamp());
75+
}
76+
77+
return new OracleSourceSchemaReader(null, treatAsOldTimestamp);
6778
}
6879

6980
@Override
@@ -123,20 +134,27 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
123134
@Nullable
124135
private Integer defaultRowPrefetch;
125136

137+
@Name(OracleConstants.TREAT_AS_OLD_TIMESTAMP)
138+
@Description("A hidden field to handle timestamp as CDAP's timestamp micros or string as per old behavior.")
139+
@Nullable
140+
private String treatAsOldTimestamp;
141+
126142
public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName,
127143
String connectionArguments, String connectionType, String database, String role,
128144
int defaultBatchValue, int defaultRowPrefetch,
129145
String importQuery, Integer numSplits, int fetchSize,
130-
String boundingQuery, String splitBy, Boolean useSSL) {
146+
String boundingQuery, String splitBy, Boolean useSSL, String treatAsOldTimestampConn,
147+
String treatAsOldTimestamp) {
131148
this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments,
132-
connectionType, database, role, useSSL);
149+
connectionType, database, role, useSSL, treatAsOldTimestampConn);
133150
this.defaultBatchValue = defaultBatchValue;
134151
this.defaultRowPrefetch = defaultRowPrefetch;
135152
this.fetchSize = fetchSize;
136153
this.importQuery = importQuery;
137154
this.numSplits = numSplits;
138155
this.boundingQuery = boundingQuery;
139156
this.splitBy = splitBy;
157+
this.treatAsOldTimestamp = treatAsOldTimestamp;
140158
}
141159

142160
@Override
@@ -163,6 +181,10 @@ public OracleConnectorConfig getConnection() {
163181
return connection;
164182
}
165183

184+
public String getTreatAsOldTimestamp() {
185+
return treatAsOldTimestamp == null ? "" : treatAsOldTimestamp;
186+
}
187+
166188
@Override
167189
public void validate(FailureCollector collector) {
168190
ConfigUtil.validateConnection(this, useConnection, connection, collector);

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
6565
);
6666

6767
private final String sessionID;
68+
private final Boolean isTimestampOldBehavior;
6869

6970
public OracleSourceSchemaReader() {
70-
this(null);
71+
this(null, false);
7172
}
72-
73-
public OracleSourceSchemaReader(String sessionID) {
74-
super();
73+
public OracleSourceSchemaReader(String sessionID, boolean isTimestampOldBehavior) {
7574
this.sessionID = sessionID;
75+
this.isTimestampOldBehavior = isTimestampOldBehavior;
7676
}
7777

7878
@Override
@@ -81,10 +81,12 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
8181

8282
switch (sqlType) {
8383
case TIMESTAMP_TZ:
84-
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
85-
case Types.TIMESTAMP:
84+
return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
8685
case TIMESTAMP_LTZ:
87-
return Schema.of(Schema.LogicalType.DATETIME);
86+
return isTimestampOldBehavior ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)
87+
: Schema.of(Schema.LogicalType.DATETIME);
88+
case Types.TIMESTAMP:
89+
return isTimestampOldBehavior ? super.getSchema(metadata, index) : Schema.of(Schema.LogicalType.DATETIME);
8890
case BINARY_FLOAT:
8991
return Schema.of(Schema.Type.FLOAT);
9092
case BINARY_DOUBLE:

oracle-plugin/widgets/Oracle-batchsource.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,15 @@
246246
"default": "40",
247247
"min": "1"
248248
}
249+
},
250+
{
251+
"widget-type": "hidden",
252+
"label": "Treat as old timestamp.",
253+
"name": "treatAsOldTimestamp",
254+
"description": "If edited, then this will take precedence over other properties and the edited value shall be true or false only",
255+
"widget-attributes": {
256+
"default": "default-no"
257+
}
249258
}
250259
]
251260
}

oracle-plugin/widgets/Oracle-connector.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@
129129
}
130130
]
131131
}
132+
},
133+
{
134+
"widget-type": "hidden",
135+
"label": "Treat as old timestamp",
136+
"name": "treatAsOldTimestampConn",
137+
"widget-attributes": {
138+
"default": "false"
139+
}
132140
}
133141
]
134142
},

0 commit comments

Comments
 (0)