Skip to content

Commit 390e165

Browse files
committed
Add support of CONNECTION_METADATA_CHANGE event
This event was introduced into the core recently, it is sent whenever `system.conneciton_metadata` table is updated.
1 parent f8d7508 commit 390e165

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

cassandra/protocol.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -956,7 +956,8 @@ def send_body(self, f, protocol_version):
956956
known_event_types = frozenset((
957957
'TOPOLOGY_CHANGE',
958958
'STATUS_CHANGE',
959-
'SCHEMA_CHANGE'
959+
'SCHEMA_CHANGE',
960+
'CONNECTION_METADATA_CHANGE'
960961
))
961962

962963

@@ -987,6 +988,14 @@ def recv_body(cls, f, protocol_version, *args):
987988
return cls(event_type=event_type, event_args=read_method(f, protocol_version))
988989
raise NotSupportedError('Unknown event type %r' % event_type)
989990

991+
@classmethod
992+
def recv_connection_metadata_change(cls, f, protocol_version):
993+
# "UPDATE_NODES"
994+
change_type = read_string(f)
995+
connection_ids = read_stringlist(f)
996+
host_ids = read_stringlist(f)
997+
return dict(change_type=change_type, connection_ids=connection_ids, host_ids=host_ids)
998+
990999
@classmethod
9911000
def recv_topology_change(cls, f, protocol_version):
9921001
# "NEW_NODE" or "REMOVED_NODE"

tests/integration/standard/test_control_connection.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414
#
1515
#
1616
#
17+
from threading import Event
18+
1719
from cassandra import InvalidRequest
1820

1921
import unittest
22+
import requests
2023

2124

2225
from cassandra.protocol import ConfigurationException
23-
from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40
26+
from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40, \
27+
xfail_scylla_version_lt
2428
from tests.integration.datatype_utils import update_datatypes
2529

2630

@@ -127,3 +131,69 @@ def test_control_connection_port_discovery(self):
127131
for host in hosts:
128132
assert 9042 == host.broadcast_rpc_port
129133
assert 7000 == host.broadcast_port
134+
135+
@xfail_scylla_version_lt(reason='scylladb/scylladb#26992 - system.connection_metadata is not yet supported',
136+
oss_scylla_version="7.0", ent_scylla_version="2025.4.0")
137+
def test_connection_metadata_change_event(self):
138+
cluster = TestCluster()
139+
140+
# Establish control connection
141+
cluster.connect()
142+
143+
flag = Event()
144+
145+
connection_ids = ["anytext", "11510f50-f906-4844-8c74-49ddab9ac6a9"]
146+
host_ids = ["1a13fa42-c45b-410f-8ba5-58b42ada9c12", "aa13fa42-c45b-410f-8ba5-58b42ada9c12"]
147+
got_connection_ids = []
148+
got_host_ids = []
149+
150+
def on_event(event):
151+
nonlocal got_connection_ids
152+
nonlocal got_host_ids
153+
try:
154+
assert event.get("change_type") == "UPDATE_NODES"
155+
got_connection_ids = event.get("connection_ids")
156+
got_host_ids = event.get("host_ids")
157+
finally:
158+
flag.set()
159+
160+
cluster.control_connection._connection.register_watchers({"CONNECTION_METADATA_CHANGE": on_event})
161+
162+
try:
163+
payload = [
164+
{
165+
"connection_id": connection_ids[0], # Should be a UUID if API requires that
166+
"host_id": host_ids[0],
167+
"address": "localhost",
168+
"port": 9042,
169+
"tls_port": 0,
170+
"alternator_port": 0,
171+
"alternator_https_port": 0,
172+
"rack": "string",
173+
"datacenter": "string"
174+
},
175+
{
176+
"connection_id": connection_ids[1],
177+
"host_id": host_ids[1],
178+
"address": "localhost",
179+
"port": 9042,
180+
"tls_port": 0,
181+
"alternator_port": 0,
182+
"alternator_https_port": 0,
183+
"rack": "string",
184+
"datacenter": "string"
185+
}
186+
]
187+
response = requests.post(
188+
"http://" + cluster.contact_points[0] + ":10000/v2/connection-metadata",
189+
json=payload,
190+
headers={
191+
"Content-Type": "application/json",
192+
"Accept": "application/json",
193+
})
194+
assert response.status_code == 200
195+
assert flag.wait(20), "Schema change event was not received after registering watchers"
196+
assert got_connection_ids == connection_ids
197+
assert got_host_ids == host_ids
198+
finally:
199+
cluster.shutdown()

0 commit comments

Comments
 (0)