88namespace Dtyq \SuperMagic \Application \SuperAgent \Event \Subscribe ;
99
1010use App \Infrastructure \Core \Exception \BusinessException ;
11+ use App \Infrastructure \Util \IdGenerator \IdGenerator ;
1112use Dtyq \SuperMagic \Application \SuperAgent \Service \TaskAppService ;
1213use Dtyq \SuperMagic \Interfaces \SuperAgent \DTO \TopicTaskMessageDTO ;
1314use Hyperf \Amqp \Annotation \Consumer ;
1718use PhpAmqpLib \Message \AMQPMessage ;
1819use PhpAmqpLib \Wire \AMQPTable ;
1920use Throwable ;
20- use App \Infrastructure \Util \IdGenerator \IdGenerator ;
2121
2222/**
2323 * 话题任务消息订阅者.
@@ -36,12 +36,12 @@ class TopicTaskMessageSubscriber extends ConsumerMessage
3636 protected AMQPTable |array $ queueArguments = [];
3737
3838 /**
39- * @var array| null QoS 配置,用于控制预取数量等
39+ * @var null|array QoS 配置,用于控制预取数量等
4040 */
4141 protected ?array $ qos = [
4242 'prefetch_count ' => 1 , // 每次只预取1条消息
4343 'prefetch_size ' => 0 ,
44- 'global ' => false
44+ 'global ' => false ,
4545 ];
4646
4747 /**
@@ -81,7 +81,7 @@ public function consumeMessage($data, AMQPMessage $message): Result
8181 $ actualOriginalTimestamp = null ; // 初始化变量以避免 linter 警告
8282
8383 if ($ originalTimestampFromHeader !== null ) {
84- $ actualOriginalTimestamp = (int )$ originalTimestampFromHeader ; // 确保是整数
84+ $ actualOriginalTimestamp = (int ) $ originalTimestampFromHeader ; // 确保是整数
8585 $ this ->logger ->info (sprintf ('消息已存在原始秒级时间戳: %d (%s), message_id: %s ' , $ actualOriginalTimestamp , date ('Y-m-d H:i:s ' , $ actualOriginalTimestamp ), $ data ['payload ' ]['message_id ' ] ?? 'N/A ' ));
8686 } else {
8787 // 如果生产者没有设置 x-original-timestamp,这通常是一个需要注意的情况。
@@ -121,7 +121,7 @@ public function consumeMessage($data, AMQPMessage $message): Result
121121
122122 $ lockAcquired = (bool ) $ this ->superAgentAppService ->acquireLock ($ lockKey , $ lockOwner , $ lockExpireSeconds );
123123
124- if (!$ lockAcquired ) {
124+ if (! $ lockAcquired ) {
125125 $ this ->logger ->info (sprintf (
126126 '无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,将消息重新入队等待处理,原始接收秒级时间: %d (%s), message_id: %s ' ,
127127 $ sandboxId ,
@@ -232,157 +232,4 @@ private function validateMessageFormat($data): void
232232 }
233233 }
234234 }
235-
236- /**
237- * 打印消息详情.
238- *
239- * @param array $data 消息数据
240- */
241- private function logMessageDetails (array $ data ): void
242- {
243- if (isset ($ data ['metadata ' ], $ data ['payload ' ])) {
244- // 新格式消息
245- $ payload = $ data ['payload ' ];
246- $ metadata = $ data ['metadata ' ];
247-
248- // 记录元数据
249- $ this ->logger ->info (sprintf (
250- '话题任务消息元数据 - sandbox_id: %s, agent_user_id: %s ' ,
251- $ metadata ['sandbox_id ' ] ?? '未提供 ' ,
252- $ metadata ['agent_user_id ' ] ?? '未提供 '
253- ));
254-
255- // 记录负载数据
256- $ this ->logger ->info (sprintf (
257- '话题任务消息负载 - message_id: %s, type: %s, task_id: %s, status: %s ' ,
258- $ payload ['message_id ' ] ?? '未提供 ' ,
259- $ payload ['type ' ] ?? '未提供 ' ,
260- $ payload ['task_id ' ] ?? '未提供 ' ,
261- $ payload ['status ' ] ?? '未提供 '
262- ));
263-
264- // 记录消息内容
265- if (isset ($ payload ['content ' ]) && ! empty ($ payload ['content ' ])) {
266- $ this ->logger ->info (sprintf (
267- '话题任务消息内容: %s ' ,
268- $ payload ['content ' ]
269- ));
270- }
271-
272- // 记录步骤信息
273- if (isset ($ payload ['steps ' ]) && ! empty ($ payload ['steps ' ])) {
274- $ this ->logger ->info (sprintf (
275- '话题任务步骤信息: %s ' ,
276- json_encode ($ payload ['steps ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
277- ));
278- }
279-
280- // 记录工具信息
281- if (isset ($ payload ['tool ' ]) && ! empty ($ payload ['tool ' ])) {
282- $ this ->logger ->info (sprintf (
283- '话题任务工具信息: %s ' ,
284- json_encode ($ payload ['tool ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
285- ));
286- }
287- } else {
288- // 旧格式消息
289- // 记录消息ID和类型
290- $ this ->logger ->info (sprintf (
291- '话题任务消息详情(旧格式) - message_id: %s, type: %s, task_id: %s, status: %s ' ,
292- $ data ['message_id ' ] ?? '未提供 ' ,
293- $ data ['type ' ] ?? '未提供 ' ,
294- $ data ['task_id ' ] ?? '未提供 ' ,
295- $ data ['status ' ] ?? '未提供 '
296- ));
297-
298- // 记录消息内容
299- if (isset ($ data ['content ' ]) && ! empty ($ data ['content ' ])) {
300- $ this ->logger ->info (sprintf (
301- '话题任务消息内容: %s ' ,
302- $ data ['content ' ]
303- ));
304- }
305-
306- // 记录步骤信息
307- if (isset ($ data ['steps ' ]) && ! empty ($ data ['steps ' ])) {
308- $ this ->logger ->info (sprintf (
309- '话题任务步骤信息: %s ' ,
310- json_encode ($ data ['steps ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
311- ));
312- }
313-
314- // 记录工具信息
315- if (isset ($ data ['tool ' ]) && ! empty ($ data ['tool ' ])) {
316- $ this ->logger ->info (sprintf (
317- '话题任务工具信息: %s ' ,
318- json_encode ($ data ['tool ' ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES )
319- ));
320- }
321- }
322- }
323-
324- /**
325- * 带重试机制的获取锁.
326- *
327- * @param string $sandboxId 沙箱ID
328- * @param TopicTaskMessageDTO $messageDTO 消息DTO
329- * @return array 包含锁信息的数组,acquired表示是否成功获取锁,owner表示锁的持有者,key表示锁的键名
330- */
331- private function acquireLockWithRetry (?string $ sandboxId , TopicTaskMessageDTO $ messageDTO ): array
332- {
333- $ result = [
334- 'acquired ' => false ,
335- 'owner ' => '' ,
336- 'key ' => '' ,
337- ];
338-
339- if (empty ($ sandboxId )) {
340- return $ result ;
341- }
342-
343- $ lockKey = 'handle_sandbox_message_lock: ' . $ sandboxId ;
344- $ lockOwner = IdGenerator::getUniqueId32 (); // 使用唯一ID作为锁持有者标识
345- $ lockExpireSeconds = 30 ; // 锁的过期时间(秒),消息处理可能需要更长时间
346-
347- $ maxRetries = 3 ;
348- $ retryCount = 0 ;
349- $ baseWaitTime = 1 ; // 基础等待时间(秒)
350-
351- while ($ retryCount <= $ maxRetries ) {
352- $ lockAcquired = (bool ) $ this ->superAgentAppService ->acquireLock ($ lockKey , $ lockOwner , $ lockExpireSeconds );
353-
354- if ($ lockAcquired ) {
355- $ this ->logger ->debug (sprintf ('已获取sandbox %s的锁,持有者: %s ' , $ sandboxId , $ lockOwner ));
356- $ result ['acquired ' ] = true ;
357- $ result ['owner ' ] = $ lockOwner ;
358- $ result ['key ' ] = $ lockKey ;
359- return $ result ;
360- }
361-
362- if ($ retryCount === $ maxRetries ) {
363- $ this ->logger ->error (sprintf (
364- '在重试%d次后仍无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s ' ,
365- $ maxRetries ,
366- $ sandboxId ,
367- $ messageDTO ->getPayload ()?->getMessageId()
368- ));
369- // 可以选择将消息重新入队或实现延迟重试策略
370- return $ result ;
371- }
372-
373- $ waitTime = $ baseWaitTime * pow (2 , $ retryCount ); // 指数退避
374- $ this ->logger ->warning (sprintf (
375- '无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s,将在%d秒后进行第%d次重试 ' ,
376- $ sandboxId ,
377- $ messageDTO ->getPayload ()?->getMessageId(),
378- $ waitTime ,
379- $ retryCount + 1
380- ));
381-
382- sleep ($ waitTime );
383- $ retryCount ++;
384- }
385-
386- return $ result ;
387- }
388235}
0 commit comments