Skip to content

Commit bee03e4

Browse files
author
peter
committed
feat: 增加队列并行消费能力
1 parent 1964f3d commit bee03e4

File tree

11 files changed

+112
-15
lines changed

11 files changed

+112
-15
lines changed

README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@
2020
本项目设计时考虑了多商户多店铺的情况,所以大部分表都需要有`merchant_id``merchant_store_id`字段。项目初始化时请把相应配置填入`amazon_app`表中(**后续会调整该表结构,把不同地区的refresh_token放到amazon_app_region中**)。
2121

2222

23-
2423
### 常用命令
2524
```
25+
# 刷新Token缓存
26+
27+
crontab:amazon:refresh-app-token
28+
29+
30+
2631
# 创建报告
2732
### 强制创建销售与流量报告, 时间范围为2023-12-01到2023-12-20,循环创建每一天的报告
2833
> php bin/hyperf.php amazon:report:create 1 1 GET_SALES_AND_TRAFFIC_REPORT --report_start_date=2023-12-01 --report_end_date=2023-12-20 --is_range_date=1 --is_force_create=1
@@ -34,9 +39,9 @@
3439
> php bin/hyperf.php amazon:report:action
3540
3641
# 获取周期报告
37-
>
42+
> php bin/hyperf.php amazon:report:gets 1 1 us-east-1 GET_DATE_RANGE_FINANCIAL_TRANSACTION_DATA
3843
# 拉取周期报告
39-
>
44+
>
4045
# 处理周期报告
4146
>
4247

app/Command/Amazon/Report/ReportGetDocument.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ public function configure(): void
3939
*/
4040
public function handle(): void
4141
{
42-
(new AmazonGetReportDocumentQueue())->pop();
42+
(new AmazonGetReportDocumentQueue())->coPop(30);//并行请求的数量
4343
}
4444
}

app/Command/Crontab/Amazon/RefreshAppToken.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use App\Model\AmazonAppRegionModel;
1616
use App\Util\AmazonApp;
1717
use App\Util\AmazonSDK;
18+
use App\Util\Constants;
1819
use Hyperf\Command\Annotation\Command;
1920
use Hyperf\Command\Command as HyperfCommand;
2021
use Hyperf\Context\ApplicationContext;
@@ -56,6 +57,7 @@ public function handle(): void
5657
$amazonAppRegionCollections = AmazonAppRegionModel::query()
5758
->where('merchant_id', $merchant_id)
5859
->where('merchant_store_id', $merchant_store_id)
60+
->where('status', Constants::STATUS_ACTIVE)
5961
->get();
6062
if ($amazonAppRegionCollections->isEmpty()) {
6163
return false;

app/Command/Crontab/Schedules.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ public function configure(): void
2828
public function handle(): void
2929
{
3030

31-
set_time_limit(0);
32-
ini_set('memory_limit', '1024M');
33-
3431
$console = ApplicationContext::getContainer()->get(ConsoleLog::class);
3532

3633
// 指令输出

app/Queue/AbstractQueue.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ abstract public function push(QueueDataInterface $queueData): bool;
4343

4444
abstract public function pop(): bool;
4545

46+
abstract public function coPop(): bool;
47+
4648
abstract public function handleQueueData(QueueDataInterface $queueData): bool;
4749

4850
abstract public function len(): int;

app/Queue/AmazonGetReportDocumentQueue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public function handleQueueData(QueueDataInterface $queueData): bool
103103
})();
104104
} elseif (file_exists($file_path)) {
105105
$log = sprintf('Get Document 报告已存在,直接进入队列. report_type: %s report_document_id: %s file_path:%s merchant_id: %s merchant_store_id: %s', $report_type, $report_document_id, $file_path, $merchant_id, $merchant_store_id);
106-
$console->highlight($log);
106+
$console->warning($log);
107107
goto end;
108108
} else {
109109
$retry = 10;

app/Queue/Queue.php

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
use App\Queue\Data\QueueData;
1414
use App\Queue\Data\QueueDataInterface;
1515
use App\Util\Log\QueueLog;
16+
use Hyperf\Collection\Collection;
1617
use Hyperf\Context\ApplicationContext;
1718
use Hyperf\Contract\StdoutLoggerInterface;
19+
use Hyperf\Coroutine\Parallel;
20+
use Hyperf\Engine\Coroutine;
1821
use Psr\Container\ContainerExceptionInterface;
1922
use Psr\Container\NotFoundExceptionInterface;
2023

@@ -64,11 +67,10 @@ public function pop(): bool
6467
$timeout = $this->timeout;
6568

6669
$retryInterval = $this->retryInterval; // 消息重试次数
67-
6870
while (true) {
6971
try {
7072
$pop = $this->redis->brpop($this->queue_name, $timeout);
71-
if (is_array($pop)) {
73+
if (empty($pop)) {
7274
pcntl_signal_dispatch();
7375
$console->info(sprintf('进程[%s] pid:%s 队列为空,自动退出', cli_get_process_title(), $pid));
7476
break;
@@ -124,6 +126,93 @@ public function pop(): bool
124126
return true;
125127
}
126128

129+
public function coPop(int $parallel_num = 100): bool
130+
{
131+
132+
$console = ApplicationContext::getContainer()->get(StdoutLoggerInterface::class);
133+
$logger = ApplicationContext::getContainer()->get(QueueLog::class);
134+
135+
if ($parallel_num < 1) {
136+
$console->error(sprintf('队列:%s 并行消费数量不能小于1。当前并行数量为:%s', $this->queue_name, $parallel_num));
137+
return false;
138+
}
139+
140+
$pid = posix_getpid();
141+
142+
$process_title = $this->queue_name . '-' . $pid;
143+
cli_set_process_title($process_title);
144+
145+
$timeout = $this->timeout;
146+
147+
$retryInterval = $this->retryInterval; // 消息重试次数
148+
149+
while ($this->len()) {
150+
$collections = new Collection();
151+
152+
while (true) {
153+
try {
154+
$pop = $this->redis->brpop($this->queue_name, $timeout);
155+
if (empty($pop)) {
156+
break;
157+
}
158+
$collections->push($pop[1]);
159+
} catch (\RedisException $exception) {
160+
$logger->error(sprintf('队列:%s 连接Redis异常.%s', $this->queue_name, $exception->getMessage()));
161+
break;
162+
}
163+
164+
if ($collections->count() >= $parallel_num) {
165+
break;
166+
}
167+
}
168+
169+
$count = $collections->count();
170+
171+
$console->notice(sprintf('进程[%s] pid:%s 消费数据长度%s', cli_get_process_title(), $pid, $count));
172+
if ($count === 0) {
173+
return true;
174+
}
175+
176+
$class = $this->getQueueDataClass();
177+
178+
$parallel = new Parallel($parallel_num);
179+
180+
$that = $this;
181+
/**
182+
* @var QueueData $dataObject
183+
*/
184+
$collections->each(function ($item) use ($that, $class, $parallel, $logger, $console) {
185+
/**
186+
* @var QueueData $dataObject
187+
*/
188+
$dataObject = new $class();
189+
190+
$arr = $dataObject->toArr($item);
191+
$dataObject->parse($arr);
192+
193+
$parallel->add(function () use ($that, $dataObject, $item, $console) {
194+
195+
$t1 = microtime(true);
196+
197+
$handle = $that->handleQueueData($dataObject);
198+
if ($handle === false) {
199+
}
200+
201+
$t2 = microtime(true);
202+
203+
$co_id = Coroutine::id();//当前协程id
204+
205+
$console->info(sprintf('队列:%s 消费数据. data:%s 耗时:%s 秒 co_id:%s', $this->queue_name, json_encode($item, JSON_THROW_ON_ERROR), round($t2 - $t1, 3), $co_id));
206+
207+
return $co_id;
208+
});
209+
});
210+
211+
$results = $parallel->wait();
212+
}
213+
return true;
214+
}
215+
127216
/**
128217
* @throws \RedisException
129218
*/

app/Util/AmazonApp.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public static function tok(int $merchant_id, int $merchant_store_id, callable $f
9494
$amazonAppRegionCollections = AmazonAppRegionModel::query()
9595
->where('merchant_id', $merchant_id)
9696
->where('merchant_store_id', $merchant_store_id)
97+
->where('status', Constants::STATUS_ACTIVE)
9798
->get();
9899
if ($amazonAppRegionCollections->isEmpty()) {
99100
return false;
@@ -170,6 +171,7 @@ public static function tok2(int $merchant_id, int $merchant_store_id, string $re
170171
->where('merchant_id', $merchant_id)
171172
->where('merchant_store_id', $merchant_store_id)
172173
->where('region', $region)
174+
->where('status', Constants::STATUS_ACTIVE)
173175
->firstOrFail();
174176
} catch (ModelNotFoundException $modelNotFoundException) {
175177
$log = sprintf('Amazon App SDK构建失败,AmazonAppRegion查询失败. 错误信息:%s merchant_id:%s merchant_store_id:%s region:%s ', $modelNotFoundException->getMessage(), $merchant_id, $merchant_store_id, $region);

app/Util/AmazonSDK.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public function getRefreshToken(): string
272272
public function getSdk(string $region, bool $force_refresh = false): SellingPartnerSDK
273273
{
274274
$factory = new Psr17Factory();
275-
$client = new Curl($factory);
275+
$client = new Curl($factory, ['timeout' => 10]);// 超时时间 单位:秒
276276

277277
$sts = new STSClient(
278278
$client,

config/autoload/databases.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
'prefix' => env('DB_PREFIX', ''),
3131
'pool' => [
3232
'min_connections' => 1,
33-
'max_connections' => 10,
33+
'max_connections' => 50,
3434
'connect_timeout' => 10.0,
35-
'wait_timeout' => 3.0,
35+
'wait_timeout' => 10.0,
3636
'heartbeat' => -1,
3737
'max_idle_time' => (float) env('DB_MAX_IDLE_TIME', 60),
3838
],

0 commit comments

Comments
 (0)