Skip to content

Commit 983138a

Browse files
authored
Merge pull request #217 from Sunbird-Lern/yugabyte_migration
Issue #SBCOSS-00: Replace QueryBuilder.batch() with BatchStatement for YugabyteDB compatibility
2 parents 7ea4c2c + c5c2146 commit 983138a

File tree

7 files changed

+49
-35
lines changed

7 files changed

+49
-35
lines changed

lms-jobs/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/ActivityAggregatesFunction.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.lang.reflect.Type
44
import java.util.concurrent.TimeUnit
55
import com.datastax.driver.core.Row
66
import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update}
7+
import com.datastax.driver.core.BatchStatement
78
import com.google.gson.Gson
89
import com.google.gson.reflect.TypeToken
910
import com.twitter.storehaus.cache.TTLCache
@@ -270,18 +271,19 @@ class ActivityAggregatesFunction(config: ActivityAggregateUpdaterConfig, httpUti
270271
}
271272

272273
/**
273-
* Method to update the specific table in a batch format.
274+
* Method to update the specific table using YugabyteDB batch operations.
275+
* Uses BatchStatement and cassandraUtil.update() for proper execution.
274276
*/
275277
def updateDB(batchSize: Int, queriesList: List[Update.Where])(implicit metrics: Metrics): Unit = {
276278
val groupedQueries = queriesList.grouped(batchSize).toList
277279
groupedQueries.foreach(queries => {
278-
val cqlBatch = QueryBuilder.batch()
279-
queries.map(query => cqlBatch.add(query))
280-
val result = cassandraUtil.upsert(cqlBatch.toString)
280+
val batch = new BatchStatement()
281+
queries.foreach(query => batch.add(query))
282+
val result = cassandraUtil.update(batch) // Use update() instead of upsert()
281283
if (result) {
282284
metrics.incCounter(config.dbUpdateCount)
283285
} else {
284-
val msg = "Database update has failed: " + cqlBatch.toString
286+
val msg = "Database update has failed: " + batch.toString
285287
logger.info(msg)
286288
throw new Exception(msg)
287289
}

lms-jobs/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/CollectionProgressCompleteFunction.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package org.sunbird.job.aggregate.functions
33
import java.util.UUID
44

55
import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update}
6+
import com.datastax.driver.core.BatchStatement
67
import com.google.gson.Gson
78
import org.apache.flink.api.common.typeinfo.TypeInformation
89
import org.apache.flink.configuration.Configuration
@@ -103,21 +104,22 @@ class CollectionProgressCompleteFunction(config: ActivityAggregateUpdaterConfig)
103104
}
104105

105106
/**
106-
* Method to update the specific table in a batch format.
107+
* Method to update the specific table using YugabyteDB batch operations.
108+
* Uses BatchStatement and cassandraUtil.update() for proper execution.
107109
*/
108110
def updateDB(batchSize: Int, queriesList: List[Update.Where])(implicit metrics: Metrics): Unit = {
109111
val groupedQueries = queriesList.grouped(batchSize).toList
110112
groupedQueries.foreach(queries => {
111-
val cqlBatch = QueryBuilder.batch()
112-
queries.map(query => cqlBatch.add(query))
113+
val batch = new BatchStatement()
114+
queries.foreach(query => batch.add(query))
113115
logger.info("is cassandra cluster available =>"+(null !=cassandraUtil.session))
114-
val result = cassandraUtil.upsert(cqlBatch.toString)
116+
val result = cassandraUtil.update(batch) // Use update() instead of upsert()
115117
logger.info("result after update => "+result)
116118
if (result) {
117119
metrics.incCounter(config.dbUpdateCount)
118120
metrics.incCounter(config.enrolmentCompleteCount)
119121
} else {
120-
val msg = "Database update has failed" + cqlBatch.toString
122+
val msg = "Database update has failed: " + batch.toString
121123
logger.error(msg)
122124
throw new Exception(msg)
123125
}

lms-jobs/activity-aggregate-updater/src/main/scala/org/sunbird/job/aggregate/functions/CollectionProgressUpdateFunction.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sunbird.job.aggregate.functions
22

33
import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update}
4+
import com.datastax.driver.core.BatchStatement
45
import org.apache.flink.api.common.typeinfo.TypeInformation
56
import org.apache.flink.configuration.Configuration
67
import org.apache.flink.streaming.api.functions.ProcessFunction
@@ -76,18 +77,19 @@ class CollectionProgressUpdateFunction(config: ActivityAggregateUpdaterConfig)(i
7677
}
7778

7879
/**
79-
* Method to update the specific table in a batch format.
80+
* Method to update the specific table using YugabyteDB batch operations.
81+
* Uses BatchStatement and cassandraUtil.update() for proper execution.
8082
*/
8183
def updateDB(batchSize: Int, queriesList: List[Update.Where])(implicit metrics: Metrics): Unit = {
8284
val groupedQueries = queriesList.grouped(batchSize).toList
8385
groupedQueries.foreach(queries => {
84-
val cqlBatch = QueryBuilder.batch()
85-
queries.map(query => cqlBatch.add(query))
86-
val result = cassandraUtil.upsert(cqlBatch.toString)
86+
val batch = new BatchStatement()
87+
queries.foreach(query => batch.add(query))
88+
val result = cassandraUtil.update(batch) // Use update() instead of upsert()
8789
if (result) {
8890
metrics.incCounter(config.dbUpdateCount)
8991
} else {
90-
val msg = "Database update has failed" + cqlBatch.toString
92+
val msg = "Database update has failed: " + batch.toString
9193
logger.error(msg)
9294
throw new Exception(msg)
9395
}

lms-jobs/enrolment-reconciliation/src/main/scala/org/sunbird/job/recounciliation/functions/EnrolmentReconciliationFn.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.sunbird.job.recounciliation.functions
22

33
import com.datastax.driver.core.Row
44
import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update}
5+
import com.datastax.driver.core.BatchStatement
56
import com.google.gson.Gson
67
import com.google.gson.reflect.TypeToken
78
import com.twitter.storehaus.cache.TTLCache
@@ -298,18 +299,19 @@ class EnrolmentReconciliationFn(config: EnrolmentReconciliationConfig, httpUtil
298299
}
299300

300301
/**
301-
* Method to update the specific table in a batch format.
302+
* Method to update the specific table using YugabyteDB batch operations.
303+
* Uses BatchStatement and cassandraUtil.update() for proper execution.
302304
*/
303305
def updateDB(batchSize: Int, queriesList: List[Update.Where])(implicit metrics: Metrics): Unit = {
304306
val groupedQueries = queriesList.grouped(batchSize).toList
305307
groupedQueries.foreach(queries => {
306-
val cqlBatch = QueryBuilder.batch()
307-
queries.map(query => cqlBatch.add(query))
308-
val result = cassandraUtil.upsert(cqlBatch.toString)
308+
val batch = new BatchStatement()
309+
queries.foreach(query => batch.add(query))
310+
val result = cassandraUtil.update(batch) // Use update() instead of upsert()
309311
if (result) {
310312
metrics.incCounter(config.dbUpdateCount)
311313
} else {
312-
val msg = "Database update has failed: " + cqlBatch.toString
314+
val msg = "Database update has failed: " + batch.toString
313315
logger.error(msg)
314316
throw new Exception(msg)
315317
}

lms-jobs/enrolment-reconciliation/src/main/scala/org/sunbird/job/recounciliation/functions/ProgressCompleteFunction.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sunbird.job.recounciliation.functions
22

33
import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update}
4+
import com.datastax.driver.core.BatchStatement
45
import com.google.gson.Gson
56
import org.apache.flink.api.common.typeinfo.TypeInformation
67
import org.apache.flink.configuration.Configuration
@@ -84,19 +85,20 @@ class ProgressCompleteFunction(config: EnrolmentReconciliationConfig)(implicit v
8485
}
8586

8687
/**
87-
* Method to update the specific table in a batch format.
88+
* Method to update the specific table using YugabyteDB batch operations.
89+
* Uses BatchStatement and cassandraUtil.update() for proper execution.
8890
*/
8991
def updateDB(batchSize: Int, queriesList: List[Update.Where])(implicit metrics: Metrics): Unit = {
9092
val groupedQueries = queriesList.grouped(batchSize).toList
9193
groupedQueries.foreach(queries => {
92-
val cqlBatch = QueryBuilder.batch()
93-
queries.map(query => cqlBatch.add(query))
94-
val result = cassandraUtil.upsert(cqlBatch.toString)
94+
val batch = new BatchStatement()
95+
queries.foreach(query => batch.add(query))
96+
val result = cassandraUtil.update(batch) // Use update() instead of upsert()
9597
if (result) {
9698
metrics.incCounter(config.dbUpdateCount)
9799
metrics.incCounter(config.enrolmentCompleteCount)
98100
} else {
99-
val msg = "Database update has failed" + cqlBatch.toString
101+
val msg = "Database update has failed: " + batch.toString
100102
logger.error(msg)
101103
throw new Exception(msg)
102104
}

lms-jobs/enrolment-reconciliation/src/main/scala/org/sunbird/job/recounciliation/functions/ProgressUpdateFunction.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sunbird.job.recounciliation.functions
22

33
import com.datastax.driver.core.querybuilder.{QueryBuilder, Select, Update}
4+
import com.datastax.driver.core.BatchStatement
45
import org.apache.flink.api.common.typeinfo.TypeInformation
56
import org.apache.flink.configuration.Configuration
67
import org.apache.flink.streaming.api.functions.ProcessFunction
@@ -65,18 +66,19 @@ class ProgressUpdateFunction(config: EnrolmentReconciliationConfig)(implicit val
6566
}
6667

6768
/**
68-
* Method to update the specific table in a batch format.
69+
* Method to update the specific table using YugabyteDB batch operations.
70+
* Uses BatchStatement and cassandraUtil.update() for proper execution.
6971
*/
7072
def updateDB(batchSize: Int, queriesList: List[Update.Where])(implicit metrics: Metrics): Unit = {
7173
val groupedQueries = queriesList.grouped(batchSize).toList
7274
groupedQueries.foreach(queries => {
73-
val cqlBatch = QueryBuilder.batch()
74-
queries.map(query => cqlBatch.add(query))
75-
val result = cassandraUtil.upsert(cqlBatch.toString)
75+
val batch = new BatchStatement()
76+
queries.foreach(query => batch.add(query))
77+
val result = cassandraUtil.update(batch) // Use update() instead of upsert()
7678
if (result) {
7779
metrics.incCounter(config.dbUpdateCount)
7880
} else {
79-
val msg = "Database update has failed" + cqlBatch.toString
81+
val msg = "Database update has failed: " + batch.toString
8082
logger.error(msg)
8183
throw new Exception(msg)
8284
}

user-org-jobs/user-ownership-transfer/src/main/scala/org/sunbird/job/ownershiptransfer/functions/UserOwnershipTransferFunction.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sunbird.job.ownershiptransfer.functions
22

33
import com.datastax.driver.core.querybuilder.{QueryBuilder, Update}
4+
import com.datastax.driver.core.BatchStatement
45
import org.apache.commons.lang3.StringUtils
56
import org.apache.flink.api.common.typeinfo.TypeInformation
67
import org.apache.flink.configuration.Configuration
@@ -130,19 +131,20 @@ class UserOwnershipTransferFunction(config: UserOwnershipTransferConfig, httpUti
130131

131132

132133
/**
133-
* Method to update the specific table in a batch format.
134+
* Method to update the specific table using YugabyteDB batch operations.
135+
* Uses BatchStatement and cassandraUtil.update() for proper execution.
134136
*/
135137
def updateDB(batchSize: Int, queriesList: List[Update.Where])(implicit metrics: Metrics): Unit = {
136138
val groupedQueries = queriesList.grouped(batchSize).toList
137139
groupedQueries.foreach(queries => {
138-
val cqlBatch = QueryBuilder.batch()
139-
queries.map(query => cqlBatch.add(query))
140-
val result = cassandraUtil.upsert(cqlBatch.toString)
140+
val batch = new BatchStatement()
141+
queries.foreach(query => batch.add(query))
142+
val result = cassandraUtil.update(batch) // Use update() instead of upsert()
141143
if (result) {
142144
metrics.incCounter(config.dbUpdateCount)
143145
logger.info("DB update successful")
144146
} else {
145-
val msg = "Database update has failed: " + cqlBatch.toString
147+
val msg = "Database update has failed: " + batch.toString
146148
logger.info(msg)
147149
throw new Exception(msg)
148150
}

0 commit comments

Comments
 (0)