Skip to content

Commit c7218f2

Browse files
committed
Add thread lock/close, mention/reply notifications, and locked field propagation
- Add 'locked' column to Conversation model with migration - Add PUT /v1/conversation/{id}/lock endpoint for lock/unlock - Update can_speak() to block non-admin/owner messages in locked threads - Add locked field to get_threads() and get_conversations_with_detail() responses - Add ThreadResponse.locked field to Models.py - Parse <@userid> mentions and [uid:userId] reply targets in messages - Send targeted 'mention' and 'reply' WebSocket notifications to mentioned/replied users - Avoid double-notifying users who are both mentioned and replied to
1 parent d16a80b commit c7218f2

File tree

4 files changed

+149
-4
lines changed

4 files changed

+149
-4
lines changed

agixt/Conversations.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,7 @@ def get_conversations_with_detail(self):
10701070
"parent_id": (
10711071
str(conversation.parent_id) if conversation.parent_id else None
10721072
),
1073+
"locked": getattr(conversation, "locked", False) or False,
10731074
}
10741075

10751076
# Sort by updated_at descending (most recent first)
@@ -4063,8 +4064,9 @@ def can_speak(self, user_id: str) -> bool:
40634064
"""
40644065
Check if a user can speak (send messages) in this conversation.
40654066
Users with 'observer' participant role cannot speak.
4067+
Locked threads only allow owner/admin to speak.
40664068
Returns True if the user can speak, False if they are muted/observer.
4067-
Non-group conversations always allow speaking.
4069+
Non-group conversations always allow speaking (unless locked).
40684070
If user has no participant record, allow speaking (they may be the owner).
40694071
"""
40704072
conversation_id = self.get_conversation_id()
@@ -4077,7 +4079,26 @@ def can_speak(self, user_id: str) -> bool:
40774079
.filter(Conversation.id == conversation_id)
40784080
.first()
40794081
)
4080-
if not conversation or conversation.conversation_type != "group":
4082+
if not conversation:
4083+
return True
4084+
4085+
# Check if conversation is locked (closed thread)
4086+
if getattr(conversation, "locked", False):
4087+
# Only owners and admins can speak in locked conversations
4088+
participant = (
4089+
session.query(ConversationParticipant)
4090+
.filter(
4091+
ConversationParticipant.conversation_id == conversation_id,
4092+
ConversationParticipant.user_id == user_id,
4093+
ConversationParticipant.status == "active",
4094+
)
4095+
.first()
4096+
)
4097+
if not participant or participant.role not in ("owner", "admin"):
4098+
return False
4099+
return True
4100+
4101+
if conversation.conversation_type != "group":
40814102
return True
40824103
participant = (
40834104
session.query(ConversationParticipant)
@@ -4666,6 +4687,7 @@ def get_threads(self, conversation_id=None):
46664687
if last_msg_time and hasattr(last_msg_time, "isoformat")
46674688
else str(last_msg_time) if last_msg_time else None
46684689
),
4690+
"locked": getattr(thread, "locked", False) or False,
46694691
}
46704692
)
46714693

agixt/DB.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,6 +1626,9 @@ class Conversation(Base):
16261626
Boolean, nullable=False, default=False
16271627
) # If True, only explicitly invited users can join; if False, all company members auto-join
16281628
description = Column(Text, nullable=True, default=None) # Channel topic/description
1629+
locked = Column(
1630+
Boolean, nullable=False, default=False
1631+
) # If True, only admins/owners can send messages (used to close/lock threads)
16291632
created_at = Column(DateTime, server_default=func.now())
16301633
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
16311634
user_id = Column(
@@ -4173,6 +4176,7 @@ def migrate_conversation_table():
41734176
columns_to_add = [
41744177
("pin_order", "INTEGER"),
41754178
("category", "VARCHAR"),
4179+
("locked", "BOOLEAN DEFAULT 0" if DATABASE_TYPE == "sqlite" else "BOOLEAN DEFAULT FALSE"),
41764180
]
41774181

41784182
if DATABASE_TYPE == "sqlite":

agixt/Models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,7 @@ class ThreadResponse(BaseModel):
12461246
updated_at: Optional[str] = None
12471247
message_count: int = 0
12481248
last_message_at: Optional[str] = None
1249+
locked: bool = False
12491250

12501251

12511252
class ThreadListResponse(BaseModel):

agixt/endpoints/Conversation.py

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2959,9 +2959,11 @@ async def notify_conversation_participants_message_added(
29592959
role: str,
29602960
):
29612961
"""Notify ALL participants of a conversation when a new message is added.
2962-
This ensures DM recipients and group channel members get notifications."""
2962+
This ensures DM recipients and group channel members get notifications.
2963+
Also sends targeted 'mention' and 'reply' notifications to @mentioned and replied-to users."""
29632964
try:
29642965
from DB import get_session, ConversationParticipant
2966+
import re
29652967

29662968
preview = message[:100] + "..." if len(message) > 100 else message
29672969
notification_data = {
@@ -2977,6 +2979,22 @@ async def notify_conversation_participants_message_added(
29772979
},
29782980
}
29792981

2982+
# Parse @mentions: <@userId> format
2983+
mentioned_user_ids = set()
2984+
mention_pattern = re.compile(r"<@([0-9a-f-]{36})>")
2985+
for match in mention_pattern.finditer(message):
2986+
uid = match.group(1)
2987+
if uid != sender_user_id: # Don't notify sender about their own mentions
2988+
mentioned_user_ids.add(uid)
2989+
2990+
# Parse reply-to: [uid:userId] format
2991+
replied_to_user_ids = set()
2992+
uid_pattern = re.compile(r"\[uid:([0-9a-f-]{36})\]")
2993+
for match in uid_pattern.finditer(message):
2994+
uid = match.group(1)
2995+
if uid != sender_user_id: # Don't notify sender about replying to themselves
2996+
replied_to_user_ids.add(uid)
2997+
29802998
with get_session() as session:
29812999
participants = (
29823000
session.query(ConversationParticipant)
@@ -2989,11 +3007,46 @@ async def notify_conversation_participants_message_added(
29893007
)
29903008
participant_user_ids = [str(p.user_id) for p in participants if p.user_id]
29913009

2992-
# Notify all participants (including sender for their own SWR cache updates)
3010+
# Notify all participants with the base message_added notification
29933011
for user_id in participant_user_ids:
29943012
await user_notification_manager.broadcast_to_user(
29953013
user_id, notification_data
29963014
)
3015+
3016+
# Send targeted mention notifications to @mentioned users
3017+
for uid in mentioned_user_ids:
3018+
mention_notification = {
3019+
"type": "mention",
3020+
"data": {
3021+
"conversation_id": conversation_id,
3022+
"conversation_name": conversation_name,
3023+
"message_id": message_id,
3024+
"message_preview": preview,
3025+
"role": role,
3026+
"sender_user_id": sender_user_id,
3027+
"timestamp": datetime.now().isoformat(),
3028+
},
3029+
}
3030+
await user_notification_manager.broadcast_to_user(uid, mention_notification)
3031+
3032+
# Send targeted reply notifications to replied-to users
3033+
for uid in replied_to_user_ids:
3034+
if uid not in mentioned_user_ids: # Don't double-notify if also mentioned
3035+
reply_notification = {
3036+
"type": "reply",
3037+
"data": {
3038+
"conversation_id": conversation_id,
3039+
"conversation_name": conversation_name,
3040+
"message_id": message_id,
3041+
"message_preview": preview,
3042+
"role": role,
3043+
"sender_user_id": sender_user_id,
3044+
"timestamp": datetime.now().isoformat(),
3045+
},
3046+
}
3047+
await user_notification_manager.broadcast_to_user(
3048+
uid, reply_notification
3049+
)
29973050
except Exception as e:
29983051
logging.warning(f"Failed to notify conversation participants: {e}")
29993052
# Fallback: at least notify the sender
@@ -4199,3 +4252,68 @@ async def update_channel(
41994252
raise HTTPException(status_code=500, detail="Failed to update channel")
42004253
finally:
42014254
session.close()
4255+
4256+
4257+
@app.put(
4258+
"/v1/conversation/{conversation_id}/lock",
4259+
summary="Lock or Unlock a Conversation/Thread",
4260+
description="Locks or unlocks a conversation or thread. When locked, only owners and admins can send messages. Useful for closing threads.",
4261+
tags=["Group Chat"],
4262+
dependencies=[Depends(verify_api_key)],
4263+
)
4264+
async def lock_conversation(
4265+
conversation_id: str,
4266+
body: dict,
4267+
user=Depends(verify_api_key),
4268+
authorization: str = Header(None),
4269+
):
4270+
auth = MagicalAuth(token=authorization)
4271+
conversation_name = get_conversation_name_by_id(
4272+
conversation_id=conversation_id, user_id=auth.user_id
4273+
)
4274+
if not conversation_name:
4275+
raise HTTPException(status_code=404, detail="Conversation not found")
4276+
from DB import get_session, Conversation, ConversationParticipant
4277+
4278+
session = get_session()
4279+
try:
4280+
conversation = (
4281+
session.query(Conversation)
4282+
.filter(Conversation.id == conversation_id)
4283+
.first()
4284+
)
4285+
if not conversation:
4286+
raise HTTPException(status_code=404, detail="Conversation not found")
4287+
4288+
# Only owners and admins can lock/unlock
4289+
participant = (
4290+
session.query(ConversationParticipant)
4291+
.filter(
4292+
ConversationParticipant.conversation_id == conversation_id,
4293+
ConversationParticipant.user_id == auth.user_id,
4294+
ConversationParticipant.status == "active",
4295+
)
4296+
.first()
4297+
)
4298+
if not participant or participant.role not in ("owner", "admin"):
4299+
raise HTTPException(
4300+
status_code=403,
4301+
detail="Only owners and admins can lock/unlock conversations",
4302+
)
4303+
4304+
locked = body.get("locked", True)
4305+
conversation.locked = locked
4306+
session.commit()
4307+
return {
4308+
"id": str(conversation.id),
4309+
"locked": conversation.locked,
4310+
"message": f"Conversation {'locked' if locked else 'unlocked'} successfully",
4311+
}
4312+
except HTTPException:
4313+
raise
4314+
except Exception as e:
4315+
session.rollback()
4316+
logging.error(f"Error locking conversation: {e}")
4317+
raise HTTPException(status_code=500, detail="Failed to lock conversation")
4318+
finally:
4319+
session.close()

0 commit comments

Comments
 (0)