Skip to content

Commit 0c536e5

Browse files
committed
Fix Predis stream in cluster
1 parent 6c0ed26 commit 0c536e5

File tree

2 files changed

+109
-5
lines changed

2 files changed

+109
-5
lines changed

src/Dashboards/Redis/Compatibility/Cluster/PredisCluster.php

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use RobiNN\Pca\Dashboards\Redis\Compatibility\RedisCompatibilityInterface;
1717
use RobiNN\Pca\Dashboards\Redis\Compatibility\RedisJson;
1818
use RobiNN\Pca\Dashboards\Redis\Compatibility\RedisModules;
19+
use Throwable;
1920

2021
class PredisCluster extends PredisClient implements RedisCompatibilityInterface {
2122
use RedisJson;
@@ -198,11 +199,118 @@ public function listRem(string $key, string $value, int $count): int {
198199
return $this->lrem($key, $count, $value);
199200
}
200201

202+
private function getKeyNode(string $key): ?PredisClient {
203+
$cluster = $this->getConnection();
204+
205+
if (!method_exists($cluster, 'getClusterStrategy') || !method_exists($cluster, 'getConnectionBySlot')) {
206+
return null;
207+
}
208+
209+
$slot = $cluster->getClusterStrategy()->getSlotByKey($key);
210+
211+
if ($slot === null) {
212+
return null;
213+
}
214+
215+
try {
216+
$connection = $cluster->getConnectionBySlot((string) $slot);
217+
} catch (Throwable) {
218+
return null;
219+
}
220+
221+
foreach ($this->nodes as $node) {
222+
$nodeConn = $node->getConnection();
223+
if (
224+
method_exists($nodeConn, 'getParameters') &&
225+
$nodeConn->getParameters()->host === $connection->getParameters()->host &&
226+
$nodeConn->getParameters()->port === $connection->getParameters()->port
227+
) {
228+
return $node;
229+
}
230+
}
231+
232+
return null;
233+
}
234+
201235
/**
202236
* @param array<string, string> $messages
203237
*/
204238
public function streamAdd(string $key, string $id, array $messages): string {
205-
return $this->xadd($key, $messages, $id);
239+
$args = [$key, $id];
240+
foreach ($messages as $field => $value) {
241+
$args[] = $field;
242+
$args[] = $value;
243+
}
244+
245+
$node = $this->getKeyNode($key);
246+
if (!$node) {
247+
return '';
248+
}
249+
250+
$raw = $node->executeRaw(array_merge(['XADD'], $args));
251+
252+
return $raw !== false && $raw !== null ? (string) $raw : '';
253+
}
254+
255+
/**
256+
* @return array<string, array<string, string>>
257+
*/
258+
public function xrange(string $key, string $start, string $end, ?int $count = null): array {
259+
$args = [$key, $start, $end];
260+
if ($count !== null) {
261+
$args[] = 'COUNT';
262+
$args[] = (string) $count;
263+
}
264+
265+
$node = $this->getKeyNode($key);
266+
if (!$node) {
267+
return [];
268+
}
269+
270+
$raw = $node->executeRaw(array_merge(['XRANGE'], $args));
271+
272+
if (!is_array($raw)) {
273+
return [];
274+
}
275+
276+
return $this->parseStreamEntries($raw);
277+
}
278+
279+
/**
280+
* @param list<array{string, list<string>}> $entries
281+
*
282+
* @return array<string, array<string, string>>
283+
*/
284+
private function parseStreamEntries(array $entries): array {
285+
$result = [];
286+
foreach ($entries as [$id, $fields]) {
287+
$assoc = [];
288+
for ($i = 0, $iMax = count($fields); $i < $iMax; $i += 2) {
289+
$assoc[$fields[$i]] = $fields[$i + 1];
290+
}
291+
$result[$id] = $assoc;
292+
}
293+
294+
return $result;
295+
}
296+
297+
/**
298+
* @param string|array<int, string> ...$id
299+
*/
300+
public function xdel(string $key, string|array ...$id): int {
301+
if (count($id) === 1 && is_array($id[0])) {
302+
$id = $id[0];
303+
}
304+
305+
$node = $this->getKeyNode($key);
306+
if (!$node) {
307+
return 0;
308+
}
309+
310+
$args = array_merge([$key], $id);
311+
$raw = $node->executeRaw(array_merge(['XDEL'], $args));
312+
313+
return is_int($raw) ? $raw : 0;
206314
}
207315

208316
public function rawcommand(string $command, mixed ...$arguments): mixed {

tests/Dashboards/Redis/RedisTestCase.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,6 @@ public function testHashType(): void {
241241
* @throws Exception
242242
*/
243243
public function testStreamType(): void {
244-
if (self::$is_cluster && $this->client === 'predis') {
245-
$this->markTestSkipped('Skipped Predis stream test');
246-
}
247-
248244
$this->dashboard->store('stream', 'pu-test-type-stream', '', '', [
249245
'stream_id' => '1670541476219-0',
250246
'stream_fields' => ['field1' => 'stvalue1', 'field2' => 'stvalue2'],

0 commit comments

Comments
 (0)