Skip to content

Commit f8ce192

Browse files
committed
feat(TopicTaskMessageSubscriber): update queue parameters to allow 3 message processing and add retry mechanism for lock acquisition
2 parents bdd68a3 + 3a53958 commit f8ce192

File tree

1 file changed

+76
-14
lines changed

1 file changed

+76
-14
lines changed

backend/super-magic-module/src/Application/SuperAgent/Event/Subscribe/TopicTaskMessageSubscriber.php

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
* 话题任务消息订阅者.
2424
*/
2525
#[Consumer(
26-
exchange: 'super_magic_topic_task_message',
27-
routingKey: 'super_magic_topic_task_message',
28-
queue: 'super_magic_topic_task_message',
29-
nums: 1
26+
exchange: 'super_magic_topic_task_message',
27+
routingKey: 'super_magic_topic_task_message',
28+
queue: 'super_magic_topic_task_message',
29+
nums: 3
3030
)]
3131
class TopicTaskMessageSubscriber extends ConsumerMessage
3232
{
@@ -77,7 +77,6 @@ public function consumeMessage($data, AMQPMessage $message): Result
7777
$applicationHeaders = $messageProperties['application_headers'] ?? new AMQPTable([]);
7878
// 直接从原生数据中获取,如果不存在则为 null
7979
$originalTimestampFromHeader = $applicationHeaders->getNativeData()['x-original-timestamp'] ?? null;
80-
8180
$currentTimeForLog = time(); // 当前处理时间,主要用于日志和可能的本地逻辑
8281
$actualOriginalTimestamp = null; // 初始化变量以避免 linter 警告
8382

@@ -91,11 +90,11 @@ public function consumeMessage($data, AMQPMessage $message): Result
9190
'消息未找到 x-original-timestamp 头部,将使用当前时间作为本次处理的原始时间戳参考: %d (%s). 请确保生产者已设置此头部. Message ID: %s',
9291
$actualOriginalTimestamp,
9392
date('Y-m-d H:i:s', $actualOriginalTimestamp),
94-
$data['payload']['message_id'] ?? 'N/A'
93+
$data['payload']['message_id'] ?? 'N/A'
9594
));
9695
// 不再尝试修改消息的 application_headers,因为这对于 REQUEUE 后的消息通常无效
9796
}
98-
97+
9998
// 验证消息格式
10099
$this->validateMessageFormat($data);
101100

@@ -104,24 +103,24 @@ public function consumeMessage($data, AMQPMessage $message): Result
104103

105104
// 创建DTO
106105
$messageDTO = TopicTaskMessageDTO::fromArray($data);
107-
106+
108107
// 获取sandboxId用于锁定
109108
$sandboxId = $messageDTO->getMetadata()?->getSandboxId();
110109
if (empty($sandboxId)) {
111110
$this->logger->warning('缺少有效的sandboxId,无法加锁保证消息顺序性,将直接处理消息', [
112111
'message_id' => $messageDTO->getPayload()?->getMessageId(),
113112
'message' => json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES),
114113
]);
115-
return Result::ACK;
114+
return Result::ACK;
116115
}
117116

118117
// 尝试获取锁
119118
$lockKey = 'handle_sandbox_message_lock:' . $sandboxId;
120119
$lockOwner = IdGenerator::getUniqueId32();
121-
$lockExpireSeconds = 30;
122-
120+
$lockExpireSeconds = 30;
121+
123122
$lockAcquired = (bool) $this->superAgentAppService->acquireLock($lockKey, $lockOwner, $lockExpireSeconds);
124-
123+
125124
if (!$lockAcquired) {
126125
$this->logger->info(sprintf(
127126
'无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,将消息重新入队等待处理,原始接收秒级时间: %d (%s), message_id: %s',
@@ -132,7 +131,6 @@ public function consumeMessage($data, AMQPMessage $message): Result
132131
));
133132
return Result::REQUEUE;
134133
}
135-
136134
$this->logger->info(sprintf(
137135
'已获取sandbox %s的锁,持有者: %s,开始处理消息,原始接收秒级时间: %d (%s), message_id: %s',
138136
$sandboxId,
@@ -141,7 +139,6 @@ public function consumeMessage($data, AMQPMessage $message): Result
141139
date('Y-m-d H:i:s', $actualOriginalTimestamp),
142140
$messageDTO->getPayload()?->getMessageId()
143141
));
144-
145142
try {
146143
$this->superAgentAppService->handleTopicTaskMessage($messageDTO);
147144
return Result::ACK;
@@ -322,4 +319,69 @@ private function logMessageDetails(array $data): void
322319
}
323320
}
324321
}
322+
323+
/**
324+
* 带重试机制的获取锁.
325+
*
326+
* @param string $sandboxId 沙箱ID
327+
* @param TopicTaskMessageDTO $messageDTO 消息DTO
328+
* @return array 包含锁信息的数组,acquired表示是否成功获取锁,owner表示锁的持有者,key表示锁的键名
329+
*/
330+
private function acquireLockWithRetry(?string $sandboxId, TopicTaskMessageDTO $messageDTO): array
331+
{
332+
$result = [
333+
'acquired' => false,
334+
'owner' => '',
335+
'key' => '',
336+
];
337+
338+
if (empty($sandboxId)) {
339+
return $result;
340+
}
341+
342+
$lockKey = 'handle_sandbox_message_lock:' . $sandboxId;
343+
$lockOwner = IdGenerator::getUniqueId32(); // 使用唯一ID作为锁持有者标识
344+
$lockExpireSeconds = 30; // 锁的过期时间(秒),消息处理可能需要更长时间
345+
346+
$maxRetries = 3;
347+
$retryCount = 0;
348+
$baseWaitTime = 1; // 基础等待时间(秒)
349+
350+
while ($retryCount <= $maxRetries) {
351+
$lockAcquired = (bool) $this->superAgentAppService->acquireLock($lockKey, $lockOwner, $lockExpireSeconds);
352+
353+
if ($lockAcquired) {
354+
$this->logger->debug(sprintf('已获取sandbox %s的锁,持有者: %s', $sandboxId, $lockOwner));
355+
$result['acquired'] = true;
356+
$result['owner'] = $lockOwner;
357+
$result['key'] = $lockKey;
358+
return $result;
359+
}
360+
361+
if ($retryCount === $maxRetries) {
362+
$this->logger->error(sprintf(
363+
'在重试%d次后仍无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s',
364+
$maxRetries,
365+
$sandboxId,
366+
$messageDTO->getPayload()?->getMessageId()
367+
));
368+
// 可以选择将消息重新入队或实现延迟重试策略
369+
return $result;
370+
}
371+
372+
$waitTime = $baseWaitTime * pow(2, $retryCount); // 指数退避
373+
$this->logger->warning(sprintf(
374+
'无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s,将在%d秒后进行第%d次重试',
375+
$sandboxId,
376+
$messageDTO->getPayload()?->getMessageId(),
377+
$waitTime,
378+
$retryCount + 1
379+
));
380+
381+
sleep($waitTime);
382+
$retryCount++;
383+
}
384+
385+
return $result;
386+
}
325387
}

0 commit comments

Comments
 (0)