Skip to content

Commit d6a2324

Browse files
committed
Restore the cancellation tests
Improve log output, fix static mode Signed-off-by: Graham King <[email protected]>
1 parent 1a70309 commit d6a2324

File tree

6 files changed

+25
-22
lines changed

6 files changed

+25
-22
lines changed

examples/custom_backend/cancellation/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def main():
5050
return
5151

5252
loop = asyncio.get_running_loop()
53-
runtime = DistributedRuntime(loop, "etcd", False)
53+
runtime = DistributedRuntime(loop, "mem", True)
5454

5555
# Connect to middle server or direct server based on argument
5656
if use_middle_server:

examples/custom_backend/cancellation/middle_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def generate(self, request, context):
5050
async def main():
5151
"""Start the middle server"""
5252
loop = asyncio.get_running_loop()
53-
runtime = DistributedRuntime(loop, "etcd", False)
53+
runtime = DistributedRuntime(loop, "mem", True)
5454

5555
# Create middle server handler
5656
handler = MiddleServer(runtime)

examples/custom_backend/cancellation/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async def generate(self, request, context):
3131
async def main():
3232
"""Start the demo server"""
3333
loop = asyncio.get_running_loop()
34-
runtime = DistributedRuntime(loop, "etcd", False)
34+
runtime = DistributedRuntime(loop, "mem", True)
3535

3636
# Create server component
3737
component = runtime.namespace("demo").component("server")

lib/bindings/python/tests/cancellation/test_example.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,25 +82,21 @@ def run_client(example_dir, use_middle=False):
8282
)
8383

8484
# Wait for client to complete
85-
stdout, _ = client_proc.communicate(timeout=1)
86-
87-
if client_proc.returncode != 0:
88-
pytest.fail(
89-
f"Client failed with return code {client_proc.returncode}. Output: {stdout}"
90-
)
85+
stdout, _ = client_proc.communicate(timeout=2)
86+
print(f"Client stdout: {stdout}")
9187

9288
return stdout
9389

9490

95-
def stop_process(process):
91+
def stop_process(name, process):
9692
"""Stop a running process and capture its output"""
9793
process.terminate()
9894
stdout, _ = process.communicate(timeout=1)
95+
print(f"{name}: {stdout}")
9996
return stdout
10097

10198

10299
@pytest.mark.asyncio
103-
@pytest.mark.skip(reason="Graham working on it")
104100
async def test_direct_connection_cancellation(example_dir, server_process):
105101
"""Test cancellation with direct client-server connection"""
106102
# Run the client (direct connection)
@@ -110,7 +106,7 @@ async def test_direct_connection_cancellation(example_dir, server_process):
110106
await asyncio.sleep(1)
111107

112108
# Capture server output
113-
server_output = stop_process(server_process)
109+
server_output = stop_process("server_process", server_process)
114110

115111
# Assert expected messages
116112
assert (
@@ -122,7 +118,6 @@ async def test_direct_connection_cancellation(example_dir, server_process):
122118

123119

124120
@pytest.mark.asyncio
125-
@pytest.mark.skip(reason="Graham working on it")
126121
async def test_middle_server_cancellation(
127122
example_dir, server_process, middle_server_process
128123
):
@@ -134,8 +129,8 @@ async def test_middle_server_cancellation(
134129
await asyncio.sleep(1)
135130

136131
# Capture output from all processes
137-
server_output = stop_process(server_process)
138-
middle_output = stop_process(middle_server_process)
132+
server_output = stop_process("server_process", server_process)
133+
middle_output = stop_process("middle_server_process", middle_server_process)
139134

140135
# Assert expected messages
141136
assert (

lib/bindings/python/tests/conftest.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def start_nats_and_etcd_default_ports():
153153
print(f"Using ETCD on default client port {etcd_client_port}")
154154

155155
# Start services with default ports
156-
nats_server = subprocess.Popen(["nats-server", "-js"])
156+
nats_server = subprocess.Popen(["nats-server", "-js", "--trace"])
157157
etcd = subprocess.Popen(["etcd"])
158158

159159
return nats_server, etcd, nats_port, etcd_client_port, nats_data_dir, etcd_data_dir
@@ -181,6 +181,8 @@ def start_nats_and_etcd_random_ports():
181181
etcd = subprocess.Popen(
182182
[
183183
"etcd",
184+
"--logger",
185+
"zap",
184186
"--data-dir",
185187
str(etcd_data_dir),
186188
"--listen-client-urls",
@@ -221,7 +223,11 @@ def start_nats_and_etcd_random_ports():
221223
msg = log.get("msg", "")
222224

223225
# Look for the client port
224-
if "serving client traffic" in msg or "serving client" in msg:
226+
if (
227+
"serving client traffic" in msg
228+
or "serving client" in msg
229+
or "serving insecure client" in msg
230+
):
225231
address = log.get("address", "")
226232
match = re.search(r":(\d+)$", address)
227233
if match:
@@ -430,6 +436,6 @@ async def test_my_test(runtime):
430436
)
431437

432438
loop = asyncio.get_running_loop()
433-
runtime = DistributedRuntime(loop, "file", True)
439+
runtime = DistributedRuntime(loop, "etcd", False)
434440
yield runtime
435441
runtime.shutdown()

lib/runtime/src/distributed.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,19 @@ impl DistributedRuntime {
5353

5454
let runtime_clone = runtime.clone();
5555

56-
let (etcd_client, store) = match selected_kv_store {
57-
KeyValueStoreSelect::Etcd(etcd_config) => {
56+
let (etcd_client, store) = match (is_static, selected_kv_store) {
57+
(false, KeyValueStoreSelect::Etcd(etcd_config)) => {
5858
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
5959
// The returned error doesn't show because of a dropped runtime error, so
6060
// log it first.
6161
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
6262
let store = KeyValueStoreManager::etcd(etcd_client.clone());
6363
(Some(etcd_client), store)
6464
}
65-
KeyValueStoreSelect::File(root) => (None, KeyValueStoreManager::file(root)),
66-
KeyValueStoreSelect::Memory => (None, KeyValueStoreManager::memory()),
65+
(false, KeyValueStoreSelect::File(root)) => (None, KeyValueStoreManager::file(root)),
66+
(true, _) | (false, KeyValueStoreSelect::Memory) => {
67+
(None, KeyValueStoreManager::memory())
68+
}
6769
};
6870

6971
let nats_client = Some(nats_config.clone().connect().await?);

0 commit comments

Comments
 (0)