Skip to content

Commit 5436c91

Browse files
liaobwRock-520
authored andcommitted
增强话题任务消息发布和订阅功能,添加AMQP消息属性以支持原始时间戳,优化队列参数设置,改进消息处理逻辑以确保消息顺序性和优先级管理。
1 parent 67caae4 commit 5436c91

File tree

2 files changed

+103
-94
lines changed

2 files changed

+103
-94
lines changed

backend/super-magic-module/src/Application/SuperAgent/Event/Publish/TopicTaskMessagePublisher.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
use Dtyq\SuperMagic\Domain\SuperAgent\Event\TopicTaskMessageEvent;
1111
use Hyperf\Amqp\Annotation\Producer;
1212
use Hyperf\Amqp\Message\ProducerMessage;
13+
use PhpAmqpLib\Message\AMQPMessage;
14+
use PhpAmqpLib\Wire\AMQPTable;
1315

1416
/**
1517
* 话题任务消息发布器.
@@ -23,5 +25,13 @@ class TopicTaskMessagePublisher extends ProducerMessage
2325
public function __construct(TopicTaskMessageEvent $event)
2426
{
2527
$this->payload = $event->toArray();
28+
29+
// 设置 AMQP 消息属性,包括原始时间戳
30+
$this->properties = [
31+
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 保持消息持久化
32+
'application_headers' => new AMQPTable([
33+
'x-original-timestamp' => time(), // 设置原始时间戳(秒级)
34+
]),
35+
];
2636
}
2737
}

backend/super-magic-module/src/Application/SuperAgent/Event/Subscribe/TopicTaskMessageSubscriber.php

Lines changed: 93 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,45 @@
1515
use Hyperf\Amqp\Result;
1616
use Hyperf\Contract\StdoutLoggerInterface;
1717
use PhpAmqpLib\Message\AMQPMessage;
18+
use PhpAmqpLib\Wire\AMQPTable;
1819
use Throwable;
1920
use App\Infrastructure\Util\IdGenerator\IdGenerator;
2021

2122
/**
2223
* 话题任务消息订阅者.
2324
*/
24-
#[Consumer(exchange: 'super_magic_topic_task_message', routingKey: 'super_magic_topic_task_message', queue: 'super_magic_topic_task_message', nums: 1)]
25+
#[Consumer(
26+
exchange: 'super_magic_topic_task_message',
27+
routingKey: 'super_magic_topic_task_message',
28+
queue: 'super_magic_topic_task_message',
29+
nums: 3
30+
)]
2531
class TopicTaskMessageSubscriber extends ConsumerMessage
2632
{
33+
/**
34+
* @var AMQPTable|array 队列参数,用于设置优先级等
35+
*/
36+
protected AMQPTable|array $queueArguments = [];
37+
38+
/**
39+
* @var array|null QoS 配置,用于控制预取数量等
40+
*/
41+
protected ?array $qos = [
42+
'prefetch_count' => 1, // 每次只预取1条消息
43+
'prefetch_size' => 0,
44+
'global' => false
45+
];
46+
2747
/**
2848
* 构造函数.
2949
*/
3050
public function __construct(
3151
private readonly TaskAppService $superAgentAppService,
3252
private readonly StdoutLoggerInterface $logger
3353
) {
54+
// 设置队列优先级参数
55+
// 注意:AMQPTable 的值需要是 AMQP 规范的类型,例如 ['S', 'value'] for string, ['I', value] for integer
56+
$this->queueArguments['x-max-priority'] = ['I', 10]; // 设置最高优先级为10
3457
}
3558

3659
/**
@@ -49,69 +72,110 @@ public function consumeMessage($data, AMQPMessage $message): Result
4972
json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)
5073
));
5174

75+
// 获取消息属性并检查秒级时间戳
76+
$messageProperties = $message->get_properties();
77+
$applicationHeaders = $messageProperties['application_headers'] ?? new AMQPTable([]);
78+
// 直接从原生数据中获取,如果不存在则为 null
79+
$originalTimestampFromHeader = $applicationHeaders->getNativeData()['x-original-timestamp'] ?? null;
80+
81+
$currentTimeForLog = time(); // 当前处理时间,主要用于日志和可能的本地逻辑
82+
$actualOriginalTimestamp = null; // 初始化变量以避免 linter 警告
83+
84+
if ($originalTimestampFromHeader !== null) {
85+
$actualOriginalTimestamp = (int)$originalTimestampFromHeader; // 确保是整数
86+
$this->logger->info(sprintf('消息已存在原始秒级时间戳: %d (%s), message_id: %s', $actualOriginalTimestamp, date('Y-m-d H:i:s', $actualOriginalTimestamp), $data['payload']['message_id'] ?? 'N/A'));
87+
} else {
88+
// 如果生产者没有设置 x-original-timestamp,这通常是一个需要注意的情况。
89+
$actualOriginalTimestamp = $currentTimeForLog;
90+
$this->logger->warning(sprintf(
91+
'消息未找到 x-original-timestamp 头部,将使用当前时间作为本次处理的原始时间戳参考: %d (%s). 请确保生产者已设置此头部. Message ID: %s',
92+
$actualOriginalTimestamp,
93+
date('Y-m-d H:i:s', $actualOriginalTimestamp),
94+
$data['payload']['message_id'] ?? 'N/A'
95+
));
96+
// 不再尝试修改消息的 application_headers,因为这对于 REQUEUE 后的消息通常无效
97+
}
98+
5299
// 验证消息格式
53100
$this->validateMessageFormat($data);
54101

55-
// 打印消息详情,用于测试和验证
56-
$this->logMessageDetails($data);
102+
// 打印消息详情,用于测试和验证 (根据需要取消注释)
103+
// $this->logMessageDetails($data);
57104

58105
// 创建DTO
59106
$messageDTO = TopicTaskMessageDTO::fromArray($data);
60107

61108
// 获取sandboxId用于锁定
62109
$sandboxId = $messageDTO->getMetadata()?->getSandboxId();
63110
if (empty($sandboxId)) {
64-
$this->logger->warning('缺少有效的sandboxId,无法加锁保证消息顺序性', [
111+
$this->logger->warning('缺少有效的sandboxId,无法加锁保证消息顺序性,将直接处理消息', [
65112
'message_id' => $messageDTO->getPayload()?->getMessageId(),
66113
'message' => json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES),
67114
]);
115+
return Result::ACK;
68116
}
69117

70-
// 获取锁并处理消息
71-
$lockInfo = $this->acquireLockWithRetry($sandboxId, $messageDTO);
72-
$lockAcquired = $lockInfo['acquired'];
73-
$lockOwner = $lockInfo['owner'];
74-
$lockKey = $lockInfo['key'];
118+
// 尝试获取锁
119+
$lockKey = 'handle_sandbox_message_lock:' . $sandboxId;
120+
$lockOwner = IdGenerator::getUniqueId32();
121+
$lockExpireSeconds = 30;
75122

76-
// 如果无法获取锁,直接返回ACK确认消息
77-
if (!empty($sandboxId) && !$lockAcquired) {
78-
return Result::ACK;
123+
$lockAcquired = (bool) $this->superAgentAppService->acquireLock($lockKey, $lockOwner, $lockExpireSeconds);
124+
125+
if (!$lockAcquired) {
126+
$this->logger->info(sprintf(
127+
'无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,将消息重新入队等待处理,原始接收秒级时间: %d (%s), message_id: %s',
128+
$sandboxId,
129+
$actualOriginalTimestamp, // 使用 actualOriginalTimestamp
130+
date('Y-m-d H:i:s', $actualOriginalTimestamp),
131+
$messageDTO->getPayload()?->getMessageId()
132+
));
133+
return Result::REQUEUE;
79134
}
80-
135+
136+
$this->logger->info(sprintf(
137+
'已获取sandbox %s的锁,持有者: %s,开始处理消息,原始接收秒级时间: %d (%s), message_id: %s',
138+
$sandboxId,
139+
$lockOwner,
140+
$actualOriginalTimestamp, // 使用 actualOriginalTimestamp
141+
date('Y-m-d H:i:s', $actualOriginalTimestamp),
142+
$messageDTO->getPayload()?->getMessageId()
143+
));
144+
81145
try {
82-
// 调用应用层服务处理消息
83146
$this->superAgentAppService->handleTopicTaskMessage($messageDTO);
84-
85-
// 返回ACK确认消息已处理
86147
return Result::ACK;
87148
} finally {
88-
// 释放锁
89-
if ($lockAcquired && !empty($sandboxId)) {
90-
if ($this->superAgentAppService->releaseLock($lockKey, $lockOwner)) {
91-
$this->logger->debug(sprintf('已释放sandbox %s的锁,持有者: %s', $sandboxId, $lockOwner));
92-
} else {
93-
$this->logger->error(sprintf('释放sandbox %s的锁失败,持有者: %s,可能需要人工干预', $sandboxId, $lockOwner));
94-
}
149+
if ($this->superAgentAppService->releaseLock($lockKey, $lockOwner)) {
150+
$this->logger->info(sprintf(
151+
'已释放sandbox %s的锁,持有者: %s, message_id: %s',
152+
$sandboxId,
153+
$lockOwner,
154+
$messageDTO->getPayload()?->getMessageId()
155+
));
156+
} else {
157+
$this->logger->error(sprintf(
158+
'释放sandbox %s的锁失败,持有者: %s,可能需要人工干预, message_id: %s',
159+
$sandboxId,
160+
$lockOwner,
161+
$messageDTO->getPayload()?->getMessageId()
162+
));
95163
}
96164
}
97165
} catch (BusinessException $e) {
98-
// 业务异常,记录错误信息
99166
$this->logger->error(sprintf(
100167
'处理话题任务消息失败,业务异常: %s, 消息内容: %s',
101168
$e->getMessage(),
102169
json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)
103170
));
104-
105-
return Result::ACK; // 即使出错也确认消息,避免消息堆积
171+
return Result::ACK;
106172
} catch (Throwable $e) {
107-
// 其他异常,记录错误信息
108173
$this->logger->error(sprintf(
109174
'处理话题任务消息失败,系统异常: %s, 消息内容: %s',
110175
$e->getMessage(),
111176
json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)
112177
));
113-
114-
return Result::ACK; // 即使出错也确认消息,避免消息堆积
178+
return Result::ACK;
115179
}
116180
}
117181

@@ -258,69 +322,4 @@ private function logMessageDetails(array $data): void
258322
}
259323
}
260324
}
261-
262-
/**
263-
* 带重试机制的获取锁.
264-
*
265-
* @param string $sandboxId 沙箱ID
266-
* @param TopicTaskMessageDTO $messageDTO 消息DTO
267-
* @return array 包含锁信息的数组,acquired表示是否成功获取锁,owner表示锁的持有者,key表示锁的键名
268-
*/
269-
private function acquireLockWithRetry(?string $sandboxId, TopicTaskMessageDTO $messageDTO): array
270-
{
271-
$result = [
272-
'acquired' => false,
273-
'owner' => '',
274-
'key' => '',
275-
];
276-
277-
if (empty($sandboxId)) {
278-
return $result;
279-
}
280-
281-
$lockKey = 'handle_sandbox_message_lock:' . $sandboxId;
282-
$lockOwner = IdGenerator::getUniqueId32(); // 使用唯一ID作为锁持有者标识
283-
$lockExpireSeconds = 30; // 锁的过期时间(秒),消息处理可能需要更长时间
284-
285-
$maxRetries = 3;
286-
$retryCount = 0;
287-
$baseWaitTime = 1; // 基础等待时间(秒)
288-
289-
while ($retryCount <= $maxRetries) {
290-
$lockAcquired = (bool) $this->superAgentAppService->acquireLock($lockKey, $lockOwner, $lockExpireSeconds);
291-
292-
if ($lockAcquired) {
293-
$this->logger->debug(sprintf('已获取sandbox %s的锁,持有者: %s', $sandboxId, $lockOwner));
294-
$result['acquired'] = true;
295-
$result['owner'] = $lockOwner;
296-
$result['key'] = $lockKey;
297-
return $result;
298-
}
299-
300-
if ($retryCount === $maxRetries) {
301-
$this->logger->error(sprintf(
302-
'在重试%d次后仍无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s',
303-
$maxRetries,
304-
$sandboxId,
305-
$messageDTO->getPayload()?->getMessageId()
306-
));
307-
// 可以选择将消息重新入队或实现延迟重试策略
308-
return $result;
309-
}
310-
311-
$waitTime = $baseWaitTime * pow(2, $retryCount); // 指数退避
312-
$this->logger->warning(sprintf(
313-
'无法获取sandbox %s的锁,该sandbox可能有其他消息正在处理中,message_id: %s,将在%d秒后进行第%d次重试',
314-
$sandboxId,
315-
$messageDTO->getPayload()?->getMessageId(),
316-
$waitTime,
317-
$retryCount + 1
318-
));
319-
320-
sleep($waitTime);
321-
$retryCount++;
322-
}
323-
324-
return $result;
325-
}
326325
}

0 commit comments

Comments
 (0)