Skip to content

Commit 066d60a

Browse files
committed
Implement publishing progress to kafka
1 parent 816dd6e commit 066d60a

File tree

3 files changed

+43
-9
lines changed

3 files changed

+43
-9
lines changed

app/background_tasks/kafka_batch_update_points_task.rb

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,50 @@
11
# frozen_string_literal: true
22

3-
require "kafka"
4-
3+
require 'kafka'
54

65
class KafkaBatchUpdatePointsTask
76
def initialize
87
seed_brokers = SiteSetting.value('kafka_seed_brokers')
9-
@kafka = seed_brokers && Kafka.new(seed_brokers, client_id: "tmc-server")
8+
@service_id = SiteSetting.value('kafka_service_id')
9+
@kafka = seed_brokers && Kafka.new(seed_brokers, client_id: 'tmc-server')
1010
end
1111

1212
def run
13-
return unless @kafka
13+
return unless @kafka && @service_id
1414
producer = @kafka.producer
15-
task = KafkaBatchUpdatePoints.first
16-
course = task.course
17-
puts "Batch publishing points for course #{course.name}."
18-
15+
KafkaBatchUpdatePoints.all.each do |task|
16+
course = task.course
17+
Rails.logger.info("Batch publishing points for course #{course.name} with moocfi id: #{course.moocfi_id}.")
18+
if !course.moocfi_id
19+
Rails.logger.error 'Cannot publish points because moocfi id is not specified'
20+
next
21+
end
22+
points_per_user = AwardedPoint.count_per_user_in_course_with_sheet(course, course.gdocs_sheets)
23+
Rails.logger.info("Found points for #{parts.keys.length} users")
24+
available_points = AvailablePoint.course_sheet_points(course, parts)
25+
points_per_user.each do |username, points_by_group|
26+
user = User.find_by(login: username)
27+
Rails.logger.info("Publishing points for user #{user.id}")
28+
progress = points_by_group.map do |group_name, awareded_points|
29+
max_points = available_points[group_name] || 0
30+
{
31+
group: group_name,
32+
n_points: awareded_points,
33+
max_points: max_points,
34+
progress: (awareded_points / max_points.to_f).floor(2)
35+
}
36+
end
37+
message = {
38+
timestamp: Time.zone.now.iso8601,
39+
user_id: user.id,
40+
course_id: course.moocfi_id,
41+
service_id: @service_id,
42+
progress: progress,
43+
message_format_version: 1
44+
}
45+
producer.deliver_message(message, topic: 'user-course-progress')
46+
end
47+
end
1948
end
2049

2150
def wait_delay
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
class AddMoocfiIdToCourse < ActiveRecord::Migration
2+
def change
3+
add_column :courses, :moocfi_id, :string
4+
end
5+
end

db/schema.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#
1212
# It's strongly recommended that you check this file into your version control system.
1313

14-
ActiveRecord::Schema.define(version: 20200112230122) do
14+
ActiveRecord::Schema.define(version: 20200112230124) do
1515
# These are extensions that must be enabled in order to support this database
1616
enable_extension "plpgsql"
1717

0 commit comments

Comments
 (0)