Skip to content

Commit 0447ccc

Browse files
authored
chore: check fail for empty listpacks in streams (#5704)
Add check fails when we trim or delete list packs in stream code. Push check in low level functions of lp/stream code --------- Signed-off-by: kostas <[email protected]>
1 parent e74bb8c commit 0447ccc

File tree

5 files changed

+35
-6
lines changed

5 files changed

+35
-6
lines changed

src/redis/rax.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -890,11 +890,12 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **
890890
* do that only if the node is a terminal node, otherwise if the OOM
891891
* happened reallocating a node in the middle, we don't need to free
892892
* anything. */
893+
fprintf(stderr, "OOM during raxGenericInsert");
893894
if (h->size == 0) {
894895
h->isnull = 1;
895896
h->iskey = 1;
896897
rax->numele++; /* Compensate the next remove. */
897-
assert(raxRemove(rax,s,i,NULL) != 0);
898+
checkedRaxRemove(rax, s, i, NULL);
898899
}
899900
errno = ENOMEM;
900901
return 0;
@@ -1941,3 +1942,13 @@ unsigned long raxTouch(raxNode *n) {
19411942
}
19421943
return sum;
19431944
}
1945+
1946+
int checkedRaxRemove(rax *rax, unsigned char *s, size_t len, void **old) {
1947+
int res = raxRemove(rax, s, len, old);
1948+
if(res == 0) {
1949+
// lp freed but node not removed!
1950+
fprintf(stderr, "Error: corrupted listpack found.");
1951+
abort();
1952+
}
1953+
return res;
1954+
}

src/redis/rax.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ uint64_t raxSize(rax *rax);
209209
unsigned long raxTouch(raxNode *n);
210210
void raxSetDebugMsg(int onoff);
211211

212+
int checkedRaxRemove(rax *rax, unsigned char *s, size_t len, void **old);
213+
212214
/* Internal API. May be used by the node callback in order to access rax nodes
213215
* in a low level way, so this function is exported as well. */
214216
void raxSetData(raxNode *n, void *data);

src/redis/t_stream.c

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
*/
2929

3030
#include <errno.h>
31+
#include <stdio.h>
3132
#include <string.h>
3233

3334
#include "endianconv.h"
@@ -277,6 +278,13 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
277278
streamIteratorStop(&si);
278279
}
279280

281+
void checkListPackNotEmpty(unsigned char* lp) {
282+
if(lpBytes(lp) == 0) {
283+
fprintf(stderr, "Error: corrupted listpack found.");
284+
abort();
285+
}
286+
}
287+
280288
/* Trim the stream 's' according to args->trim_strategy, and return the
281289
* number of elements removed from the stream. The 'approx' option, if non-zero,
282290
* specifies that the trimming must be performed in a approximated way in
@@ -345,7 +353,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
345353

346354
if (remove_node) {
347355
lpFree(lp);
348-
raxRemove(s->rax_tree,ri.key,ri.key_len,NULL);
356+
checkedRaxRemove(s->rax_tree, ri.key, ri.key_len, NULL);
349357
raxSeek(&ri,">=",ri.key,ri.key_len);
350358
s->length -= entries;
351359
deleted += entries;
@@ -444,6 +452,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
444452

445453
/* Update the listpack with the new pointer. */
446454
raxInsert(s->rax_tree,ri.key,ri.key_len,lp,NULL);
455+
checkListPackNotEmpty(lp);
447456

448457
break; /* If we are here, there was enough to delete in the current
449458
node, so no need to go to the next node. */
@@ -739,7 +748,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
739748
/* If this is the last element in the listpack, we can remove the whole
740749
* node. */
741750
lpFree(lp);
742-
raxRemove(si->stream->rax_tree,si->ri.key,si->ri.key_len,NULL);
751+
checkedRaxRemove(si->stream->rax_tree, si->ri.key, si->ri.key_len, NULL);
743752
} else {
744753
/* In the base case we alter the counters of valid/deleted entries. */
745754
lp = lpReplaceInteger(lp,&p,aux-1);
@@ -750,6 +759,8 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
750759
/* Update the listpack with the new pointer. */
751760
if (si->lp != lp)
752761
raxInsert(si->stream->rax_tree,si->ri.key,si->ri.key_len,lp,NULL);
762+
763+
checkListPackNotEmpty(lp);
753764
}
754765

755766
/* Update the number of entries counter. */

src/server/stream_family.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,8 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad
526526
}
527527
lp = lpAppendInteger(lp, 0); /* Master entry zero terminator. */
528528
raxInsert(s->rax_tree, (unsigned char*)&rax_key, sizeof(rax_key), lp, NULL);
529+
// TODO remove this check
530+
CHECK_GT(lpBytes(lp), 0U);
529531
/* The first entry we insert, has obviously the same fields of the
530532
* master entry. */
531533
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
@@ -610,8 +612,11 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad
610612
lp = lpAppendInteger(lp, lp_count);
611613

612614
/* Insert back into the tree in order to update the listpack pointer. */
613-
if (ri.data != lp)
615+
if (ri.data != lp) {
614616
raxInsert(s->rax_tree, (unsigned char*)&rax_key, sizeof(rax_key), lp, NULL);
617+
// TODO remove this
618+
CHECK_GT(lpBytes(lp), 0U);
619+
}
615620
s->length++;
616621
s->entries_added++;
617622
s->last_id = id;

tests/dragonfly/cluster_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3065,7 +3065,7 @@ async def test_cluster_sharded_pub_sub(df_factory: DflyInstanceFactory):
30653065
await c_nodes[0].execute_command("SPUBLISH kostas hello")
30663066
# We need to sleep cause we use DispatchBrief internally. Otherwise we can't really gurantee
30673067
# that the client received the message
3068-
await asyncio.sleep(1)
3068+
await asyncio.sleep(2)
30693069

30703070
# Consume subscription message result from above
30713071
message = consumer.get_sharded_message(target_node=node_a)
@@ -3075,7 +3075,7 @@ async def test_cluster_sharded_pub_sub(df_factory: DflyInstanceFactory):
30753075
assert message == {"type": "message", "pattern": None, "channel": b"kostas", "data": b"hello"}
30763076

30773077
consumer.sunsubscribe("kostas")
3078-
await asyncio.sleep(1)
3078+
await asyncio.sleep(2)
30793079
await c_nodes[0].execute_command("SPUBLISH kostas new_message")
30803080
message = consumer.get_sharded_message(target_node=node_a)
30813081
assert message == {"type": "unsubscribe", "pattern": None, "channel": b"kostas", "data": 0}

0 commit comments

Comments
 (0)