Skip to content

Commit 8e2cd13

Browse files
committed
move nexus tests out of overall metric tests b/c nexus doesn't work with time skipping
1 parent 623ea6e commit 8e2cd13

File tree

3 files changed

+262
-170
lines changed

3 files changed

+262
-170
lines changed

tests/helpers/metrics.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from collections.abc import Mapping
2+
3+
4+
class PromMetricMatcher:
5+
def __init__(self, prom_lines: list[str]) -> None:
6+
self._prom_lines = prom_lines
7+
8+
# Intentionally naive metric checker
9+
def matches_metric_line(
10+
self, line: str, name: str, at_least_labels: Mapping[str, str], value: int
11+
) -> bool:
12+
# Must have metric name
13+
if not line.startswith(name + "{"):
14+
return False
15+
# Must have labels (don't escape for this test)
16+
for k, v in at_least_labels.items():
17+
if f'{k}="{v}"' not in line:
18+
return False
19+
return line.endswith(f" {value}")
20+
21+
def assert_metric_exists(
22+
self, name: str, at_least_labels: Mapping[str, str], value: int
23+
) -> None:
24+
assert any(
25+
self.matches_metric_line(line, name, at_least_labels, value)
26+
for line in self._prom_lines
27+
)
28+
29+
def assert_description_exists(self, name: str, description: str) -> None:
30+
assert f"# HELP {name} {description}" in self._prom_lines

tests/nexus/test_workflow_caller.py

Lines changed: 210 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import concurrent.futures
45
import uuid
56
from dataclasses import dataclass
67
from enum import IntEnum
78
from typing import Any, Awaitable, Callable, Union
9+
from urllib.request import urlopen
810

911
import nexusrpc
1012
import nexusrpc.handler
@@ -37,9 +39,18 @@
3739
from temporalio.converter import PayloadConverter
3840
from temporalio.exceptions import ApplicationError, CancelledError, NexusOperationError
3941
from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation
42+
from temporalio.runtime import (
43+
BUFFERED_METRIC_KIND_COUNTER,
44+
MetricBuffer,
45+
PrometheusConfig,
46+
Runtime,
47+
TelemetryConfig,
48+
)
4049
from temporalio.service import RPCError, RPCStatusCode
4150
from temporalio.testing import WorkflowEnvironment
4251
from temporalio.worker import Worker
52+
from tests.helpers import find_free_port, new_worker
53+
from tests.helpers.metrics import PromMetricMatcher
4354
from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name
4455

4556
# TODO(nexus-prerelease): test availability of Temporal client etc in async context set by worker
@@ -239,7 +250,7 @@ def __init__(
239250
request_cancel: bool,
240251
task_queue: str,
241252
) -> None:
242-
self.nexus_client: workflow.NexusClient[ServiceInterface] = (
253+
self.nexus_client: workflow.NexusClient[ServiceInterface | ServiceImpl] = (
243254
workflow.create_nexus_client(
244255
service={
245256
CallerReference.IMPL_WITH_INTERFACE: ServiceImpl,
@@ -890,7 +901,7 @@ async def run(
890901
f"Invalid combination of caller_reference ({caller_reference}) and name_override ({name_override})"
891902
)
892903

893-
nexus_client = workflow.create_nexus_client(
904+
nexus_client: workflow.NexusClient[Any] = workflow.create_nexus_client(
894905
service=service_cls,
895906
endpoint=make_nexus_endpoint_name(task_queue),
896907
)
@@ -1409,3 +1420,200 @@ async def test_workflow_run_operation_overloads(
14091420
if op != "no_param"
14101421
else OverloadTestValue(value=0)
14111422
)
1423+
1424+
1425+
@nexusrpc.handler.service_handler
1426+
class CustomMetricsService:
1427+
@nexusrpc.handler.sync_operation
1428+
async def custom_metric_op(
1429+
self, ctx: nexusrpc.handler.StartOperationContext, input: None
1430+
) -> None:
1431+
counter = nexus.metric_meter().create_counter(
1432+
"my-operation-counter", "my-operation-description", "my-operation-unit"
1433+
)
1434+
counter.add(12)
1435+
counter.add(30, {"my-operation-extra-attr": 12.34})
1436+
1437+
@nexusrpc.handler.sync_operation
1438+
def custom_metric_op_executor(
1439+
self, ctx: nexusrpc.handler.StartOperationContext, input: None
1440+
) -> None:
1441+
counter = nexus.metric_meter().create_counter(
1442+
"my-executor-operation-counter",
1443+
"my-executor-operation-description",
1444+
"my-executor-operation-unit",
1445+
)
1446+
counter.add(12)
1447+
counter.add(30, {"my-executor-operation-extra-attr": 12.34})
1448+
1449+
1450+
@workflow.defn
1451+
class CustomMetricsWorkflow:
1452+
@workflow.run
1453+
async def run(self, task_queue: str) -> None:
1454+
nexus_client = workflow.create_nexus_client(
1455+
service=CustomMetricsService, endpoint=make_nexus_endpoint_name(task_queue)
1456+
)
1457+
1458+
await nexus_client.execute_operation(
1459+
CustomMetricsService.custom_metric_op, None
1460+
)
1461+
await nexus_client.execute_operation(
1462+
CustomMetricsService.custom_metric_op_executor, None
1463+
)
1464+
1465+
1466+
async def test_workflow_caller_custom_metrics(client: Client, env: WorkflowEnvironment):
1467+
if env.supports_time_skipping:
1468+
pytest.skip("Nexus tests don't work with time-skipping server")
1469+
1470+
# Run worker with default runtime which is noop meter just to confirm it
1471+
# doesn't fail
1472+
task_queue = str(uuid.uuid4())
1473+
await create_nexus_endpoint(task_queue, client)
1474+
1475+
# Create new runtime with Prom server
1476+
prom_addr = f"127.0.0.1:{find_free_port()}"
1477+
runtime = Runtime(
1478+
telemetry=TelemetryConfig(
1479+
metrics=PrometheusConfig(bind_address=prom_addr), metric_prefix="foo_"
1480+
)
1481+
)
1482+
1483+
# New client with the runtime
1484+
client = await Client.connect(
1485+
client.service_client.config.target_host,
1486+
namespace=client.namespace,
1487+
runtime=runtime,
1488+
)
1489+
1490+
async with new_worker(
1491+
client,
1492+
CustomMetricsWorkflow,
1493+
task_queue=task_queue,
1494+
nexus_service_handlers=[CustomMetricsService()],
1495+
nexus_task_executor=concurrent.futures.ThreadPoolExecutor(),
1496+
) as worker:
1497+
# Run workflow
1498+
await client.execute_workflow(
1499+
CustomMetricsWorkflow.run,
1500+
worker.task_queue,
1501+
id=f"wf-{uuid.uuid4()}",
1502+
task_queue=worker.task_queue,
1503+
)
1504+
1505+
# Get Prom dump
1506+
with urlopen(url=f"http://{prom_addr}/metrics") as f:
1507+
prom_str: str = f.read().decode("utf-8")
1508+
prom_lines = prom_str.splitlines()
1509+
1510+
prom_matcher = PromMetricMatcher(prom_lines)
1511+
1512+
prom_matcher.assert_description_exists(
1513+
"my_operation_counter", "my-operation-description"
1514+
)
1515+
prom_matcher.assert_metric_exists("my_operation_counter", {}, 12)
1516+
prom_matcher.assert_metric_exists(
1517+
"my_operation_counter",
1518+
{
1519+
"my_operation_extra_attr": "12.34",
1520+
# Also confirm some nexus operation labels
1521+
"nexus_service": CustomMetricsService.__name__,
1522+
"nexus_operation": CustomMetricsService.custom_metric_op.__name__,
1523+
"task_queue": worker.task_queue,
1524+
},
1525+
30,
1526+
)
1527+
prom_matcher.assert_description_exists(
1528+
"my_executor_operation_counter", "my-executor-operation-description"
1529+
)
1530+
prom_matcher.assert_metric_exists("my_executor_operation_counter", {}, 12)
1531+
prom_matcher.assert_metric_exists(
1532+
"my_executor_operation_counter",
1533+
{
1534+
"my_executor_operation_extra_attr": "12.34",
1535+
# Also confirm some nexus operation labels
1536+
"nexus_service": CustomMetricsService.__name__,
1537+
"nexus_operation": CustomMetricsService.custom_metric_op_executor.__name__,
1538+
"task_queue": worker.task_queue,
1539+
},
1540+
30,
1541+
)
1542+
1543+
1544+
async def test_workflow_caller_buffered_metrics(
1545+
client: Client, env: WorkflowEnvironment
1546+
):
1547+
if env.supports_time_skipping:
1548+
pytest.skip("Nexus tests don't work with time-skipping server")
1549+
1550+
# Create runtime with metric buffer
1551+
buffer = MetricBuffer(10000)
1552+
runtime = Runtime(
1553+
telemetry=TelemetryConfig(metrics=buffer, metric_prefix="some_prefix_")
1554+
)
1555+
1556+
# Confirm no updates yet
1557+
assert not buffer.retrieve_updates()
1558+
1559+
# Create a new client on the runtime and execute the custom metric workflow
1560+
client = await Client.connect(
1561+
client.service_client.config.target_host,
1562+
namespace=client.namespace,
1563+
runtime=runtime,
1564+
)
1565+
task_queue = str(uuid.uuid4())
1566+
await create_nexus_endpoint(task_queue, client)
1567+
async with new_worker(
1568+
client,
1569+
CustomMetricsWorkflow,
1570+
task_queue=task_queue,
1571+
nexus_service_handlers=[CustomMetricsService()],
1572+
nexus_task_executor=concurrent.futures.ThreadPoolExecutor(),
1573+
) as worker:
1574+
await client.execute_workflow(
1575+
CustomMetricsWorkflow.run,
1576+
worker.task_queue,
1577+
id=f"wf-{uuid.uuid4()}",
1578+
task_queue=worker.task_queue,
1579+
)
1580+
1581+
# Drain updates and confirm updates exist as expected
1582+
updates = buffer.retrieve_updates()
1583+
# Check for Nexus metrics
1584+
assert any(
1585+
update.metric.name == "my-operation-counter"
1586+
and update.metric.kind == BUFFERED_METRIC_KIND_COUNTER
1587+
and update.metric.description == "my-operation-description"
1588+
and update.attributes["nexus_service"] == CustomMetricsService.__name__
1589+
and update.attributes["nexus_operation"]
1590+
== CustomMetricsService.custom_metric_op.__name__
1591+
and update.attributes["task_queue"] == worker.task_queue
1592+
and "my-operation-extra-attr" not in update.attributes
1593+
and update.value == 12
1594+
for update in updates
1595+
)
1596+
assert any(
1597+
update.metric.name == "my-operation-counter"
1598+
and update.attributes.get("my-operation-extra-attr") == 12.34
1599+
and update.value == 30
1600+
for update in updates
1601+
)
1602+
assert any(
1603+
update.metric.name == "my-executor-operation-counter"
1604+
and update.metric.description == "my-executor-operation-description"
1605+
and update.metric.kind == BUFFERED_METRIC_KIND_COUNTER
1606+
and update.attributes["nexus_service"] == CustomMetricsService.__name__
1607+
and update.attributes["nexus_operation"]
1608+
== CustomMetricsService.custom_metric_op_executor.__name__
1609+
and update.attributes["task_queue"] == worker.task_queue
1610+
and "my-executor-operation-extra-attr" not in update.attributes
1611+
and update.value == 12
1612+
for update in updates
1613+
)
1614+
assert any(
1615+
update.metric.name == "my-executor-operation-counter"
1616+
and update.attributes.get("my-executor-operation-extra-attr") == 12.34
1617+
and update.value == 30
1618+
for update in updates
1619+
)

0 commit comments

Comments
 (0)