@@ -37,7 +37,6 @@ public enum CQL {
37
37
38
38
private boolean isJobMigrateRowsFromFile = false ;
39
39
private final PropertyHelper propertyHelper ;
40
- private boolean isInitialized = false ;
41
40
42
41
////////////////////////////////////////////////
43
42
@@ -46,9 +45,6 @@ public CqlHelper() {
46
45
}
47
46
48
47
public void initialize () {
49
- if (null == originSession || originSession .isClosed () || null == targetSession || targetSession .isClosed ()) {
50
- throw new RuntimeException ("Origin and/or Target sessions are either not set, or are cloasd" );
51
- }
52
48
53
49
readConsistencyLevel = Util .mapToConsistencyLevel (propertyHelper .getString (KnownProperties .READ_CL ));
54
50
writeConsistencyLevel = Util .mapToConsistencyLevel (propertyHelper .getString (KnownProperties .WRITE_CL ));;
@@ -81,8 +77,6 @@ public void initialize() {
81
77
logger .info ("PARAM -- ORIGIN SELECT Query used: {}" , cqlMap .get (CQL .ORIGIN_SELECT ));
82
78
logger .info ("PARAM -- TARGET INSERT Query used: {}" , cqlMap .get (CQL .TARGET_INSERT ));
83
79
logger .info ("PARAM -- TARGET SELECT Query used: {}" , cqlMap .get (CQL .TARGET_SELECT_ORIGIN_BY_PK ));
84
-
85
- isInitialized = true ;
86
80
}
87
81
88
82
public BoundStatement bindInsert (PreparedStatement insertStatement , Row originRow , Row targetRow ) {
@@ -139,31 +133,31 @@ private String cqlOriginSelect() {
139
133
final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
140
134
if (null != getTtlCols ()) {
141
135
getTtlCols ().forEach (col -> {
142
- selectTTLWriteTimeCols .append (",ttl (" + getOriginColumnNames ().get (col ) + ")" );
136
+ selectTTLWriteTimeCols .append (",TTL (" + getOriginColumnNames ().get (col ) + ")" );
143
137
});
144
138
}
145
139
if (null != getWriteTimeStampCols ()) {
146
140
getWriteTimeStampCols ().forEach (col -> {
147
- selectTTLWriteTimeCols .append (",writetime (" + getOriginColumnNames ().get (col ) + ")" );
141
+ selectTTLWriteTimeCols .append (",WRITETIME (" + getOriginColumnNames ().get (col ) + ")" );
148
142
});
149
143
}
150
144
151
145
String fullSelectQuery ;
152
146
if (!isJobMigrateRowsFromFile ) {
153
147
String partitionKey = propertyHelper .getAsString (KnownProperties .ORIGIN_PARTITION_KEY ).trim ();
154
- fullSelectQuery = "select " + propertyHelper .getAsString (KnownProperties .ORIGIN_COLUMN_NAMES ) + selectTTLWriteTimeCols + " from " + getOriginKeyspaceTable () +
155
- " where token (" + partitionKey + ") >= ? and token (" + partitionKey + ") <= ? " +
148
+ fullSelectQuery = "SELECT " + propertyHelper .getAsString (KnownProperties .ORIGIN_COLUMN_NAMES ) + selectTTLWriteTimeCols + " FROM " + getOriginKeyspaceTable () +
149
+ " WHERE TOKEN (" + partitionKey + ") >= ? AND TOKEN (" + partitionKey + ") <= ? " +
156
150
getOriginFilterCondition () + " ALLOW FILTERING" ;
157
151
} else {
158
152
String keyBinds = "" ;
159
153
for (String key : propertyHelper .getStringList (KnownProperties .TARGET_PRIMARY_KEY )) {
160
154
if (keyBinds .isEmpty ()) {
161
- keyBinds = key + "= ?" ;
155
+ keyBinds = key + "=?" ;
162
156
} else {
163
- keyBinds += " and " + key + "= ?" ;
157
+ keyBinds += " AND " + key + "=?" ;
164
158
}
165
159
}
166
- fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + getOriginKeyspaceTable () + " where " + keyBinds ;
160
+ fullSelectQuery = "SELECT " + selectCols + selectTTLWriteTimeCols + " FROM " + getOriginKeyspaceTable () + " WHERE " + keyBinds ;
167
161
}
168
162
return fullSelectQuery ;
169
163
}
@@ -178,10 +172,10 @@ private String cqlTargetInsert() {
178
172
if (insertBinds .isEmpty ()) {
179
173
insertBinds = "?" ;
180
174
} else {
181
- insertBinds += ", ?" ;
175
+ insertBinds += ",?" ;
182
176
}
183
177
}
184
- targetInsertQuery = "insert into " + getTargetKeyspaceTable () + " (" + propertyHelper .getAsString (KnownProperties .TARGET_COLUMN_NAMES ) + ") VALUES (" + insertBinds + ")" ;
178
+ targetInsertQuery = "INSERT INTO " + getTargetKeyspaceTable () + " (" + propertyHelper .getAsString (KnownProperties .TARGET_COLUMN_NAMES ) + ") VALUES (" + insertBinds + ")" ;
185
179
if (null != getTtlCols () && !getTtlCols ().isEmpty ()) {
186
180
targetInsertQuery += " USING TTL ?" ;
187
181
if (null != getWriteTimeStampCols () && !getWriteTimeStampCols ().isEmpty ()) {
@@ -198,13 +192,13 @@ private String cqlTargetSelectOriginByPK() {
198
192
String keyBinds = "" ;
199
193
for (String key : propertyHelper .getStringList (KnownProperties .TARGET_PRIMARY_KEY )) {
200
194
if (keyBinds .isEmpty ()) {
201
- keyBinds = key + "= ?" ;
195
+ keyBinds = key + "=?" ;
202
196
} else {
203
- keyBinds += " and " + key + "= ?" ;
197
+ keyBinds += " AND " + key + "=?" ;
204
198
}
205
199
}
206
200
207
- return "select " + propertyHelper .getAsString (KnownProperties .ORIGIN_COLUMN_NAMES ) + " from " + getTargetKeyspaceTable () + " where " + keyBinds ;
201
+ return "SELECT " + propertyHelper .getAsString (KnownProperties .ORIGIN_COLUMN_NAMES ) + " FROM " + getTargetKeyspaceTable () + " WHERE " + keyBinds ;
208
202
}
209
203
210
204
public long getLargestWriteTimeStamp (Row row ) {
@@ -313,9 +307,6 @@ public void setTargetSession(CqlSession targetSession) {
313
307
314
308
// Getters
315
309
public String getCql (CQL cql ) {
316
- if (!isInitialized )
317
- throw new RuntimeException ("CqlHelper not initialized. Call initialize() first." );
318
-
319
310
if (!cqlMap .containsKey (cql )) {
320
311
switch (cql ) {
321
312
case ORIGIN_SELECT :
@@ -333,8 +324,7 @@ public String getCql(CQL cql) {
333
324
}
334
325
335
326
public PreparedStatement getPreparedStatement (CQL cql ) {
336
- if (!isInitialized )
337
- throw new RuntimeException ("CqlHelper not initialized. Call initialize() first." );
327
+ abendIfSessionsNotSet ();
338
328
339
329
if (!preparedStatementMap .containsKey (cql )) {
340
330
switch (cql ) {
@@ -351,10 +341,12 @@ public PreparedStatement getPreparedStatement(CQL cql) {
351
341
}
352
342
353
343
public CqlSession getOriginSession () {
344
+ abendIfSessionsNotSet ();
354
345
return originSession ;
355
346
}
356
347
357
348
public CqlSession getTargetSession () {
349
+ abendIfSessionsNotSet ();
358
350
return targetSession ;
359
351
}
360
352
@@ -450,4 +442,10 @@ private String getTargetKeyspaceTable() {
450
442
private String getOriginFilterCondition () {
451
443
return propertyHelper .getString (KnownProperties .ORIGIN_FILTER_CONDITION );
452
444
}
445
+
446
+ private void abendIfSessionsNotSet () {
447
+ if (null == originSession || originSession .isClosed () || null == targetSession || targetSession .isClosed ()) {
448
+ throw new RuntimeException ("Origin and/or Target sessions are either not set, or are closed" );
449
+ }
450
+ }
453
451
}
0 commit comments