Skip to content

Commit 8860ff4

Browse files
committed
Added Throttle::join loop to avoid overloading throttle.
1 parent d292c0c commit 8860ff4

File tree

3 files changed

+9
-1
lines changed

3 files changed

+9
-1
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"require": {
1212
"php": "^7.1",
1313
"amphp/amp": "^2",
14-
"async/throttle": "^2",
14+
"async/throttle": "dev-join",
1515
"psr/cache": "^1",
1616
"psr/container": "^1",
1717
"scriptfusion/retry": "^2.1",

src/Connector/ImportConnector.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ public function fetchAsync(AsyncDataSource $source): Promise
103103
$this->maxFetchAttempts,
104104
function () use ($source): Promise {
105105
return call(function () use ($source): \Generator {
106+
while (!yield $this->throttle->join()) {
107+
// Throttle is choked. Wait for free slot.
108+
}
109+
106110
yield $this->throttle->await($response = $this->connector->fetchAsync($source));
107111

108112
return yield $response;

test/FixtureFactory.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace ScriptFUSIONTest;
55

6+
use Amp\Success;
67
use ScriptFUSION\Async\Throttle\Throttle;
78
use ScriptFUSION\Porter\Connector\Connector;
89
use ScriptFUSION\Porter\Connector\ImportConnector;
@@ -26,6 +27,9 @@ public static function buildImportConnector(
2627
$maxFetchAttempts,
2728
$mustCache,
2829
\Mockery::mock(Throttle::class)
30+
->shouldReceive('join')
31+
->andReturn(new Success(true))
32+
->getMock()
2933
);
3034
}
3135
}

0 commit comments

Comments
 (0)