-
Notifications
You must be signed in to change notification settings - Fork 113
Expand file tree
/
Copy pathdingtalkapp.py
More file actions
140 lines (120 loc) · 6.46 KB
/
dingtalkapp.py
File metadata and controls
140 lines (120 loc) · 6.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import asyncio, json, os, sys, threading, time
import requests
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from agentmain import GeneraticAgent
from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text
from llmcore import mykeys
try:
from dingtalk_stream import AckMessage, CallbackHandler, Credential, DingTalkStreamClient
from dingtalk_stream.chatbot import ChatbotMessage
except Exception:
print("Please install dingtalk-stream to use DingTalk: pip install dingtalk-stream")
sys.exit(1)
agent = GeneraticAgent(); agent.verbose = False
CLIENT_ID = str(mykeys.get("dingtalk_client_id", "") or "").strip()
CLIENT_SECRET = str(mykeys.get("dingtalk_client_secret", "") or "").strip()
ALLOWED = {str(x).strip() for x in mykeys.get("dingtalk_allowed_users", []) if str(x).strip()}
USER_TASKS = {}
class DingTalkApp(AgentChatMixin):
label, source, split_limit = "DingTalk", "dingtalk", 1800
def __init__(self):
super().__init__(agent, USER_TASKS)
self.client, self.access_token, self.token_expiry, self.background_tasks = None, None, 0, set()
async def _get_access_token(self):
if self.access_token and time.time() < self.token_expiry:
return self.access_token
def _fetch():
resp = requests.post("https://api.dingtalk.com/v1.0/oauth2/accessToken", json={"appKey": CLIENT_ID, "appSecret": CLIENT_SECRET}, timeout=20)
resp.raise_for_status()
return resp.json()
try:
data = await asyncio.to_thread(_fetch)
self.access_token = data.get("accessToken")
self.token_expiry = time.time() + int(data.get("expireIn", 7200)) - 60
return self.access_token
except Exception as e:
print(f"[DingTalk] token error: {e}")
return None
async def _send_batch_message(self, chat_id, msg_key, msg_param):
token = await self._get_access_token()
if not token:
return False
headers = {"x-acs-dingtalk-access-token": token}
if chat_id.startswith("group:"):
url = "https://api.dingtalk.com/v1.0/robot/groupMessages/send"
payload = {"robotCode": CLIENT_ID, "openConversationId": chat_id[6:], "msgKey": msg_key, "msgParam": json.dumps(msg_param, ensure_ascii=False)}
else:
url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
payload = {"robotCode": CLIENT_ID, "userIds": [chat_id], "msgKey": msg_key, "msgParam": json.dumps(msg_param, ensure_ascii=False)}
def _post():
resp = requests.post(url, json=payload, headers=headers, timeout=20)
body = resp.text
if resp.status_code != 200:
raise RuntimeError(f"HTTP {resp.status_code}: {body[:300]}")
result = resp.json() if "json" in resp.headers.get("content-type", "") else {}
errcode = result.get("errcode")
if errcode not in (None, 0):
raise RuntimeError(f"API errcode={errcode}: {body[:300]}")
return True
try:
return await asyncio.to_thread(_post)
except Exception as e:
print(f"[DingTalk] send error: {e}")
return False
async def send_text(self, chat_id, content):
for part in split_text(content, self.split_limit):
await self._send_batch_message(chat_id, "sampleMarkdown", {"text": part, "title": "Agent Reply"})
async def on_message(self, content, sender_id, sender_name, conversation_type=None, conversation_id=None):
try:
if not content:
return
if not public_access(ALLOWED) and sender_id not in ALLOWED:
print(f"[DingTalk] unauthorized user: {sender_id}")
return
is_group = conversation_type == "2" and conversation_id
chat_id = f"group:{conversation_id}" if is_group else sender_id
print(f"[DingTalk] message from {sender_name} ({sender_id}): {content}")
if content.startswith("/"):
return await self.handle_command(chat_id, content)
task = asyncio.create_task(self.run_agent(chat_id, content))
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
except Exception:
import traceback
print("[DingTalk] handle_message error")
traceback.print_exc()
async def start(self):
self.client = DingTalkStreamClient(Credential(CLIENT_ID, CLIENT_SECRET))
self.client.register_callback_handler(ChatbotMessage.TOPIC, _DingTalkHandler(self))
print("[DingTalk] bot starting...")
while True:
try:
await self.client.start()
except Exception as e:
print(f"[DingTalk] stream error: {e}")
print("[DingTalk] reconnect in 5s...")
await asyncio.sleep(5)
class _DingTalkHandler(CallbackHandler):
def __init__(self, app):
super().__init__()
self.app = app
async def process(self, message):
try:
chatbot_msg = ChatbotMessage.from_dict(message.data)
text = getattr(getattr(chatbot_msg, "text", None), "content", "") or ""
extensions = getattr(chatbot_msg, "extensions", None) or {}
recognition = ((extensions.get("content") or {}).get("recognition") or "").strip() if isinstance(extensions, dict) else ""
if not (text := text.strip()):
text = recognition or str((message.data.get("text", {}) or {}).get("content", "") or "").strip()
sender_id = str(getattr(chatbot_msg, "sender_staff_id", None) or getattr(chatbot_msg, "sender_id", None) or "unknown")
sender_name = getattr(chatbot_msg, "sender_nick", None) or "Unknown"
await self.app.on_message(text, sender_id, sender_name, message.data.get("conversationType"), message.data.get("conversationId") or message.data.get("openConversationId"))
except Exception as e:
print(f"[DingTalk] callback error: {e}")
return AckMessage.STATUS_OK, "OK"
if __name__ == "__main__":
_LOCK_SOCK = ensure_single_instance(19530, "DingTalk")
require_runtime(agent, "DingTalk", dingtalk_client_id=CLIENT_ID, dingtalk_client_secret=CLIENT_SECRET)
redirect_log(__file__, "dingtalkapp.log", "DingTalk", ALLOWED)
threading.Thread(target=agent.run, daemon=True).start()
asyncio.run(DingTalkApp().start())