From 56ebcda2e1c18f738b50037d50a89298f4d51d24 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 21 Nov 2025 10:26:03 -0300 Subject: [PATCH 1/6] admin: add internal v2 API to log a message Adds ability to log a message to the Redpanda server logs via an internal admin API. Use for DT tests. --- .../core/admin/internal/v1/debug.proto | 21 ++++++++ .../redpanda/admin/services/internal/debug.cc | 31 ++++++++++++ .../redpanda/admin/services/internal/debug.h | 3 ++ .../core/admin/internal/v1/debug_pb2.py | 14 ++++-- .../core/admin/internal/v1/debug_pb2.pyi | 49 ++++++++++++++++++- .../admin/internal/v1/debug_pb2_connect.py | 34 +++++++++++++ 6 files changed, 148 insertions(+), 4 deletions(-) diff --git a/proto/redpanda/core/admin/internal/v1/debug.proto b/proto/redpanda/core/admin/internal/v1/debug.proto index 818d0478c67e0..14e9905f9bd1a 100644 --- a/proto/redpanda/core/admin/internal/v1/debug.proto +++ b/proto/redpanda/core/admin/internal/v1/debug.proto @@ -43,6 +43,22 @@ message ThrowStructuredExceptionRequest { } message ThrowStructuredExceptionResponse {} +enum LogLevel { + LOG_LEVEL_UNSPECIFIED = 0; + LOG_LEVEL_TRACE = 1; + LOG_LEVEL_DEBUG = 2; + LOG_LEVEL_INFO = 3; + LOG_LEVEL_WARN = 4; + LOG_LEVEL_ERROR = 5; +} + +message LogMessageRequest { + string message = 1; + LogLevel level = 2; +} + +message LogMessageResponse {} + // The DebugService provides access to internal debugging information and debug // operations for the cluster or node. // @@ -71,4 +87,9 @@ service DebugService { authz: SUPERUSER, }; } + rpc LogMessage(LogMessageRequest) returns (LogMessageResponse) { + option (pbgen.rpc) = { + authz: SUPERUSER, + }; + } } diff --git a/src/v/redpanda/admin/services/internal/debug.cc b/src/v/redpanda/admin/services/internal/debug.cc index caf2fb8dd3967..3f5ddfe1cf615 100644 --- a/src/v/redpanda/admin/services/internal/debug.cc +++ b/src/v/redpanda/admin/services/internal/debug.cc @@ -131,4 +131,35 @@ debug_service_impl::stop_stress_fiber( co_return proto::stop_stress_fiber_response{}; } +seastar::future +debug_service_impl::log_message( + serde::pb::rpc::context, proto::admin::log_message_request req) { + auto msg = req.get_message(); + auto level = req.get_level(); + + using enum proto::admin::log_level; + + ss::log_level ss_level = [=]() { + switch (level) { + case trace: + return ss::log_level::trace; + case debug: + return ss::log_level::debug; + case info: + return ss::log_level::info; + case warn: + return ss::log_level::warn; + case error: + return ss::log_level::error; + case unspecified: + default: + throw serde::pb::rpc::invalid_argument_exception( + "Invalid log level specified"); + } + }(); + + log.log(ss_level, "{}", msg); + co_return proto::admin::log_message_response{}; +} + } // namespace admin diff --git a/src/v/redpanda/admin/services/internal/debug.h b/src/v/redpanda/admin/services/internal/debug.h index db77034801003..db0ba7e009e72 100644 --- a/src/v/redpanda/admin/services/internal/debug.h +++ b/src/v/redpanda/admin/services/internal/debug.h @@ -45,6 +45,9 @@ class debug_service_impl : public proto::admin::debug_service { serde::pb::rpc::context, proto::admin::stop_stress_fiber_request) override; + seastar::future log_message( + serde::pb::rpc::context, proto::admin::log_message_request) override; + private: admin::proxy::client _client; ss::sharded& _stress_fiber_manager; diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.py index 7cadb161ecca4..cd7cc773df244 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.py @@ -8,7 +8,7 @@ _sym_db = _symbol_database.Default() from .......proto.redpanda.core.pbgen import options_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_options__pb2 from .......proto.redpanda.core.pbgen import rpc_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_rpc__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n1proto/redpanda/core/admin/internal/v1/debug.proto\x12\x1fredpanda.core.admin.v2.internal\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"\xdd\x01\n\x17StartStressFiberRequest\x12&\n\x1emin_spins_per_scheduling_point\x18\x01 \x01(\x05\x12&\n\x1emax_spins_per_scheduling_point\x18\x02 \x01(\x05\x12\x13\n\x0bstack_depth\x18\x03 \x01(\x05\x12#\n\x1bmin_ms_per_scheduling_point\x18\x04 \x01(\x05\x12#\n\x1bmax_ms_per_scheduling_point\x18\x05 \x01(\x05\x12\x13\n\x0bfiber_count\x18\x06 \x01(\x05"\x1a\n\x18StartStressFiberResponse"\x18\n\x16StopStressFiberRequest"\x19\n\x17StopStressFiberResponse"\xd5\x01\n\x1fThrowStructuredExceptionRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\x05\x12\x0e\n\x06reason\x18\x02 \x01(\t\x12`\n\x08metadata\x18\x03 \x03(\x0b2N.redpanda.core.admin.v2.internal.ThrowStructuredExceptionRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x028\x01""\n ThrowStructuredExceptionResponse2\xd9\x03\n\x0cDebugService\x12\xa7\x01\n\x18ThrowStructuredException\x12@.redpanda.core.admin.v2.internal.ThrowStructuredExceptionRequest\x1aA.redpanda.core.admin.v2.internal.ThrowStructuredExceptionResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x8f\x01\n\x10StartStressFiber\x128.redpanda.core.admin.v2.internal.StartStressFiberRequest\x1a9.redpanda.core.admin.v2.internal.StartStressFiberResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x8c\x01\n\x0fStopStressFiber\x127.redpanda.core.admin.v2.internal.StopStressFiberRequest\x1a8.redpanda.core.admin.v2.internal.StopStressFiberResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n1proto/redpanda/core/admin/internal/v1/debug.proto\x12\x1fredpanda.core.admin.v2.internal\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"\xdd\x01\n\x17StartStressFiberRequest\x12&\n\x1emin_spins_per_scheduling_point\x18\x01 \x01(\x05\x12&\n\x1emax_spins_per_scheduling_point\x18\x02 \x01(\x05\x12\x13\n\x0bstack_depth\x18\x03 \x01(\x05\x12#\n\x1bmin_ms_per_scheduling_point\x18\x04 \x01(\x05\x12#\n\x1bmax_ms_per_scheduling_point\x18\x05 \x01(\x05\x12\x13\n\x0bfiber_count\x18\x06 \x01(\x05"\x1a\n\x18StartStressFiberResponse"\x18\n\x16StopStressFiberRequest"\x19\n\x17StopStressFiberResponse"\xd5\x01\n\x1fThrowStructuredExceptionRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\x05\x12\x0e\n\x06reason\x18\x02 \x01(\t\x12`\n\x08metadata\x18\x03 \x03(\x0b2N.redpanda.core.admin.v2.internal.ThrowStructuredExceptionRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x028\x01""\n ThrowStructuredExceptionResponse"^\n\x11LogMessageRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\x128\n\x05level\x18\x02 \x01(\x0e2).redpanda.core.admin.v2.internal.LogLevel"\x14\n\x12LogMessageResponse*\x8c\x01\n\x08LogLevel\x12\x19\n\x15LOG_LEVEL_UNSPECIFIED\x10\x00\x12\x13\n\x0fLOG_LEVEL_TRACE\x10\x01\x12\x13\n\x0fLOG_LEVEL_DEBUG\x10\x02\x12\x12\n\x0eLOG_LEVEL_INFO\x10\x03\x12\x12\n\x0eLOG_LEVEL_WARN\x10\x04\x12\x13\n\x0fLOG_LEVEL_ERROR\x10\x052\xd8\x04\n\x0cDebugService\x12\xa7\x01\n\x18ThrowStructuredException\x12@.redpanda.core.admin.v2.internal.ThrowStructuredExceptionRequest\x1aA.redpanda.core.admin.v2.internal.ThrowStructuredExceptionResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x8f\x01\n\x10StartStressFiber\x128.redpanda.core.admin.v2.internal.StartStressFiberRequest\x1a9.redpanda.core.admin.v2.internal.StartStressFiberResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x8c\x01\n\x0fStopStressFiber\x127.redpanda.core.admin.v2.internal.StopStressFiberRequest\x1a8.redpanda.core.admin.v2.internal.StopStressFiberResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\nLogMessage\x122.redpanda.core.admin.v2.internal.LogMessageRequest\x1a3.redpanda.core.admin.v2.internal.LogMessageResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.internal.v1.debug_pb2', _globals) @@ -23,6 +23,10 @@ _globals['_DEBUGSERVICE'].methods_by_name['StartStressFiber']._serialized_options = b'\xea\x92\x19\x02\x10\x03' _globals['_DEBUGSERVICE'].methods_by_name['StopStressFiber']._loaded_options = None _globals['_DEBUGSERVICE'].methods_by_name['StopStressFiber']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_DEBUGSERVICE'].methods_by_name['LogMessage']._loaded_options = None + _globals['_DEBUGSERVICE'].methods_by_name['LogMessage']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_LOGLEVEL']._serialized_start = 840 + _globals['_LOGLEVEL']._serialized_end = 980 _globals['_STARTSTRESSFIBERREQUEST']._serialized_start = 165 _globals['_STARTSTRESSFIBERREQUEST']._serialized_end = 386 _globals['_STARTSTRESSFIBERRESPONSE']._serialized_start = 388 @@ -37,5 +41,9 @@ _globals['_THROWSTRUCTUREDEXCEPTIONREQUEST_METADATAENTRY']._serialized_end = 683 _globals['_THROWSTRUCTUREDEXCEPTIONRESPONSE']._serialized_start = 685 _globals['_THROWSTRUCTUREDEXCEPTIONRESPONSE']._serialized_end = 719 - _globals['_DEBUGSERVICE']._serialized_start = 722 - _globals['_DEBUGSERVICE']._serialized_end = 1195 \ No newline at end of file + _globals['_LOGMESSAGEREQUEST']._serialized_start = 721 + _globals['_LOGMESSAGEREQUEST']._serialized_end = 815 + _globals['_LOGMESSAGERESPONSE']._serialized_start = 817 + _globals['_LOGMESSAGERESPONSE']._serialized_end = 837 + _globals['_DEBUGSERVICE']._serialized_start = 983 + _globals['_DEBUGSERVICE']._serialized_end = 1583 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.pyi index 4d2282d29d6c1..cbe26e2d29a18 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2.pyi @@ -19,6 +19,7 @@ import builtins import collections.abc import google.protobuf.descriptor import google.protobuf.internal.containers +import google.protobuf.internal.enum_type_wrapper import google.protobuf.message import sys import typing @@ -28,6 +29,29 @@ else: import typing_extensions DESCRIPTOR: google.protobuf.descriptor.FileDescriptor +class _LogLevel: + ValueType = typing.NewType('ValueType', builtins.int) + V: typing_extensions.TypeAlias = ValueType + +class _LogLevelEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_LogLevel.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + LOG_LEVEL_UNSPECIFIED: _LogLevel.ValueType + LOG_LEVEL_TRACE: _LogLevel.ValueType + LOG_LEVEL_DEBUG: _LogLevel.ValueType + LOG_LEVEL_INFO: _LogLevel.ValueType + LOG_LEVEL_WARN: _LogLevel.ValueType + LOG_LEVEL_ERROR: _LogLevel.ValueType + +class LogLevel(_LogLevel, metaclass=_LogLevelEnumTypeWrapper): + ... +LOG_LEVEL_UNSPECIFIED: LogLevel.ValueType +LOG_LEVEL_TRACE: LogLevel.ValueType +LOG_LEVEL_DEBUG: LogLevel.ValueType +LOG_LEVEL_INFO: LogLevel.ValueType +LOG_LEVEL_WARN: LogLevel.ValueType +LOG_LEVEL_ERROR: LogLevel.ValueType +Global___LogLevel: typing_extensions.TypeAlias = LogLevel + @typing.final class StartStressFiberRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -115,4 +139,27 @@ class ThrowStructuredExceptionResponse(google.protobuf.message.Message): def __init__(self) -> None: ... -Global___ThrowStructuredExceptionResponse: typing_extensions.TypeAlias = ThrowStructuredExceptionResponse \ No newline at end of file +Global___ThrowStructuredExceptionResponse: typing_extensions.TypeAlias = ThrowStructuredExceptionResponse + +@typing.final +class LogMessageRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + MESSAGE_FIELD_NUMBER: builtins.int + LEVEL_FIELD_NUMBER: builtins.int + message: builtins.str + level: Global___LogLevel.ValueType + + def __init__(self, *, message: builtins.str=..., level: Global___LogLevel.ValueType=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['level', b'level', 'message', b'message']) -> None: + ... +Global___LogMessageRequest: typing_extensions.TypeAlias = LogMessageRequest + +@typing.final +class LogMessageResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__(self) -> None: + ... +Global___LogMessageResponse: typing_extensions.TypeAlias = LogMessageResponse \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2_connect.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2_connect.py index 4c4328e6fb5b4..a1ea04440bf97 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2_connect.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/debug_pb2_connect.py @@ -79,6 +79,21 @@ def stop_stress_fiber(self, req: proto.redpanda.core.admin.internal.v1.debug_pb2 raise ConnectProtocolError('missing response message') return msg + def call_log_message(self, req: proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageResponse]: + """Low-level method to call LogMessage, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.DebugService/LogMessage' + return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageResponse, extra_headers, timeout_seconds) + + def log_message(self, req: proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageResponse: + response = self.call_log_message(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + class AsyncDebugServiceClient: def __init__(self, base_url: str, http_client: aiohttp.ClientSession, protocol: ConnectProtocol=ConnectProtocol.CONNECT_PROTOBUF): @@ -130,6 +145,21 @@ async def stop_stress_fiber(self, req: proto.redpanda.core.admin.internal.v1.deb raise ConnectProtocolError('missing response message') return msg + async def call_log_message(self, req: proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageResponse]: + """Low-level method to call LogMessage, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.DebugService/LogMessage' + return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageResponse, extra_headers, timeout_seconds) + + async def log_message(self, req: proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageResponse: + response = await self.call_log_message(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + @typing.runtime_checkable class DebugServiceProtocol(typing.Protocol): @@ -141,6 +171,9 @@ def start_stress_fiber(self, req: ClientRequest[proto.redpanda.core.admin.intern def stop_stress_fiber(self, req: ClientRequest[proto.redpanda.core.admin.internal.v1.debug_pb2.StopStressFiberRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.v1.debug_pb2.StopStressFiberResponse]: ... + + def log_message(self, req: ClientRequest[proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageResponse]: + ... DEBUG_SERVICE_PATH_PREFIX = '/redpanda.core.admin.v2.internal.DebugService' def wsgi_debug_service(implementation: DebugServiceProtocol) -> WSGIApplication: @@ -148,4 +181,5 @@ def wsgi_debug_service(implementation: DebugServiceProtocol) -> WSGIApplication: app.register_unary_rpc('/redpanda.core.admin.v2.internal.DebugService/ThrowStructuredException', implementation.throw_structured_exception, proto.redpanda.core.admin.internal.v1.debug_pb2.ThrowStructuredExceptionRequest) app.register_unary_rpc('/redpanda.core.admin.v2.internal.DebugService/StartStressFiber', implementation.start_stress_fiber, proto.redpanda.core.admin.internal.v1.debug_pb2.StartStressFiberRequest) app.register_unary_rpc('/redpanda.core.admin.v2.internal.DebugService/StopStressFiber', implementation.stop_stress_fiber, proto.redpanda.core.admin.internal.v1.debug_pb2.StopStressFiberRequest) + app.register_unary_rpc('/redpanda.core.admin.v2.internal.DebugService/LogMessage', implementation.log_message, proto.redpanda.core.admin.internal.v1.debug_pb2.LogMessageRequest) return app \ No newline at end of file From 146925117345be090802df929ce52b2696a1c7be Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 21 Nov 2025 10:35:13 -0300 Subject: [PATCH 2/6] rptest: self-test for raise_on_bad_logs I am going to make some changes here, so add a self-test. It uses the new log line API to send log lines to one Redpanda node and checks that raise on bad logs catches them. --- tests/rptest/tests/services_self_test.py | 69 +++++++++++++++++++----- 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/tests/rptest/tests/services_self_test.py b/tests/rptest/tests/services_self_test.py index 9c5bc56610d36..6732376ef9587 100644 --- a/tests/rptest/tests/services_self_test.py +++ b/tests/rptest/tests/services_self_test.py @@ -18,6 +18,7 @@ from ducktape.mark.resource import cluster as dt_cluster from ducktape.tests.test import Test, TestContext +from rptest.clients.admin.v2 import Admin as AdminV2, debug_pb from rptest.clients.kubectl import is_redpanda_pod from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec @@ -521,6 +522,22 @@ def _assert_expected_backtrace_contents( ) +def _assert_log_content(test: RedpandaTest, node: ClusterNode, needle: str) -> None: + """ + Assert that the redpanda log contains the expected content. + """ + log_searcher = LogSearchLocal( + test.test_context, + [], + test.redpanda.logger, + test.redpanda.STDOUT_STDERR_CAPTURE, + ) + + lines = list(log_searcher._capture_log(node, f"'{needle}'")) + assert lines, f"Did not find expected string '{needle}' in redpanda log" + test.logger.debug(f"Found matching log line: {lines[0]}") + + class RedpandaServiceSelfTest(RedpandaTest): @cluster(num_nodes=1) @matrix(simple_backtrace=[True, False]) @@ -605,26 +622,50 @@ def _crash_test_impl(self, crash_type: CrashType) -> None: rp._admin.trigger_crash(node, crash_type) # look for this snipptet which will appear at the top of the backtrace # if it was properly decoded - self._assert_log_content(node, "in (anonymous namespace)::trigger_crash") + _assert_log_content(self, node, "in (anonymous namespace)::trigger_crash") + + @cluster(num_nodes=1) + def test_start(self) -> None: + pass + - def _assert_log_content(self, node: ClusterNode, needle: str) -> None: +class RedpandaClusteredServiceSelfTest(RedpandaTest): + """Same as RedpandaServiceSelfTest but uses a 3-broker cluster. + Use this only for tests that need more than one broker.""" + + def __init__(self, test_context: TestContext) -> None: + super().__init__(test_context, num_brokers=3) + + @cluster(num_nodes=3, check_allowed_error_logs=False) + def test_raise_on_bad_logs(self): """ - Assert that the redpanda log contains the expected content. + Test that the LogMessage admin API correctly logs messages and that + ERROR level logs are caught by raise_on_bad_logs. """ - log_searcher = LogSearchLocal( - self.test_context, - [], - self.redpanda.logger, - self.redpanda.STDOUT_STDERR_CAPTURE, + admin_v2 = AdminV2(self.redpanda) + # use the last node for a slightly better test + node = self.redpanda.nodes[2] + + # Create a unique error message that we can search for + test_error_msg = "TEST_LOG_MESSAGE_API_ERROR_MARKER_12345" + + # Use the new LogMessage API to log an error + request = debug_pb.LogMessageRequest( + message=test_error_msg, level=debug_pb.LOG_LEVEL_ERROR ) + admin_v2.debug(node=node).log_message(request) - lines = list(log_searcher._capture_log(node, f"'{needle}'")) - assert lines, f"Did not find expected string '{needle}' in redpanda log" - self.logger.debug(f"Found matching log line: {lines[0]}") + # Verify the error message was logged + _assert_log_content(self, node, test_error_msg) - @cluster(num_nodes=1) - def test_start(self) -> None: - pass + def validate_exception(e: BadLogLines) -> bool: + # should have the marker and also the name of node 2 + exn_str = str(e) + return test_error_msg in exn_str and node.name in exn_str + + # Now verify that raise_on_bad_logs will catch this error + with expect_exception(BadLogLines, validate_exception): + self.redpanda.raise_on_bad_logs(allow_list=[]) class RedpandaServiceSelfRawTest(Test): From c81530092350ea392385086437a8ad10d58c4ddc Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 21 Nov 2025 11:12:17 -0300 Subject: [PATCH 3/6] rptest: type services/utils.py Fully types at strict. This required a couple of touch-up changes in other files, and exposed in a bug on the cloud tests side where we passed tuples instead of list of tuples. --- .../services/openmessaging_benchmark.py | 6 +- tests/rptest/services/redpanda.py | 16 ++- tests/rptest/services/utils.py | 120 +++++++++++------- .../type-checking/type-check-strictness.json | 1 - 4 files changed, 88 insertions(+), 55 deletions(-) diff --git a/tests/rptest/services/openmessaging_benchmark.py b/tests/rptest/services/openmessaging_benchmark.py index 227c32e6d93c8..1b693bd6f0ea7 100644 --- a/tests/rptest/services/openmessaging_benchmark.py +++ b/tests/rptest/services/openmessaging_benchmark.py @@ -26,7 +26,7 @@ RedpandaService, RedpandaServiceCloud, ) -from rptest.services.utils import BadLogLines, VersionAndLines +from rptest.services.utils import BadLogLines, NodeToLines, VersionAndLines from ducktape.tests.test import TestContext @@ -150,7 +150,7 @@ def raise_on_bad_log_lines(self, node: ClusterNode) -> None: def make_vl() -> VersionAndLines: return {"version": None, "lines": []} - bad_lines: dict[ClusterNode, VersionAndLines] = collections.defaultdict(make_vl) + bad_lines: NodeToLines = collections.defaultdict(make_vl) self.logger.info(f"Scanning node {node.account.hostname} log for errors...") for line in node.account.ssh_capture( @@ -420,7 +420,7 @@ def raise_on_bad_log_lines(self, node: ClusterNode) -> None: def make_vl() -> VersionAndLines: return {"version": None, "lines": []} - bad_lines: dict[ClusterNode, VersionAndLines] = collections.defaultdict(make_vl) + bad_lines: NodeToLines = collections.defaultdict(make_vl) self.logger.info(f"Scanning node {node.account.hostname} log for errors...") for line in node.account.ssh_capture( diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 8f3ab351cd5e1..f0a5f57362ea7 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -1294,7 +1294,7 @@ def all_up(self) -> bool: @abstractmethod def raise_on_bad_logs( - self, allow_list: LogAllowList = (), test_start_time: float | None = None + self, allow_list: LogAllowList = (), test_start_time: float = 0 ) -> None: pass @@ -2340,17 +2340,21 @@ def _get_restart_count(p: dict[str, Any]): # Check if stored pod and loaded one is the same _stored_pod = _get_stored_pod(pod["metadata"]["uid"]) if _stored_pod is None: - raise NodeCrash((_name, "Pod not found among prior stored ones")) + raise NodeCrash([(_name, "Pod not found among prior stored ones")]) # Check if container inside pod stayed the same container_id = _get_container_id(pod["status"]) if _get_container_id(_stored_pod._status) != container_id: - raise NodeCrash((_name, "Pod container mismatch with prior stored one")) + raise NodeCrash( + [(_name, "Pod container mismatch with prior stored one")] + ) # Check that restart count is the same restart_count = _get_restart_count(pod["status"]) if _get_restart_count(_stored_pod._status) != restart_count: - raise NodeCrash((_name, "Pod has been restarted due to possible crash")) + raise NodeCrash( + [(_name, "Pod has been restarted due to possible crash")] + ) # Worth to note that rebuilding stored broker classes # can be skipped in this case since nothing changed now @@ -2397,7 +2401,7 @@ def cluster_healthy(self) -> bool: return self.cluster_unhealthy_reason is not None def raise_on_bad_logs( - self, allow_list: LogAllowList = (), test_start_time: float | None = None + self, allow_list: LogAllowList = (), test_start_time: float = 0 ) -> None: """ Raise a BadLogLines exception if any nodes' logs contain errors @@ -3020,7 +3024,7 @@ def set_skip_if_no_redpanda_log(self, v: bool): self._skip_if_no_redpanda_log = v def raise_on_bad_logs( - self, allow_list: LogAllowList = (), test_start_time: float | None = None + self, allow_list: LogAllowList = (), test_start_time: float = 0 ): """ Raise a BadLogLines exception if any nodes' logs contain errors not diff --git a/tests/rptest/services/utils.py b/tests/rptest/services/utils.py index 42172cf30e55f..e526e46e415cf 100644 --- a/tests/rptest/services/utils.py +++ b/tests/rptest/services/utils.py @@ -3,7 +3,7 @@ import re import time from abc import ABC, abstractmethod -from typing import Any, Generator, Iterable, TypedDict +from typing import Any, Generator, Iterable, Mapping, TypedDict from rptest.clients.kubectl import KubectlTool @@ -11,6 +11,7 @@ from ducktape.cluster.cluster import ClusterNode from rptest.services.cloud_broker import CloudBroker +from rptest.services.redpanda_types import CompiledLogAllowList VersionedNodes = Iterable[tuple[str | None, ClusterNode | CloudBroker]] @@ -20,7 +21,7 @@ class VersionAndLines(TypedDict): lines: list[str] -NodeToLines = dict[ClusterNode, VersionAndLines] +NodeToLines = dict[ClusterNode | CloudBroker, VersionAndLines] def assert_int(v: Any) -> int: @@ -34,9 +35,9 @@ def assert_int_or_none(v: Any) -> int | None: class Stopwatch: - def __init__(self): - self._start = 0 - self._end = 0 + def __init__(self) -> None: + self._start: float = 0 + self._end: float = 0 def start(self) -> None: self._start = time.time() @@ -68,7 +69,7 @@ def elapsedf(self, note: str) -> str: class BadLogLines(Exception): - def __init__(self, node_to_lines: NodeToLines): + def __init__(self, node_to_lines: NodeToLines) -> None: self.node_to_lines = node_to_lines self._str = self._make_str(node_to_lines) @@ -92,21 +93,21 @@ def _make_str(node_to_lines: NodeToLines) -> str: summary = ",".join(summary_list) return f'' - def __str__(self): + def __str__(self) -> str: return self._str - def __repr__(self): + def __repr__(self) -> str: return self.__str__() class NodeCrash(Exception): - def __init__(self, crashes): + def __init__(self, crashes: list[tuple[Any, str]]) -> None: self.crashes = crashes # Not legal to construct empty assert len(crashes) - def __str__(self): + def __str__(self) -> str: example = f"{self.crashes[0][0].name}: {self.crashes[0][1]}" if len(self.crashes) == 1: return f"" @@ -114,7 +115,7 @@ def __str__(self): names = ",".join([c[0].name for c in self.crashes]) return f"" - def __repr__(self): + def __repr__(self) -> str: return self.__str__() @@ -135,70 +136,76 @@ class LogSearch(ABC): "oversized allocation", ] - def __init__(self, test_context: TestContext, allow_list, logger: Logger) -> None: + def __init__( + self, + test_context: TestContext, + allow_list: CompiledLogAllowList, + logger: Logger, + ) -> None: self._context = test_context self.allow_list = allow_list self.logger = logger - self._raise_on_errors = self._context.globals.get( + self._raise_on_errors: bool = self._context.globals.get( self.RAISE_ON_ERRORS_KEY, True ) # Prepare matching terms - self.match_terms = self.DEFAULT_MATCH_TERMS + self.match_terms: list[str] = self.DEFAULT_MATCH_TERMS if self._raise_on_errors: self.match_terms.append("^ERROR") - self.match_expr = " ".join(f'-e "{t}"' for t in self.match_terms) + self.match_expr: str = " ".join(f'-e "{t}"' for t in self.match_terms) @abstractmethod - def _capture_log(self, x, s) -> Generator[str, None, None]: + def _capture_log(self, node: Any, expr: str) -> Generator[str, None, None]: """Method to get log from host node. Overriden by each child.""" # Fake return type for type hint silence # And proper handling when called directly - for x in []: - yield x + yield from [] @abstractmethod def _get_hostname(self, host: Any) -> str: """Method to get name of the host. Overriden by each child.""" return "" - def _check_if_line_allowed(self, line): + def _check_if_line_allowed(self, line: str) -> bool: for a in self.allow_list: if a.search(line) is not None: self.logger.info(f"Ignoring allow-listed log line '{line}'") return True return False - def _check_memory_leak(self, host): + def _check_memory_leak(self, host: Any) -> bool: # Special case for LeakSanitizer errors, where tiny leaks # are permitted, as they can occur during Seastar shutdown. # See https://github.com/redpanda-data/redpanda/issues/3626 for summary_line in self._capture_log(host, "SUMMARY: AddressSanitizer:"): m = re.match( - "SUMMARY: AddressSanitizer: (\d+) byte\(s\) leaked in (\d+) allocation\(s\).", + r"SUMMARY: AddressSanitizer: (\d+) byte\(s\) leaked in (\d+) allocation\(s\).", summary_line.strip(), ) if m and int(m.group(1)) < 1024: - self.logger.warn( + self.logger.warning( f"Ignoring memory leak, small quantity: {summary_line}" ) return True return False - def _check_oversized_allocations(self, line): - m = re.search("oversized allocation: (\d+) byte", line) + def _check_oversized_allocations(self, line: str) -> bool: + m = re.search(r"oversized allocation: (\d+) byte", line) if m and int(m.group(1)) <= self.MAX_ALLOCATION_SIZE: - self.logger.warn( + self.logger.warning( f"Ignoring oversized allocation, {m.group(1)} is less than the max allowable allocation size of {self.MAX_ALLOCATION_SIZE} bytes" ) return True return False - def _search(self, versioned_nodes: VersionedNodes): + def _search(self, versioned_nodes: VersionedNodes) -> NodeToLines: def make_vl() -> VersionAndLines: return {"version": None, "lines": []} - bad_lines: defaultdict[ClusterNode, VersionAndLines] = defaultdict(make_vl) + bad_lines: defaultdict[ClusterNode | CloudBroker, VersionAndLines] = ( + defaultdict(make_vl) + ) test_name = self._context.function_name sw = Stopwatch() for version, node in versioned_nodes: @@ -221,7 +228,7 @@ def make_vl() -> VersionAndLines: if not allowed: bad_lines[node]["version"] = version bad_lines[node]["lines"].append(line) - self.logger.warn( + self.logger.warning( f"[{test_name}] Unexpected log line on {hostname}: {line}" ) self.logger.info( @@ -229,7 +236,7 @@ def make_vl() -> VersionAndLines: ) return dict(bad_lines) - def search_logs(self, versioned_nodes: VersionedNodes): + def search_logs(self, versioned_nodes: VersionedNodes) -> None: """ versioned_nodes is a list of Tuple[version, node] """ @@ -242,22 +249,33 @@ def search_logs(self, versioned_nodes: VersionedNodes): class LogSearchLocal(LogSearch): - def __init__(self, test_context, allow_list, logger, targetpath) -> None: + def __init__( + self, + test_context: TestContext, + allow_list: CompiledLogAllowList, + logger: Logger, + targetpath: str, + ) -> None: super().__init__(test_context, allow_list, logger) self.targetpath = targetpath - def _capture_log(self, node, expr) -> Generator[str, None, None]: + def _capture_log(self, node: ClusterNode, expr: str) -> Generator[str, None, None]: cmd = f"grep {expr} {self.targetpath} || true" for line in node.account.ssh_capture(cmd): yield line - def _get_hostname(self, host) -> str: + def _get_hostname(self, host: ClusterNode) -> str: return host.account.hostname class LogSearchCloud(LogSearch): def __init__( - self, test_context, allow_list, logger, kubectl: KubectlTool, test_start_time + self, + test_context: TestContext, + allow_list: CompiledLogAllowList, + logger: Logger, + kubectl: KubectlTool, + test_start_time: float, ) -> None: super().__init__(test_context, allow_list, logger) @@ -265,12 +283,12 @@ def __init__( self.kubectl = kubectl self.test_start_time = test_start_time - def _capture_log(self, pod, expr) -> Generator[str, None, None]: + def _capture_log(self, node: CloudBroker, expr: str) -> Generator[str, None, None]: """Capture log and check test timing. If logline produced before test start, ignore it """ - def parse_k8s_time(logline, tz): + def parse_k8s_time(logline: str, tz: str) -> time.struct_time: k8s_time_format = "%Y-%m-%dT%H:%M:%S.%f %z" # containerd has nanoseconds format (9 digits) # python supports only 6 @@ -280,27 +298,39 @@ def parse_k8s_time(logline, tz): return time.strptime(logline_time, k8s_time_format) # Load log, output is in binary form - loglines = [] - tz = "+00:00" + loglines: list[str] = [] + tz: str = "+00:00" try: # get time zone in +00:00 format - tz = pod.nodeshell("date +'%:z'") + # nodeshell return type is not well-typed, cast result to list[str] + tz_result = node.nodeshell("date +'%:z'") # Assume UTC if output is empty # But this should never happen - tz = tz[0] if len(tz) > 0 else "+00:00" + if isinstance(tz_result, list) and len(tz_result) > 0: + tz = str(tz_result[0]) # Find all log files for target pod # Return type without capture is always str, so ignore type - logfiles = pod.nodeshell("find /var/log/pods -type f") + logfiles_result = node.nodeshell("find /var/log/pods -type f") + logfiles = ( + [str(f) for f in logfiles_result] + if isinstance(logfiles_result, list) + else [] + ) for logfile in logfiles: - if pod.name in logfile and "redpanda-configurator" not in logfile: + if node.name in logfile and "redpanda-configurator" not in logfile: self.logger.info(f"Inspecting '{logfile}'") - lines = pod.nodeshell(f"cat {logfile} | grep {expr}") + lines_result = node.nodeshell(f"cat {logfile} | grep {expr}") + lines = ( + [str(line) for line in lines_result] + if isinstance(lines_result, list) + else [] + ) loglines += lines except Exception as e: - self.logger.warning(f"Error getting logs for {pod.name}: {e}") + self.logger.warning(f"Error getting logs for {node.name}: {e}") else: _size = len(loglines) - self.logger.debug(f"Received {_size}B of data from {pod.name}") + self.logger.debug(f"Received {_size}B of data from {node.name}") # check log lines for proper timing. # Log lines will have two timing objects: @@ -320,5 +350,5 @@ def parse_k8s_time(logline, tz): continue yield line - def _get_hostname(self, host) -> str: + def _get_hostname(self, host: CloudBroker) -> str: return host.hostname diff --git a/tools/type-checking/type-check-strictness.json b/tools/type-checking/type-check-strictness.json index 5d7896cfdd9f1..374ed59a981e9 100644 --- a/tools/type-checking/type-check-strictness.json +++ b/tools/type-checking/type-check-strictness.json @@ -318,7 +318,6 @@ "rptest/services/tls.py", "rptest/services/transform_verifier_service.py", "rptest/services/trino_service.py", - "rptest/services/utils.py", "rptest/tests/acls_test.py", "rptest/tests/adjacent_segment_merging_test.py", "rptest/tests/admin_api_auth_test.py", From 452b00b46d826d52383a3df64c7203bc565ca5f4 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 21 Nov 2025 14:44:44 -0300 Subject: [PATCH 4/6] rptest/docker-deps: install ripgrep Install ripgrep from github. In my testing ripgrep is close to 5x faster when searching large logfiles, though it needs to be ripgrep 15, not <= 13 that we'd get on older Ubuntu distros. So install 15.1 from github releases. --- tests/docker/ducktape-deps/tool-pkgs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tests/docker/ducktape-deps/tool-pkgs b/tests/docker/ducktape-deps/tool-pkgs index 7797006cfcbde..f3d145fd8f5c3 100644 --- a/tests/docker/ducktape-deps/tool-pkgs +++ b/tests/docker/ducktape-deps/tool-pkgs @@ -4,7 +4,8 @@ # tests, but not for building the early layers of the image themselves. They # are installed relatively late in the Dockerfile, after most of the heavy # "non mainline" layers have forked off. -set -e +set -euo pipefail + apt-get update apt-get install -qq \ bind9-dnsutils \ @@ -57,3 +58,22 @@ unset LD_LIBRARY_PATH exec /usr/bin/llvm-symbolizer-$LLVM_VERSION "\$@" EOF chmod +x /usr/local/bin/llvm-symbolizer + +########### +# ripgrep # +########### + +RG_VERSION=15.1.0 +RG_BASE_URL="https://github.com/BurntSushi/ripgrep/releases/download" +if [ $(uname -m) = "aarch64" ]; then + # ripgrep- 15.1.0 -aarch64-unknown-linux-gnu.tar.gz + RG_TARBALL="ripgrep-${RG_VERSION}-aarch64-unknown-linux-gnu.tar.gz" +else + # ripgrep- 15.1.0 -x86_64-unknown-linux-musl.tar.gz + RG_TARBALL="ripgrep-${RG_VERSION}-x86_64-unknown-linux-musl.tar.gz" +fi +mkdir -p /opt/ripgrep +RG_URL="${RG_BASE_URL}/${RG_VERSION}/${RG_TARBALL}" +echo "Downloading ripgrep from ${RG_URL}" +curl -sSL "$RG_URL" | tar -xz -C /opt/ripgrep --strip-components=1 +ln -sf /opt/ripgrep/rg /usr/local/bin/rg From 537009d6e3c3ba4a65006e4b93b7274f05163878 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 21 Nov 2025 14:51:21 -0300 Subject: [PATCH 5/6] rptest: switch to ripgrep in log search Use ripgrep (rg) instead of grep for log searching in rptest. In my benchmark this results in about a 3x speedup when searching through 3 logs in parallel resulting from creating 10 topics of 1000 partitions. The benchmark is also included in this change, though @ignored so it does not run CI (it takes a minute or so). This requires translating GNU BRE (used by grep by default) to ERE (used by grep -E, and ripgrep). It was probably a mistake to use BRE in the first place, but it is what it is. --- tests/rptest/services/utils.py | 69 ++++++++++++++++++++++-- tests/rptest/tests/services_self_test.py | 40 +++++++++++++- 2 files changed, 105 insertions(+), 4 deletions(-) diff --git a/tests/rptest/services/utils.py b/tests/rptest/services/utils.py index e526e46e415cf..3b9ee5355bd50 100644 --- a/tests/rptest/services/utils.py +++ b/tests/rptest/services/utils.py @@ -132,7 +132,7 @@ class LogSearch(ABC): "Exceptional future ignored", "UndefinedBehaviorSanitizer", "Aborting on shard", - "libc++abi: terminating due to uncaught exception", + "terminating due to uncaught exception", "oversized allocation", ] @@ -157,7 +157,10 @@ def __init__( @abstractmethod def _capture_log(self, node: Any, expr: str) -> Generator[str, None, None]: - """Method to get log from host node. Overriden by each child.""" + """Method to get log from host node. Overriden by each child. + + expr is a GNU BRE regex (i.e., the default grep regex style), which means + you need to escape things like +() if you intend them to be metacharacters""" # Fake return type for type hint silence # And proper handling when called directly yield from [] @@ -248,6 +251,63 @@ def search_logs(self, versioned_nodes: VersionedNodes) -> None: raise BadLogLines(bad_loglines) +def _gnu_bre_to_ere(bre_pattern: str) -> str: + r""" + Convert a GNU Basic Regular Expression (BRE) to a GNU Extended Regular + Expression (ERE). + + This function handles two main differences between GNU BRE and ERE: + 1. In BRE, `(`, `)`, `{`, `}`, `+`, `?`, and `|` are literal characters, + whereas in ERE they are special metacharacters. To treat them as + literals in ERE, they must be escaped with a backslash. + 2. In BRE, the escaped versions `\(`, `\)`, `\{`, `\}`, `\+`, `\?`, and + `\|` have special meanings (grouping, intervals, etc.), while in ERE, + the unescaped versions have these special meanings. + + The conversion is performed by iterating through the BRE pattern and + applying the following rules: + - Unescaped `(`, `)`, `{`, `}`, `+`, `?`, `|` are escaped. + - Escaped `\(`, `\)`, `\{`, `\}`, `\+`, `\?`, `\|` are unescaped. + - Other characters, including other escaped characters (e.g., `\.`, `\*`), + are kept as they are. + - The logic correctly handles double backslashes (`\\`), ensuring they + are preserved. + """ + + # these are metacharacters in both ERE and GNU BRE but in BRE + # they must be escaped to have their metacharacter meaning + BRE_ESCAPED_METACHARACTERS = set("(){}+?|") + + ere_pattern = "" + i = 0 + while i < len(bre_pattern): + char = bre_pattern[i] + if char == "\\": + if i + 1 < len(bre_pattern): + next_char = bre_pattern[i + 1] + if next_char in BRE_ESCAPED_METACHARACTERS: + # Unescape BRE metacharacters to become ERE metacharacters + ere_pattern += next_char + i += 2 + else: + # Keep other escaped characters as they are (e.g., \\, \*, \.) + ere_pattern += char + next_char + i += 2 + else: + # Trailing backslash + ere_pattern += char + i += 1 + elif char in BRE_ESCAPED_METACHARACTERS: + # Escape ERE metacharacters that are literals in BRE + ere_pattern += "\\" + char + i += 1 + else: + # Keep all other characters + ere_pattern += char + i += 1 + return ere_pattern + + class LogSearchLocal(LogSearch): def __init__( self, @@ -260,7 +320,10 @@ def __init__( self.targetpath = targetpath def _capture_log(self, node: ClusterNode, expr: str) -> Generator[str, None, None]: - cmd = f"grep {expr} {self.targetpath} || true" + if not expr.startswith("-P"): + # some naughty tests use this to force grep/rg to use PRCE + expr = _gnu_bre_to_ere(expr) + cmd = f"rg {expr} {self.targetpath} || true" for line in node.account.ssh_capture(cmd): yield line diff --git a/tests/rptest/tests/services_self_test.py b/tests/rptest/tests/services_self_test.py index 6732376ef9587..42d4f6438c010 100644 --- a/tests/rptest/tests/services_self_test.py +++ b/tests/rptest/tests/services_self_test.py @@ -11,10 +11,11 @@ import signal from subprocess import CalledProcessError from typing import Any, Callable, Iterator +import time from ducktape.cluster.cluster import ClusterNode from ducktape.cluster.remoteaccount import RemoteCommandError -from ducktape.mark import matrix +from ducktape.mark import matrix, ignore from ducktape.mark.resource import cluster as dt_cluster from ducktape.tests.test import Test, TestContext @@ -667,6 +668,43 @@ def validate_exception(e: BadLogLines) -> bool: with expect_exception(BadLogLines, validate_exception): self.redpanda.raise_on_bad_logs(allow_list=[]) + @ignore + @cluster(num_nodes=3, check_allowed_error_logs=False) + def test_bll_bench(self): + """ + Test that the LogMessage admin API correctly logs messages and that + ERROR level logs are caught by raise_on_bad_logs. + + Ignored by default since we don't want to run benchmarks in CI. + """ + # create and delete a 1000-partition topic 10 times + rpk = RpkTool(self.redpanda) + + parts = 1000 + + for i in range(10): + topic_name = f"bll_bench_{i}" + + def _all_partitions_present(): + try: + desc = list(rpk.describe_topic(topic_name)) + return len(desc) == parts + except Exception: + return False + + # 1000 partitions, replication factor 1 to avoid excess resource usage + rpk.create_topic(topic_name, partitions=parts, replicas=3) + self.redpanda.wait_until( + _all_partitions_present, timeout_sec=30, backoff_sec=1 + ) + rpk.delete_topic(topic_name) + self.logger.warning(f"c d topic {i}") + + start = time.time() + self.redpanda.raise_on_bad_logs(allow_list=[]) + elapsed = time.time() - start + self.logger.warning(f"raise_on_bad_logs elapsed {elapsed:.3f}s") + class RedpandaServiceSelfRawTest(Test): """This 'raw' test inherits only from Test, so that internally it From e62fe62af9d3dde6fc719c6a30574304b35902a3 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 21 Nov 2025 14:58:19 -0300 Subject: [PATCH 6/6] rptest: speed up BLL check by checking in parallel We query each node one by one to check the log for bad log lines, but we can do this in parallel: it puts almost no load on the runner, and the work happens on the nodes, so let's speed it up. This results in about a 3x speed in my benchmark, on top of the 3x speedup from switching to rg (so about 9x overall). --- tests/rptest/services/utils.py | 48 ++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/tests/rptest/services/utils.py b/tests/rptest/services/utils.py index 3b9ee5355bd50..ac0fee0f0f46b 100644 --- a/tests/rptest/services/utils.py +++ b/tests/rptest/services/utils.py @@ -1,9 +1,9 @@ -from collections import defaultdict from logging import Logger import re import time from abc import ABC, abstractmethod -from typing import Any, Generator, Iterable, Mapping, TypedDict +from typing import Any, Generator, Iterable, TypedDict +from concurrent.futures import ThreadPoolExecutor, as_completed from rptest.clients.kubectl import KubectlTool @@ -62,7 +62,7 @@ def elapsed(self) -> float: return time.time() - self._start def elapseds(self) -> str: - return f"{self.elapsed:.2f}s" + return f"{self.elapsed:.3f}s" def elapsedf(self, note: str) -> str: return f"{note}: {self.elapseds()}" @@ -150,7 +150,7 @@ def __init__( ) # Prepare matching terms - self.match_terms: list[str] = self.DEFAULT_MATCH_TERMS + self.match_terms: list[str] = list(self.DEFAULT_MATCH_TERMS) if self._raise_on_errors: self.match_terms.append("^ERROR") self.match_expr: str = " ".join(f'-e "{t}"' for t in self.match_terms) @@ -203,20 +203,18 @@ def _check_oversized_allocations(self, line: str) -> bool: return False def _search(self, versioned_nodes: VersionedNodes) -> NodeToLines: - def make_vl() -> VersionAndLines: - return {"version": None, "lines": []} - - bad_lines: defaultdict[ClusterNode | CloudBroker, VersionAndLines] = ( - defaultdict(make_vl) - ) test_name = self._context.function_name - sw = Stopwatch() - for version, node in versioned_nodes: - sw.start() + overall_sw = Stopwatch() + overall_sw.start() + + def scan_one( + version: str | None, node: ClusterNode | CloudBroker + ) -> tuple[ClusterNode | CloudBroker, VersionAndLines]: + node_sw = Stopwatch() + node_sw.start() hostname = self._get_hostname(node) self.logger.info(f"Scanning node {hostname} log for errors...") - # Prepare/Build capture func shortcut - # Iterate + vl: VersionAndLines = {"version": version, "lines": []} for line in self._capture_log(node, self.match_expr): line = line.strip() # Check if this line holds error @@ -229,15 +227,27 @@ def make_vl() -> VersionAndLines: allowed = self._check_oversized_allocations(line) # If detected bad lines, log it and add to the list if not allowed: - bad_lines[node]["version"] = version - bad_lines[node]["lines"].append(line) + vl["lines"].append(line) self.logger.warning( f"[{test_name}] Unexpected log line on {hostname}: {line}" ) self.logger.info( - sw.elapsedf(f"##### Time spent to scan bad logs on '{hostname}'") + node_sw.elapsedf(f"Time spent to scan bad logs on '{hostname}'") ) - return dict(bad_lines) + return node, vl + + bad_lines: NodeToLines = {} + + # Run scans in parallel + with ThreadPoolExecutor() as executor: + futures = [executor.submit(scan_one, v, n) for v, n in versioned_nodes] + for fut in as_completed(futures): + node, vl = fut.result() + if vl["lines"]: + bad_lines[node] = vl + + self.logger.info(overall_sw.elapsedf("Time spent to scan bad logs overall")) + return bad_lines def search_logs(self, versioned_nodes: VersionedNodes) -> None: """