Skip to content

Commit 3483b91

Browse files
committed
optimize data collection by threading architecture
introduce threading architecture: main thread: handles TDLib updates (auth, message) background thread: classify spam message so it could handle message and classify message concurrently without blocking message
1 parent b04a117 commit 3483b91

File tree

2 files changed

+158
-129
lines changed

2 files changed

+158
-129
lines changed

app/services/tdlib_client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ module TDJson
2424
end
2525
end
2626

27-
class TDClient
27+
class TdlibClient
2828
def initialize
2929
@client = TDJson.td_json_client_create
3030
@request_queue = {}

lib/tasks/telegram_data_collector.rake

Lines changed: 157 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -22,69 +22,31 @@ namespace :telegram do
2222
next
2323
end
2424

25-
client = TDClient.new
25+
@message_queue = Queue.new
26+
@processed_hashes = Set.new
27+
# Load existing message hash into memory
28+
TrainedMessage.pluck(:message_hash).compact.each { |hash| @processed_hashes.add(hash) }
29+
30+
# Start background processor thread
31+
client = TdlibClient.new
32+
processor_thread = Thread.new { process_message_queue(client) }
2633

2734
# Set log level
2835
client.send({ "@type" => "setLogVerbosityLevel", "new_verbosity_level" => 2 })
2936

3037
begin
38+
# Main message loop
3139
state = nil
3240
loop do
3341
update = client.receive(1.0)
3442
next unless update
3543

3644
case update["@type"]
3745
when "updateAuthorizationState"
38-
new_state = update["authorization_state"]["@type"]
39-
puts "Authorization state: #{new_state}"
40-
state = new_state
41-
42-
case new_state
43-
when "authorizationStateWaitTdlibParameters"
44-
puts "Setting TDLib parameters..."
45-
params = {
46-
"@type" => "setTdlibParameters",
47-
"api_id" => Rails.application.credentials.dig(:tdlib_app_id),
48-
"api_hash" => Rails.application.credentials.dig(:tdlib_app_hash_id),
49-
"database_directory" => "./tdlib-db",
50-
"files_directory" => "./tdlib-files",
51-
"use_file_database" => true,
52-
"use_chat_info_database" => true,
53-
"use_message_database" => true,
54-
"use_secret_chats" => true,
55-
"system_language_code" => "en",
56-
"device_model" => "Ruby TD Client",
57-
"application_version" => "1.0"
58-
}
59-
client.send(params)
60-
61-
when "authorizationStateWaitPhoneNumber"
62-
puts "Please enter your phone number (e.g. +15551234567):"
63-
phone = STDIN.gets.strip
64-
client.send({
65-
"@type" => "setAuthenticationPhoneNumber",
66-
"phone_number" => phone
67-
})
68-
69-
when "authorizationStateWaitCode"
70-
puts "Please enter the code from Telegram/SMS:"
71-
code = STDIN.gets.strip
72-
client.send({
73-
"@type" => "checkAuthenticationCode",
74-
"code" => code
75-
})
76-
77-
when "authorizationStateReady"
78-
puts "Authorization successful! Listening for messages..."
79-
80-
when "authorizationStateClosed"
81-
puts "Authorization closed. Exiting."
82-
break
83-
else
84-
puts "Unhandled authorization state: #{new_state}"
85-
end
46+
state = handle_update_authorization_state(update, client)
47+
break if state == "authorizationStateClosed"
8648
when "updateNewMessage"
87-
handleUpdateNewMessage(update, client)
49+
handle_message_update(update)
8850
else
8951
# ignore other updates
9052
end
@@ -96,100 +58,167 @@ namespace :telegram do
9658
client&.close
9759
end
9860
end
99-
end
100-
101-
def process_message(message_content, group_id, group_name, user_id, user_name)
102-
# Memoize the classifier to avoid creating it twice
103-
classifier = SpamClassifierService.new(group_id, group_name)
10461

105-
message_hash = Digest::SHA256.hexdigest(message_content.to_s)
106-
existing_message = TrainedMessage.find_by(message_hash: message_hash)
107-
108-
if existing_message
109-
puts "Message already exists, skipping"
110-
return
62+
def handle_update_authorization_state(update, client)
63+
new_state = update["authorization_state"]["@type"]
64+
puts "Authorization state: #{new_state}"
65+
state = new_state
66+
67+
case new_state
68+
when "authorizationStateWaitTdlibParameters"
69+
puts "Setting TDLib parameters..."
70+
params = {
71+
"@type" => "setTdlibParameters",
72+
"api_id" => Rails.application.credentials.dig(:tdlib_app_id),
73+
"api_hash" => Rails.application.credentials.dig(:tdlib_app_hash_id),
74+
"database_directory" => "./tdlib-db",
75+
"files_directory" => "./tdlib-files",
76+
"use_file_database" => true,
77+
"use_chat_info_database" => true,
78+
"use_message_database" => true,
79+
"use_secret_chats" => true,
80+
"system_language_code" => "en",
81+
"device_model" => "Ruby TD Client",
82+
"application_version" => "1.0"
83+
}
84+
client.send(params)
85+
86+
when "authorizationStateWaitPhoneNumber"
87+
puts "Please enter your phone number (e.g. +15551234567):"
88+
phone = STDIN.gets.strip
89+
client.send({
90+
"@type" => "setAuthenticationPhoneNumber",
91+
"phone_number" => phone
92+
})
93+
94+
when "authorizationStateWaitCode"
95+
puts "Please enter the code from Telegram/SMS:"
96+
code = STDIN.gets.strip
97+
client.send({
98+
"@type" => "checkAuthenticationCode",
99+
"code" => code
100+
})
101+
102+
when "authorizationStateReady"
103+
puts "Authorization successful! Listening for messages..."
104+
105+
when "authorizationStateClosed"
106+
puts "Authorization closed. Exiting."
107+
else
108+
puts "Unhandled authorization state: #{new_state}"
109+
end
110+
new_state
111111
end
112112

113-
# Process message content
114-
train_message_if_needed(
115-
classifier,
116-
message_content,
117-
:message_content,
118-
group_id,
119-
group_name,
120-
user_id,
121-
user_name
122-
)
123-
124-
# Process user name
125-
train_message_if_needed(
126-
classifier,
127-
user_name,
128-
:user_name,
129-
group_id,
130-
group_name,
131-
user_id,
132-
user_name
133-
)
134-
end
113+
def handle_message_update(update)
114+
message = update["message"] || update
115+
content = message["content"] || update["new_content"]
135116

136-
def train_message_if_needed(classifier, text_to_classify, training_target, group_id, group_name, user_id, user_name)
137-
is_spam, _, _ = classifier.classify(text_to_classify)
117+
return unless content["@type"] == "messageText"
138118

139-
puts "#{training_target} classified result: #{is_spam ? 'maybe_spam' : 'maybe_ham'}"
119+
message_content = content["text"]["text"]
120+
message_hash = Digest::SHA256.hexdigest(message_content)
140121

141-
spam_count = TrainedMessage.where(message_type: [ :spam, :maybe_spam ], training_target: training_target).count
142-
ham_count = TrainedMessage.where(message_type: [ :ham, :maybe_ham ], training_target: training_target).count
122+
# Fast in-memory duplicate check
123+
return if @processed_hashes.include?(message_hash)
143124

144-
# Logic to balance the dataset
145-
should_create = (spam_count > ham_count && !is_spam) || (spam_count <= ham_count && is_spam)
125+
# Add to processing queue(non-blocking)
126+
@message_queue.push({
127+
content: message_content,
128+
update: update
129+
})
130+
end
146131

147-
if should_create
148-
TrainedMessage.create!(
149-
group_id: group_id,
150-
group_name: group_name,
151-
message: text_to_classify,
152-
message_type: is_spam ? :maybe_spam : :maybe_ham,
153-
sender_user_name: user_name || "Telegram collector",
154-
training_target: training_target,
155-
sender_chat_id: user_id
156-
)
132+
def process_message_queue(client)
133+
loop do
134+
begin
135+
message = @message_queue.pop(timeout: 1.0)
136+
if message
137+
update = message[:update]
138+
handle_update_new_message(update, client)
139+
end
140+
rescue ThreadError => e
141+
puts "ThreadError: #{e}"
142+
end
143+
end
157144
end
158-
end
159145

160-
def handleUpdateNewMessage(update, client)
161-
message = update["message"]
162-
chat_id = message["chat_id"]
163-
sender_id = message["sender_id"]["user_id"] rescue nil
164-
content = message["content"]
165-
user_name = "Unknown User"
166-
message_content = content["text"]["text"] if content["@type"] == "messageText"
167-
168-
# Send asynchronous requests for chat and user data
169-
client.send_async({ "@type" => "getChat", "chat_id" => chat_id }) do |chat_update|
170-
chat = chat_update
171-
group_name = chat["title"] || "Unknown Group"
172-
173-
if sender_id
174-
client.send_async({ "@type" => "getUser", "user_id" => sender_id }) do |user_update|
175-
user = user_update
176-
user_name = user["first_name"]
177-
user_name += " " + user["last_name"] if user["last_name"]
178-
user_name = user["username"] if user["username"]
146+
def process_message(message_content, group_id, group_name, user_id, user_name)
147+
# Process message content
148+
train_message_if_needed(message_content, :message_content, group_id, group_name, user_id, user_name)
149+
# Process user name
150+
train_message_if_needed(user_name, :user_name, group_id, group_name, user_id, user_name)
151+
end
152+
153+
def train_message_if_needed(text_to_classify, training_target, group_id, group_name, user_id, user_name)
154+
text_hash = Digest::SHA256.hexdigest(text_to_classify.to_s)
155+
existing_message = TrainedMessage.find_by(message_hash: text_hash)
156+
157+
if existing_message
158+
puts "Trained message already exists, skipping"
159+
return
160+
end
161+
162+
# Memoize the classifier to avoid creating it twice
163+
classifier = SpamClassifierService.new(group_id, group_name)
164+
is_spam, _, _ = classifier.classify(text_to_classify)
165+
166+
puts "#{training_target} classified result: #{is_spam ? 'maybe_spam' : 'maybe_ham'}"
167+
168+
spam_count = TrainedMessage.where(message_type: [ :spam, :maybe_spam ], training_target: training_target).count
169+
ham_count = TrainedMessage.where(message_type: [ :ham, :maybe_ham ], training_target: training_target).count
170+
171+
# Logic to balance the dataset
172+
should_create = (spam_count > ham_count && !is_spam) || (spam_count <= ham_count && is_spam)
173+
174+
if should_create
175+
@processed_hashes.add(text_hash)
176+
TrainedMessage.create!(
177+
group_id: group_id,
178+
group_name: group_name,
179+
message: text_to_classify,
180+
message_type: is_spam ? :maybe_spam : :maybe_ham,
181+
sender_user_name: user_name || "Telegram collector",
182+
training_target: training_target,
183+
sender_chat_id: user_id
184+
)
185+
end
186+
end
179187

188+
def handle_update_new_message(update, client)
189+
message = update["message"]
190+
chat_id = message["chat_id"]
191+
sender_id = message["sender_id"]["user_id"] rescue nil
192+
content = message["content"]
193+
user_name = "Unknown User"
194+
message_content = content["text"]["text"] if content["@type"] == "messageText"
195+
196+
# Send asynchronous requests for chat and user data
197+
client.send_async({ "@type" => "getChat", "chat_id" => chat_id }) do |chat_update|
198+
chat = chat_update
199+
group_name = chat["title"] || "Unknown Group"
200+
201+
if sender_id
202+
client.send_async({ "@type" => "getUser", "user_id" => sender_id }) do |user_update|
203+
user = user_update
204+
user_name = user["first_name"]
205+
user_name += " " + user["last_name"] if user["last_name"]
206+
user_name = user["username"] if user["username"]
207+
208+
unless message_content.blank?
209+
process_message(message_content, chat_id, group_name, sender_id, user_name)
210+
puts "Group(#{chat_id}): #{group_name} | User: #{user_name} | Text: #{message_content}"
211+
puts "----------------------"
212+
end
213+
end
214+
else
215+
# Handle cases where there's no sender_id (e.g., channel posts)
180216
unless message_content.blank?
181217
process_message(message_content, chat_id, group_name, sender_id, user_name)
182-
puts "Group(#{chat_id}): #{group_name} | User: #{user_name} | Text: #{message_content}"
218+
puts "Group: #{group_name} | User: #{user_name} | Text: #{message_content}"
183219
puts "----------------------"
184220
end
185221
end
186-
else
187-
# Handle cases where there's no sender_id (e.g., channel posts)
188-
unless message_content.blank?
189-
process_message(message_content, chat_id, group_name, sender_id, user_name)
190-
puts "Group: #{group_name} | User: #{user_name} | Text: #{message_content}"
191-
puts "----------------------"
192-
end
193222
end
194223
end
195224
end

0 commit comments

Comments
 (0)