Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions tycho-indexer/src/services/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ impl WsActor {
let handle = ctx.add_stream(stream);
actor.subscriptions.insert(subscription_id, handle);
actor.compression_enabled.insert(subscription_id, compression);
debug!("Added subscription to hashmap");
gauge!("websocket_extractor_subscriptions_active", "subscription_id" => subscription_id.to_string()).increment(1);
counter!(
"websocket_extractor_subscriptions_metadata",
Expand Down Expand Up @@ -451,12 +450,11 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor {
self.heartbeat = Instant::now();
}
Ok(ws::Message::Text(text)) => {
debug!(actor_id = %self.id, text = %text, "Websocket text message received");
trace!(actor_id = %self.id, text = %text, "Websocket text message received");

// Try to deserialize the message to a Message enum
match serde_json::from_str::<Command>(&text) {
Ok(message) => {
debug!(actor_id = %self.id, "Parsed command successfully");
// Handle the message based on its variant
match message {
Command::Subscribe {
Expand All @@ -465,18 +463,18 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor {
compression,
partial_blocks,
} => {
debug!(actor_id = %self.id, %extractor_id, "Message handler: Processing subscribe request");
trace!(actor_id = %self.id, %extractor_id, "Message handler: Processing subscribe request");
self.subscribe(
ctx,
&extractor_id.clone().into(),
include_state,
compression,
partial_blocks,
);
debug!(actor_id = %self.id, %extractor_id, "Message handler: Subscribe method completed");
trace!(actor_id = %self.id, %extractor_id, "Message handler: Subscribe method completed");
}
Command::Unsubscribe { subscription_id } => {
debug!(%subscription_id, "Unsubscribing from subscription");
trace!(%subscription_id, "Unsubscribing from subscription");
self.unsubscribe(ctx, subscription_id);
}
}
Expand All @@ -495,11 +493,11 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor {
}
}
Ok(ws::Message::Binary(bin)) => {
debug!("Websocket binary message received");
trace!("Websocket binary message received");
ctx.binary(bin)
}
Ok(ws::Message::Close(reason)) => {
debug!(reason = ?reason, "Websocket close message received");
trace!(reason = ?reason, "Websocket close message received");
ctx.close(reason);
ctx.stop()
}
Expand Down
Loading