diff --git a/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/domain/DeviceProfile.scala b/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/domain/DeviceProfile.scala index 19028dc344..75ddcbb041 100644 --- a/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/domain/DeviceProfile.scala +++ b/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/domain/DeviceProfile.scala @@ -31,6 +31,7 @@ class DeviceProfile() { private var apiLastUpdatedOn: Long = 0L private val gson = new Gson private val `type` = new TypeToken[util.HashMap[String, String]]() {}.getType + private var userSelectedRole: String = _ this.countryCode = "" this.country = "" @@ -47,6 +48,7 @@ class DeviceProfile() { this.firstAccess = 0L this.userDeclaredOn = 0L this.apiLastUpdatedOn = 0L + this.userSelectedRole = "" def toMap(config: DeviceProfileUpdaterConfig): util.Map[String, String] = { val values = new util.HashMap[String, String] @@ -65,6 +67,7 @@ class DeviceProfile() { values.put(config.firstAccess, DeviceProfile.getValueOrDefault(String.valueOf(this.firstAccess), "")) values.put(config.userDeclaredOn, DeviceProfile.getValueOrDefault(String.valueOf(this.userDeclaredOn), "")) values.put(config.apiLastUpdatedOn, DeviceProfile.getValueOrDefault(String.valueOf(this.apiLastUpdatedOn), "")) + values.put(config.userSelectedRole, DeviceProfile.getValueOrDefault(this.userSelectedRole, "")) values } @@ -84,6 +87,7 @@ class DeviceProfile() { this.firstAccess = map.getOrDefault("first_access", "0").asInstanceOf[Number].longValue() this.userDeclaredOn = map.getOrDefault(config.apiLastUpdatedOn, "0").asInstanceOf[Number].longValue() this.apiLastUpdatedOn = map.getOrDefault(config.apiLastUpdatedOn, "0").asInstanceOf[Number].longValue() + this.userSelectedRole = map.getOrDefault(config.userSelectedRole, "") this } } diff --git a/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/functions/DeviceProfileUpdaterFunction.scala b/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/functions/DeviceProfileUpdaterFunction.scala index 9a3bcea499..2ca6e6e03d 100644 --- a/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/functions/DeviceProfileUpdaterFunction.scala +++ b/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/functions/DeviceProfileUpdaterFunction.scala @@ -100,6 +100,13 @@ class DeviceProfileUpdaterFunction(config: DeviceProfileUpdaterConfig, */ val postgresQuery = String.format("INSERT INTO %s (api_last_updated_on,updated_date,%s) VALUES(?,?,%s?) ON CONFLICT(device_id) DO UPDATE SET (api_last_updated_on,updated_date,%s)=(?,?,%s?);", config.postgresTable, columns, values, columns, values) + logger.debug("postgresQuery:::::::::::::::::::" + postgresQuery); + logger.info("postgresQuery:::::::::::::::::::" + postgresQuery); + logger.debug("columns:::::::::::::::::::" + columns); + logger.info("columns:::::::::::::::::::" + columns); + logger.debug("values:::::::::::::::::::" + values); + logger.info("values:::::::::::::::::::" + values); + val preparedStatement = postgresConnect.getConnection.prepareStatement(postgresQuery) preparedStatement.setTimestamp(1, new Timestamp(lastUpdatedDate)) // Adding api_last_updated_on as timestamp to index 1 of preparestatement diff --git a/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/task/DeviceProfileUpdaterConfig.scala b/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/task/DeviceProfileUpdaterConfig.scala index df84b5aeb6..6759419fbd 100644 --- a/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/task/DeviceProfileUpdaterConfig.scala +++ b/data-pipeline-flink/device-profile-updater/src/main/scala/org/sunbird/dp/deviceprofile/task/DeviceProfileUpdaterConfig.scala @@ -54,9 +54,10 @@ class DeviceProfileUpdaterConfig(override val config: Config) extends BaseJobCon val userDeclaredOn = "user_declared_on" val apiLastUpdatedOn = "api_last_updated_on" val deviceId = "device_id" + val userSelectedRole = "user_selected_role" val fields = List(countryCode, country, stateCode, state, city, districtCustom, stateCustom, stateCustomCode, - userDeclaredDistrict, uaSpec, deviceSpec, firstAccess, userDeclaredOn, apiLastUpdatedOn + userDeclaredDistrict, uaSpec, deviceSpec, firstAccess, userDeclaredOn, apiLastUpdatedOn, userSelectedRole ) // Consumers