88namespace Dtyq \SuperMagic \Application \SuperAgent \Event \Subscribe ;
99
1010use App \Infrastructure \Core \Exception \BusinessException ;
11+ use App \Infrastructure \Util \IdGenerator \IdGenerator ;
1112use Dtyq \SuperMagic \Application \SuperAgent \Service \TaskAppService ;
1213use Dtyq \SuperMagic \Interfaces \SuperAgent \DTO \TopicTaskMessageDTO ;
1314use Hyperf \Amqp \Annotation \Consumer ;
1718use PhpAmqpLib \Message \AMQPMessage ;
1819use PhpAmqpLib \Wire \AMQPTable ;
1920use Throwable ;
20- use App \Infrastructure \Util \IdGenerator \IdGenerator ;
2121
2222/**
2323 * 话题任务消息订阅者.
@@ -36,12 +36,12 @@ class TopicTaskMessageSubscriber extends ConsumerMessage
3636 protected AMQPTable |array $ queueArguments = [];
3737
3838 /**
39- * @var array| null QoS 配置,用于控制预取数量等
39+ * @var null|array QoS 配置,用于控制预取数量等
4040 */
4141 protected ?array $ qos = [
4242 'prefetch_count ' => 1 , // 每次只预取1条消息
4343 'prefetch_size ' => 0 ,
44- 'global ' => false
44+ 'global ' => false ,
4545 ];
4646
4747 /**
@@ -81,7 +81,7 @@ public function consumeMessage($data, AMQPMessage $message): Result
8181 $ actualOriginalTimestamp = null ; // 初始化变量以避免 linter 警告
8282
8383 if ($ originalTimestampFromHeader !== null ) {
84- $ actualOriginalTimestamp = (int )$ originalTimestampFromHeader ; // 确保是整数
84+ $ actualOriginalTimestamp = (int ) $ originalTimestampFromHeader ; // 确保是整数
8585 $ this ->logger ->info (sprintf ('消息已存在原始秒级时间戳: %d (%s), message_id: %s ' , $ actualOriginalTimestamp , date ('Y-m-d H:i:s ' , $ actualOriginalTimestamp ), $ data ['payload ' ]['message_id ' ] ?? 'N/A ' ));
8686 } else {
8787 // 如果生产者没有设置 x-original-timestamp,这通常是一个需要注意的情况。
@@ -121,7 +121,7 @@ public function consumeMessage($data, AMQPMessage $message): Result
121121
122122 $ lockAcquired = (bool ) $ this ->superAgentAppService ->acquireLock ($ lockKey , $ lockOwner , $ lockExpireSeconds );
123123
124- if (!$ lockAcquired ) {
124+ if (! $ lockAcquired ) {
125125 $ this ->logger ->info (sprintf (
126126 '无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,将消息重新入队等待处理,原始接收秒级时间: %d (%s), message_id: %s ' ,
127127 $ sandboxId ,
@@ -231,157 +231,4 @@ private function validateMessageFormat($data): void
231231 }
232232 }
233233 }
234-
235- /**
236- * 打印消息详情.
237- *
238- * @param array $data 消息数据
239- */
240- private function logMessageDetails (array $ data ): void
241- {
242- if (isset ($ data ['metadata ' ], $ data ['payload ' ])) {
243- // 新格式消息
244- $ payload = $ data ['payload ' ];
245- $ metadata = $ data ['metadata ' ];
246-
247- // 记录元数据
248- $ this ->logger ->info (sprintf (
249- '话题任务消息元数据 - sandbox_id: %s, agent_user_id: %s ' ,
250- $ metadata ['sandbox_id ' ] ?? '未提供 ' ,
251- $ metadata ['agent_user_id ' ] ?? '未提供 '
252- ));
253-
254- // 记录负载数据
255- $ this ->logger ->info (sprintf (
256- '话题任务消息负载 - message_id: %s, type: %s, task_id: %s, status: %s ' ,
257- $ payload ['message_id ' ] ?? '未提供 ' ,
258- $ payload ['type ' ] ?? '未提供 ' ,
259- $ payload ['task_id ' ] ?? '未提供 ' ,
260- $ payload ['status ' ] ?? '未提供 '
261- ));
262-
263- // 记录消息内容
264- if (isset ($ payload ['content ' ]) && ! empty ($ payload ['content ' ])) {
265- $ this ->logger ->info (sprintf (
266- '话题任务消息内容: %s ' ,
267- $ payload ['content ' ]
268- ));
269- }
270-
271- // 记录步骤信息
272- if (isset ($ payload ['steps ' ]) && ! empty ($ payload ['steps ' ])) {
273- $ this ->logger ->info (sprintf (
274- '话题任务步骤信息: %s ' ,
275- json_encode ($ payload ['steps ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
276- ));
277- }
278-
279- // 记录工具信息
280- if (isset ($ payload ['tool ' ]) && ! empty ($ payload ['tool ' ])) {
281- $ this ->logger ->info (sprintf (
282- '话题任务工具信息: %s ' ,
283- json_encode ($ payload ['tool ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
284- ));
285- }
286- } else {
287- // 旧格式消息
288- // 记录消息ID和类型
289- $ this ->logger ->info (sprintf (
290- '话题任务消息详情(旧格式) - message_id: %s, type: %s, task_id: %s, status: %s ' ,
291- $ data ['message_id ' ] ?? '未提供 ' ,
292- $ data ['type ' ] ?? '未提供 ' ,
293- $ data ['task_id ' ] ?? '未提供 ' ,
294- $ data ['status ' ] ?? '未提供 '
295- ));
296-
297- // 记录消息内容
298- if (isset ($ data ['content ' ]) && ! empty ($ data ['content ' ])) {
299- $ this ->logger ->info (sprintf (
300- '话题任务消息内容: %s ' ,
301- $ data ['content ' ]
302- ));
303- }
304-
305- // 记录步骤信息
306- if (isset ($ data ['steps ' ]) && ! empty ($ data ['steps ' ])) {
307- $ this ->logger ->info (sprintf (
308- '话题任务步骤信息: %s ' ,
309- json_encode ($ data ['steps ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
310- ));
311- }
312-
313- // 记录工具信息
314- if (isset ($ data ['tool ' ]) && ! empty ($ data ['tool ' ])) {
315- $ this ->logger ->info (sprintf (
316- '话题任务工具信息: %s ' ,
317- json_encode ($ data ['tool ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
318- ));
319- }
320- }
321- }
322-
323- /**
324- * 带重试机制的获取锁.
325- *
326- * @param string $sandboxId 沙箱ID
327- * @param TopicTaskMessageDTO $messageDTO 消息DTO
328- * @return array 包含锁信息的数组,acquired表示是否成功获取锁,owner表示锁的持有者,key表示锁的键名
329- */
330- private function acquireLockWithRetry (?string $ sandboxId , TopicTaskMessageDTO $ messageDTO ): array
331- {
332- $ result = [
333- 'acquired ' => false ,
334- 'owner ' => '' ,
335- 'key ' => '' ,
336- ];
337-
338- if (empty ($ sandboxId )) {
339- return $ result ;
340- }
341-
342- $ lockKey = 'handle_sandbox_message_lock: ' . $ sandboxId ;
343- $ lockOwner = IdGenerator::getUniqueId32 (); // 使用唯一ID作为锁持有者标识
344- $ lockExpireSeconds = 30 ; // 锁的过期时间(秒),消息处理可能需要更长时间
345-
346- $ maxRetries = 3 ;
347- $ retryCount = 0 ;
348- $ baseWaitTime = 1 ; // 基础等待时间(秒)
349-
350- while ($ retryCount <= $ maxRetries ) {
351- $ lockAcquired = (bool ) $ this ->superAgentAppService ->acquireLock ($ lockKey , $ lockOwner , $ lockExpireSeconds );
352-
353- if ($ lockAcquired ) {
354- $ this ->logger ->debug (sprintf ('已获取sandbox %s的锁,持有者: %s ' , $ sandboxId , $ lockOwner ));
355- $ result ['acquired ' ] = true ;
356- $ result ['owner ' ] = $ lockOwner ;
357- $ result ['key ' ] = $ lockKey ;
358- return $ result ;
359- }
360-
361- if ($ retryCount === $ maxRetries ) {
362- $ this ->logger ->error (sprintf (
363- '在重试%d次后仍无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s ' ,
364- $ maxRetries ,
365- $ sandboxId ,
366- $ messageDTO ->getPayload ()?->getMessageId()
367- ));
368- // 可以选择将消息重新入队或实现延迟重试策略
369- return $ result ;
370- }
371-
372- $ waitTime = $ baseWaitTime * pow (2 , $ retryCount ); // 指数退避
373- $ this ->logger ->warning (sprintf (
374- '无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s,将在%d秒后进行第%d次重试 ' ,
375- $ sandboxId ,
376- $ messageDTO ->getPayload ()?->getMessageId(),
377- $ waitTime ,
378- $ retryCount + 1
379- ));
380-
381- sleep ($ waitTime );
382- $ retryCount ++;
383- }
384-
385- return $ result ;
386- }
387234}
0 commit comments