Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
get_lib_version,
safe_str,
str_if_bytes,
truncate_command_for_exception,
)

PubSubHandler = Callable[[Dict[str, str]], Awaitable[None]]
Expand Down Expand Up @@ -1508,7 +1509,10 @@ def annotate_exception(
self, exception: Exception, number: int, command: Iterable[object]
) -> None:
cmd = " ".join(map(safe_str, command))
msg = f"Command # {number} ({cmd}) of pipeline caused error: {exception.args}"
msg = (
f"Command # {number} ({truncate_command_for_exception(cmd)}) "
"of pipeline caused error: {exception.args}"
)
exception.args = (msg,) + exception.args[1:]

async def parse_response(
Expand Down
13 changes: 8 additions & 5 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
TryAgainError,
)
from redis.typing import AnyKeyT, EncodableT, KeyT
from redis.utils import deprecated_function, get_lib_version, safe_str, str_if_bytes
from redis.utils import deprecated_function, get_lib_version, safe_str, str_if_bytes, truncate_command_for_exception

TargetNodesT = TypeVar(
"TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"]
Expand Down Expand Up @@ -1161,7 +1161,9 @@ def get_node(
return self.nodes_cache.get(node_name)
else:
raise DataError(
"get_node requires one of the following: 1. node name 2. host and port"
"get_node requires one of the following: "
"1. node name "
"2. host and port"
)

def set_nodes(
Expand Down Expand Up @@ -1343,7 +1345,7 @@ async def initialize(self) -> None:
if len(disagreements) > 5:
raise RedisClusterException(
f"startup_nodes could not agree on a valid "
f"slots cache: {', '.join(disagreements)}"
f'slots cache: {", ".join(disagreements)}'
)

# Validate if all slots are covered or if we should try next startup node
Expand Down Expand Up @@ -1598,8 +1600,9 @@ async def _execute(
if isinstance(result, Exception):
command = " ".join(map(safe_str, cmd.args))
msg = (
f"Command # {cmd.position + 1} ({command}) of pipeline "
f"caused error: {result.args}"
f"Command # {cmd.position + 1} "
f"({truncate_command_for_exception(command)}) "
f"of pipeline caused error: {result.args}"
)
result.args = (msg,) + result.args[1:]
raise result
Expand Down
4 changes: 3 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
get_lib_version,
safe_str,
str_if_bytes,
truncate_command_for_exception,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1521,7 +1522,8 @@ def raise_first_error(self, commands, response):
def annotate_exception(self, exception, number, command):
cmd = " ".join(map(safe_str, command))
msg = (
f"Command # {number} ({cmd}) of pipeline caused error: {exception.args[0]}"
f"Command # {number} ({truncate_command_for_exception(cmd)}) of pipeline "
f"caused error: {exception.args[0]}"
)
exception.args = (msg,) + exception.args[1:]

Expand Down
6 changes: 4 additions & 2 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
merge_result,
safe_str,
str_if_bytes,
truncate_command_for_exception,
)


Expand Down Expand Up @@ -1635,7 +1636,7 @@ def initialize(self):
if len(disagreements) > 5:
raise RedisClusterException(
f"startup_nodes could not agree on a valid "
f"slots cache: {', '.join(disagreements)}"
f'slots cache: {", ".join(disagreements)}'
)

fully_covered = self.check_slots_coverage(tmp_slots)
Expand Down Expand Up @@ -2054,7 +2055,8 @@ def annotate_exception(self, exception, number, command):
"""
cmd = " ".join(map(safe_str, command))
msg = (
f"Command # {number} ({cmd}) of pipeline caused error: {exception.args[0]}"
f"Command # {number} ({truncate_command_for_exception(cmd)}) of pipeline "
f"caused error: {exception.args[0]}"
)
exception.args = (msg,) + exception.args[1:]

Expand Down
6 changes: 6 additions & 0 deletions redis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,9 @@ def ensure_string(key):
return key
else:
raise TypeError("Key must be either a string or bytes")


def truncate_command_for_exception(self, command, max_length=100):
if len(command) > max_length:
return command[: max_length - 3] + "..."
return command
30 changes: 25 additions & 5 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async def get_mocked_redis_client(
with mock.patch.object(ClusterNode, "execute_command") as execute_command_mock:

async def execute_command(*_args, **_kwargs):

if _args[0] == "CLUSTER SLOTS":
if cluster_slots_raise_error:
raise ResponseError()
Expand Down Expand Up @@ -1576,23 +1577,23 @@ async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None:

@skip_if_server_version_lt("2.6.0")
async def test_cluster_bitop_not(self, r: RedisCluster) -> None:
test_str = b"\xaa\x00\xff\x55"
test_str = b"\xAA\x00\xFF\x55"
correct = ~0xAA00FF55 & 0xFFFFFFFF
await r.set("{foo}a", test_str)
await r.bitop("not", "{foo}r", "{foo}a")
assert int(binascii.hexlify(await r.get("{foo}r")), 16) == correct

@skip_if_server_version_lt("2.6.0")
async def test_cluster_bitop_not_in_place(self, r: RedisCluster) -> None:
test_str = b"\xaa\x00\xff\x55"
test_str = b"\xAA\x00\xFF\x55"
correct = ~0xAA00FF55 & 0xFFFFFFFF
await r.set("{foo}a", test_str)
await r.bitop("not", "{foo}a", "{foo}a")
assert int(binascii.hexlify(await r.get("{foo}a")), 16) == correct

@skip_if_server_version_lt("2.6.0")
async def test_cluster_bitop_single_string(self, r: RedisCluster) -> None:
test_str = b"\x01\x02\xff"
test_str = b"\x01\x02\xFF"
await r.set("{foo}a", test_str)
await r.bitop("and", "{foo}res1", "{foo}a")
await r.bitop("or", "{foo}res2", "{foo}a")
Expand All @@ -1603,8 +1604,8 @@ async def test_cluster_bitop_single_string(self, r: RedisCluster) -> None:

@skip_if_server_version_lt("2.6.0")
async def test_cluster_bitop_string_operands(self, r: RedisCluster) -> None:
await r.set("{foo}a", b"\x01\x02\xff\xff")
await r.set("{foo}b", b"\x01\x02\xff")
await r.set("{foo}a", b"\x01\x02\xFF\xFF")
await r.set("{foo}b", b"\x01\x02\xFF")
await r.bitop("and", "{foo}res1", "{foo}a", "{foo}b")
await r.bitop("or", "{foo}res2", "{foo}a", "{foo}b")
await r.bitop("xor", "{foo}res3", "{foo}a", "{foo}b")
Expand Down Expand Up @@ -2802,6 +2803,25 @@ async def test_asking_error(self, r: RedisCluster) -> None:
assert ask_node._free.pop().read_response.await_count
assert res == ["MOCK_OK"]

async def test_error_is_truncated(self, r) -> None:
"""
Test that an error from the pipeline is truncated correctly.
"""
key = "a" * 5000

async with r.pipeline() as pipe:
pipe.set(key, 1)
pipe.llen(key)
pipe.expire(key, 100)

with pytest.raises(Exception) as ex:
await pipe.execute()

expected = (
"Command # 2 (LLEN " + ("a" * 92) + "...) of pipeline caused error: "
)
assert str(ex.value).startswith(expected)

async def test_moved_redirection_on_slave_with_default(
self, r: RedisCluster
) -> None:
Expand Down
15 changes: 15 additions & 0 deletions tests/test_asyncio/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,21 @@ async def test_exec_error_in_no_transaction_pipeline_unicode_command(self, r):

assert await r.get(key) == b"1"

async def test_exec_error_in_pipeline_truncated(self, r):
key = "a" * 5000
await r.set(key, 1)
async with r.pipeline(transaction=False) as pipe:
pipe.llen(key)
pipe.expire(key, 100)

with pytest.raises(redis.ResponseError) as ex:
await pipe.execute()

expected = (
"Command # 1 (LLEN " + ("a" * 92) + "...) of pipeline caused error: "
)
assert str(ex.value).startswith(expected)

async def test_pipeline_with_bitfield(self, r):
async with r.pipeline() as pipe:
pipe.set("a", "1")
Expand Down
29 changes: 24 additions & 5 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1692,23 +1692,23 @@ def test_cluster_bitop_not_empty_string(self, r):

@skip_if_server_version_lt("2.6.0")
def test_cluster_bitop_not(self, r):
test_str = b"\xaa\x00\xff\x55"
test_str = b"\xAA\x00\xFF\x55"
correct = ~0xAA00FF55 & 0xFFFFFFFF
r["{foo}a"] = test_str
r.bitop("not", "{foo}r", "{foo}a")
assert int(binascii.hexlify(r["{foo}r"]), 16) == correct

@skip_if_server_version_lt("2.6.0")
def test_cluster_bitop_not_in_place(self, r):
test_str = b"\xaa\x00\xff\x55"
test_str = b"\xAA\x00\xFF\x55"
correct = ~0xAA00FF55 & 0xFFFFFFFF
r["{foo}a"] = test_str
r.bitop("not", "{foo}a", "{foo}a")
assert int(binascii.hexlify(r["{foo}a"]), 16) == correct

@skip_if_server_version_lt("2.6.0")
def test_cluster_bitop_single_string(self, r):
test_str = b"\x01\x02\xff"
test_str = b"\x01\x02\xFF"
r["{foo}a"] = test_str
r.bitop("and", "{foo}res1", "{foo}a")
r.bitop("or", "{foo}res2", "{foo}a")
Expand All @@ -1719,8 +1719,8 @@ def test_cluster_bitop_single_string(self, r):

@skip_if_server_version_lt("2.6.0")
def test_cluster_bitop_string_operands(self, r):
r["{foo}a"] = b"\x01\x02\xff\xff"
r["{foo}b"] = b"\x01\x02\xff"
r["{foo}a"] = b"\x01\x02\xFF\xFF"
r["{foo}b"] = b"\x01\x02\xFF"
r.bitop("and", "{foo}res1", "{foo}a", "{foo}b")
r.bitop("or", "{foo}res2", "{foo}a", "{foo}b")
r.bitop("xor", "{foo}res3", "{foo}a", "{foo}b")
Expand Down Expand Up @@ -3260,6 +3260,25 @@ def raise_ask_error():
assert ask_node.redis_connection.connection.read_response.called
assert res == ["MOCK_OK"]

def test_error_is_truncated(self, r):
"""
Test that an error from the pipeline is truncated correctly.
"""
key = "a" * 5000

with r.pipeline() as pipe:
pipe.set(key, 1)
pipe.llen(key)
pipe.expire(key, 100)

with pytest.raises(Exception) as ex:
pipe.execute()

expected = (
"Command # 2 (LLEN " + ("a" * 92) + "...) of pipeline caused error: "
)
assert str(ex.value).startswith(expected)

def test_return_previously_acquired_connections(self, r):
# in order to ensure that a pipeline will make use of connections
# from different nodes
Expand Down
15 changes: 15 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,21 @@ def test_exec_error_in_no_transaction_pipeline_unicode_command(self, r):

assert r[key] == b"1"

def test_exec_error_in_pipeline_truncated(self, r):
key = "a" * 5000
r[key] = 1
with r.pipeline(transaction=False) as pipe:
pipe.llen(key)
pipe.expire(key, 100)

with pytest.raises(redis.ResponseError) as ex:
pipe.execute()

expected = (
"Command # 1 (LLEN " + ("a" * 92) + "...) of pipeline caused error: "
)
assert str(ex.value).startswith(expected)

def test_pipeline_with_bitfield(self, r):
with r.pipeline() as pipe:
pipe.set("a", "1")
Expand Down
Loading