Skip to content

[Python][Flaky Test] TestSyncPubSub.test_sync_pubsub_exact_happy_path #4970

@xShinnRyuu

Description

@xShinnRyuu

Test Name

TestSyncPubSub.test_sync_pubsub_exact_happy_path[MethodTesting.Async-True]

Test Location

tests/sync_tests/test_sync_pubsub.py:211

Failure Permlink

https://github.com/valkey-io/valkey-glide/actions/runs/19319472569/job/55354545999#step:8:383

System Information

aarch64-apple-darwin

Language and Version

Python 3.9

Engine Version

6.2

Logs

__ TestSyncPubSub.test_sync_pubsub_exact_happy_path[MethodTesting.Async-True] __

self = <test_sync_pubsub.TestSyncPubSub object at 0x11ff3f550>
request = <FixtureRequest for <Function test_sync_pubsub_exact_happy_path[MethodTesting.Async-True]>>
cluster_mode = True, method = <MethodTesting.Async: 0>

    @pytest.mark.parametrize("cluster_mode", [True, False])
    @pytest.mark.parametrize(
        "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback]
    )
    def test_sync_pubsub_exact_happy_path(
        self,
        request,
        cluster_mode: bool,
        method: MethodTesting,
    ):
        """
        Tests the basic happy path for exact PUBSUB functionality.
    
        This test covers the basic PUBSUB flow using three different methods:
        Async, Sync, and Callback. It verifies that a message published to a
        specific channel is correctly received by a subscriber.
        """
        listening_client, publishing_client = None, None
        try:
            channel = get_random_string(10)
            message = get_random_string(5)
    
            callback, context = None, None
            callback_messages: List[PubSubMsg] = []
            if method == MethodTesting.Callback:
                callback = new_message
                context = callback_messages
    
            pub_sub = create_pubsub_subscription(
                cluster_mode,
                {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}},
                {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}},
                callback=callback,
                context=context,
            )
    
>           listening_client, publishing_client = create_two_clients_with_pubsub(
                request, cluster_mode, pub_sub
            )

tests/sync_tests/test_sync_pubsub.py:211: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/sync_tests/test_sync_pubsub.py:71: in create_two_clients_with_pubsub
    client1 = create_sync_client(
tests/sync_tests/conftest.py:160: in create_sync_client
    return SyncGlideClusterClient.create(config)
.env/lib/python3.9/site-packages/glide_sync/glide_client.py:84: in create
    self._create_core_client()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <glide_sync.glide_client.GlideClusterClient object at 0x11fc09280>

    def _create_core_client(self):
        # This check is needed in case a fork happens after the client already closed
        # In that case the registered fork function will kick in even if the
        # client already closed, and recreate it anyway.
        if self._is_closed:
            return
        conn_req = self._config._create_a_protobuf_conn_request(
            cluster_mode=type(self._config) is GlideClusterClientConfiguration
        )
        conn_req_bytes = conn_req.SerializeToString()
        client_type = self._ffi.new(
            "ClientType*",
            {
                "_type": self._ffi.cast("ClientTypeEnum", FFIClientTypeEnum.Sync),
            },
        )
    
        if self._config._is_pubsub_configured():
            # If in subscribed mode, create a callback that will be called by the FFI layer
            # for handling push notifications. This callback would either call the user callback (if provided),
            # or append the messaged to the the `_pubsub_queue`
            python_callback = self._create_push_handle_callback()
            pubsub_callback = self._ffi.callback("PubSubCallback", python_callback)
            # Store reference to prevent garbage collection
            self._pubsub_callback_ref = pubsub_callback
        else:
            pubsub_callback = self._ffi.cast("PubSubCallback", 0)
    
        client_response_ptr = self._lib.create_client(
            conn_req_bytes,
            len(conn_req_bytes),
            client_type,
            pubsub_callback,
        )
    
        Logger.log(Level.INFO, "connection info", "new connection established")
    
        # Handle the connection response
        if client_response_ptr != self._ffi.NULL:
            client_response = self._try_ffi_cast(
                "ConnectionResponse*", client_response_ptr
            )
            if client_response.conn_ptr != self._ffi.NULL:
                self._core_client = client_response.conn_ptr
            else:
                error_message = (
                    self._ffi.string(client_response.connection_error_message).decode(
                        ENCODING
                    )
                    if client_response.connection_error_message != self._ffi.NULL
                    else "Unknown error"
                )
>               raise ClosingError(error_message)
E               glide_shared.exceptions.ClosingError: connection attempt timed out

.env/lib/python3.9/site-packages/glide_sync/glide_client.py:140: ClosingError

Glide Version

main

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions