Skip to content

Commit e7bd1b5

Browse files
Release redis connection immediately when using transaction or pipeline with callbacks. (#7394)
Co-authored-by: 李铭昕 <[email protected]>
1 parent 9fa4e43 commit e7bd1b5

File tree

3 files changed

+238
-14
lines changed

3 files changed

+238
-14
lines changed

src/Redis.php

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,8 @@ public function __call($name, $arguments)
7171
}
7272
// Should storage the connection to coroutine context, then use defer() to release the connection.
7373
Context::set($this->getContextKey(), $connection);
74-
defer(function () use ($connection) {
75-
Context::set($this->getContextKey(), null);
76-
$connection->release();
74+
defer(function () {
75+
$this->releaseContextConnection();
7776
});
7877
} else {
7978
// Release the connection after command executed.
@@ -85,6 +84,20 @@ public function __call($name, $arguments)
8584
return $result;
8685
}
8786

87+
/**
88+
* Release the connection stored in coroutine context.
89+
*/
90+
private function releaseContextConnection(): void
91+
{
92+
$contextKey = $this->getContextKey();
93+
$connection = Context::get($contextKey);
94+
95+
if ($connection) {
96+
Context::set($contextKey, null);
97+
$connection->release();
98+
}
99+
}
100+
88101
/**
89102
* Define the commands that need same connection to execute.
90103
* When these commands executed, the connection will storage to coroutine context.

src/Traits/MultiExec.php

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
namespace Hyperf\Redis\Traits;
1414

15+
use Hyperf\Context\Context;
16+
use Hyperf\Redis\Redis as HyperfRedis;
1517
use Redis;
1618
use RedisCluster;
1719

@@ -26,9 +28,7 @@ trait MultiExec
2628
*/
2729
public function pipeline(?callable $callback = null)
2830
{
29-
$pipeline = $this->__call('pipeline', []);
30-
31-
return is_null($callback) ? $pipeline : tap($pipeline, $callback)->exec();
31+
return $this->executeMultiExec('pipeline', $callback);
3232
}
3333

3434
/**
@@ -38,8 +38,33 @@ public function pipeline(?callable $callback = null)
3838
*/
3939
public function transaction(?callable $callback = null)
4040
{
41-
$transaction = $this->__call('multi', []);
41+
return $this->executeMultiExec('multi', $callback);
42+
}
43+
44+
/**
45+
* Execute multi-exec commands with optional callback.
46+
*
47+
* @return array|Redis|RedisCluster
48+
*/
49+
private function executeMultiExec(string $command, ?callable $callback = null)
50+
{
51+
if (is_null($callback)) {
52+
return $this->__call($command, []);
53+
}
54+
55+
if (! $this instanceof HyperfRedis) {
56+
return tap($this->__call($command, []), $callback)->exec();
57+
}
58+
59+
$hasExistingConnection = Context::has($this->getContextKey());
60+
$instance = $this->__call($command, []);
4261

43-
return is_null($callback) ? $transaction : tap($transaction, $callback)->exec();
62+
try {
63+
return tap($instance, $callback)->exec();
64+
} finally {
65+
if (! $hasExistingConnection) {
66+
$this->releaseContextConnection();
67+
}
68+
}
4469
}
4570
}

tests/RedisProxyTest.php

Lines changed: 192 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Hyperf\Context\ApplicationContext;
1717
use Hyperf\Contract\ConfigInterface;
1818
use Hyperf\Contract\StdoutLoggerInterface;
19+
use Hyperf\Coroutine\Waiter;
1920
use Hyperf\Di\Container;
2021
use Hyperf\Engine\Channel as Chan;
2122
use Hyperf\Pool\Channel;
@@ -30,6 +31,7 @@
3031
use PHPUnit\Framework\TestCase;
3132
use Psr\EventDispatcher\EventDispatcherInterface;
3233
use RedisCluster;
34+
use Throwable;
3335

3436
use function Hyperf\Coroutine\go;
3537

@@ -213,14 +215,195 @@ public function testRedisPipeline()
213215
$this->assertSame([['C', 'D'], true], $chan2->pop());
214216
}
215217

218+
public function testPipelineCallbackAndSelect()
219+
{
220+
$redis = $this->getRedis();
221+
(new Waiter())->wait(function () use ($redis) {
222+
$redis->select(1);
223+
224+
$redis->set('concurrent_pipeline_test_callback_and_select_value', $id = uniqid(), 600);
225+
226+
$key = 'concurrent_pipeline_test_callback_and_select';
227+
228+
$redis->pipeline(function (\Redis $pipe) use ($key) {
229+
$pipe->set($key, "value_{$key}");
230+
$pipe->incr("{$key}_counter");
231+
$pipe->get($key);
232+
$pipe->get("{$key}_counter");
233+
});
234+
235+
$this->assertSame($id, $redis->get('concurrent_pipeline_test_callback_and_select_value'));
236+
});
237+
}
238+
239+
public function testPipelineCallbackAndPipeline()
240+
{
241+
$redis = $this->getRedis();
242+
(new Waiter())->wait(function () use ($redis) {
243+
$r = $redis->pipeline();
244+
245+
$redis->set('concurrent_pipeline_test_callback_and_select_value', $id = uniqid(), 600);
246+
247+
$key = 'concurrent_pipeline_test_callback_and_select';
248+
249+
$redis->pipeline(function (\Redis $pipe) use ($key) {
250+
$pipe->set($key, "value_{$key}");
251+
$pipe->incr("{$key}_counter");
252+
$pipe->get($key);
253+
$pipe->get("{$key}_counter");
254+
});
255+
256+
go(static function () use ($redis) {
257+
$redis->select(1);
258+
$redis->set('xxx', 'x');
259+
$redis->set('xxx', 'x');
260+
$redis->set('xxx', 'x');
261+
});
262+
263+
$r->set('xxxxxx', 'x');
264+
$r->set('xxxxxx', 'x');
265+
$r->set('xxxxxx', 'x');
266+
$r->set('xxxxxx', 'x');
267+
268+
$this->assertTrue(true);
269+
});
270+
}
271+
272+
public function testConcurrentPipelineCallbacksWithLimitedConnectionPool()
273+
{
274+
$redis = $this->getRedis([], 3); // max_connections = 3
275+
276+
$concurrentOperations = 20; // More than max_connections
277+
$channels = [];
278+
279+
for ($i = 0; $i < $concurrentOperations; ++$i) {
280+
$channels[$i] = new Chan(1);
281+
}
282+
283+
// Start concurrent coroutines using pipeline with callbacks
284+
for ($i = 0; $i < $concurrentOperations; ++$i) {
285+
go(function () use ($redis, $channels, $i) {
286+
try {
287+
$key = "concurrent_pipeline_test_{$i}";
288+
289+
$results = $redis->pipeline(function (\Redis $pipe) use ($key) {
290+
$pipe->set($key, "value_{$key}");
291+
$pipe->incr("{$key}_counter");
292+
$pipe->get($key);
293+
$pipe->get("{$key}_counter");
294+
});
295+
296+
// Simulate work after callback
297+
sleep(1);
298+
299+
$this->assertCount(4, $results);
300+
$this->assertTrue($results[0]);
301+
$this->assertSame(1, $results[1]);
302+
$this->assertSame("value_{$key}", $results[2]);
303+
$this->assertSame('1', $results[3]);
304+
305+
$channels[$i]->push(['success' => true, 'operation' => 'pipeline']);
306+
} catch (Throwable $e) {
307+
$channels[$i]->push(['success' => false, 'error' => $e->getMessage()]);
308+
}
309+
});
310+
}
311+
312+
$successCount = 0;
313+
for ($i = 0; $i < $concurrentOperations; ++$i) {
314+
$result = $channels[$i]->pop(10.0);
315+
$this->assertNotFalse($result, "Operation {$i} timed out - possible connection pool exhaustion");
316+
317+
if ($result['success']) {
318+
++$successCount;
319+
} else {
320+
$this->fail("Concurrent operation {$i} failed: " . $result['error']);
321+
}
322+
}
323+
324+
$this->assertSame(
325+
$concurrentOperations,
326+
$successCount,
327+
"All {$concurrentOperations} concurrent pipeline operations should succeed with only 3 max connections"
328+
);
329+
330+
// Clean up
331+
for ($i = 0; $i < $concurrentOperations; ++$i) {
332+
$redis->del("concurrent_pipeline_test_{$i}");
333+
$redis->del("concurrent_pipeline_test_{$i}_counter");
334+
}
335+
}
336+
337+
public function testConcurrentTransactionCallbacksWithLimitedConnectionPool()
338+
{
339+
$redis = $this->getRedis([], 3); // max_connections = 3
340+
341+
$concurrentOperations = 20; // More than max_connections
342+
$channels = [];
343+
344+
for ($i = 0; $i < $concurrentOperations; ++$i) {
345+
$channels[$i] = new Chan(1);
346+
}
347+
348+
// Start concurrent coroutines using transaction with callbacks
349+
for ($i = 0; $i < $concurrentOperations; ++$i) {
350+
go(function () use ($redis, $channels, $i) {
351+
try {
352+
$key = "concurrent_transaction_test_{$i}";
353+
354+
$results = $redis->transaction(function (\Redis $transaction) use ($key) {
355+
$transaction->set($key, "tx_value_{$key}");
356+
$transaction->incr("{$key}_counter");
357+
$transaction->get($key);
358+
});
359+
360+
// Simulate work after callback
361+
sleep(1);
362+
363+
$this->assertCount(3, $results);
364+
$this->assertTrue($results[0]);
365+
$this->assertSame(1, $results[1]);
366+
$this->assertSame("tx_value_{$key}", $results[2]);
367+
368+
$channels[$i]->push(['success' => true, 'operation' => 'transaction']);
369+
} catch (Throwable $e) {
370+
$channels[$i]->push(['success' => false, 'error' => $e->getMessage()]);
371+
}
372+
});
373+
}
374+
375+
$successCount = 0;
376+
for ($i = 0; $i < $concurrentOperations; ++$i) {
377+
$result = $channels[$i]->pop(10.0);
378+
$this->assertNotFalse($result, "Transaction operation {$i} timed out - possible connection pool exhaustion");
379+
380+
if ($result['success']) {
381+
++$successCount;
382+
} else {
383+
$this->fail("Concurrent transaction {$i} failed: " . $result['error']);
384+
}
385+
}
386+
387+
$this->assertSame(
388+
$concurrentOperations,
389+
$successCount,
390+
"All {$concurrentOperations} concurrent transaction operations should succeed with only 3 max connections"
391+
);
392+
393+
// Clean up
394+
for ($i = 0; $i < $concurrentOperations; ++$i) {
395+
$redis->del("concurrent_transaction_test_{$i}");
396+
$redis->del("concurrent_transaction_test_{$i}_counter");
397+
}
398+
}
399+
216400
/**
217401
* @param mixed $options
218402
* @return \Redis|Redis
219403
*/
220-
private function getRedis($options = [])
404+
private function getRedis($options = [], int $maxConnections = 30)
221405
{
222406
$container = Mockery::mock(Container::class);
223-
$container->shouldReceive('has')->with(StdoutLoggerInterface::class)->andReturnFalse();
224407
$container->shouldReceive('get')->once()->with(ConfigInterface::class)->andReturn(new Config([
225408
'redis' => [
226409
'default' => [
@@ -231,7 +414,7 @@ private function getRedis($options = [])
231414
'options' => $options,
232415
'pool' => [
233416
'min_connections' => 1,
234-
'max_connections' => 30,
417+
'max_connections' => $maxConnections,
235418
'connect_timeout' => 10.0,
236419
'wait_timeout' => 3.0,
237420
'heartbeat' => -1,
@@ -240,18 +423,21 @@ private function getRedis($options = [])
240423
],
241424
],
242425
]));
243-
$pool = new RedisPool($container, 'default');
244426
$frequency = Mockery::mock(LowFrequencyInterface::class);
245427
$frequency->shouldReceive('isLowFrequency')->andReturn(false);
246428
$container->shouldReceive('make')->with(Frequency::class, Mockery::any())->andReturn($frequency);
247-
$container->shouldReceive('make')->with(RedisPool::class, ['name' => 'default'])->andReturn($pool);
248-
$container->shouldReceive('make')->with(Channel::class, ['size' => 30])->andReturn(new Channel(30));
429+
$container->shouldReceive('make')->with(Channel::class, Mockery::any())->andReturnUsing(function ($class, $args) {
430+
return new Channel($args['size']);
431+
});
249432
$container->shouldReceive('make')->with(PoolOption::class, Mockery::any())->andReturnUsing(function ($class, $args) {
250433
return new PoolOption(...array_values($args));
251434
});
252435
$container->shouldReceive('has')->with(StdoutLoggerInterface::class)->andReturnFalse();
253436
$container->shouldReceive('has')->with(EventDispatcherInterface::class)->andReturnFalse();
254437

438+
$pool = new RedisPool($container, 'default');
439+
$container->shouldReceive('make')->with(RedisPool::class, ['name' => 'default'])->andReturn($pool);
440+
255441
ApplicationContext::setContainer($container);
256442

257443
$factory = new PoolFactory($container);

0 commit comments

Comments
 (0)