diff --git a/.gitignore b/.gitignore
index 4ac6b047c..6be7d0a05 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,3 +19,4 @@ RUNNING_PID
/bin/
/logs
**.vscodedependency-reduced-pom.xml
+.DS_Store
diff --git a/activity-aggregator/.gitignore b/activity-aggregator/.gitignore
new file mode 100644
index 000000000..ac008a51a
--- /dev/null
+++ b/activity-aggregator/.gitignore
@@ -0,0 +1,6 @@
+target/
+*.class
+*.log
+.idea/
+*.iml
+.DS_Store
diff --git a/activity-aggregator/pom.xml b/activity-aggregator/pom.xml
new file mode 100644
index 000000000..0a0823d57
--- /dev/null
+++ b/activity-aggregator/pom.xml
@@ -0,0 +1,161 @@
+
+
+ 4.0.0
+
+
+ org.sunbird
+ sunbird-lms-service
+ 1.0-SNAPSHOT
+ ../pom.xml
+
+
+ org.sunbird
+ activity-aggregator
+ 1.0-SNAPSHOT
+ jar
+
+ Activity Aggregator
+ Activity aggregation module for content consumption tracking
+
+
+ 11
+ 11
+ 2.13
+ 2.13.12
+ 1.0.3
+ UTF-8
+
+
+
+
+ org.scala-lang
+ scala-library
+ ${scala.maj.version}
+
+
+ org.apache.pekko
+ pekko-actor_${scala.version}
+ ${pekko.version}
+
+
+ org.sunbird
+ course-actors-common
+ 1.0-SNAPSHOT
+
+
+ org.sunbird
+ actor-core
+ 1.0-SNAPSHOT
+
+
+ org.sunbird
+ common-util
+ 0.0.1-SNAPSHOT
+
+
+ org.sunbird
+ cache-utils
+ 0.0.1-SNAPSHOT
+
+
+ org.sunbird
+ enrolment-actor
+ 1.0-SNAPSHOT
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.13.5
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+ org.apache.commons
+ commons-collections4
+ 4.4
+
+
+ org.scalatest
+ scalatest_${scala.version}
+ 3.2.15
+ test
+
+
+ org.scalamock
+ scalamock_${scala.version}
+ 5.2.0
+ test
+
+
+ org.apache.pekko
+ pekko-testkit_${scala.version}
+ ${pekko.version}
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.4.0
+
+ ${scala.maj.version}
+ false
+
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 2.0.0
+
+
+ test
+ test
+
+ test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.2.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+
diff --git a/activity-aggregator/src/main/scala/org/sunbird/activity/actor/ActivityAggregatorActor.scala b/activity-aggregator/src/main/scala/org/sunbird/activity/actor/ActivityAggregatorActor.scala
new file mode 100644
index 000000000..e5361f2b3
--- /dev/null
+++ b/activity-aggregator/src/main/scala/org/sunbird/activity/actor/ActivityAggregatorActor.scala
@@ -0,0 +1,452 @@
+package org.sunbird.activity.actor
+
+import com.google.gson.Gson
+import org.apache.pekko.actor.Props
+import org.apache.commons.lang3.StringUtils
+import org.sunbird.activity.domain.{CollectionProgress, ContentStatus, TelemetryEvent, UserContentConsumption, UserEnrolmentAgg}
+import org.sunbird.activity.util.{ActivityAggregateUtil, CertificateUtil, ContentSearchUtil, DeDupUtil, RedisUtil}
+import org.sunbird.cache.util.RedisCacheUtil
+import org.sunbird.cassandra.CassandraOperation
+import org.sunbird.common.exception.ProjectCommonException
+import org.sunbird.common.models.util.{JsonKey, LoggerUtil, ProjectUtil}
+import org.sunbird.common.request.{Request, RequestContext}
+import org.sunbird.common.responsecode.ResponseCode
+import org.sunbird.enrolments.BaseEnrolmentActor
+import org.sunbird.helper.ServiceFactory
+import org.sunbird.kafka.client.KafkaClient
+import org.sunbird.learner.util.Util
+
+import java.util
+import javax.inject.Inject
+import scala.collection.JavaConverters._
+
+class ActivityAggregatorActor @Inject()(implicit val cacheUtil: RedisCacheUtil) extends BaseEnrolmentActor {
+
+ private val cassandraOperation: CassandraOperation = ServiceFactory.getInstance
+ private val enrolmentDBInfo = Util.dbInfoMap.get(JsonKey.LEARNER_COURSE_DB)
+ private val activityAggDBInfo = Util.dbInfoMap.get(JsonKey.GROUP_ACTIVITY_DB)
+ private val consumptionDBInfo = Util.dbInfoMap.get(JsonKey.LEARNER_CONTENT_DB)
+
+ private val activityAggUtil = new ActivityAggregateUtil()
+ private val redisUtil = RedisUtil()
+ private val certificateUtil = CertificateUtil()
+ private val contentSearchUtil = ContentSearchUtil()
+ private val deDupUtil = DeDupUtil()
+ private val gson = new Gson()
+
+ private val auditEventTopic = ProjectUtil.getConfigValue("kafka_topics_audit_event") match {
+ case value if value != null => value
+ case _ => "dev.telemetry.raw"
+ }
+
+ private val moduleAggEnabled = ProjectUtil.getConfigValue("enable_module_aggregation") match {
+ case value if value != null => value.toBoolean
+ case _ => true
+ }
+
+ private val filterCompletedEnrolments = ProjectUtil.getConfigValue("filter_processed_enrolments") match {
+ case value if value != null => value.toBoolean
+ case _ => true
+ }
+
+ private val dedupEnabled = ProjectUtil.getConfigValue("activity_input_dedup_enabled") match {
+ case value if value != null => value.toBoolean
+ case _ => false
+ }
+
+ override def onReceive(request: Request): Unit = {
+ request.getOperation match {
+ case "updateActivityAggregates" => updateActivityAggregates(request)
+ case _ => onReceiveUnsupportedOperation(request.getOperation)
+ }
+ }
+
+ private def updateActivityAggregates(request: Request): Unit = {
+ val requestContext = request.getRequestContext
+ val userId = request.get(JsonKey.USER_ID).asInstanceOf[String]
+ val batchId = request.get(JsonKey.BATCH_ID).asInstanceOf[String]
+ val courseId = request.get(JsonKey.COURSE_ID).asInstanceOf[String]
+ val contentsRaw = request.get(JsonKey.CONTENTS)
+ val contents = if (contentsRaw != null) contentsRaw.asInstanceOf[util.List[util.Map[String, AnyRef]]] else null
+
+ try {
+ processActivityAggregates(userId, batchId, courseId, contents, requestContext)
+ sender().tell(successResponse(), self)
+ } catch {
+ case ex: Exception =>
+ logger.error(requestContext, s"ActivityAggregatorActor failed for userId: $userId, courseId: $courseId", ex)
+ ProjectCommonException.throwServerErrorException(ResponseCode.SERVER_ERROR, ex.getMessage)
+ }
+ }
+
+ private def processActivityAggregates(
+ userId: String,
+ batchId: String,
+ courseId: String,
+ contents: util.List[util.Map[String, AnyRef]],
+ requestContext: RequestContext
+ ): Unit = {
+ logger.info(requestContext, s"ActivityAggregatorActor: START processActivityAggregates - userId: $userId, courseId: $courseId, batchId: $batchId")
+
+ var dbUserConsumption: UserContentConsumption = null
+
+ val uniqueContents = if (contents != null && !contents.isEmpty) {
+ val filteredContents = filterValidContents(contents)
+ if (filteredContents.isEmpty) {
+ logger.info(requestContext, s"No valid contents to process for userId: $userId, courseId: $courseId")
+ return
+ }
+
+ val unique = deduplicateContents(filteredContents, userId, batchId, courseId, requestContext)
+ if (unique.isEmpty) {
+ logger.info(requestContext, s"No unique contents after deduplication for userId: $userId, courseId: $courseId")
+ return
+ }
+ unique
+ } else if (contents == null) {
+ logger.info(requestContext, s"Contents key missing, fetching from DB (Force Sync) for userId: $userId, courseId: $courseId")
+ dbUserConsumption = getContentStatusFromDB(userId, courseId, batchId, requestContext)
+ if (dbUserConsumption.contents.isEmpty) {
+ logger.info(requestContext, s"No existing consumption in DB to sync for userId: $userId, courseId: $courseId")
+ return
+ }
+ dbUserConsumption.contents.values.map(c => {
+ val m = new java.util.HashMap[String, AnyRef]()
+ m.put(JsonKey.CONTENT_ID, c.contentId)
+ m.put(JsonKey.STATUS, c.status.asInstanceOf[AnyRef])
+ m.put(JsonKey.LAST_ACCESS_TIME, c.lastAccessTime)
+ m.put(JsonKey.COMPLETED_COUNT, c.completedCount.asInstanceOf[AnyRef])
+ m.put(JsonKey.VIEW_COUNT, c.viewCount.asInstanceOf[AnyRef])
+ m.put(JsonKey.PROGRESS, c.progress.asInstanceOf[AnyRef])
+ if (c.lastUpdatedTime != null) m.put(JsonKey.LAST_UPDATED_TIME, c.lastUpdatedTime)
+ if (c.lastCompletedTime != null) m.put(JsonKey.LAST_COMPLETED_TIME, c.lastCompletedTime)
+ m
+ }).toList
+ } else {
+ logger.info(requestContext, s"Received empty contents list. No processing required.")
+ return
+ }
+
+ val contentStatusMap = activityAggUtil.getContentStatusFromContents(uniqueContents.asJava)
+ val inputUserConsumption = UserContentConsumption(userId, batchId, courseId, contentStatusMap)
+
+ if (dbUserConsumption == null) {
+ dbUserConsumption = getContentStatusFromDB(userId, courseId, batchId, requestContext)
+ }
+ val finalUserConsumption = activityAggUtil.mergeConsumptionData(inputUserConsumption, dbUserConsumption)
+
+ updateContentConsumption(finalUserConsumption, requestContext)
+
+ // Fetch leaf nodes and optional nodes from Redis
+ val leafNodes = redisUtil.getLeafNodes(courseId, courseId, requestContext)
+ val optionalNodes = redisUtil.getOptionalNodes(courseId, courseId, requestContext)
+
+ if (leafNodes.nonEmpty) {
+ val courseAggregations = computeCourseAggregations(finalUserConsumption, courseId, leafNodes, optionalNodes, requestContext)
+ updateActivityAggregates(courseAggregations, requestContext)
+
+ val collectionProgressList = courseAggregations.filter(agg => agg.collectionProgress.nonEmpty).map(agg => agg.collectionProgress.get)
+ val latestRead = activityAggUtil.getLatestReadDetails(userId, batchId, courseId, uniqueContents)
+ updateCollectionProgress(collectionProgressList, leafNodes, optionalNodes, latestRead, requestContext)
+
+ if (dedupEnabled) {
+ uniqueContents.foreach { content =>
+ val contentId = Option(content.get(JsonKey.CONTENT_ID)).getOrElse(content.get("contentid")).asInstanceOf[String]
+ val status = content.get("status").asInstanceOf[Number].intValue()
+ val checksum = deDupUtil.getMessageId(courseId, batchId, userId, contentId, status)
+ deDupUtil.storeChecksum(checksum, requestContext)
+ }
+ }
+
+ publishContentAuditEvents(finalUserConsumption, requestContext)
+ logger.info(requestContext, s"ActivityAggregatorActor: Successfully processed all aggregates for userId: $userId, courseId: $courseId")
+ } else {
+ handleMissingLeafNodes(courseId, requestContext)
+ }
+ }
+
+ private def filterValidContents(contents: util.List[util.Map[String, AnyRef]]): List[util.Map[String, AnyRef]] = {
+ val filtered = contents.asScala.filter(c => {
+ val contentId = Option(c.get(JsonKey.CONTENT_ID)).getOrElse(c.get("contentid")).asInstanceOf[String]
+ val status = c.getOrDefault("status", 0.asInstanceOf[AnyRef]).asInstanceOf[Number].intValue()
+ StringUtils.isNotBlank(contentId) && status > 0
+ }).toList
+ if (contents.size() != filtered.size) {
+ logger.info(null, s"filterValidContents: Filtered ${contents.size()} -> ${filtered.size} valid contents")
+ }
+ filtered
+ }
+
+ private def deduplicateContents(
+ contents: List[util.Map[String, AnyRef]],
+ userId: String,
+ batchId: String,
+ courseId: String,
+ requestContext: RequestContext
+ ): List[util.Map[String, AnyRef]] = {
+ if (!dedupEnabled) {
+ logger.info(requestContext, s"deduplicateContents: Deduplication disabled, returning all ${contents.size} contents")
+ return contents
+ }
+
+ logger.info(requestContext, s"deduplicateContents: Checking ${contents.size} contents for duplicates")
+ val unique = contents.filter(content => {
+ val contentId = Option(content.get(JsonKey.CONTENT_ID)).getOrElse(content.get("contentid")).asInstanceOf[String]
+ val status = content.get("status").asInstanceOf[Number].intValue()
+ val checksum = deDupUtil.getMessageId(courseId, batchId, userId, contentId, status)
+ deDupUtil.isUniqueEvent(checksum, requestContext)
+ })
+ logger.info(requestContext, s"deduplicateContents: Found ${unique.size} unique contents out of ${contents.size}")
+ unique
+ }
+
+ private def updateContentConsumption(userConsumption: UserContentConsumption, requestContext: RequestContext): Unit = {
+ logger.info(requestContext, s"updateContentConsumption: Creating batch update queries for ${userConsumption.contents.size} records")
+ val queries: java.util.List[java.util.Map[String, java.util.Map[String, Object]]] =
+ new java.util.ArrayList[java.util.Map[String, java.util.Map[String, Object]]]()
+
+ userConsumption.contents.foreach { case (contentId, content) =>
+ val updateMaps = activityAggUtil.createContentConsumptionUpdateMap(
+ userConsumption.userId,
+ userConsumption.courseId,
+ userConsumption.batchId,
+ content
+ )
+ val queryMap: java.util.Map[String, java.util.Map[String, Object]] = new util.HashMap[String, java.util.Map[String, Object]]()
+ queryMap.put(JsonKey.PRIMARY_KEY, updateMaps._1.asInstanceOf[java.util.Map[String, Object]])
+ queryMap.put(JsonKey.NON_PRIMARY_KEY, updateMaps._2.asInstanceOf[java.util.Map[String, Object]])
+ queries.add(queryMap)
+ }
+
+ if (!queries.isEmpty) {
+ logger.info(requestContext, s"updateContentConsumption: Executing batch update with ${queries.size()} queries")
+ cassandraOperation.batchUpdate(consumptionDBInfo.getKeySpace, "user_content_consumption", queries, requestContext)
+ logger.info(requestContext, s"updateContentConsumption: Batch update completed successfully")
+ } else {
+ logger.warn(requestContext, s"updateContentConsumption: No queries to execute")
+ }
+ }
+
+ private def computeCourseAggregations(
+ userConsumption: UserContentConsumption,
+ courseId: String,
+ leafNodes: List[String],
+ optionalNodes: List[String],
+ requestContext: RequestContext
+ ): List[UserEnrolmentAgg] = {
+ logger.info(requestContext, s"computeCourseAggregations: Computing course-level aggregation for courseId: $courseId")
+ val courseAggOpt = activityAggUtil.computeCourseActivityAgg(userConsumption, leafNodes, optionalNodes, requestContext)
+ val courseAggs = if (courseAggOpt.nonEmpty) {
+ logger.info(requestContext, s"computeCourseAggregations: Course aggregation computed successfully")
+ List(courseAggOpt.get)
+ } else {
+ logger.warn(requestContext, s"computeCourseAggregations: No course aggregation computed")
+ List()
+ }
+
+ if (moduleAggEnabled) {
+ logger.info(requestContext, s"computeCourseAggregations: Module aggregation enabled, computing module-level aggregations")
+ val ancestors = userConsumption.contents.map { case (contentId, content) =>
+ val ancestorList = redisUtil.getAncestors(courseId, content.contentId, requestContext)
+ logger.info(requestContext, s"computeCourseAggregations: contentId: $contentId has ${ancestorList.size} ancestors")
+ (contentId, ancestorList)
+ }.toMap
+
+ val childCollections = ancestors.values.flatten.filter(a => a != courseId).toList.distinct
+ logger.info(requestContext, s"computeCourseAggregations: Found ${childCollections.size} child collections: ${childCollections.mkString(", ")}")
+
+ val collectionsWithLeafNodes = childCollections.map(collectionId => {
+ val collectionLeafNodes = redisUtil.getRequiredLeafNodes(courseId, collectionId, requestContext)
+ logger.info(requestContext, s"computeCourseAggregations: collectionId: $collectionId has ${collectionLeafNodes.size} required leaf nodes")
+ (collectionId, collectionLeafNodes)
+ }).toMap
+
+ val moduleAggs = activityAggUtil.computeModuleActivityAgg(userConsumption, courseId, ancestors, collectionsWithLeafNodes, requestContext)
+ logger.info(requestContext, s"computeCourseAggregations: Computed ${moduleAggs.size} module aggregations")
+ courseAggs ++ moduleAggs
+ } else {
+ logger.info(requestContext, s"computeCourseAggregations: Module aggregation disabled, returning only course aggregation")
+ courseAggs
+ }
+ }
+
+ private def updateActivityAggregates(courseAggregations: List[UserEnrolmentAgg], requestContext: RequestContext): Unit = {
+ logger.info(requestContext, s"updateActivityAggregates: Creating batch update queries for ${courseAggregations.size} aggregations")
+ val aggQueries = courseAggregations.map { agg =>
+ activityAggUtil.createActivityAggUpdateMap(agg.activityAgg)
+ }.asJava
+
+ if (!aggQueries.isEmpty) {
+ logger.info(requestContext, s"updateActivityAggregates: Executing batch update with ${aggQueries.size()} queries to ${activityAggDBInfo.getTableName}")
+ cassandraOperation.batchUpdate(activityAggDBInfo.getKeySpace, activityAggDBInfo.getTableName, aggQueries, requestContext)
+ logger.info(requestContext, s"updateActivityAggregates: Batch update completed successfully")
+ } else {
+ logger.warn(requestContext, s"updateActivityAggregates: No queries to execute")
+ }
+ }
+
+ private def updateCollectionProgress(
+ collectionProgressList: List[CollectionProgress],
+ leafNodes: List[String],
+ optionalNodes: List[String],
+ latestRead: util.Map[String, AnyRef],
+ requestContext: RequestContext
+ ): Unit = {
+ collectionProgressList.foreach { progress =>
+ val shouldUpdateProgress = if (filterCompletedEnrolments) {
+ val enrolmentStatus = getEnrolmentStatus(progress.userId, progress.courseId, progress.batchId, requestContext)
+ enrolmentStatus != 2
+ } else true
+
+ if (shouldUpdateProgress) {
+ val updatedLeafNodes = leafNodes.diff(optionalNodes)
+ val completionStatus = activityAggUtil.getCompletionStatus(progress.progress, updatedLeafNodes.size)
+
+ val progressUpdateMap = activityAggUtil.createProgressUpdateMap(
+ progress.userId, progress.courseId, progress.batchId,
+ progress.progress,
+ completionStatus,
+ progress.completedOn,
+ progress.contentStatus,
+ latestRead
+ )
+ cassandraOperation.updateRecordV2(enrolmentDBInfo.getKeySpace,
+ enrolmentDBInfo.getTableName, progressUpdateMap._1, progressUpdateMap._2, true, requestContext)
+
+ if (progress.completed) {
+ logger.info(requestContext, s"updateCollectionProgress: Course completed for userId: ${progress.userId}, Publishing certificate event")
+ certificateUtil.publishCertificateIssueEvent(progress.userId, progress.courseId, progress.batchId, requestContext)
+ publishEnrolmentCompleteAuditEvent(progress, requestContext)
+ }
+ }
+ }
+ }
+
+
+ private def publishContentAuditEvents(userConsumption: UserContentConsumption, requestContext: RequestContext): Unit = {
+ val auditEvents = activityAggUtil.generateContentAuditEvents(userConsumption)
+ if (auditEvents.nonEmpty) {
+ logger.info(requestContext, s"publishContentAuditEvents: Publishing ${auditEvents.size} audit events to topic: $auditEventTopic")
+ auditEvents.foreach { event =>
+ publishAuditEvent(event, requestContext)
+ }
+ }
+ }
+
+ protected def publishEnrolmentCompleteAuditEvent(progress: CollectionProgress, requestContext: RequestContext): Unit = {
+ import org.sunbird.activity.domain._
+
+ val auditEvent = TelemetryEvent(
+ actor = ActorObject(id = progress.userId, `type` = "User"),
+ edata = EventData(props = Array("status", "completedon"), `type` = "enrol-complete"),
+ context = EventContext(cdata = Array(
+ Map("type" -> "CourseBatch", "id" -> progress.batchId).asJava,
+ Map("type" -> "Course", "id" -> progress.courseId).asJava
+ )),
+ `object` = EventObject(id = progress.userId, `type` = "User", rollup = Map("l1" -> progress.courseId).asJava)
+ )
+ publishAuditEvent(auditEvent, requestContext)
+ }
+
+ protected def publishAuditEvent(event: TelemetryEvent, requestContext: RequestContext): Unit = {
+ try {
+ val eventJson = gson.toJson(event)
+ logger.info(requestContext, s"publishAuditEvent: Publishing event to Kafka - topic: $auditEventTopic")
+ KafkaClient.send(eventJson, auditEventTopic)
+ logger.info(requestContext, s"publishAuditEvent: Event published successfully")
+ } catch {
+ case ex: Exception =>
+ logger.error(requestContext, s"publishAuditEvent: Failed to publish audit event to Kafka topic $auditEventTopic: ${ex.getMessage}", ex)
+ }
+ }
+
+ private def handleMissingLeafNodes(courseId: String, requestContext: RequestContext): Unit = {
+ val collectionStatus = contentSearchUtil.getCollectionStatus(courseId, requestContext)
+
+ if (StringUtils.equalsIgnoreCase(collectionStatus, "Retired")) {
+ logger.warn(requestContext, s"Contents consumed from retired collection: $courseId", null)
+ } else {
+ val errorMsg = s"Leaf nodes not available for published collection: $courseId (status: $collectionStatus)"
+ logger.error(requestContext, errorMsg, null)
+ throw new Exception(errorMsg)
+ }
+ }
+
+ private def getEnrolmentStatus(userId: String, courseId: String, batchId: String, requestContext: RequestContext): Int = {
+ logger.info(requestContext, s"getEnrolmentStatus: Querying user_enrolments for userId: $userId, courseId: $courseId, batchId: $batchId")
+ val selectMap = new util.HashMap[String, AnyRef]() {{
+ put("userid", userId)
+ put("courseid", courseId)
+ put("batchid", batchId)
+ }}
+
+ val response = cassandraOperation.getRecordsByProperties(enrolmentDBInfo.getKeySpace, enrolmentDBInfo.getTableName, selectMap, requestContext)
+
+ if (response != null && response.getResult != null) {
+ val result = response.getResult.get(JsonKey.RESPONSE).asInstanceOf[util.List[util.Map[String, AnyRef]]]
+ if (!result.isEmpty) {
+ val enrolment = result.get(0)
+ val status = enrolment.getOrDefault("status", 0.asInstanceOf[AnyRef]).asInstanceOf[Number].intValue()
+ logger.info(requestContext, s"getEnrolmentStatus: Found enrolment with status: $status")
+ return status
+ } else {
+ logger.info(requestContext, s"getEnrolmentStatus: No enrolment found, returning status 0")
+ }
+ } else {
+ logger.warn(requestContext, s"getEnrolmentStatus: Null response from Cassandra")
+ }
+
+ 0
+ }
+
+ private def getContentStatusFromDB(userId: String, courseId: String, batchId: String, requestContext: RequestContext): UserContentConsumption = {
+ logger.info(requestContext, s"getContentStatusFromDB: Querying user_content_consumption for userId: $userId, courseId: $courseId, batchId: $batchId")
+ val response = cassandraOperation.getRecordsByProperties(consumptionDBInfo.getKeySpace, "user_content_consumption",
+ new util.HashMap[String, AnyRef]() {{
+ put("userid", userId)
+ put("courseid", courseId)
+ put("batchid", batchId)
+ }}, requestContext)
+
+ if (response != null && response.getResult != null) {
+ val result = response.getResult.get(JsonKey.RESPONSE).asInstanceOf[util.List[util.Map[String, AnyRef]]]
+ logger.info(requestContext, s"getContentStatusFromDB: Found ${result.size()} records in DB")
+
+ if (!result.isEmpty) {
+
+ val consumptionMap = result.asScala.flatMap(row => {
+ val contentId = Option(row.get("contentid"))
+ .orElse(Option(row.get("contentId")))
+ .orElse(Option(row.get("content_id")))
+ .map(_.asInstanceOf[String]).getOrElse("")
+
+ if (StringUtils.isNotBlank(contentId)) {
+ val status = Option(row.get("status")).orElse(Option(row.get("Status"))).map(_.asInstanceOf[Number].intValue()).getOrElse(0)
+ val viewCount = Option(row.get("viewcount")).orElse(Option(row.get("viewCount"))).map(_.asInstanceOf[Number].intValue()).getOrElse(0)
+ val completedCount = Option(row.get("completedcount")).orElse(Option(row.get("completedCount"))).map(_.asInstanceOf[Number].intValue()).getOrElse(0)
+ val progress = Option(row.get("progress")).orElse(Option(row.get("Progress"))).map(_.asInstanceOf[Number].intValue()).getOrElse(0)
+
+ val lastAccessTime = activityAggUtil.parseDate(Option(row.get("lastaccesstime")).getOrElse(row.get("lastAccessTime")))
+ val lastCompletedTime = activityAggUtil.parseDate(Option(row.get("lastcompletedtime")).getOrElse(row.get("lastCompletedTime")))
+ val lastUpdatedTime = activityAggUtil.parseDate(Option(row.get("lastupdatedtime")).getOrElse(row.get("lastUpdatedTime")))
+
+ Some(contentId -> ContentStatus(contentId, status, completedCount, viewCount, progress, lastAccessTime, lastCompletedTime, lastUpdatedTime, fromInput = false))
+ } else {
+ logger.warn(requestContext, s"getContentStatusFromDB: Skipping row with missing contentId. Keys: ${row.keySet()}")
+ None
+ }
+ }).toMap
+
+ logger.info(requestContext, s"getContentStatusFromDB: Returning ${consumptionMap.size} content status records")
+ return UserContentConsumption(userId, batchId, courseId, consumptionMap)
+ }
+ }
+
+ logger.info(requestContext, s"getContentStatusFromDB: No existing consumption found, returning empty")
+ UserContentConsumption(userId, batchId, courseId, Map.empty)
+ }
+}
+
+object ActivityAggregatorActor {
+ def props(cacheUtil: RedisCacheUtil): Props = Props(new ActivityAggregatorActor()(cacheUtil))
+}
diff --git a/activity-aggregator/src/main/scala/org/sunbird/activity/domain/Models.scala b/activity-aggregator/src/main/scala/org/sunbird/activity/domain/Models.scala
new file mode 100644
index 000000000..9d19a5fd1
--- /dev/null
+++ b/activity-aggregator/src/main/scala/org/sunbird/activity/domain/Models.scala
@@ -0,0 +1,74 @@
+package org.sunbird.activity.domain
+
+import java.util.Date
+import scala.collection.JavaConverters._
+
+
+case class ContentStatus(
+ contentId: String,
+ status: Int,
+ completedCount: Int = 0,
+ viewCount: Int = 1,
+ progress: Int = 0,
+ lastAccessTime: Date = null,
+ lastCompletedTime: Date = null,
+ lastUpdatedTime: Date = null,
+ fromInput: Boolean = true,
+ eventsFor: List[String] = List()
+ )
+
+case class UserContentConsumption(userId: String, batchId: String, courseId: String, contents: Map[String, ContentStatus])
+
+case class UserActivityAgg(
+ activity_type: String,
+ user_id: String,
+ activity_id: String,
+ context_id: String,
+ aggregates: Map[String, Double],
+ agg_last_updated: Map[String, Long]
+ )
+
+case class CollectionProgress(
+ userId: String,
+ batchId: String,
+ courseId: String,
+ progress: Int,
+ completedOn: Date,
+ contentStatus: Map[String, Int],
+ inputContents: List[String] = List(),
+ completed: Boolean = false
+ )
+
+case class UserEnrolmentAgg(activityAgg: UserActivityAgg, collectionProgress: Option[CollectionProgress] = None)
+
+/**
+ * Telemetry event models for audit events
+ */
+case class ActorObject(id: String, `type`: String = "User")
+
+case class EventData(props: Array[String], `type`: String)
+
+case class EventContext(
+ channel: String = "in.sunbird",
+ env: String = "Course",
+ sid: String = java.util.UUID.randomUUID().toString,
+ did: String = java.util.UUID.randomUUID().toString,
+ pdata: java.util.Map[String, String] = Map("ver" -> "3.0", "id" -> "org.sunbird.learning.platform", "pid" -> "activity-aggregator-actor").asJava,
+ cdata: Array[java.util.Map[String, String]]
+ )
+
+case class EventObject(id: String, `type`: String, rollup: java.util.Map[String, String])
+
+case class TelemetryEvent(
+ actor: ActorObject,
+ eid: String = "AUDIT",
+ edata: EventData,
+ ver: String = "3.0",
+ syncts: Long = System.currentTimeMillis(),
+ ets: Long = System.currentTimeMillis(),
+ context: EventContext,
+ mid: String = s"LP.AUDIT.${java.util.UUID.randomUUID().toString}",
+ `object`: EventObject,
+ tags: java.util.List[AnyRef] = new java.util.ArrayList[AnyRef]()
+ )
+
diff --git a/activity-aggregator/src/main/scala/org/sunbird/activity/util/ActivityAggregateUtil.scala b/activity-aggregator/src/main/scala/org/sunbird/activity/util/ActivityAggregateUtil.scala
new file mode 100644
index 000000000..cae6147a5
--- /dev/null
+++ b/activity-aggregator/src/main/scala/org/sunbird/activity/util/ActivityAggregateUtil.scala
@@ -0,0 +1,373 @@
+package org.sunbird.activity.util
+
+import org.apache.commons.collections4.CollectionUtils
+import org.apache.commons.lang3.StringUtils
+import org.sunbird.activity.domain._
+import org.sunbird.common.models.util.{JsonKey, LoggerUtil, ProjectUtil}
+import org.sunbird.common.request.RequestContext
+
+import java.util
+import java.util.Date
+import scala.collection.JavaConverters._
+
+class ActivityAggregateUtil {
+
+ private val logger = new LoggerUtil(classOf[ActivityAggregateUtil])
+ private val formatter = ProjectUtil.getDateFormatter
+
+ /**
+ * Convert Java Map contents to Scala ContentStatus map
+ */
+ def getContentStatusFromContents(contents: util.List[util.Map[String, AnyRef]]): Map[String, ContentStatus] = {
+ if (CollectionUtils.isEmpty(contents)) {
+ Map.empty
+ } else {
+ val enrichedContents = contents.asScala.map(content => {
+ val contentId = Option(content.get(JsonKey.CONTENT_ID)).getOrElse(content.get("contentid")).asInstanceOf[String]
+ val status = Option(content.get("status")).map(_.asInstanceOf[Number].intValue()).getOrElse(0)
+ val progress = Option(content.get("progress")).map(_.asInstanceOf[Number].intValue()).getOrElse(0)
+ val viewCount = Option(content.get("viewcount")).map(_.asInstanceOf[Number].intValue()).getOrElse(1)
+ val completedCount = Option(content.get(JsonKey.COMPLETED_COUNT))
+ .orElse(Option(content.get("completedcount")))
+ .map(_.asInstanceOf[Number].intValue())
+ .getOrElse(if (status == 2) 1 else 0)
+
+ val lastAccessTime = parseDate(Option(content.get(JsonKey.LAST_ACCESS_TIME)).getOrElse(content.get(JsonKey.LAST_ACCESS_TIME_KEY)))
+ val lastCompletedTime = parseDate(Option(content.get(JsonKey.LAST_COMPLETED_TIME)).getOrElse(content.get("last_completed_time")))
+ ContentStatus(contentId, status, completedCount, viewCount, progress, lastAccessTime, lastCompletedTime, fromInput = true)
+ }).filter(t => StringUtils.isNotBlank(t.contentId) && t.status > 0)
+ .groupBy(_.contentId)
+ val result = enrichedContents.map { case (contentId, contentList) =>
+ val finalStatus = contentList.map(_.status).max
+ val views = contentList.map(_.viewCount).sum
+ val completion = contentList.map(_.completedCount).sum
+ val maxProgress = contentList.map(_.progress).max
+ val latestAccess = contentList.flatMap(c => Option(c.lastAccessTime)).sortWith(_.after(_)).headOption.orNull
+ val latestCompleted = contentList.flatMap(c => Option(c.lastCompletedTime)).sortWith(_.after(_)).headOption.orNull
+ (contentId, ContentStatus(contentId, finalStatus, completion, views, maxProgress, latestAccess, latestCompleted, fromInput = true))
+ }
+ result
+ }
+ }
+
+ /**
+ * Merge input consumption data with DB consumption data
+ * This is critical for maintaining accurate view counts and completion counts
+ */
+ def mergeConsumptionData(
+ inputData: UserContentConsumption,
+ dbData: UserContentConsumption
+ ): UserContentConsumption = {
+ val dbContents = dbData.contents
+ val processedContents = inputData.contents.map { case (contentId, inputCC) =>
+ val dbCC = dbContents.getOrElse(contentId, ContentStatus(contentId, 0, 0, 0, fromInput = false))
+ val finalStatus = List(inputCC.status, dbCC.status).max
+ val views = inputCC.viewCount + dbCC.viewCount
+ val completion = inputCC.completedCount + dbCC.completedCount
+ val progress = if (finalStatus == 2) 100 else List(inputCC.progress, dbCC.progress).max
+ val lastAccessTime = compareTime(dbCC.lastAccessTime, inputCC.lastAccessTime)
+ val lastCompletedTime = if (finalStatus == 2) {
+ if (dbCC.status < 2) compareTime(null, inputCC.lastCompletedTime)
+ else compareTime(dbCC.lastCompletedTime, inputCC.lastCompletedTime)
+ } else null
+ val eventsFor = getEventActions(dbCC, inputCC)
+
+ (contentId, ContentStatus(contentId, finalStatus, completion, views, progress, lastAccessTime, lastCompletedTime, ProjectUtil.getTimeStamp, inputCC.fromInput, eventsFor))
+ }
+
+ val existingContents = processedContents.keySet
+ val remainingContents = dbData.contents.filterNot { case (key, _) => existingContents.contains(key) }
+ val finalContentsMap = processedContents ++ remainingContents
+ UserContentConsumption(inputData.userId, inputData.batchId, inputData.courseId, finalContentsMap)
+ }
+
+ /**
+ * Determine which audit events should be generated for a content
+ *
+ * @param dbCC Content status from database
+ * @param inputCC Content status from input
+ * @return List of event types to generate ("start", "complete")
+ */
+ def getEventActions(dbCC: ContentStatus, inputCC: ContentStatus): List[String] = {
+ val startAction = if (dbCC.viewCount == 0) List("start") else List()
+ val completeAction = if (dbCC.completedCount == 0 && inputCC.completedCount > 0) List("complete") else List()
+ startAction ::: completeAction
+ }
+
+ /**
+ * Generate content audit events for telemetry
+ */
+ def generateContentAuditEvents(userConsumption: UserContentConsumption): List[TelemetryEvent] = {
+ val userId = userConsumption.userId
+ val courseId = userConsumption.courseId
+ val batchId = userConsumption.batchId
+
+ val contentsForEvents = userConsumption.contents.filter(_._2.eventsFor.nonEmpty).values
+
+ contentsForEvents.flatMap { content =>
+ content.eventsFor.map { action =>
+ val properties = if (StringUtils.equalsIgnoreCase(action, "complete")) {
+ Array("viewcount", "completedcount")
+ } else {
+ Array("viewcount")
+ }
+
+ TelemetryEvent(
+ actor = ActorObject(id = userConsumption.userId),
+ edata = EventData(props = properties, `type` = action),
+ context = EventContext(cdata = Array(Map("type" -> "CourseBatch", "id" -> userConsumption.batchId).asJava)),
+ `object` = EventObject(id = content.contentId, `type` = "Content", rollup = Map("l1" -> userConsumption.courseId).asJava)
+ )
+ }
+ }.toList
+ }
+
+ /**
+ * Compute course-level activity aggregates
+ */
+ def computeCourseActivityAgg(
+ userConsumption: UserContentConsumption,
+ leafNodes: List[String],
+ optionalNodes: List[String],
+ requestContext: RequestContext
+ ): Option[UserEnrolmentAgg] = {
+ val courseId = userConsumption.courseId
+ val userId = userConsumption.userId
+ val contextId = "cb:" + userConsumption.batchId
+
+ logger.info(requestContext, s"computeCourseActivityAgg: courseId: $courseId, userId: $userId, leafNodes: ${leafNodes.size}, optionalNodes: ${optionalNodes.size}")
+
+ if (leafNodes.isEmpty) {
+ logger.warn(requestContext, s"computeCourseActivityAgg: Leaf nodes are not available for courseId: $courseId")
+ None
+ } else {
+ val updatedLeafNodes = leafNodes.diff(optionalNodes)
+ logger.info(requestContext, s"computeCourseActivityAgg: Required leaf nodes (excluding optional): ${updatedLeafNodes.size}")
+
+ val completedContents = userConsumption.contents.filter(cc => cc._2.status == 2).map(cc => cc._2.contentId).toList.distinct
+ logger.info(requestContext, s"computeCourseActivityAgg: User has completed ${completedContents.size} contents")
+
+ val completedCount = updatedLeafNodes.intersect(completedContents).size
+ logger.info(requestContext, s"computeCourseActivityAgg: Completed count: $completedCount / ${updatedLeafNodes.size} required leaf nodes")
+
+ val contentStatus = userConsumption.contents.map(cc => (cc._2.contentId, cc._2.status)).toMap
+ val inputContents = userConsumption.contents.filter(cc => cc._2.fromInput).keys.toList
+
+ val isCompleted = completedCount >= updatedLeafNodes.size
+ logger.info(requestContext, s"computeCourseActivityAgg: Course completed: $isCompleted")
+
+ val collectionProgress = if (isCompleted) {
+ logger.info(requestContext, s"computeCourseActivityAgg: Creating CollectionProgress with completion date")
+ Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, new Date(), contentStatus, inputContents, true))
+ } else {
+ logger.info(requestContext, s"computeCourseActivityAgg: Creating CollectionProgress without completion date")
+ Option(CollectionProgress(userId, userConsumption.batchId, courseId, completedCount, null, contentStatus, inputContents, false))
+ }
+
+ val activityAgg = UserActivityAgg(
+ "Course",
+ userId,
+ courseId,
+ contextId,
+ Map("completedCount" -> completedCount.toDouble),
+ Map("completedCount" -> System.currentTimeMillis())
+ )
+
+ logger.info(requestContext, s"computeCourseActivityAgg: Created UserEnrolmentAgg with completedCount: $completedCount")
+ Option(UserEnrolmentAgg(activityAgg, collectionProgress))
+ }
+ }
+
+ /**
+ * Compute module-level (collection children) activity aggregates
+ */
+ def computeModuleActivityAgg(
+ userConsumption: UserContentConsumption,
+ courseId: String,
+ ancestors: Map[String, List[String]],
+ collectionsWithLeafNodes: Map[String, List[String]],
+ requestContext: RequestContext
+ ): List[UserEnrolmentAgg] = {
+ val userId = userConsumption.userId
+ val contextId = "cb:" + userConsumption.batchId
+ val childCollections = ancestors.values.flatten.filter(a => !StringUtils.equals(a, courseId)).toList.distinct
+ val userCompletedContents = userConsumption.contents.filter(cc => cc._2.status == 2).map(cc => cc._2.contentId).toList.distinct
+ childCollections.flatMap(collectionId => {
+ collectionsWithLeafNodes.get(collectionId).map(leafNodes => {
+ val completedCount = leafNodes.intersect(userCompletedContents).size
+ val activityAgg = UserActivityAgg(
+ "Course",
+ userId,
+ collectionId,
+ contextId,
+ Map("completedCount" -> completedCount.toDouble),
+ Map("completedCount" -> System.currentTimeMillis())
+ )
+ UserEnrolmentAgg(activityAgg, None)
+ })
+ })
+ }
+
+ /**
+ * Get completion percentage
+ */
+ def getCompletionPercentage(completedCount: Int, leafNodesCount: Int): Int = {
+ if (leafNodesCount == 0) 0
+ else if (completedCount >= leafNodesCount) 100
+ else (completedCount * 100) / leafNodesCount
+ }
+
+ /**
+ * Get completion status
+ * 0 = Not Started, 1 = In Progress, 2 = Completed
+ */
+ def getCompletionStatus(completedCount: Int, leafNodesCount: Int): Int = {
+ if (completedCount == 0) 0
+ else if (completedCount >= leafNodesCount) 2
+ else 1
+ }
+
+ /**
+ * Get latest read details for user_enrolments update
+ */
+ def getLatestReadDetails(userId: String, batchId: String, courseId: String, contents: List[util.Map[String, AnyRef]]): util.Map[String, AnyRef] = {
+ val result = new util.HashMap[String, AnyRef]()
+ if (contents == null || contents.isEmpty) return result
+
+ val contentsWithTime = contents.map(c => {
+ val time = parseDate(Option(c.get(JsonKey.LAST_ACCESS_TIME)).getOrElse(c.get(JsonKey.LAST_ACCESS_TIME_KEY)))
+ (c, time)
+ }).filter(_._2 != null)
+
+ if (contentsWithTime.isEmpty) return result
+
+ val lastAccessContent = contentsWithTime.maxBy(_._2)._1
+
+ result.put("lastreadcontentid", lastAccessContent.get(JsonKey.CONTENT_ID))
+ result.put("lastreadcontentstatus", Option(lastAccessContent.get("status")).getOrElse(1).asInstanceOf[AnyRef])
+ result.put(JsonKey.LAST_CONTENT_ACCESS_TIME, lastAccessContent.get(JsonKey.LAST_ACCESS_TIME_KEY))
+ result
+ }
+
+ /**
+ * Create activity aggregate map for Cassandra upsert
+ */
+ def createActivityAggMap(activityAgg: UserActivityAgg): util.Map[String, AnyRef] = {
+ new util.HashMap[String, AnyRef]() {{
+ put("activity_type", activityAgg.activity_type)
+ put("user_id", activityAgg.user_id)
+ put("activity_id", activityAgg.activity_id)
+ put("context_id", activityAgg.context_id)
+ put("aggregates", activityAgg.aggregates.asJava)
+ put("agg_last_updated", activityAgg.agg_last_updated.asJava)
+ }}
+ }
+
+ /**
+ * Create activity aggregate update map for batch update
+ * Returns format needed for batchUpdate: Map with PRIMARY_KEY and NON_PRIMARY_KEY
+ */
+ def createActivityAggUpdateMap(activityAgg: UserActivityAgg): util.Map[String, util.Map[String, AnyRef]] = {
+ val primaryKey = new util.HashMap[String, AnyRef]() {{
+ put("activity_type", activityAgg.activity_type)
+ put("activity_id", activityAgg.activity_id)
+ put("user_id", activityAgg.user_id)
+ put("context_id", activityAgg.context_id)
+ }}
+
+ val nonPrimaryKey = new util.HashMap[String, AnyRef]() {{
+ put("aggregates", activityAgg.aggregates.asJava)
+ put("agg_last_updated", activityAgg.agg_last_updated.asJava)
+ }}
+
+ new util.HashMap[String, util.Map[String, AnyRef]]() {{
+ put(JsonKey.PRIMARY_KEY, primaryKey)
+ put(JsonKey.NON_PRIMARY_KEY, nonPrimaryKey)
+ }}
+ }
+
+ /**
+ * Create progress update map for user_enrolments
+ */
+ def createProgressUpdateMap(
+ userId: String,
+ courseId: String,
+ batchId: String,
+ progress: Int,
+ status: Int,
+ completedOn: Date,
+ contentStatus: Map[String, Int],
+ latestRead: util.Map[String, AnyRef]
+ ): (util.Map[String, AnyRef], util.Map[String, AnyRef]) = {
+ val selectMap = new util.HashMap[String, AnyRef]()
+ selectMap.put("userid", userId)
+ selectMap.put("courseid", courseId)
+ selectMap.put("batchid", batchId)
+
+ val updateMap = new util.HashMap[String, AnyRef]()
+ updateMap.put("progress", progress.asInstanceOf[AnyRef])
+ updateMap.put("status", status.asInstanceOf[AnyRef])
+ updateMap.put("contentstatus", contentStatus.asJava)
+ updateMap.put("datetime", System.currentTimeMillis().asInstanceOf[AnyRef])
+ if (completedOn != null) {
+ updateMap.put("completedon", completedOn)
+ }
+ if (latestRead != null && !latestRead.isEmpty) {
+ updateMap.putAll(latestRead)
+ }
+
+ (selectMap, updateMap)
+ }
+
+ /**
+ * Create content consumption update map for user_content_consumption table
+ */
+ def createContentConsumptionUpdateMap(
+ userId: String,
+ courseId: String,
+ batchId: String,
+ content: ContentStatus
+ ): (util.Map[String, AnyRef], util.Map[String, AnyRef]) = {
+ val selectMap = new util.HashMap[String, AnyRef]() {{
+ put("userid", userId)
+ put("courseid", courseId)
+ put("batchid", batchId)
+ put("contentid", content.contentId)
+ }}
+
+ val updateMap = new util.HashMap[String, AnyRef]() {{
+ put("viewcount", content.viewCount.asInstanceOf[AnyRef])
+ put("completedcount", content.completedCount.asInstanceOf[AnyRef])
+ put("status", content.status.asInstanceOf[AnyRef])
+ put("progress", content.progress.asInstanceOf[AnyRef])
+ put("lastupdatedtime", formatDate(content.lastUpdatedTime))
+ put("lastaccesstime", formatDate(content.lastAccessTime))
+ if (content.lastCompletedTime != null) {
+ put("lastcompletedtime", formatDate(content.lastCompletedTime))
+ }
+ put("datetime", content.lastUpdatedTime)
+ }}
+ (selectMap, updateMap)
+ }
+
+ def parseDate(dateString: AnyRef): Date = {
+ dateString match {
+ case s: String if StringUtils.isNotBlank(s) && !StringUtils.equalsIgnoreCase("null", s) =>
+ try { formatter.parse(s) } catch { case _: Exception => null }
+ case d: Date => d
+ case _ => null
+ }
+ }
+
+ def formatDate(date: Date): String = {
+ if (date != null) formatter.format(date) else null
+ }
+
+ def compareTime(existingTime: Date, inputTime: Date): Date = {
+ if (existingTime == null && inputTime == null) ProjectUtil.getTimeStamp
+ else if (existingTime == null) inputTime
+ else if (inputTime == null) existingTime
+ else if (inputTime.after(existingTime)) inputTime
+ else existingTime
+ }
+}
diff --git a/activity-aggregator/src/main/scala/org/sunbird/activity/util/CertificateUtil.scala b/activity-aggregator/src/main/scala/org/sunbird/activity/util/CertificateUtil.scala
new file mode 100644
index 000000000..a237ef279
--- /dev/null
+++ b/activity-aggregator/src/main/scala/org/sunbird/activity/util/CertificateUtil.scala
@@ -0,0 +1,25 @@
+package org.sunbird.activity.util
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.sunbird.common.models.util.ProjectUtil
+import org.sunbird.common.request.RequestContext
+import org.sunbird.kafka.client.KafkaClient
+
+import java.util.UUID
+
+class CertificateUtil {
+
+ private val mapper = new ObjectMapper()
+
+ def publishCertificateIssueEvent(userId: String, courseId: String, batchId: String, requestContext: RequestContext): Unit = {
+ val topic = ProjectUtil.getConfigValue("kafka_topics_certificate_instruction")
+ val ets = System.currentTimeMillis
+ val mid = s"LP.$ets.${UUID.randomUUID}"
+ val event = s"""{"eid": "BE_JOB_REQUEST","ets": $ets,"mid": "$mid","actor": {"id": "Course Certificate Generator","type": "System"},"context": {"pdata": {"ver": "1.0","id": "org.sunbird.platform"}},"object": {"id": "${batchId}_$courseId","type": "CourseCertificateGeneration"},"edata": {"userIds": ["$userId"],"action": "issue-certificate","iteration": 1, "trigger": "auto-issue","batchId": "$batchId","reIssue": false,"courseId": "$courseId"}}"""
+ KafkaClient.send(event, topic)
+ }
+}
+
+object CertificateUtil {
+ def apply(): CertificateUtil = new CertificateUtil()
+}
diff --git a/activity-aggregator/src/main/scala/org/sunbird/activity/util/ContentSearchUtil.scala b/activity-aggregator/src/main/scala/org/sunbird/activity/util/ContentSearchUtil.scala
new file mode 100644
index 000000000..48a3f896f
--- /dev/null
+++ b/activity-aggregator/src/main/scala/org/sunbird/activity/util/ContentSearchUtil.scala
@@ -0,0 +1,50 @@
+package org.sunbird.activity.util
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.sunbird.common.models.util.{HttpUtil, ProjectUtil}
+import org.sunbird.common.request.RequestContext
+
+import java.util
+import scala.collection.JavaConverters._
+
+class ContentSearchUtil {
+
+ private val mapper = new ObjectMapper()
+
+ def getDBStatus(collectionId: String): String = {
+ val requestBody = s"""{
+ | "request": {
+ | "filters": {
+ | "objectType": "Collection",
+ | "identifier": "$collectionId",
+ | "status": ["Live", "Unlisted", "Retired"]
+ | },
+ | "fields": ["status"]
+ | }
+ |}""".stripMargin
+
+ val searchBasePath = ProjectUtil.getConfigValue("service_search_base_path")
+ val searchAPIURL = searchBasePath + "/v3/search"
+ val response = HttpUtil.doPostRequest(searchAPIURL, requestBody, new util.HashMap[String, String]())
+
+ if (response != null && response.getStatusCode == 200) {
+ val responseBody = mapper.readValue(response.getBody, classOf[util.Map[String, AnyRef]])
+ val result = responseBody.getOrDefault("result", new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]]
+ val count = result.getOrDefault("count", 0.asInstanceOf[AnyRef]).asInstanceOf[Number].intValue()
+ if (count > 0) {
+ val list = result.getOrDefault("content", new util.ArrayList[util.Map[String, AnyRef]]()).asInstanceOf[util.List[util.Map[String, AnyRef]]]
+ list.asScala.head.get("status").asInstanceOf[String]
+ } else throw new Exception(s"There are no published or retired collection with id: $collectionId")
+ } else {
+ throw new Exception("search-service not returning error:" + (if (response != null) response.getStatusCode else "null"))
+ }
+ }
+
+ def getCollectionStatus(collectionId: String, requestContext: RequestContext): String = {
+ getDBStatus(collectionId)
+ }
+}
+
+object ContentSearchUtil {
+ def apply(): ContentSearchUtil = new ContentSearchUtil()
+}
diff --git a/activity-aggregator/src/main/scala/org/sunbird/activity/util/DeDupUtil.scala b/activity-aggregator/src/main/scala/org/sunbird/activity/util/DeDupUtil.scala
new file mode 100644
index 000000000..c2575293e
--- /dev/null
+++ b/activity-aggregator/src/main/scala/org/sunbird/activity/util/DeDupUtil.scala
@@ -0,0 +1,58 @@
+package org.sunbird.activity.util
+
+import org.sunbird.cache.util.RedisCacheUtil
+import org.sunbird.common.models.util.ProjectUtil
+import org.sunbird.common.request.RequestContext
+
+import java.security.MessageDigest
+
+class DeDupUtil(implicit cacheUtil: RedisCacheUtil) {
+
+ private val deDupRedisIndex = ProjectUtil.getConfigValue("dedup_redis_index") match {
+ case value if value != null => value.toInt
+ case _ => 3
+ }
+
+ private val deDupExpirySec = ProjectUtil.getConfigValue("dedup_redis_expiry") match {
+ case value if value != null => value.toInt
+ case _ => 604800
+ }
+
+ private val dedupEnabled = ProjectUtil.getConfigValue("activity_input_dedup_enabled") match {
+ case value if value != null => value.toBoolean
+ case _ => false
+ }
+
+ def getMessageId(courseId: String, batchId: String, userId: String, contentId: String, status: Int): String = {
+ val key = Array(courseId, batchId, userId, contentId, status).mkString("|")
+ MessageDigest.getInstance("MD5").digest(key.getBytes).map("%02X".format(_)).mkString
+ }
+
+ def isUniqueEvent(checksum: String, requestContext: RequestContext): Boolean = {
+ if (!dedupEnabled) {
+ true
+ } else {
+ val jedis = cacheUtil.getConnection(deDupRedisIndex)
+ try {
+ !jedis.exists(checksum)
+ } finally {
+ jedis.close()
+ }
+ }
+ }
+
+ def storeChecksum(checksum: String, requestContext: RequestContext): Unit = {
+ if (dedupEnabled) {
+ val jedis = cacheUtil.getConnection(deDupRedisIndex)
+ try {
+ jedis.setex(checksum, deDupExpirySec, "1")
+ } finally {
+ jedis.close()
+ }
+ }
+ }
+}
+
+object DeDupUtil {
+ def apply()(implicit cacheUtil: RedisCacheUtil): DeDupUtil = new DeDupUtil()
+}
diff --git a/activity-aggregator/src/main/scala/org/sunbird/activity/util/RedisUtil.scala b/activity-aggregator/src/main/scala/org/sunbird/activity/util/RedisUtil.scala
new file mode 100644
index 000000000..e74836ade
--- /dev/null
+++ b/activity-aggregator/src/main/scala/org/sunbird/activity/util/RedisUtil.scala
@@ -0,0 +1,73 @@
+package org.sunbird.activity.util
+
+import org.apache.commons.collections.CollectionUtils
+import org.sunbird.cache.util.RedisCacheUtil
+import org.sunbird.common.models.util.{LoggerUtil, ProjectUtil}
+import org.sunbird.common.request.RequestContext
+
+import scala.collection.JavaConverters._
+
+class RedisUtil(implicit val cacheUtil: RedisCacheUtil) {
+
+ private val logger = new LoggerUtil(classOf[RedisUtil])
+
+ private val relationCacheDb = ProjectUtil.getConfigValue("redis_relation_cache_index") match {
+ case value if value != null => value.toInt
+ case _ => 10
+ }
+
+ def readFromCache(key: String, requestContext: RequestContext): List[String] = {
+ logger.info(requestContext, s"RedisUtil.readFromCache: key: $key, dbIndex: $relationCacheDb")
+ try {
+ val list = cacheUtil.getList(key, relationCacheDb)
+ if (CollectionUtils.isEmpty(list.asJava)) {
+ logger.info(requestContext, s"RedisUtil: No data found in Redis for key: $key, dbIndex: $relationCacheDb")
+ List.empty
+ } else {
+ logger.info(requestContext, s"RedisUtil: Found ${list.size} items for key: $key")
+ list
+ }
+ } catch {
+ case ex: Exception =>
+ logger.error(requestContext, s"RedisUtil: Error reading from Redis for key: $key, dbIndex: $relationCacheDb", ex)
+ List.empty
+ }
+ }
+
+ def getLeafNodes(courseId: String, collectionId: String, requestContext: RequestContext): List[String] = {
+ val key = s"$courseId:$collectionId:leafnodes"
+ logger.info(requestContext, s"RedisUtil: Getting leaf nodes for courseId: $courseId, collectionId: $collectionId")
+ val nodes = readFromCache(key, requestContext).distinct
+ logger.info(requestContext, s"RedisUtil: Retrieved ${nodes.size} distinct leaf nodes")
+ nodes
+ }
+
+ def getOptionalNodes(courseId: String, collectionId: String, requestContext: RequestContext): List[String] = {
+ val key = s"$courseId:$collectionId:optionalnodes"
+ logger.info(requestContext, s"RedisUtil: Getting optional nodes for courseId: $courseId, collectionId: $collectionId")
+ val nodes = readFromCache(key, requestContext).distinct
+ logger.info(requestContext, s"RedisUtil: Retrieved ${nodes.size} distinct optional nodes")
+ nodes
+ }
+
+ def getAncestors(courseId: String, contentId: String, requestContext: RequestContext): List[String] = {
+ val key = s"$courseId:$contentId:ancestors"
+ logger.info(requestContext, s"RedisUtil: Getting ancestors for courseId: $courseId, contentId: $contentId")
+ val ancestors = readFromCache(key, requestContext)
+ logger.info(requestContext, s"RedisUtil: Retrieved ${ancestors.size} ancestors")
+ ancestors
+ }
+
+ def getRequiredLeafNodes(courseId: String, collectionId: String, requestContext: RequestContext): List[String] = {
+ logger.info(requestContext, s"RedisUtil: Getting required leaf nodes (excluding optional) for courseId: $courseId, collectionId: $collectionId")
+ val leafNodes = getLeafNodes(courseId, collectionId, requestContext)
+ val optionalNodes = getOptionalNodes(courseId, collectionId, requestContext)
+ val required = leafNodes.diff(optionalNodes)
+ logger.info(requestContext, s"RedisUtil: Required leaf nodes: ${required.size} (total: ${leafNodes.size}, optional: ${optionalNodes.size})")
+ required
+ }
+}
+
+object RedisUtil {
+ def apply()(implicit cacheUtil: RedisCacheUtil): RedisUtil = new RedisUtil()
+}
diff --git a/activity-aggregator/src/test/scala/org/sunbird/activity/actor/ActivityAggregatorActorTest.scala b/activity-aggregator/src/test/scala/org/sunbird/activity/actor/ActivityAggregatorActorTest.scala
new file mode 100644
index 000000000..a5f11e322
--- /dev/null
+++ b/activity-aggregator/src/test/scala/org/sunbird/activity/actor/ActivityAggregatorActorTest.scala
@@ -0,0 +1,433 @@
+package org.sunbird.activity.actor
+
+import org.apache.pekko.actor.{ActorSystem, Props}
+import org.apache.pekko.testkit.{TestKit, TestProbe}
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpecLike
+import org.scalatest.matchers.should.Matchers
+import org.sunbird.activity.util.{CertificateUtil, ContentSearchUtil, DeDupUtil, RedisUtil}
+import org.sunbird.activity.domain.{CollectionProgress, TelemetryEvent}
+import org.sunbird.cache.util.RedisCacheUtil
+import org.sunbird.cassandra.CassandraOperation
+import org.sunbird.common.models.response.Response
+import org.sunbird.common.models.util.JsonKey
+import org.sunbird.common.request.{Request, RequestContext}
+
+import java.util
+import scala.concurrent.duration._
+
+class ActivityAggregatorActorTest
+ extends TestKit(ActorSystem("ActivityAggregatorActorTest"))
+ with AnyFlatSpecLike
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll {
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ implicit val cacheUtil: RedisCacheUtil = mock[RedisCacheUtil]
+
+ "ActivityAggregatorActor" should "process valid contents successfully" in {
+ val cassandraOperation = mock[CassandraOperation]
+ val redisUtil = mock[RedisUtil]
+ val deDupUtil = mock[DeDupUtil]
+ val contentSearchUtil = mock[ContentSearchUtil]
+ val certificateUtil = mock[CertificateUtil]
+ val probe = TestProbe()
+
+ val emptyConsumptionResponse = createEmptyResponse()
+
+ // Redis Expectations
+ (redisUtil.getLeafNodes _).expects(*, *, *).returning(List("content1", "content2")).anyNumberOfTimes()
+ (redisUtil.getOptionalNodes _).expects(*, *, *).returning(List.empty).anyNumberOfTimes()
+ (redisUtil.getAncestors _).expects(*, *, *).returning(List("course456")).anyNumberOfTimes()
+
+ // DeDup Expectations
+ (deDupUtil.isUniqueEvent _).expects(*, *).returning(true).anyNumberOfTimes()
+ (deDupUtil.storeChecksum _).expects(*, *).returning(()).anyNumberOfTimes()
+
+ // ContentSearchUtil Expectations
+ (contentSearchUtil.getCollectionStatus _).expects(*, *).returning("Live").anyNumberOfTimes()
+
+ // Certificate Util Expectations (Mock Kafka calls)
+ (certificateUtil.publishCertificateIssueEvent _).expects(*, *, *, *).returning(()).anyNumberOfTimes()
+
+ // Cassandra Expectations
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, "user_content_consumption", *, *)
+ .returning(emptyConsumptionResponse)
+ .anyNumberOfTimes()
+
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, "user_enrolments", *, *)
+ .returning(createEnrolmentResponse())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.batchUpdate(_: String, _: String, _: util.List[util.Map[String, util.Map[String, Object]]], _: RequestContext))
+ .expects(*, "user_content_consumption", *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.batchUpdate(_: String, _: String, _: util.List[util.Map[String, util.Map[String, Object]]], _: RequestContext))
+ .expects(*, "user_activity_agg", *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.updateRecordV2(_: String, _: String, _: util.Map[String, Object], _: util.Map[String, Object], _: Boolean, _: RequestContext))
+ .expects(*, *, *, *, *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ val actor = system.actorOf(Props(new TestableActivityAggregatorActor(cassandraOperation, redisUtil, deDupUtil, contentSearchUtil, certificateUtil)))
+
+ val request = createUpdateRequest(
+ userId = "user123",
+ courseId = "course456",
+ batchId = "batch789",
+ contents = createContentsList()
+ )
+
+ probe.send(actor, request)
+ probe.expectMsgType[Response](5.seconds)
+ }
+
+ it should "handle missing contents key (Force Sync)" in {
+ val cassandraOperation = mock[CassandraOperation]
+ val redisUtil = mock[RedisUtil]
+ val deDupUtil = mock[DeDupUtil]
+ val contentSearchUtil = mock[ContentSearchUtil]
+ val certificateUtil = mock[CertificateUtil]
+ val probe = TestProbe()
+
+ val consumptionResponse = createConsumptionResponse()
+
+ (redisUtil.getLeafNodes _).expects(*, *, *).returning(List("content1", "content2")).anyNumberOfTimes()
+ (redisUtil.getOptionalNodes _).expects(*, *, *).returning(List.empty).anyNumberOfTimes()
+ (redisUtil.getAncestors _).expects(*, *, *).returning(List("course456")).anyNumberOfTimes()
+
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, "user_content_consumption", *, *)
+ .returning(consumptionResponse)
+ .anyNumberOfTimes()
+
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, "user_enrolments", *, *)
+ .returning(createEnrolmentResponse())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.batchUpdate(_: String, _: String, _: util.List[util.Map[String, util.Map[String, Object]]], _: RequestContext))
+ .expects(*, *, *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.updateRecordV2(_: String, _: String, _: util.Map[String, Object], _: util.Map[String, Object], _: Boolean, _: RequestContext))
+ .expects(*, *, *, *, *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ val actor = system.actorOf(Props(new TestableActivityAggregatorActor(cassandraOperation, redisUtil, deDupUtil, contentSearchUtil, certificateUtil)))
+
+ val request = createUpdateRequest(
+ userId = "user123",
+ courseId = "course456",
+ batchId = "batch789",
+ contents = null // Force Sync
+ )
+
+ probe.send(actor, request)
+ probe.expectMsgType[Response](5.seconds)
+ }
+
+ it should "handle empty contents list gracefully" in {
+ val cassandraOperation = mock[CassandraOperation]
+ val redisUtil = mock[RedisUtil]
+ val deDupUtil = mock[DeDupUtil]
+ val contentSearchUtil = mock[ContentSearchUtil]
+ val certificateUtil = mock[CertificateUtil]
+ val probe = TestProbe()
+
+ val actor = system.actorOf(Props(new TestableActivityAggregatorActor(cassandraOperation, redisUtil, deDupUtil, contentSearchUtil, certificateUtil)))
+
+ val request = createUpdateRequest(
+ userId = "user123",
+ courseId = "course456",
+ batchId = "batch789",
+ contents = new util.ArrayList[util.Map[String, AnyRef]]()
+ )
+
+ probe.send(actor, request)
+ probe.expectMsgType[Response](5.seconds)
+ }
+
+ it should "filter out invalid contents" in {
+ val cassandraOperation = mock[CassandraOperation]
+ val redisUtil = mock[RedisUtil]
+ val deDupUtil = mock[DeDupUtil]
+ val contentSearchUtil = mock[ContentSearchUtil]
+ val certificateUtil = mock[CertificateUtil]
+ val probe = TestProbe()
+
+ // Redis
+ (redisUtil.getLeafNodes _).expects(*, *, *).returning(List("content1", "content2")).anyNumberOfTimes()
+ (redisUtil.getOptionalNodes _).expects(*, *, *).returning(List.empty).anyNumberOfTimes()
+ (redisUtil.getAncestors _).expects(*, *, *).returning(List("course456")).anyNumberOfTimes()
+
+ // DeDup
+ (deDupUtil.isUniqueEvent _).expects(*, *).returning(true).anyNumberOfTimes()
+ (deDupUtil.storeChecksum _).expects(*, *).returning(()).anyNumberOfTimes()
+
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, *, *, *)
+ .returning(createEmptyResponse())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.batchUpdate(_: String, _: String, _: util.List[util.Map[String, util.Map[String, Object]]], _: RequestContext))
+ .expects(*, *, *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.updateRecordV2(_: String, _: String, _: util.Map[String, Object], _: util.Map[String, Object], _: Boolean, _: RequestContext))
+ .expects(*, *, *, *, *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ val actor = system.actorOf(Props(new TestableActivityAggregatorActor(cassandraOperation, redisUtil, deDupUtil, contentSearchUtil, certificateUtil)))
+
+ val contentsWithInvalid = new util.ArrayList[util.Map[String, AnyRef]]()
+
+ val invalidContent = new util.HashMap[String, AnyRef]()
+ invalidContent.put("contentId", "content1")
+ invalidContent.put("status", Int.box(0))
+ contentsWithInvalid.add(invalidContent)
+
+ val validContent = new util.HashMap[String, AnyRef]()
+ validContent.put("contentId", "content2")
+ validContent.put("status", Int.box(2))
+ contentsWithInvalid.add(validContent)
+
+ val request = createUpdateRequest(
+ userId = "user123",
+ courseId = "course456",
+ batchId = "batch789",
+ contents = contentsWithInvalid
+ )
+
+ probe.send(actor, request)
+ probe.expectMsgType[Response](5.seconds)
+ }
+
+ it should "handle DB with no existing consumption for Force Sync" in {
+ val cassandraOperation = mock[CassandraOperation]
+ val redisUtil = mock[RedisUtil]
+ val deDupUtil = mock[DeDupUtil]
+ val contentSearchUtil = mock[ContentSearchUtil]
+ val certificateUtil = mock[CertificateUtil]
+ val probe = TestProbe()
+
+ (redisUtil.getLeafNodes _).expects(*, *, *).returning(List("content1", "content2")).anyNumberOfTimes()
+ (redisUtil.getOptionalNodes _).expects(*, *, *).returning(List.empty).anyNumberOfTimes()
+ (redisUtil.getAncestors _).expects(*, *, *).returning(List("course456")).anyNumberOfTimes()
+
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, "user_content_consumption", *, *)
+ .returning(createEmptyResponse())
+ .once()
+
+ val actor = system.actorOf(Props(new TestableActivityAggregatorActor(cassandraOperation, redisUtil, deDupUtil, contentSearchUtil, certificateUtil)))
+
+ val request = createUpdateRequest(
+ userId = "user123",
+ courseId = "course456",
+ batchId = "batch789",
+ contents = null // Force Sync
+ )
+
+ probe.send(actor, request)
+ probe.expectMsgType[Response](5.seconds)
+ }
+
+ it should "merge input consumption with DB consumption correctly" in {
+ val cassandraOperation = mock[CassandraOperation]
+ val redisUtil = mock[RedisUtil]
+ val deDupUtil = mock[DeDupUtil]
+ val contentSearchUtil = mock[ContentSearchUtil]
+ val certificateUtil = mock[CertificateUtil]
+ val probe = TestProbe()
+
+ (redisUtil.getLeafNodes _).expects(*, *, *).returning(List("content1", "content2")).anyNumberOfTimes()
+ (redisUtil.getOptionalNodes _).expects(*, *, *).returning(List.empty).anyNumberOfTimes()
+ (redisUtil.getAncestors _).expects(*, *, *).returning(List("course456")).anyNumberOfTimes()
+
+ (deDupUtil.isUniqueEvent _).expects(*, *).returning(true).anyNumberOfTimes()
+ (deDupUtil.storeChecksum _).expects(*, *).returning(()).anyNumberOfTimes()
+
+ val dbConsumptionResponse = new Response()
+ val dbRecords = new util.ArrayList[util.Map[String, AnyRef]]()
+
+ val existingRecord = new util.HashMap[String, AnyRef]()
+ existingRecord.put("contentid", "content1")
+ existingRecord.put("status", Int.box(1))
+ existingRecord.put("viewcount", Int.box(2))
+ existingRecord.put("completedcount", Int.box(0))
+ dbRecords.add(existingRecord)
+
+ dbConsumptionResponse.put("response", dbRecords)
+
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, "user_content_consumption", *, *)
+ .returning(dbConsumptionResponse)
+ .anyNumberOfTimes()
+
+ (cassandraOperation.getRecordsByProperties(_: String, _: String, _: util.Map[String, Object], _: RequestContext))
+ .expects(*, "user_enrolments", *, *)
+ .returning(createEnrolmentResponse())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.batchUpdate(_: String, _: String, _: util.List[util.Map[String, util.Map[String, Object]]], _: RequestContext))
+ .expects(*, *, *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ (cassandraOperation.updateRecordV2(_: String, _: String, _: util.Map[String, Object], _: util.Map[String, Object], _: Boolean, _: RequestContext))
+ .expects(*, *, *, *, *, *)
+ .returning(new Response())
+ .anyNumberOfTimes()
+
+ val actor = system.actorOf(Props(new TestableActivityAggregatorActor(cassandraOperation, redisUtil, deDupUtil, contentSearchUtil, certificateUtil)))
+
+ val inputContents = new util.ArrayList[util.Map[String, AnyRef]]()
+ val inputContent = new util.HashMap[String, AnyRef]()
+ inputContent.put("contentId", "content1")
+ inputContent.put("status", Int.box(2))
+ inputContents.add(inputContent)
+
+ val request = createUpdateRequest(
+ userId = "user123",
+ courseId = "course456",
+ batchId = "batch789",
+ contents = inputContents
+ )
+
+ probe.send(actor, request)
+ probe.expectMsgType[Response](5.seconds)
+ }
+
+ // Helper methods
+ private def createRequestContext(): RequestContext = {
+ new RequestContext("channel", "pdataId", "env", "did", "sid", "pid", "pver", null)
+ }
+
+ private def createUpdateRequest(
+ userId: String,
+ courseId: String,
+ batchId: String,
+ contents: util.List[util.Map[String, AnyRef]]
+ ): Request = {
+ val request = new Request()
+ request.setOperation("updateActivityAggregates")
+ request.setRequestContext(createRequestContext())
+
+ val requestMap = new util.HashMap[String, AnyRef]()
+ requestMap.put(JsonKey.USER_ID, userId)
+ requestMap.put(JsonKey.COURSE_ID, courseId)
+ requestMap.put(JsonKey.BATCH_ID, batchId)
+ if (contents != null) {
+ requestMap.put(JsonKey.CONTENTS, contents)
+ }
+
+ request.setRequest(requestMap)
+ request
+ }
+
+ private def createContentsList(): util.List[util.Map[String, AnyRef]] = {
+ val contents = new util.ArrayList[util.Map[String, AnyRef]]()
+ val content1 = new util.HashMap[String, AnyRef]()
+ content1.put("contentId", "content1")
+ content1.put("status", Int.box(2))
+ content1.put("lastAccessTime", "2024-01-01 10:00:00:000+0000")
+ contents.add(content1)
+
+ val content2 = new util.HashMap[String, AnyRef]()
+ content2.put("contentId", "content2")
+ content2.put("status", Int.box(1))
+ content2.put("lastAccessTime", "2024-01-01 11:00:00:000+0000")
+ contents.add(content2)
+ contents
+ }
+
+ private def createEmptyResponse(): Response = {
+ val response = new Response()
+ response.put("response", new util.ArrayList[util.Map[String, AnyRef]]())
+ response
+ }
+
+ private def createConsumptionResponse(): Response = {
+ val response = new Response()
+ val records = new util.ArrayList[util.Map[String, AnyRef]]()
+
+ val record = new util.HashMap[String, AnyRef]()
+ record.put("contentid", "content1")
+ record.put("status", Int.box(2))
+ record.put("viewcount", Int.box(3))
+ record.put("completedcount", Int.box(1))
+ record.put("progress", Int.box(100))
+ records.add(record)
+
+ response.put("response", records)
+ response
+ }
+
+ private def createEnrolmentResponse(): Response = {
+ val response = new Response()
+ val records = new util.ArrayList[util.Map[String, AnyRef]]()
+
+ val record = new util.HashMap[String, AnyRef]()
+ record.put("userid", "user123")
+ record.put("courseid", "course456")
+ record.put("batchid", "batch789")
+ record.put("status", Int.box(1))
+ record.put("progress", Int.box(50))
+ records.add(record)
+
+ response.put("response", records)
+ response
+ }
+
+ class TestableActivityAggregatorActor(
+ cassandraOp: CassandraOperation,
+ redisUtil: RedisUtil,
+ deDupUtil: DeDupUtil,
+ contentSearchUtil: ContentSearchUtil,
+ certificateUtil: CertificateUtil
+ ) extends ActivityAggregatorActor {
+
+ override def onReceive(request: Request): Unit = {
+ setField("cassandraOperation", cassandraOp)
+ setField("redisUtil", redisUtil)
+ setField("deDupUtil", deDupUtil)
+ setField("contentSearchUtil", contentSearchUtil)
+ setField("certificateUtil", certificateUtil)
+ super.onReceive(request)
+ }
+
+ def setField(fieldName: String, value: AnyRef): Unit = {
+ val field = classOf[ActivityAggregatorActor].getDeclaredField(fieldName)
+ field.setAccessible(true)
+ field.set(this, value)
+ }
+
+ // Override methods that call Kafka to prevent actual Kafka calls
+ override def publishAuditEvent(event: TelemetryEvent, requestContext: RequestContext): Unit = {
+ // Mock implementation - do nothing to avoid Kafka calls
+ logger.info(requestContext, s"Mock: publishAuditEvent called with event: ${event.eid}")
+ }
+
+ override def publishEnrolmentCompleteAuditEvent(progress: CollectionProgress, requestContext: RequestContext): Unit = {
+ // Mock implementation - do nothing to avoid Kafka calls
+ logger.info(requestContext, s"Mock: publishEnrolmentCompleteAuditEvent called for userId: ${progress.userId}")
+ }
+ }
+}
diff --git a/activity-aggregator/src/test/scala/org/sunbird/activity/util/ActivityAggregateUtilTest.scala b/activity-aggregator/src/test/scala/org/sunbird/activity/util/ActivityAggregateUtilTest.scala
new file mode 100644
index 000000000..0247ccbe7
--- /dev/null
+++ b/activity-aggregator/src/test/scala/org/sunbird/activity/util/ActivityAggregateUtilTest.scala
@@ -0,0 +1,247 @@
+package org.sunbird.activity.util
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+import org.sunbird.activity.domain.{ContentStatus, UserContentConsumption}
+import org.sunbird.common.request.RequestContext
+
+import java.util
+import java.util.Date
+
+class ActivityAggregateUtilTest extends AnyFlatSpec with Matchers {
+
+ val aggUtil = new ActivityAggregateUtil()
+ // Correct RequestContext instantiation
+ implicit val requestContext: RequestContext = new RequestContext("channel", "pdataId", "env", "did", "sid", "pid", "pver", null)
+
+ "getContentStatusFromContents" should "convert Java list to ContentStatus map" in {
+ val contentsList = new util.ArrayList[util.Map[String, AnyRef]]()
+
+ val content1 = new util.HashMap[String, AnyRef]()
+ content1.put("contentId", "content1")
+ content1.put("status", Int.box(2))
+ content1.put("progress", Int.box(100))
+ contentsList.add(content1)
+
+ val result = aggUtil.getContentStatusFromContents(contentsList)
+
+ result.size shouldBe 1
+ result.contains("content1") shouldBe true
+ result("content1").status shouldBe 2
+ result("content1").completedCount shouldBe 1
+ }
+
+ it should "filter out contents with status 0" in {
+ val contentsList = new util.ArrayList[util.Map[String, AnyRef]]()
+
+ val content1 = new util.HashMap[String, AnyRef]()
+ content1.put("contentId", "content1")
+ content1.put("status", Int.box(0))
+ contentsList.add(content1)
+
+ val content2 = new util.HashMap[String, AnyRef]()
+ content2.put("contentId", "content2")
+ content2.put("status", Int.box(2))
+ contentsList.add(content2)
+
+ val result = aggUtil.getContentStatusFromContents(contentsList)
+
+ result.size shouldBe 1
+ result.contains("content2") shouldBe true
+ result.contains("content1") shouldBe false
+ }
+
+ it should "merge multiple entries for same content" in {
+ val contentsList = new util.ArrayList[util.Map[String, AnyRef]]()
+
+ // Same content ID, different entries
+ val content1a = new util.HashMap[String, AnyRef]()
+ content1a.put("contentId", "content1")
+ content1a.put("status", Int.box(1))
+ contentsList.add(content1a)
+
+ val content1b = new util.HashMap[String, AnyRef]()
+ content1b.put("contentId", "content1")
+ content1b.put("status", Int.box(2))
+ contentsList.add(content1b)
+
+ val result = aggUtil.getContentStatusFromContents(contentsList)
+
+ result.size shouldBe 1
+ result("content1").status shouldBe 2 // Max status
+ result("content1").viewCount shouldBe 2 // Sum of views
+ }
+
+ "mergeConsumptionData" should "merge input and DB data correctly" in {
+ val inputMap = Map(
+ "content1" -> ContentStatus("content1", 2, 1, 1, 100, null, null, null, fromInput = true)
+ )
+ val inputData = UserContentConsumption("user1", "batch1", "course1", inputMap)
+
+ val dbMap = Map(
+ "content1" -> ContentStatus("content1", 1, 0, 2, 50, null, null, null, fromInput = false)
+ )
+ val dbData = UserContentConsumption("user1", "batch1", "course1", dbMap)
+
+ val result = aggUtil.mergeConsumptionData(inputData, dbData)
+
+ result.contents("content1").status shouldBe 2 // Max
+ result.contents("content1").viewCount shouldBe 3 // 1 + 2
+ result.contents("content1").completedCount shouldBe 1 // 1 + 0
+ result.contents("content1").progress shouldBe 100 // Status is 2, so 100
+ }
+
+ it should "preserve DB contents not in input" in {
+ val inputMap = Map(
+ "content1" -> ContentStatus("content1", 2, 1, 1, 100, null, null, null, fromInput = true)
+ )
+ val inputData = UserContentConsumption("user1", "batch1", "course1", inputMap)
+
+ val dbMap = Map(
+ "content1" -> ContentStatus("content1", 1, 0, 2, 50, null, null, null, fromInput = false),
+ "content2" -> ContentStatus("content2", 2, 1, 3, 100, null, null, null, fromInput = false)
+ )
+ val dbData = UserContentConsumption("user1", "batch1", "course1", dbMap)
+
+ val result = aggUtil.mergeConsumptionData(inputData, dbData)
+
+ result.contents.size shouldBe 2
+ result.contents.contains("content1") shouldBe true
+ result.contents.contains("content2") shouldBe true
+ result.contents("content2").viewCount shouldBe 3 // Unchanged from DB
+ }
+
+ "getEventActions" should "generate start event for first view" in {
+ val dbCC = ContentStatus("content1", 0, 0, 0, 0, null, null, null, fromInput = false)
+ val inputCC = ContentStatus("content1", 1, 0, 1, 10, null, null, null, fromInput = true)
+
+ val events = aggUtil.getEventActions(dbCC, inputCC)
+
+ events should contain("start")
+ }
+
+ it should "generate complete event for first completion" in {
+ val dbCC = ContentStatus("content1", 1, 0, 2, 50, null, null, null, fromInput = false)
+ val inputCC = ContentStatus("content1", 2, 1, 1, 100, null, null, null, fromInput = true)
+
+ val events = aggUtil.getEventActions(dbCC, inputCC)
+
+ events should contain("complete")
+ }
+
+ it should "not generate start event for subsequent views" in {
+ val dbCC = ContentStatus("content1", 1, 0, 3, 50, null, null, null, fromInput = false)
+ val inputCC = ContentStatus("content1", 1, 0, 1, 50, null, null, null, fromInput = true)
+
+ val events = aggUtil.getEventActions(dbCC, inputCC)
+
+ events should not contain "start"
+ }
+
+ "computeCourseActivityAgg" should "calculate progress correctly" in {
+ val consumptionMap = Map(
+ "content1" -> ContentStatus("content1", 2, 1, 1, 100, null, null, null, fromInput = true),
+ "content2" -> ContentStatus("content2", 1, 0, 1, 50, null, null, null, fromInput = true),
+ "content3" -> ContentStatus("content3", 2, 1, 1, 100, null, null, null, fromInput = true)
+ )
+ val consumption = UserContentConsumption("user1", "batch1", "course1", consumptionMap)
+
+ val leafNodes = List("content1", "content2", "content3", "content4")
+ val optionalNodes = List.empty[String]
+
+ val result = aggUtil.computeCourseActivityAgg(consumption, leafNodes, optionalNodes, requestContext)
+
+ result.isDefined shouldBe true
+ val agg = result.get.activityAgg
+ agg.aggregates("completedCount") shouldBe 2.0 // content1 and content3
+ }
+
+ it should "exclude optional nodes from calculation" in {
+ val consumptionMap = Map(
+ "content1" -> ContentStatus("content1", 2, 1, 1, 100, null, null, null, fromInput = true),
+ "content2" -> ContentStatus("content2", 1, 0, 1, 50, null, null, null, fromInput = true)
+ )
+ val consumption = UserContentConsumption("user1", "batch1", "course1", consumptionMap)
+
+ val leafNodes = List("content1", "content2", "content3")
+ val optionalNodes = List("content3") // content3 is optional
+
+ val result = aggUtil.computeCourseActivityAgg(consumption, leafNodes, optionalNodes, requestContext)
+
+ result.isDefined shouldBe true
+ val collProgress = result.get.collectionProgress
+ collProgress.isDefined shouldBe true
+ // Only content1 and content2 count, content1 is completed
+ // So progress is 1 out of 2 required = 50%
+ collProgress.get.progress shouldBe 1
+ }
+
+ it should "mark course as completed when all required contents are done" in {
+ val consumptionMap = Map(
+ "content1" -> ContentStatus("content1", 2, 1, 1, 100, null, null, null, fromInput = true),
+ "content2" -> ContentStatus("content2", 2, 1, 1, 100, null, null, null, fromInput = true)
+ )
+ val consumption = UserContentConsumption("user1", "batch1", "course1", consumptionMap)
+
+ val leafNodes = List("content1", "content2")
+ val optionalNodes = List.empty[String]
+
+ val result = aggUtil.computeCourseActivityAgg(consumption, leafNodes, optionalNodes, requestContext)
+
+ result.isDefined shouldBe true
+ val collProgress = result.get.collectionProgress.get
+ collProgress.completed shouldBe true
+ collProgress.completedOn should not be null
+ }
+
+ "getCompletionPercentage" should "calculate percentage correctly" in {
+ aggUtil.getCompletionPercentage(5, 10) shouldBe 50
+ aggUtil.getCompletionPercentage(10, 10) shouldBe 100
+ aggUtil.getCompletionPercentage(0, 10) shouldBe 0
+ }
+
+ it should "handle division by zero" in {
+ aggUtil.getCompletionPercentage(5, 0) shouldBe 0
+ }
+
+ "getCompletionStatus" should "return correct status codes" in {
+ aggUtil.getCompletionStatus(0, 10) shouldBe 0 // Not started
+ aggUtil.getCompletionStatus(5, 10) shouldBe 1 // In progress
+ aggUtil.getCompletionStatus(10, 10) shouldBe 2 // Completed
+ aggUtil.getCompletionStatus(15, 10) shouldBe 2 // Over-completed
+ }
+
+ "compareTime" should "return latest timestamp" in {
+ val earlier = new Date(1000000)
+ val later = new Date(2000000)
+
+ aggUtil.compareTime(earlier, later) shouldBe later
+ aggUtil.compareTime(later, earlier) shouldBe later
+ }
+
+ it should "handle null timestamps" in {
+ val someDate = new Date()
+
+ aggUtil.compareTime(null, someDate) shouldBe someDate
+ aggUtil.compareTime(someDate, null) shouldBe someDate
+ aggUtil.compareTime(null, null) should not be null
+ }
+
+ "parseDate" should "parse date string correctly" in {
+ val dateString = "2024-01-01 10:00:00:000+0000"
+ val result = aggUtil.parseDate(dateString)
+
+ result should not be null
+ }
+
+ it should "handle null gracefully" in {
+ aggUtil.parseDate(null) shouldBe null
+ aggUtil.parseDate("null") shouldBe null
+ aggUtil.parseDate("") shouldBe null
+ }
+
+ it should "handle Date object passthrough" in {
+ val date = new Date()
+ aggUtil.parseDate(date) shouldBe date
+ }
+}
diff --git a/course-mw/enrolment-actor/src/main/scala/org/sunbird/enrolments/ContentConsumptionActor.scala b/course-mw/enrolment-actor/src/main/scala/org/sunbird/enrolments/ContentConsumptionActor.scala
index b46ca66c1..c160f4e56 100644
--- a/course-mw/enrolment-actor/src/main/scala/org/sunbird/enrolments/ContentConsumptionActor.scala
+++ b/course-mw/enrolment-actor/src/main/scala/org/sunbird/enrolments/ContentConsumptionActor.scala
@@ -1,5 +1,6 @@
package org.sunbird.enrolments
+import org.apache.pekko.actor.ActorRef
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.collections4.{CollectionUtils, MapUtils}
import org.apache.commons.lang3.StringUtils
@@ -20,7 +21,6 @@ import com.datastax.driver.core.{UDTValue, UserType}
import java.util
import java.util.{Date, TimeZone, UUID}
import javax.inject.{Inject, Named}
-import org.apache.pekko.actor.ActorRef
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions._
@@ -28,7 +28,10 @@ case class InternalContentConsumption(courseId: String, batchId: String, content
def validConsumption() = StringUtils.isNotBlank(courseId) && StringUtils.isNotBlank(batchId) && StringUtils.isNotBlank(contentId)
}
-class ContentConsumptionActor @Inject() (@Named("assessment-aggregator-actor") assessmentAggregator: ActorRef) extends BaseEnrolmentActor {
+class ContentConsumptionActor @Inject() (
+ @Named("activity-aggregator-actor") activityAggregatorActor: ActorRef,
+ @Named("assessment-aggregator-actor") assessmentAggregatorActor: ActorRef
+) extends BaseEnrolmentActor {
private val mapper = new ObjectMapper
private var cassandraOperation = ServiceFactory.getInstance
private var pushTokafkaEnabled: Boolean = true //TODO: to be removed once all are in scala
@@ -193,12 +196,18 @@ class ContentConsumptionActor @Inject() (@Named("assessment-aggregator-actor") a
val existingContent = existingContents.getOrElse(inputContent.get("contentId").asInstanceOf[String], new java.util.HashMap[String, AnyRef])
CassandraUtil.changeCassandraColumnMapping(processContentConsumption(inputContent, existingContent, userId))
})
- // First push the event to kafka and then update cassandra user_content_consumption table
- pushInstructionEvent(requestContext, userId, batchId, courseId, contents.asJava)
cassandraOperation.batchInsertLogged(consumptionDBInfo.getKeySpace, consumptionDBInfo.getTableName, contents, requestContext)
val updateData = getLatestReadDetails(userId, batchId, contents)
cassandraOperation.updateRecordV2(enrolmentDBInfo.getKeySpace, enrolmentDBInfo.getTableName, updateData._1, updateData._2, true, requestContext)
contentIds.map(id => responseMessage.put(id,JsonKey.SUCCESS))
+ val useActivityAggregator = ProjectUtil.getConfigValue("enable_activity_aggregator_actor")
+ if (StringUtils.isNotBlank(useActivityAggregator) && useActivityAggregator.equalsIgnoreCase("true")) {
+ logger.info(requestContext, s"ContentConsumptionActor: Routing to ActivityAggregatorActor for userId: $userId, batchId: $batchId, courseId: $courseId")
+ callActivityAggregatorActor(requestContext, userId, batchId, courseId, entry._2.asJava)
+ } else {
+ logger.info(requestContext, s"ContentConsumptionActor: Using legacy Kafka workflow for userId: $userId, batchId: $batchId, courseId: $courseId")
+ pushInstructionEvent(requestContext, userId, batchId, courseId, contents.asJava)
+ }
} else {
logger.info(requestContext, "ContentConsumptionActor: addContent : User Id is invalid : " + userId)
@@ -242,7 +251,7 @@ class ContentConsumptionActor @Inject() (@Named("assessment-aggregator-actor") a
val attemptId = AssessmentAuditRecorder.record(assessment, questionUDTType, requestContext)
assessment.put(JsonKey.ATTEMPT_ID, attemptId)
val request = createAssessmentRequest(assessment, requestContext)
- assessmentAggregator ! request
+ assessmentAggregatorActor ! request
logger.info(requestContext, s"Assessment sent to aggregator (async): attemptId=$attemptId")
} else {
logger.info(requestContext, "Using Kafka-based assessment aggregation")
@@ -402,6 +411,20 @@ class ContentConsumptionActor @Inject() (@Named("assessment-aggregator-actor") a
InstructionEventGenerator.pushInstructionEvent(userId, topic, data)
}
+ @throws[Exception]
+ private def callActivityAggregatorActor(requestContext: RequestContext, userId: String, batchId: String, courseId: String, contents: java.util.List[java.util.Map[String, AnyRef]]): Unit = {
+ logger.info(requestContext, s"ContentConsumptionActor: Calling ActivityAggregatorActor for userId: $userId, batchId: $batchId, courseId: $courseId")
+
+ val activityRequest = new Request()
+ activityRequest.setOperation("updateActivityAggregates")
+ activityRequest.setRequestContext(requestContext)
+ activityRequest.put(JsonKey.USER_ID, userId)
+ activityRequest.put(JsonKey.BATCH_ID, batchId)
+ activityRequest.put(JsonKey.COURSE_ID, courseId)
+ activityRequest.put(JsonKey.CONTENTS, contents)
+ activityAggregatorActor ! activityRequest
+ }
+
def getConsumption(request: Request): Unit = {
val userId = request.get(JsonKey.USER_ID).asInstanceOf[String]
val batchId = request.get(JsonKey.BATCH_ID).asInstanceOf[String]
diff --git a/course-mw/enrolment-actor/src/test/scala/org/sunbird/enrolments/CourseConsumptionActorTest.scala b/course-mw/enrolment-actor/src/test/scala/org/sunbird/enrolments/CourseConsumptionActorTest.scala
index e8dde089d..825ce2f43 100644
--- a/course-mw/enrolment-actor/src/test/scala/org/sunbird/enrolments/CourseConsumptionActorTest.scala
+++ b/course-mw/enrolment-actor/src/test/scala/org/sunbird/enrolments/CourseConsumptionActorTest.scala
@@ -22,8 +22,12 @@ import scala.concurrent.duration.FiniteDuration
class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory {
implicit val ec: ExecutionContext = ExecutionContext.global
val system = ActorSystem.create("system")
-
- def createMockAssessmentAggregator() = TestProbe()(system).ref
+ val mockActivityAggregatorActor = system.actorOf(Props(new org.apache.pekko.actor.Actor {
+ def receive = { case _ => }
+ }))
+ val mockAssessmentAggregatorActor = system.actorOf(Props(new org.apache.pekko.actor.Actor {
+ def receive = { case _ => }
+ }))
"get Consumption" should "return success on not giving contentIds" in {
val cassandraOperation = mock[CassandraOperation]
@@ -42,8 +46,8 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
put("contentId", "do_789")
}})
}})
- ((requestContext: RequestContext, keyspace: _root_.scala.Predef.String, table: _root_.scala.Predef.String, filters: _root_.java.util.Map[_root_.scala.Predef.String, AnyRef], fields: _root_.java.util.List[_root_.scala.Predef.String]) => cassandraOperation.getRecords(keyspace, table, filters, fields, requestContext)).expects(*,*,*,*,*).returns(response)
- val result = callActor(getStateReadRequest(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false)))
+ ((keyspace: _root_.scala.Predef.String, table: _root_.scala.Predef.String, filters: _root_.java.util.Map[_root_.scala.Predef.String, AnyRef], fields: _root_.java.util.List[_root_.scala.Predef.String], requestContext: RequestContext) => cassandraOperation.getRecords(keyspace, table, filters, fields, requestContext)).expects(*,*,*,*,*).returns(response)
+ val result = callActor(getStateReadRequest(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false)))
assert(null!= result)
}
@@ -51,8 +55,8 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
val cassandraOperation = mock[CassandraOperation]
val response = new Response()
response.put("response", new java.util.ArrayList[java.util.Map[String, AnyRef]])
- ((requestContext: RequestContext, keyspace: _root_.scala.Predef.String, table: _root_.scala.Predef.String, filters: _root_.java.util.Map[_root_.scala.Predef.String, AnyRef], fields: _root_.java.util.List[_root_.scala.Predef.String]) => cassandraOperation.getRecords(keyspace, table, filters, fields, requestContext)).expects(*,*,*,*,*).returns(response)
- val result = callActor(getStateReadRequest(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false)))
+ ((keyspace: _root_.scala.Predef.String, table: _root_.scala.Predef.String, filters: _root_.java.util.Map[_root_.scala.Predef.String, AnyRef], fields: _root_.java.util.List[_root_.scala.Predef.String], requestContext: RequestContext) => cassandraOperation.getRecords(keyspace, table, filters, fields, requestContext)).expects(*,*,*,*,*).returns(response)
+ val result = callActor(getStateReadRequest(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false)))
println("RRRR ="+result.getResult)
assert(null!= result)
}
@@ -80,7 +84,7 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
(cassandraOperation.getRecords(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.List[String], _:RequestContext)).expects(*,*,*,*,*).returns(response)
(cassandraOperation.batchInsertLogged(_: String, _: String, _: java.util.List[java.util.Map[String, AnyRef]], _:RequestContext)).expects(*,*,*,*)
(cassandraOperation.updateRecordV2(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.Map[String, AnyRef], _: Boolean, _:RequestContext)).expects("sunbird_courses", "user_enrolments",*,*,true,*)
- val result = callActor(getStateUpdateRequest(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
+ val result = callActor(getStateUpdateRequest(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
assert(null!= result)
}
@@ -103,7 +107,7 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
}})
}})
(cassandraOperation.getRecords(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.List[String], _:RequestContext)).expects(*,*,*,*,*).returns(response)
- val result = callActor(getEnrolmentSyncRequest(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
+ val result = callActor(getEnrolmentSyncRequest(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
}
@@ -113,7 +117,7 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
val response = new Response()
response.put("response", new java.util.ArrayList[java.util.Map[String, AnyRef]]())
(cassandraOperation.getRecords(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.List[String], _:RequestContext)).expects(*,*,*,*,*).returns(response)
- val result = callActorForFailure(getEnrolmentSyncRequest(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
+ val result = callActorForFailure(getEnrolmentSyncRequest(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
assert(null!= result)
assert(ResponseCode.CLIENT_ERROR.getResponseCode == result.getResponseCode)
}
@@ -127,7 +131,7 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
(cassandraOperation.getRecords(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.List[String], _:RequestContext)).expects(*,*,*,*,*).returns(response)
(cassandraOperation.batchInsertLogged(_: String, _: String, _: java.util.List[java.util.Map[String, AnyRef]], _:RequestContext)).expects(*,*,*,*)
(cassandraOperation.updateRecordV2(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.Map[String, AnyRef], _: Boolean, _:RequestContext)).expects("sunbird_courses", "user_enrolments",*,*,true,*)
- val result = callActorForFailure(getAssementUpdateRequest(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
+ val result = callActorForFailure(getAssementUpdateRequest(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
assert(result.getResponseCode == ResponseCode.CLIENT_ERROR.getResponseCode)
}
@@ -247,9 +251,9 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
put("contentId", "do_789")
}})
}})
- (cassandraOperation.getRecords(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.List[String], _:RequestContext)).expects(*,*,*,*,*).returns(response)
+ ((keyspace: _root_.scala.Predef.String, table: _root_.scala.Predef.String, filters: _root_.java.util.Map[_root_.scala.Predef.String, AnyRef], fields: _root_.java.util.List[_root_.scala.Predef.String], requestContext: RequestContext) => cassandraOperation.getRecords(keyspace, table, filters, fields, requestContext)).expects(*, *, *, *, *).returns(response)
(cassandraOperation.getRecordsWithLimit(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.List[String], _: Integer, _: RequestContext)).expects(*, *, *, *, *, *).returns(response).anyNumberOfTimes()
- val result = callActor(getStateReadRequestWithFields(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false)))
+ val result = callActor(getStateReadRequestWithFields(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false)))
println("result : " + result)
assert(null!= result)
}
@@ -287,8 +291,8 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
}
})
- ((requestContext: RequestContext, keyspace: _root_.scala.Predef.String, table: _root_.scala.Predef.String, filters: _root_.java.util.Map[_root_.scala.Predef.String, AnyRef], fields: _root_.java.util.List[_root_.scala.Predef.String]) => cassandraOperation.getRecords(keyspace, table, filters, fields, requestContext)).expects(*, *, *, *, *).returns(response)
- val result = callActor(getStateReadRequestWithProgressField(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false)))
+ ((keyspace: _root_.scala.Predef.String, table: _root_.scala.Predef.String, filters: _root_.java.util.Map[_root_.scala.Predef.String, AnyRef], fields: _root_.java.util.List[_root_.scala.Predef.String], requestContext: RequestContext) => cassandraOperation.getRecords(keyspace, table, filters, fields, requestContext)).expects(*, *, *, *, *).returns(response)
+ val result = callActor(getStateReadRequestWithProgressField(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false)))
result.getResult().get("response").toString.shouldEqual("[{progressDetails={key1=val1, key2=val2}, contentId=do_456, batchId=0123, courseId=do_123, collectionId=do_123, progressdetails={}}]")
assert(null != result)
@@ -338,7 +342,7 @@ class CourseConsumptionActorTest extends FlatSpec with Matchers with MockFactory
(cassandraOperation.getRecords(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.List[String], _:RequestContext)).expects(*, *, *, *, *).returns(response)
(cassandraOperation.batchInsertLogged(_: String, _: String, _: java.util.List[java.util.Map[String, AnyRef]], _:RequestContext)).expects(*, *, *, *)
(cassandraOperation.updateRecordV2(_: String, _: String, _: java.util.Map[String, AnyRef], _: java.util.Map[String, AnyRef], _: Boolean, _:RequestContext)).expects("sunbird_courses", "user_enrolments", *, *, true, *)
- val result = callActor(getStateUpdateRequestWithProgress(), Props(new ContentConsumptionActor(createMockAssessmentAggregator()).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
+ val result = callActor(getStateUpdateRequestWithProgress(), Props(new ContentConsumptionActor(mockActivityAggregatorActor, mockAssessmentAggregatorActor).setCassandraOperation(cassandraOperation, false).setEsService(esService)))
assert(null != result)
}
diff --git a/course-mw/sunbird-util/cache-utils/src/main/scala/org/sunbird/cache/util/RedisCacheUtil.scala b/course-mw/sunbird-util/cache-utils/src/main/scala/org/sunbird/cache/util/RedisCacheUtil.scala
index c5431c84c..7e60fe03b 100644
--- a/course-mw/sunbird-util/cache-utils/src/main/scala/org/sunbird/cache/util/RedisCacheUtil.scala
+++ b/course-mw/sunbird-util/cache-utils/src/main/scala/org/sunbird/cache/util/RedisCacheUtil.scala
@@ -236,6 +236,25 @@ class RedisCacheUtil {
} finally returnConnection(jedis)
}
+ /**
+ * This method returns list data from cache for a given key from specific database
+ *
+ * @param key
+ * @param database
+ * @return
+ */
+ def getList(key: String, database: Int): List[String] = {
+ val jedis = getConnection(database)
+ try {
+ val data = jedis.smembers(key).asScala.toList
+ data
+ } catch {
+ case e: Exception =>
+ logger.error(null, "Exception Occurred While Fetching List Data from Redis Cache for Key : " + key + " from database: " + database + " | Exception is:", e)
+ throw e
+ } finally returnConnection(jedis)
+ }
+
/**
* This method returns list data from cache for a given key
*
diff --git a/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/ActorOperations.java b/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/ActorOperations.java
index 5dd589867..458c7e1f0 100644
--- a/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/ActorOperations.java
+++ b/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/ActorOperations.java
@@ -174,7 +174,8 @@ public enum ActorOperations {
ONDEMAND_START_SCHEDULER("onDemandStartScheduler"),
GROUP_ACTIVITY_AGGREGATES("groupActivityAggregates"),
SUBMIT_JOB_REQUEST("submitJobRequest"),
- LIST_JOB_REQUEST("listJobRequest");
+ LIST_JOB_REQUEST("listJobRequest"),
+ UPDATE_ACTIVITY_AGGREGATES("updateActivityAggregates");
private String value;
diff --git a/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/JsonKey.java b/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/JsonKey.java
index 481db5810..2badeb7f9 100644
--- a/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/JsonKey.java
+++ b/course-mw/sunbird-util/sunbird-platform-core/common-util/src/main/java/org/sunbird/common/models/util/JsonKey.java
@@ -78,6 +78,8 @@ public final class JsonKey {
public static final String CONTENTS = "contents";
public static final String CONTEXT = "context";
public static final String CORRELATED_OBJECTS = "correlatedObjects";
+ public static final String COMPLETED_COUNT = "completedcount";
+ public static final String VIEW_COUNT = "viewcount";
public static final String COUNT = "count";
public static final String COUNTRY = "country";
public static final String COUNTRY_CODE = "countryCode";
diff --git a/pom.xml b/pom.xml
index fb598624a..93d87c8fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -169,6 +169,7 @@
course-mw
service
assessment-aggregator
+ activity-aggregator
@@ -184,6 +185,7 @@
core
course-mw
service
+ activity-aggregator
diff --git a/service/app/controllers/activityaggregate/ActivityAggregateController.java b/service/app/controllers/activityaggregate/ActivityAggregateController.java
new file mode 100644
index 000000000..7ce8ac16c
--- /dev/null
+++ b/service/app/controllers/activityaggregate/ActivityAggregateController.java
@@ -0,0 +1,46 @@
+package controllers.activityaggregate;
+
+import controllers.BaseController;
+import controllers.activityaggregate.validator.ActivityAggregateRequestValidator;
+import org.apache.pekko.actor.ActorRef;
+import org.sunbird.common.models.util.ActorOperations;
+import org.sunbird.common.request.Request;
+import play.mvc.Http;
+import play.mvc.Result;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+public class ActivityAggregateController extends BaseController {
+
+ private final ActorRef activityAggregatorActor;
+ private final ActivityAggregateRequestValidator validator = new ActivityAggregateRequestValidator();
+
+ @Inject
+ public ActivityAggregateController(
+ @Named("activity-aggregator-actor") ActorRef activityAggregatorActor) {
+ this.activityAggregatorActor = activityAggregatorActor;
+ }
+
+ public CompletionStage updateActivityAggregates(Http.Request httpRequest) {
+ try {
+ Request request = createAndInitRequest(
+ ActorOperations.UPDATE_ACTIVITY_AGGREGATES.getValue(),
+ httpRequest.body().asJson(),
+ httpRequest);
+
+ validator.validateUpdateActivityAggregates(request);
+
+ return actorResponseHandler(
+ activityAggregatorActor,
+ request,
+ timeout,
+ null,
+ httpRequest);
+ } catch (Exception e) {
+ return CompletableFuture.completedFuture(createCommonExceptionResponse(e, httpRequest));
+ }
+ }
+}
diff --git a/service/app/controllers/activityaggregate/validator/ActivityAggregateRequestValidator.java b/service/app/controllers/activityaggregate/validator/ActivityAggregateRequestValidator.java
new file mode 100644
index 000000000..7a2243eec
--- /dev/null
+++ b/service/app/controllers/activityaggregate/validator/ActivityAggregateRequestValidator.java
@@ -0,0 +1,91 @@
+package controllers.activityaggregate.validator;
+
+import org.sunbird.common.exception.ProjectCommonException;
+import org.sunbird.common.models.util.JsonKey;
+import org.sunbird.common.responsecode.ResponseCode;
+import org.sunbird.common.request.Request;
+
+import java.util.List;
+import java.util.Map;
+
+public class ActivityAggregateRequestValidator {
+
+ public void validateUpdateActivityAggregates(Request request) {
+ if (request.getRequest() == null || request.getRequest().isEmpty()) {
+ throw new ProjectCommonException(
+ ResponseCode.mandatoryParamsMissing.getErrorCode(),
+ ResponseCode.mandatoryParamsMissing.getErrorMessage(),
+ ResponseCode.CLIENT_ERROR.getResponseCode()
+ );
+ }
+
+ String userId = (String) request.get(JsonKey.USER_ID);
+ if (userId == null || userId.trim().isEmpty()) {
+ throw new ProjectCommonException(
+ ResponseCode.mandatoryParamsMissing.getErrorCode(),
+ "userId is mandatory",
+ ResponseCode.CLIENT_ERROR.getResponseCode()
+ );
+ }
+
+ String courseId = (String) request.get(JsonKey.COURSE_ID);
+ if (courseId == null || courseId.trim().isEmpty()) {
+ throw new ProjectCommonException(
+ ResponseCode.mandatoryParamsMissing.getErrorCode(),
+ "courseId is mandatory",
+ ResponseCode.CLIENT_ERROR.getResponseCode()
+ );
+ }
+
+ String batchId = (String) request.get(JsonKey.BATCH_ID);
+ if (batchId == null || batchId.trim().isEmpty()) {
+ throw new ProjectCommonException(
+ ResponseCode.mandatoryParamsMissing.getErrorCode(),
+ "batchId is mandatory",
+ ResponseCode.CLIENT_ERROR.getResponseCode()
+ );
+ }
+
+ List