Skip to content

Commit f8fa351

Browse files
committed
Added ThrottledConnector and accompanying ThrottlePrecedenceHierarchyTest.
ThrottledConnector specifies a connector that may be throttled. The connector's throttle is used only when not overriden by the Specification's throttle (if specified).
1 parent a04dc68 commit f8fa351

File tree

9 files changed

+156
-12
lines changed

9 files changed

+156
-12
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,8 @@ Porter is published under the open source GNU Lesser General Public License v3.0
515515
[Version image]: https://poser.pugx.org/scriptfusion/porter/version "Latest version"
516516
[Downloads]: https://packagist.org/packages/scriptfusion/porter
517517
[Downloads image]: https://poser.pugx.org/scriptfusion/porter/downloads "Total downloads"
518-
[Build]: http://travis-ci.org/ScriptFUSION/Porter
519-
[Build image]: https://travis-ci.org/ScriptFUSION/Porter.svg?branch=master "Build status"
518+
[Build]: http://travis-ci.com/ScriptFUSION/Porter
519+
[Build image]: https://travis-ci.com/ScriptFUSION/Porter.svg?branch=master "Build status"
520520
[MSI image]: https://badge.stryker-mutator.io/github.com/ScriptFUSION/Porter/master
521521
[Coverage]: https://codecov.io/gh/ScriptFUSION/Porter
522522
[Coverage image]: https://codecov.io/gh/ScriptFUSION/Porter/branch/master/graphs/badge.svg "Test coverage"

src/Connector/ImportConnector.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ final class ImportConnector implements ConnectorWrapper
4747
/**
4848
* @param Connector|AsyncConnector $connector Wrapped connector.
4949
* @param RecoverableExceptionHandler $recoverableExceptionHandler User's recoverable exception handler.
50-
* @param int $maxFetchAttempts
50+
* @param int $maxFetchAttempts Maximum fetch attempts.
5151
* @param bool $mustCache True if the response must be cached, otherwise false.
52+
* @param Throttle|null $throttle Connection throttle invoked each time the connector fetches data. May be null
53+
* for synchronous imports only.
5254
*/
5355
public function __construct(
5456
$connector,

src/Connector/ImportConnectorFactory.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33

44
namespace ScriptFUSION\Porter\Connector;
55

6+
use ScriptFUSION\Async\Throttle\NullThrottle;
67
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
78
use ScriptFUSION\Porter\Specification\Specification;
89
use ScriptFUSION\StaticClass;
910

11+
/**
12+
* @internal For internal use only.
13+
*/
1014
final class ImportConnectorFactory
1115
{
1216
use StaticClass;
@@ -19,12 +23,20 @@ final class ImportConnectorFactory
1923
*/
2024
public static function create($connector, Specification $specification): ImportConnector
2125
{
26+
if ($specification instanceof AsyncImportSpecification) {
27+
$throttle = $specification->getThrottle();
28+
29+
if ($throttle instanceof NullThrottle && $connector instanceof ThrottledConnector) {
30+
$throttle = $connector->getThrottle();
31+
}
32+
}
33+
2234
return new ImportConnector(
2335
$connector,
2436
$specification->getRecoverableExceptionHandler(),
2537
$specification->getMaxFetchAttempts(),
2638
$specification->mustCache(),
27-
$specification instanceof AsyncImportSpecification ? $specification->getThrottle() : null
39+
$throttle ?? null
2840
);
2941
}
3042
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Connector;
5+
6+
use ScriptFUSION\Async\Throttle\Throttle;
7+
8+
/**
9+
* Specifies a connector that is rate-limited by a connection throttle.
10+
*
11+
* Currently only supported for async connectors.
12+
*/
13+
interface ThrottledConnector
14+
{
15+
/**
16+
* Gets the connection throttle, invoked each time the connector fetches data.
17+
*
18+
* @return Throttle Connection throttle.
19+
*/
20+
public function getThrottle(): Throttle;
21+
}

src/Specification/AsyncImportSpecification.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ final public function getAsyncResource(): AsyncResource
5252

5353
final public function addTransformer(AsyncTransformer $transformer): self
5454
{
55-
return parent::addAnyTransformer($transformer);
55+
return $this->addAnyTransformer($transformer);
5656
}
5757

5858
protected static function createDefaultRecoverableExceptionHandler(): RecoverableExceptionHandler

src/Specification/ImportSpecification.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ final public function getResource(): ProviderResource
5353
*/
5454
final public function addTransformer(Transformer $transformer): self
5555
{
56-
return parent::addAnyTransformer($transformer);
56+
return $this->addAnyTransformer($transformer);
5757
}
5858

5959
protected static function createDefaultRecoverableExceptionHandler(): RecoverableExceptionHandler

test/FixtureFactory.php

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
namespace ScriptFUSIONTest;
55

6-
use Amp\Success;
7-
use ScriptFUSION\Async\Throttle\Throttle;
86
use ScriptFUSION\Porter\Connector\Connector;
97
use ScriptFUSION\Porter\Connector\ImportConnector;
108
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
@@ -26,10 +24,7 @@ public static function buildImportConnector(
2624
$recoverableExceptionHandler ?: \Mockery::spy(RecoverableExceptionHandler::class),
2725
$maxFetchAttempts,
2826
$mustCache,
29-
\Mockery::mock(Throttle::class)
30-
->shouldReceive('join')
31-
->andReturn(new Success(true))
32-
->getMock()
27+
MockFactory::mockThrottle()
3328
);
3429
}
3530
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSIONTest\Functional;
5+
6+
use Amp\PHPUnit\AsyncTestCase;
7+
use Amp\Success;
8+
use Mockery\Adapter\Phpunit\MockeryPHPUnitIntegration;
9+
use ScriptFUSION\Porter\Connector\AsyncConnector;
10+
use ScriptFUSION\Porter\Connector\AsyncDataSource;
11+
use ScriptFUSION\Porter\Connector\ImportConnectorFactory;
12+
use ScriptFUSION\Porter\Connector\ThrottledConnector;
13+
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
14+
use ScriptFUSIONTest\MockFactory;
15+
16+
/**
17+
* Tests the throttle hierarchy of precedence. Only applies to async imports.
18+
*
19+
* Specification throttle (preferred) > Connector throttle (default).
20+
*
21+
* @see ImportConnectorFactory
22+
*/
23+
final class ThrottlePrecedenceHierarchyTest extends AsyncTestCase
24+
{
25+
use MockeryPHPUnitIntegration;
26+
27+
private $specificationThrottle;
28+
private $connectorThrottle;
29+
private $specification;
30+
private $provider;
31+
32+
protected function setUp(): void
33+
{
34+
parent::setUp();
35+
36+
$this->specificationThrottle = MockFactory::mockThrottle();
37+
$this->connectorThrottle = MockFactory::mockThrottle();
38+
39+
$this->specification = new AsyncImportSpecification(MockFactory::mockResource(
40+
$this->provider = MockFactory::mockProvider()
41+
));
42+
}
43+
44+
/**
45+
* Tests that when the connector is non-throttling, the specification's throttle is used.
46+
*/
47+
public function testNonThrottledConnector(): \Generator
48+
{
49+
$this->specification->setThrottle($this->specificationThrottle);
50+
$this->specificationThrottle->expects('await')->once()->andReturn(new Success());
51+
$this->connectorThrottle->expects('await')->never();
52+
53+
$connector = ImportConnectorFactory::create($this->provider->getAsyncConnector(), $this->specification);
54+
55+
yield $connector->fetchAsync(\Mockery::mock(AsyncDataSource::class));
56+
}
57+
58+
/**
59+
* Tests that when the connector is throttled, and the specification's throttle is not set, the connector's
60+
* throttle is used.
61+
*/
62+
public function testThrottledConnector(): \Generator
63+
{
64+
$this->specificationThrottle->expects('await')->never();
65+
$this->connectorThrottle->expects('await')->once()->andReturn(new Success());
66+
67+
$connector = ImportConnectorFactory::create($this->mockThrottledConnector(), $this->specification);
68+
69+
yield $connector->fetchAsync(\Mockery::mock(AsyncDataSource::class));
70+
}
71+
72+
/**
73+
* Tests that when both the connector is throttled and the specification's throttle are set, the specification's
74+
* throttle overrides that of the connector.
75+
*/
76+
public function testThrottledConnectorOverride(): \Generator
77+
{
78+
$this->specification->setThrottle($this->specificationThrottle);
79+
$this->specificationThrottle->expects('await')->once()->andReturn(new Success());
80+
$this->connectorThrottle->expects('await')->never();
81+
82+
$connector = ImportConnectorFactory::create($this->mockThrottledConnector(), $this->specification);
83+
84+
yield $connector->fetchAsync(\Mockery::mock(AsyncDataSource::class));
85+
}
86+
87+
/**
88+
* @return AsyncConnector|ThrottledConnector
89+
*/
90+
private function mockThrottledConnector(): AsyncConnector
91+
{
92+
return \Mockery::mock(AsyncConnector::class, ThrottledConnector::class)
93+
->shouldReceive('getThrottle')
94+
->andReturn($this->connectorThrottle)
95+
->shouldReceive('fetchAsync')
96+
->andReturn(MockFactory::mockPromise())
97+
->getMock()
98+
;
99+
}
100+
}

test/MockFactory.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
use Amp\Iterator;
88
use Amp\Producer;
99
use Amp\Promise;
10+
use Amp\Success;
1011
use Mockery\MockInterface;
12+
use ScriptFUSION\Async\Throttle\Throttle;
1113
use ScriptFUSION\Porter\Connector\AsyncConnector;
1214
use ScriptFUSION\Porter\Connector\AsyncDataSource;
1315
use ScriptFUSION\Porter\Connector\Connector;
@@ -95,6 +97,18 @@ public static function mockSingleRecordResource(Provider $provider)
9597
return self::mockResource($provider, null, true);
9698
}
9799

100+
/**
101+
* @return Throttle|MockInterface
102+
*/
103+
public static function mockThrottle()
104+
{
105+
return \Mockery::mock(Throttle::class)
106+
->shouldReceive('join')
107+
->andReturn(new Success(true))
108+
->getMock()
109+
;
110+
}
111+
98112
/**
99113
* @return Promise|MockInterface
100114
*/

0 commit comments

Comments
 (0)