Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .cursorrules
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Cursor Rules for Cadence Python Client

## Package Management
- **Always use `uv` for Python package management**
- Use `uv run` for running Python commands instead of `python` directly
- Use `uv sync` for installing dependencies instead of `pip install`
- Use `uv tool run` for running development tools (pytest, mypy, ruff, etc.)
- Only use `pip` or `python` directly when specifically required by the tool or documentation

## Examples
```bash
# ✅ Correct
uv run python scripts/generate_proto.py
uv run python -m pytest tests/
uv tool run mypy cadence/
uv tool run ruff check

# ❌ Avoid
python scripts/generate_proto.py
pip install -e ".[dev]"
```

## Virtual Environment
- The project uses `uv` for virtual environment management
- Always activate the virtual environment using `uv` commands
- Dependencies are managed through `pyproject.toml` and `uv.lock`

## Testing
- Run tests with `uv run python -m pytest`
- Use `uv run` for any Python script execution
- Development tools should be run with `uv tool run`

## Code Generation
- Use `uv run python scripts/generate_proto.py` for protobuf generation
- Use `uv run python scripts/dev.py` for development tasks

## Code Quality
- **ALWAYS run linter and type checker after making code changes**
- Run linter with auto-fix: `uv tool run ruff check --fix`
- Run type checking: `uv tool run mypy cadence/`
- Use `uv tool run ruff check --fix && uv tool run mypy cadence/` to run both together
- **Standard workflow**: Make changes → Run linter → Run type checker → Commit

## Development Workflow
1. Make code changes
2. Run `uv tool run ruff check --fix` (fixes formatting and linting issues)
3. Run `uv tool run mypy cadence/` (checks type safety)
4. Run `uv run python -m pytest` (run tests)
5. Commit changes
12 changes: 12 additions & 0 deletions cadence/_internal/visibility/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Visibility and metrics collection components for Cadence client."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricsEmitter and the prometheus stuff should be in a public package. Users need to specify it in the ClientOptions so it should be visibile. Maybe cadence/metrics

Additionally we should go with metrics as the package name. I don't think visibility accurately describes it.


from .metrics import MetricsEmitter, NoOpMetricsEmitter
from .prometheus import PrometheusMetrics, PrometheusConfig, CadenceMetrics

__all__ = [
"MetricsEmitter",
"NoOpMetricsEmitter",
"PrometheusMetrics",
"PrometheusConfig",
"CadenceMetrics",
]
60 changes: 60 additions & 0 deletions cadence/_internal/visibility/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Core metrics collection interface and registry for Cadence client."""

import logging
from enum import Enum
from typing import Dict, Optional, Protocol

logger = logging.getLogger(__name__)


class MetricType(Enum):
"""Types of metrics that can be collected."""

COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"


class MetricsEmitter(Protocol):
"""Protocol for metrics collection backends."""

def counter(
self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None
) -> None:
"""Send a counter metric."""
...

def gauge(
self, key: str, value: float, tags: Optional[Dict[str, str]] = None
) -> None:
"""Send a gauge metric."""
...


def histogram(
self, key: str, value: float, tags: Optional[Dict[str, str]] = None
) -> None:
"""Send a histogram metric."""
...


class NoOpMetricsEmitter:
"""No-op metrics emitter that discards all metrics."""

def counter(
self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None
) -> None:
pass

def gauge(
self, key: str, value: float, tags: Optional[Dict[str, str]] = None
) -> None:
pass


def histogram(
self, key: str, value: float, tags: Optional[Dict[str, str]] = None
) -> None:
pass


198 changes: 198 additions & 0 deletions cadence/_internal/visibility/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""Prometheus metrics integration for Cadence client."""

import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Optional

from prometheus_client import ( # type: ignore[import-not-found]
REGISTRY,
CollectorRegistry,
Counter,
Gauge,
Histogram,
generate_latest,
)

from cadence._internal.visibility.metrics import MetricsEmitter


logger = logging.getLogger(__name__)


@dataclass
class PrometheusConfig:
"""Configuration for Prometheus metrics."""

# Metric name prefix
metric_prefix: str = "cadence_"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we actually want to expose this as an option unless our other clients do. It makes standard dashboards impossible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make this into a hardcoded constant to match other clients


# Default labels to apply to all metrics
default_labels: Dict[str, str] = field(default_factory=dict)

# Custom registry (if None, uses default global registry)
registry: Optional[CollectorRegistry] = None


class PrometheusMetrics(MetricsEmitter):
"""Prometheus metrics collector implementation."""

def __init__(self, config: Optional[PrometheusConfig] = None):
self.config = config or PrometheusConfig()
self.registry = self.config.registry or REGISTRY

# Track created metrics to avoid duplicates
self._counters: Dict[str, Counter] = {}
self._gauges: Dict[str, Gauge] = {}
self._histograms: Dict[str, Histogram] = {}

def _get_metric_name(self, name: str) -> str:
"""Get the full metric name with prefix."""
return f"{self.config.metric_prefix}{name}"

def _merge_labels(self, labels: Optional[Dict[str, str]]) -> Dict[str, str]:
"""Merge provided labels with default labels."""
merged = self.config.default_labels.copy()
if labels:
merged.update(labels)
return merged

def _get_or_create_counter(
self, name: str, labels: Optional[Dict[str, str]]
) -> Counter:
"""Get or create a Counter metric."""
metric_name = self._get_metric_name(name)

if metric_name not in self._counters:
label_names = list(self._merge_labels(labels).keys()) if labels else []
self._counters[metric_name] = Counter(
metric_name,
f"Counter metric for {name}",
labelnames=label_names,
registry=self.registry,
)
logger.debug(f"Created counter metric: {metric_name}")

return self._counters[metric_name]

def _get_or_create_gauge(
self, name: str, labels: Optional[Dict[str, str]]
) -> Gauge:
"""Get or create a Gauge metric."""
metric_name = self._get_metric_name(name)

if metric_name not in self._gauges:
label_names = list(self._merge_labels(labels).keys()) if labels else []
self._gauges[metric_name] = Gauge(
metric_name,
f"Gauge metric for {name}",
labelnames=label_names,
registry=self.registry,
)
logger.debug(f"Created gauge metric: {metric_name}")

return self._gauges[metric_name]

def _get_or_create_histogram(
self, name: str, labels: Optional[Dict[str, str]]
) -> Histogram:
"""Get or create a Histogram metric."""
metric_name = self._get_metric_name(name)

if metric_name not in self._histograms:
label_names = list(self._merge_labels(labels).keys()) if labels else []
self._histograms[metric_name] = Histogram(
metric_name,
f"Histogram metric for {name}",
labelnames=label_names,
registry=self.registry,
)
logger.debug(f"Created histogram metric: {metric_name}")

return self._histograms[metric_name]


def counter(
self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None
) -> None:
"""Send a counter metric."""
try:
counter = self._get_or_create_counter(key, tags)
merged_tags = self._merge_labels(tags)

if merged_tags:
counter.labels(**merged_tags).inc(n)
else:
counter.inc(n)

except Exception as e:
logger.error(f"Failed to send counter {key}: {e}")

def gauge(
self, key: str, value: float, tags: Optional[Dict[str, str]] = None
) -> None:
"""Send a gauge metric."""
try:
gauge = self._get_or_create_gauge(key, tags)
merged_tags = self._merge_labels(tags)

if merged_tags:
gauge.labels(**merged_tags).set(value)
else:
gauge.set(value)

except Exception as e:
logger.error(f"Failed to send gauge {key}: {e}")


def histogram(
self, key: str, value: float, tags: Optional[Dict[str, str]] = None
) -> None:
"""Send a histogram metric."""
try:
histogram = self._get_or_create_histogram(key, tags)
merged_tags = self._merge_labels(tags)

if merged_tags:
histogram.labels(**merged_tags).observe(value)
else:
histogram.observe(value)

except Exception as e:
logger.error(f"Failed to send histogram {key}: {e}")

def get_metrics_text(self) -> str:
"""Get metrics in Prometheus text format."""
try:
metrics_bytes = generate_latest(self.registry)
return metrics_bytes.decode("utf-8") # type: ignore[no-any-return]
except Exception as e:
logger.error(f"Failed to generate metrics text: {e}")
return ""


# Default Cadence metrics names
class CadenceMetrics(Enum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move out of prometheus.py

"""Standard Cadence client metrics."""

# Workflow metrics
WORKFLOW_STARTED_TOTAL = "workflow_started_total"
WORKFLOW_COMPLETED_TOTAL = "workflow_completed_total"
WORKFLOW_FAILED_TOTAL = "workflow_failed_total"
WORKFLOW_DURATION_SECONDS = "workflow_duration_seconds"

# Activity metrics
ACTIVITY_STARTED_TOTAL = "activity_started_total"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these metrics actual ones that exist? Do they match the Go/Java client?

ACTIVITY_COMPLETED_TOTAL = "activity_completed_total"
ACTIVITY_FAILED_TOTAL = "activity_failed_total"
ACTIVITY_DURATION_SECONDS = "activity_duration_seconds"

# Worker metrics
WORKER_TASK_POLLS_TOTAL = "worker_task_polls_total"
WORKER_TASK_POLL_ERRORS_TOTAL = "worker_task_poll_errors_total"
WORKER_ACTIVE_TASKS = "worker_active_tasks"

# Client metrics
CLIENT_REQUESTS_TOTAL = "client_requests_total"
CLIENT_REQUEST_DURATION_SECONDS = "client_request_duration_seconds"
CLIENT_REQUEST_ERRORS_TOTAL = "client_request_errors_total"
7 changes: 7 additions & 0 deletions cadence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel
from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub
from cadence.data_converter import DataConverter, DefaultDataConverter
from cadence._internal.visibility.metrics import MetricsEmitter, NoOpMetricsEmitter


class ClientOptions(TypedDict, total=False):
Expand All @@ -24,6 +25,7 @@ class ClientOptions(TypedDict, total=False):
channel_arguments: dict[str, Any]
credentials: ChannelCredentials | None
compression: Compression
metrics_emitter: MetricsEmitter
interceptors: list[ClientInterceptor]

_DEFAULT_OPTIONS: ClientOptions = {
Expand All @@ -34,6 +36,7 @@ class ClientOptions(TypedDict, total=False):
"channel_arguments": {},
"credentials": None,
"compression": Compression.NoCompression,
"metrics_emitter": NoOpMetricsEmitter(),
"interceptors": [],
}

Expand Down Expand Up @@ -69,6 +72,10 @@ def worker_stub(self) -> WorkerAPIStub:
def workflow_stub(self) -> WorkflowAPIStub:
return self._workflow_stub

@property
def metrics_emitter(self) -> MetricsEmitter:
return self._options["metrics_emitter"]

async def ready(self) -> None:
await self._channel.channel_ready()

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"msgspec>=0.19.0",
"protobuf==5.29.1",
"typing-extensions>=4.0.0",
"prometheus-client>=0.21.0",
]

[project.optional-dependencies]
Expand Down
Loading