Skip to content

Commit 5602959

Browse files
committed
fix: add message_id to reactions, retry persistence race, validate SSE fields
Address PR review feedback: - Thread message_id through OutboundResponse::Reaction and ApiEvent::Reaction so the frontend can target the exact message instead of scanning backwards (coderabbitai) - Make log_reaction async with a retry loop (5 attempts, 25ms backoff) to handle the race where the fire-and-forget user message INSERT hasn't landed yet (tembo) - Guard the frontend SSE reaction handler against missing channel_id/emoji fields before mutating state (tembo)
1 parent 4feb063 commit 5602959

File tree

13 files changed

+74
-41
lines changed

13 files changed

+74
-41
lines changed

interface/src/api/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export interface ReactionEvent {
114114
agent_id: string;
115115
channel_id: string;
116116
emoji: string;
117+
message_id: string | null;
117118
}
118119

119120
export type ApiEvent =

interface/src/hooks/useChannelLiveState.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -570,20 +570,24 @@ export function useChannelLiveState(channels: ChannelInfo[]) {
570570
}, []);
571571

572572
const handleReaction = useCallback((data: unknown) => {
573-
const event = data as ReactionEvent;
573+
const event = data as Partial<ReactionEvent>;
574+
if (typeof event.channel_id !== "string" || typeof event.emoji !== "string") return;
575+
const channelId = event.channel_id;
576+
const emoji = event.emoji;
577+
const messageId = event.message_id ?? null;
574578
setLiveStates((prev) => {
575-
const state = prev[event.channel_id];
579+
const state = prev[channelId];
576580
if (!state) return prev;
577-
// Find the most recent user message to attach the reaction to
578581
const timeline = [...state.timeline];
582+
// Target by message_id when available, fall back to latest user message
579583
for (let i = timeline.length - 1; i >= 0; i--) {
580584
const item = timeline[i];
581-
if (item.type === "message" && item.role === "user") {
582-
timeline[i] = { ...item, reaction: event.emoji };
583-
break;
584-
}
585+
if (item.type !== "message" || item.role !== "user") continue;
586+
if (messageId && item.id !== messageId) continue;
587+
timeline[i] = { ...item, reaction: emoji };
588+
break;
585589
}
586-
return { ...prev, [event.channel_id]: { ...state, timeline } };
590+
return { ...prev, [channelId]: { ...state, timeline } };
587591
});
588592
}, []);
589593

src/api/state.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,11 +213,13 @@ pub enum ApiEvent {
213213
/// "created", "updated", or "deleted".
214214
action: String,
215215
},
216-
/// An emoji reaction added to the latest message.
216+
/// An emoji reaction added to a message.
217217
Reaction {
218218
agent_id: String,
219219
channel_id: String,
220220
emoji: String,
221+
/// The conversation_messages ID the reaction was attached to.
222+
message_id: Option<String>,
221223
},
222224
}
223225

src/conversation/history.rs

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -104,29 +104,51 @@ impl ConversationLogger {
104104
});
105105
}
106106

107-
/// Set a reaction emoji on the most recent user message in a channel. Fire-and-forget.
108-
pub fn log_reaction(&self, channel_id: &ChannelId, emoji: &str) {
109-
let pool = self.pool.clone();
110-
let channel_id = channel_id.to_string();
111-
let emoji = emoji.to_string();
112-
113-
tokio::spawn(async move {
114-
if let Err(error) = sqlx::query(
115-
"UPDATE conversation_messages SET reaction = ? \
116-
WHERE id = ( \
117-
SELECT id FROM conversation_messages \
118-
WHERE channel_id = ? AND role = 'user' \
119-
ORDER BY created_at DESC LIMIT 1 \
120-
)",
107+
/// Set a reaction emoji on the most recent user message in a channel.
108+
///
109+
/// Returns the message ID the reaction was attached to, or `None` if no
110+
/// matching user message was found. Retries a few times to handle the race
111+
/// where the user message INSERT (fire-and-forget) hasn't landed yet.
112+
pub async fn log_reaction(&self, channel_id: &ChannelId, emoji: &str) -> Option<String> {
113+
let channel_id_str = channel_id.to_string();
114+
115+
for attempt in 0..5u32 {
116+
// Find the target message first so we can return its ID.
117+
let target_id: Option<String> = sqlx::query_scalar(
118+
"SELECT id FROM conversation_messages \
119+
WHERE channel_id = ? AND role = 'user' \
120+
ORDER BY created_at DESC LIMIT 1",
121121
)
122-
.bind(&emoji)
123-
.bind(&channel_id)
124-
.execute(&pool)
122+
.bind(&channel_id_str)
123+
.fetch_optional(&self.pool)
125124
.await
125+
.ok()
126+
.flatten();
127+
128+
let Some(message_id) = target_id else {
129+
if attempt < 4 {
130+
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
131+
continue;
132+
}
133+
tracing::warn!("no user message found for reaction");
134+
return None;
135+
};
136+
137+
match sqlx::query("UPDATE conversation_messages SET reaction = ? WHERE id = ?")
138+
.bind(emoji)
139+
.bind(&message_id)
140+
.execute(&self.pool)
141+
.await
126142
{
127-
tracing::warn!(%error, "failed to persist reaction");
143+
Ok(_) => return Some(message_id),
144+
Err(error) => {
145+
tracing::warn!(%error, "failed to persist reaction");
146+
return None;
147+
}
128148
}
129-
});
149+
}
150+
151+
None
130152
}
131153

132154
/// Load recent messages for a channel (oldest first).

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,8 @@ pub enum OutboundResponse {
381381
caption: Option<String>,
382382
},
383383
/// Add a reaction emoji to the triggering message.
384-
Reaction(String),
384+
/// The optional message_id identifies the specific message being reacted to.
385+
Reaction(String, Option<String>),
385386
/// Remove a reaction emoji from the triggering message.
386387
/// No-op on platforms that don't support reaction removal.
387388
RemoveReaction(String),

src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,11 +972,12 @@ async fn run(
972972
is_typing: false,
973973
}).ok();
974974
}
975-
spacebot::OutboundResponse::Reaction(emoji) => {
975+
spacebot::OutboundResponse::Reaction(emoji, message_id) => {
976976
api_event_tx.send(spacebot::api::ApiEvent::Reaction {
977977
agent_id: sse_agent_id.clone(),
978978
channel_id: sse_channel_id.clone(),
979979
emoji: emoji.clone(),
980+
message_id: message_id.clone(),
980981
}).ok();
981982
}
982983
_ => {}

src/messaging/discord.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ impl Messaging for DiscordAdapter {
286286
.await
287287
.context("failed to send file attachment")?;
288288
}
289-
OutboundResponse::Reaction(emoji) => {
289+
OutboundResponse::Reaction(emoji, _) => {
290290
let message_id = message
291291
.metadata
292292
.get("discord_message_id")

src/messaging/email.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ impl Messaging for EmailAdapter {
423423
)
424424
.await?;
425425
}
426-
OutboundResponse::Reaction(_)
426+
OutboundResponse::Reaction(..)
427427
| OutboundResponse::RemoveReaction(_)
428428
| OutboundResponse::Status(_) => {}
429429
OutboundResponse::Ephemeral { text, .. } => {
@@ -506,7 +506,7 @@ impl Messaging for EmailAdapter {
506506
self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None)
507507
.await?;
508508
}
509-
OutboundResponse::Reaction(_)
509+
OutboundResponse::Reaction(..)
510510
| OutboundResponse::RemoveReaction(_)
511511
| OutboundResponse::Status(_)
512512
| OutboundResponse::StreamStart

src/messaging/slack.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,7 @@ impl Messaging for SlackAdapter {
933933
.context("failed to complete slack file upload")?;
934934
}
935935

936-
OutboundResponse::Reaction(emoji) => {
936+
OutboundResponse::Reaction(emoji, _) => {
937937
let ts =
938938
extract_message_ts(message).context("missing slack_message_ts for reaction")?;
939939
let req = SlackApiReactionsAddRequest::new(
@@ -1517,7 +1517,7 @@ fn variant_name(response: &OutboundResponse) -> &'static str {
15171517
OutboundResponse::Text(_) => "Text",
15181518
OutboundResponse::ThreadReply { .. } => "ThreadReply",
15191519
OutboundResponse::File { .. } => "File",
1520-
OutboundResponse::Reaction(_) => "Reaction",
1520+
OutboundResponse::Reaction(..) => "Reaction",
15211521
OutboundResponse::RemoveReaction(_) => "RemoveReaction",
15221522
OutboundResponse::Ephemeral { .. } => "Ephemeral",
15231523
OutboundResponse::RichMessage { .. } => "RichMessage",

src/messaging/telegram.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ impl Messaging for TelegramAdapter {
403403
}
404404
}
405405
}
406-
OutboundResponse::Reaction(emoji) => {
406+
OutboundResponse::Reaction(emoji, _) => {
407407
let message_id = self.extract_message_id(message)?;
408408

409409
let reaction = ReactionType::Emoji {

0 commit comments

Comments
 (0)