Skip to content

Commit f5f753c

Browse files
committed
Added Throttle support for async imports only.
1 parent 0c3e70a commit f5f753c

File tree

5 files changed

+39
-8
lines changed

5 files changed

+39
-8
lines changed

composer.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
"license": "LGPL-3.0",
1111
"require": {
1212
"php": "^7.1",
13-
"scriptfusion/static-class": "^1",
14-
"scriptfusion/retry": "^2.1",
15-
"scriptfusion/retry-exception-handlers": "^1.2",
1613
"amphp/amp": "^2",
14+
"async/throttle": "^2",
15+
"psr/cache": "^1",
1716
"psr/container": "^1",
18-
"psr/cache": "^1"
17+
"scriptfusion/retry": "^2.1",
18+
"scriptfusion/retry-exception-handlers": "^1.2",
19+
"scriptfusion/static-class": "^1"
1920
},
2021
"require-dev": {
2122
"amphp/phpunit-util": "^1.1",

src/Connector/ImportConnector.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
namespace ScriptFUSION\Porter\Connector;
55

66
use Amp\Promise;
7+
use ScriptFUSION\Async\Throttle\Throttle;
78
use ScriptFUSION\Porter\Cache\CacheUnavailableException;
89
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableException;
910
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
1011
use ScriptFUSION\Porter\Connector\Recoverable\StatelessRecoverableExceptionHandler;
12+
use function Amp\call;
1113
use function Amp\Promise\all;
1214
use function ScriptFUSION\Retry\retry;
1315
use function ScriptFUSION\Retry\retryAsync;
@@ -40,6 +42,8 @@ final class ImportConnector implements ConnectorWrapper
4042

4143
private $maxFetchAttempts;
4244

45+
private $throttle;
46+
4347
/**
4448
* @param Connector|AsyncConnector $connector Wrapped connector.
4549
* @param RecoverableExceptionHandler $recoverableExceptionHandler User's recoverable exception handler.
@@ -50,7 +54,8 @@ public function __construct(
5054
$connector,
5155
RecoverableExceptionHandler $recoverableExceptionHandler,
5256
int $maxFetchAttempts,
53-
bool $mustCache
57+
bool $mustCache,
58+
?Throttle $throttle
5459
) {
5560
if ($mustCache && !$connector instanceof CachingConnector) {
5661
throw CacheUnavailableException::createUnsupported();
@@ -64,6 +69,7 @@ public function __construct(
6469
);
6570
$this->userExceptionHandler = $recoverableExceptionHandler;
6671
$this->maxFetchAttempts = $maxFetchAttempts;
72+
$this->throttle = $throttle;
6773
}
6874

6975
/**
@@ -96,7 +102,11 @@ public function fetchAsync(AsyncDataSource $source): Promise
96102
return retryAsync(
97103
$this->maxFetchAttempts,
98104
function () use ($source): Promise {
99-
return $this->connector->fetchAsync($source);
105+
return call(function () use ($source): \Generator {
106+
yield $this->throttle->await($response = $this->connector->fetchAsync($source));
107+
108+
return yield $response;
109+
});
100110
},
101111
$this->createExceptionHandler()
102112
);

src/Connector/ImportConnectorFactory.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace ScriptFUSION\Porter\Connector;
55

6+
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
67
use ScriptFUSION\Porter\Specification\Specification;
78
use ScriptFUSION\StaticClass;
89

@@ -22,7 +23,8 @@ public static function create($connector, Specification $specification): ImportC
2223
$connector,
2324
$specification->getRecoverableExceptionHandler(),
2425
$specification->getMaxFetchAttempts(),
25-
$specification->mustCache()
26+
$specification->mustCache(),
27+
$specification instanceof AsyncImportSpecification ? $specification->getThrottle() : null
2628
);
2729
}
2830
}

src/Specification/AsyncImportSpecification.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace ScriptFUSION\Porter\Specification;
55

6+
use ScriptFUSION\Async\Throttle\Throttle;
67
use ScriptFUSION\Porter\Connector\Recoverable\ExponentialAsyncDelayRecoverableExceptionHandler;
78
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
89
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
@@ -15,6 +16,9 @@ class AsyncImportSpecification extends Specification
1516
{
1617
private $asyncResource;
1718

19+
/** @var Throttle */
20+
private $throttle;
21+
1822
/**
1923
* Initializes this instance with the specified asynchronous resource.
2024
*
@@ -53,4 +57,16 @@ protected static function createDefaultRecoverableExceptionHandler(): Recoverabl
5357
{
5458
return new ExponentialAsyncDelayRecoverableExceptionHandler;
5559
}
60+
61+
final public function getThrottle(): Throttle
62+
{
63+
return $this->throttle ?? $this->throttle = new Throttle();
64+
}
65+
66+
final public function setThrottle(Throttle $throttle): self
67+
{
68+
$this->throttle = $throttle;
69+
70+
return $this;
71+
}
5672
}

test/FixtureFactory.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace ScriptFUSIONTest;
55

6+
use ScriptFUSION\Async\Throttle\Throttle;
67
use ScriptFUSION\Porter\Connector\Connector;
78
use ScriptFUSION\Porter\Connector\ImportConnector;
89
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
@@ -23,7 +24,8 @@ public static function buildImportConnector(
2324
$connector,
2425
$recoverableExceptionHandler ?: \Mockery::spy(RecoverableExceptionHandler::class),
2526
$maxFetchAttempts,
26-
$mustCache
27+
$mustCache,
28+
\Mockery::mock(Throttle::class)
2729
);
2830
}
2931
}

0 commit comments

Comments
 (0)