-
-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathrain_lab_telegram.py
More file actions
226 lines (181 loc) · 7.61 KB
/
rain_lab_telegram.py
File metadata and controls
226 lines (181 loc) · 7.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""Telegram gateway for R.A.I.N. Lab.
This module adds a phone-friendly interface on top of the existing async
``run_rain_lab`` entry point. It intentionally does not change existing CLI
behavior in ``rain_lab.py``.
"""
from __future__ import annotations
import asyncio
import io
import logging
import os
import re
from dataclasses import dataclass
from typing import Optional
from dotenv import load_dotenv
from telegram import Update
from telegram.constants import ChatAction
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from rain_lab import run_rain_lab
# Telegram hard limit is 4096 characters. Keep a little safety margin.
TELEGRAM_MESSAGE_LIMIT = 3900
MAX_TEXT_MESSAGES_BEFORE_FILE = 3
AGENT_PREFIX_RE = re.compile(r"^\s*@(?P<agent>james|elena|jasmine|luca)\b\s*[:\-]?\s*", re.IGNORECASE)
@dataclass(slots=True)
class RouteDecision:
"""Normalized routing information for an incoming user message."""
mode: str
query: str
agent: Optional[str] = None
def _normalize_agent_name(agent: str) -> str:
"""Convert user-facing tags into canonical agent names."""
canonical = agent.strip().lower()
mapping = {
"james": "James",
"elena": "Elena",
"jasmine": "Jasmine",
"luca": "Luca",
}
return mapping.get(canonical, "James")
def _route_message(text: str) -> RouteDecision:
"""Route user text to chat/meeting mode and optional agent override."""
cleaned = text.strip()
# Meeting routing is keyword based per requirement.
if "/meeting" in cleaned.lower() or "meeting" in cleaned.lower():
query = cleaned.replace("/meeting", "").strip() or "Open research discussion"
return RouteDecision(mode="rlm", query=query)
# Agent mention routing (chat mode).
match = AGENT_PREFIX_RE.match(cleaned)
if match:
agent = _normalize_agent_name(match.group("agent"))
query = cleaned[match.end() :].strip() or "Continue"
return RouteDecision(mode="chat", query=query, agent=agent)
# Default behavior: normal chat to James.
return RouteDecision(mode="chat", query=cleaned, agent="James")
def _split_message(text: str, limit: int = TELEGRAM_MESSAGE_LIMIT) -> list[str]:
"""Split long text at natural boundaries while respecting Telegram limits."""
normalized = text.strip()
if len(normalized) <= limit:
return [normalized]
chunks: list[str] = []
remaining = normalized
while len(remaining) > limit:
# Prefer newline boundaries first, then sentence boundary, then hard cut.
cut = remaining.rfind("\n", 0, limit)
if cut < int(limit * 0.5):
cut = remaining.rfind(". ", 0, limit)
if cut < int(limit * 0.5):
cut = limit
chunk = remaining[:cut].strip()
if chunk:
chunks.append(chunk)
remaining = remaining[cut:].strip()
if remaining:
chunks.append(remaining)
return chunks
async def _send_long_response(update: Update, response_text: str) -> None:
"""Send response as one or many messages, with optional .txt fallback."""
chunks = _split_message(response_text)
if len(chunks) > MAX_TEXT_MESSAGES_BEFORE_FILE:
# Bonus behavior: send as a .txt if text would be too spammy.
payload = io.BytesIO(response_text.encode("utf-8"))
payload.name = "rain_lab_response.txt"
await update.message.reply_document(
document=payload,
caption="Response is long, so I attached it as a text file.",
)
return
for index, chunk in enumerate(chunks):
if index < len(chunks) - 1:
await update.message.reply_text(f"{chunk}\n\n_Continued in next message…_", parse_mode="Markdown")
else:
await update.message.reply_text(chunk)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start with a concise onboarding message."""
del context
message = (
"🧠 *Welcome to R.A.I.N. Lab Telegram Gateway*\n\n"
"I can route your prompt to the team:\n"
"• *James* (default): systems + synthesis\n"
"• *Elena*: engineering + implementation rigor\n"
"• *Jasmine*: theory + conceptual depth\n"
"• *Luca*: critical analysis + edge cases\n\n"
"Usage:\n"
"• Send normal text → James in chat mode\n"
"• `@Elena design a robust test harness` → direct agent chat\n"
"• `/meeting Quantum resonance` or any message containing `meeting` → full multi-agent mode\n"
"• `/help` for more examples"
)
await update.message.reply_text(message, parse_mode="Markdown")
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /help with command examples."""
del context
message = (
"📘 *Commands & Examples*\n\n"
"• Normal chat:\n"
" `Summarize today's strongest hypothesis`\n\n"
"• Agent routing:\n"
" `@James pressure-test this idea`\n"
" `@Elena propose an implementation plan`\n"
" `@Jasmine derive a conceptual model`\n"
" `@Luca find flaws in this argument`\n\n"
"• Meeting mode:\n"
" `/meeting Evaluate recursive self-critique`\n"
" `Run a meeting on robust local RAG pipelines`"
)
await update.message.reply_text(message, parse_mode="Markdown")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Main message handler: route text and call the async R.A.I.N. entrypoint."""
del context
if not update.message or not update.message.text:
return
user_text = update.message.text.strip()
route = _route_message(user_text)
logging.info(
"Incoming message chat_id=%s mode=%s agent=%s query_preview=%s",
update.effective_chat.id if update.effective_chat else "unknown",
route.mode,
route.agent,
route.query[:120],
)
await update.message.chat.send_action(action=ChatAction.TYPING)
try:
response = await run_rain_lab(
query=route.query,
mode=route.mode,
agent=route.agent,
recursive_depth=1,
)
response = response.strip() or "(No response returned.)"
await _send_long_response(update, response)
except Exception:
logging.exception("run_rain_lab failed")
await update.message.reply_text(
"Sorry — R.A.I.N. Lab hit an internal error while processing that request. "
"Please try again in a moment."
)
async def main() -> None:
"""Entrypoint for starting Telegram long polling in local/dev workflows."""
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
raise RuntimeError("Missing TELEGRAM_BOT_TOKEN. Add it to your environment or .env file.")
application = Application.builder().token(token).build()
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logging.info("Starting R.A.I.N. Lab Telegram gateway (polling mode)")
# run_polling() is blocking. Wrap it in a worker thread so main() remains async,
# which keeps this module compatible with asyncio.run(main()).
await asyncio.to_thread(application.run_polling, allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
asyncio.run(main())