-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTelegramService.ts
More file actions
456 lines (390 loc) Β· 12.3 KB
/
TelegramService.ts
File metadata and controls
456 lines (390 loc) Β· 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
/**
* Telegram Service
* Bot startup with command handling using MCP Telegram sampling
*/
import {
type BaseSessionService,
createSamplingHandler,
type EnhancedRunner,
extractTextFromContent,
type LlmRequest,
type MemoryService,
} from "@iqai/adk";
import { createClawdAgent } from "../agents/agent.js";
import { getTelegramAgent } from "../agents/telegram-agent/agent.js";
import { getConfig, getRawConfig } from "../config/index.js";
import { createLogger } from "../lib/logger.js";
import { createPendingPair, isAllowed } from "../lib/pairing.js";
import { setSchedulerDeps } from "../tools/scheduleTools.js";
import {
broadcastToTelegram,
initScheduler,
stopScheduler,
} from "./SchedulerService.js";
const log = createLogger("Telegram");
/**
* Parse Telegram message info from MCP sampling request content
* MCP Telegram embeds user/chat info in key-value format:
* "NEW TELEGRAM MESSAGE FROM:\nuser_id: 123\nchat_id: 456\n...\ncontent: message text"
*/
function parseTelegramMessage(content: string): {
userId: string;
chatId: string;
messageText: string;
} {
const lines = content.split("\n");
let userId = "unknown";
let chatId = "unknown";
let messageText = "";
let foundContent = false;
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.startsWith("user_id:")) {
userId = trimmed.replace("user_id:", "").trim();
} else if (trimmed.startsWith("chat_id:")) {
chatId = trimmed.replace("chat_id:", "").trim();
} else if (trimmed.startsWith("content:")) {
const contentIndex = lines.indexOf(line);
const firstLine = trimmed.replace("content:", "").trim();
const remainingLines = lines.slice(contentIndex + 1);
messageText = [firstLine, ...remainingLines].join("\n").trim();
foundContent = true;
break;
}
}
// Fallback: if no content: field found, use blank-line separator format
if (!foundContent) {
let messageStartIndex = 0;
for (let i = 0; i < lines.length; i++) {
if (lines[i].trim() === "" && i > 0) {
messageStartIndex = i + 1;
break;
}
}
messageText = lines.slice(messageStartIndex).join("\n").trim();
}
return { userId, chatId, messageText };
}
/**
* Extract text from LlmRequest
*/
function getTextFromRequest(request: LlmRequest): string {
const contents = request.contents || [];
if (contents.length === 0) return "";
const lastContent = contents[contents.length - 1];
return extractTextFromContent(lastContent);
}
/**
* Public bot commands visible to all users (including unapproved)
*/
const PUBLIC_BOT_COMMANDS = [
{ command: "start", description: "Start the bot and pair" },
{ command: "help", description: "Show available commands" },
];
/**
* Full bot commands visible only to approved/paired users
*/
const FULL_BOT_COMMANDS = [
{ command: "start", description: "Start the bot and pair" },
{ command: "new", description: "Save session and start fresh" },
{ command: "reset", description: "Clear session without saving" },
{ command: "help", description: "Show available commands" },
];
/**
* Track chatIds that have had full commands registered
* so we only register once per session per chat
*/
const registeredChats = new Set<string>();
/**
* Register bot commands with Telegram API at global scopes.
* Called at startup with public commands so all users (including unapproved)
* can see /start and /help in the command menu.
*/
async function registerBotCommands(
botToken: string,
commandList: typeof PUBLIC_BOT_COMMANDS = PUBLIC_BOT_COMMANDS,
): Promise<void> {
const api = `https://api.telegram.org/bot${botToken}`;
const headers = { "Content-Type": "application/json" };
const scopes = [
{ type: "default" },
{ type: "all_private_chats" },
{ type: "all_group_chats" },
];
try {
// Delete commands at all scopes to clear stale cache
for (const scope of scopes) {
await fetch(`${api}/deleteMyCommands`, {
method: "POST",
headers,
body: JSON.stringify({ scope }),
});
}
// Set commands at all scopes
for (const scope of scopes) {
const response = await fetch(`${api}/setMyCommands`, {
method: "POST",
headers,
body: JSON.stringify({ commands: commandList, scope }),
});
const result = (await response.json()) as {
ok: boolean;
description?: string;
};
if (!result.ok) {
log.warn(
`Failed to register commands for scope ${scope.type}: ${result.description}`,
);
}
}
log.info(
`Registered ${commandList.length} bot commands: ${commandList.map((c) => `/${c.command}`).join(", ")}`,
);
} catch (error) {
log.warn(`Could not register bot commands: ${error}`);
}
}
/**
* Register full bot commands for a specific chat (approved user).
* Uses Telegram's per-chat scope so the user sees all commands
* including /new and /reset in their command menu.
*/
async function registerUserCommands(
botToken: string,
chatId: string,
): Promise<void> {
const api = `https://api.telegram.org/bot${botToken}`;
const headers = { "Content-Type": "application/json" };
const scope = { type: "chat", chat_id: Number(chatId) };
try {
// Clear existing per-chat commands first
await fetch(`${api}/deleteMyCommands`, {
method: "POST",
headers,
body: JSON.stringify({ scope }),
});
// Set full command list for this specific chat
const response = await fetch(`${api}/setMyCommands`, {
method: "POST",
headers,
body: JSON.stringify({ commands: FULL_BOT_COMMANDS, scope }),
});
const result = (await response.json()) as {
ok: boolean;
description?: string;
};
if (!result.ok) {
log.warn(
`Failed to register user commands for chat ${chatId}: ${result.description}`,
);
} else {
registeredChats.add(chatId);
log.info(`Registered full commands for chat ${chatId}`);
}
} catch (error) {
log.warn(`Could not register user commands for chat ${chatId}: ${error}`);
}
}
/**
* Command handlers
*/
type CommandHandler = (chatId: string, userId: string) => Promise<string>;
/**
* Generate pairing response for unpaired users
*/
function getPairingResponse(
userId: string,
username: string | undefined,
chatId: string,
): string {
const code = createPendingPair("telegram", userId, username, chatId);
if (!code) {
return `β οΈ Too many pending pairing requests. Please try again later.
Contact the bot owner if this persists.`;
}
return `π Pairing required!
Your code: ${code}
Ask the bot owner to run:
adk-claw pairing approve telegram ${code}
Code expires in 1 hour.`;
}
/**
* Format duration between a unix timestamp (seconds) and now
*/
function formatDuration(startTimestamp: number): string {
const diffMs = Date.now() - startTimestamp * 1000;
const minutes = Math.floor(diffMs / 60000);
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
}
return `${minutes}m`;
}
/**
* Create command handlers bound to ADK session lifecycle.
* Uses sessionService.createSession / deleteSession and runner.setSession
* to properly create/destroy ADK sessions instead of local state.
*/
function createCommands(deps: {
runner: EnhancedRunner;
sessionService: BaseSessionService;
memoryService?: MemoryService;
}): Record<string, CommandHandler> {
const { runner, sessionService, memoryService } = deps;
return {
"/start": async (_chatId, userId) => {
const config = getRawConfig();
// Check if user is already allowed
if (isAllowed("telegram", userId)) {
return `π Welcome back!
I'm ${config.agent.name}, your personal AI assistant.
Commands:
/new - Save session & start fresh
/reset - Clear session without saving
/help - Show available commands`;
}
// User needs pairing
return getPairingResponse(userId, undefined, _chatId);
},
"/new": async (_chatId, _userId) => {
const currentSession = runner.getSession();
// Save current session to memory if it has events
if (memoryService && currentSession.events.length > 0) {
await memoryService.addSessionToMemory(currentSession);
}
// Derive summary from ADK session events
const eventCount = currentSession.events.length;
let summary = "No conversation to save.";
if (eventCount > 0) {
const firstTimestamp = currentSession.events[0].timestamp;
const duration = formatDuration(firstTimestamp);
summary = `${eventCount} events over ${duration}`;
}
// Create a new ADK session and swap it into the runner
const newSession = await sessionService.createSession(
currentSession.appName,
currentSession.userId,
);
runner.setSession(newSession);
return `β
Session saved to memory.
π Summary: ${summary}
π New session started!`;
},
"/reset": async (_chatId, _userId) => {
const currentSession = runner.getSession();
// Delete the old session without saving to memory
await sessionService.deleteSession(
currentSession.appName,
currentSession.userId,
currentSession.id,
);
// Create a fresh ADK session
const newSession = await sessionService.createSession(
currentSession.appName,
currentSession.userId,
);
runner.setSession(newSession);
return "π Session cleared (not saved). Fresh start!";
},
"/help": async (_chatId, _userId) => {
return `π Available commands:
/start - Start the bot and pair
/new - Save session & start fresh
/reset - Clear session without saving
/help - Show this help message
Just send a message to chat with me!`;
},
};
}
/**
* Start the Telegram bot with MCP sampling
*/
export async function startTelegramBot(): Promise<void> {
const config = getConfig();
if (!config.telegramEnabled) {
throw new Error("Telegram is not enabled in config");
}
log.info(`Starting ${config.agentName} Telegram bot...`);
// Create the agent with Telegram channel context
const { runner, sessionService, session, memoryService } =
await createClawdAgent({
channel: "telegram",
});
// Create command handlers bound to ADK session lifecycle
const commands = createCommands({ runner, sessionService, memoryService });
// Create sampling handler with command detection
const samplingHandler = createSamplingHandler(
async (request: LlmRequest): Promise<string> => {
const rawContent = getTextFromRequest(request);
const { userId, chatId, messageText } = parseTelegramMessage(rawContent);
log.info(
`Message from userId=${userId} chatId=${chatId}: "${messageText.slice(0, 50)}..."`,
);
// Check for commands
const trimmed = messageText.trim();
if (trimmed.startsWith("/")) {
const cmd = trimmed.split(" ")[0].split("@")[0].toLowerCase();
const handler = commands[cmd];
if (handler) {
return await handler(chatId, userId);
}
return `Unknown command: ${cmd}\n\nType /help for available commands.`;
}
// Check pairing for non-command messages
if (!isAllowed("telegram", userId)) {
return getPairingResponse(userId, undefined, chatId);
}
// Register full commands for newly approved users on first message
if (!registeredChats.has(chatId)) {
registerUserCommands(config.telegramBotToken, chatId);
}
// Get response from agent (ADK tracks events in session automatically)
try {
const response = await runner.ask(messageText);
return response;
} catch (error) {
log.error(
`Failed to get response for chatId=${chatId}: ${error instanceof Error ? error.message : error}`,
);
return "β οΈ Sorry, I'm having trouble right now. Please try again in a moment.";
}
},
);
// Initialize Telegram agent with sampling
await getTelegramAgent(samplingHandler);
// Register commands after MCP init so they aren't overwritten
await registerBotCommands(config.telegramBotToken);
// Initialize scheduler with the same runner
const scheduler = await initScheduler(
runner,
sessionService,
session.appName,
);
// Bind scheduler deps so schedule tools can dynamically manage jobs
setSchedulerDeps({
scheduler,
runner,
sessionService,
appName: session.appName,
broadcastToTelegram,
});
log.info(`${config.agentName} is now listening on Telegram`);
log.info("DM your bot to start chatting. Press Ctrl+C to stop");
// Keep process alive
await keepAlive();
}
/**
* Keep the process alive
*/
async function keepAlive(): Promise<void> {
const shutdown = async (signal: string) => {
log.info(`Shutting down (${signal})...`);
await stopScheduler();
process.exit(0);
};
process.on("SIGINT", () => shutdown("SIGINT"));
process.on("SIGTERM", () => shutdown("SIGTERM"));
await new Promise(() => {
setInterval(() => {}, 1000);
});
}