Skip to content

Commit db3b4f7

Browse files
Merge feat/migrate-to-msgtrace-sdk into init
Complete migration from built-in OpenTelemetry to msgtrace-sdk. Key changes: - Integrated msgtrace-sdk v1.1.0 as core telemetry dependency - Added new attributes to msgtrace-sdk for msgflux compatibility - Removed all redundant telemetry abstractions - Clean, direct integration with msgtrace-sdk - Proper async/sync instrumentation All tests passing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2 parents a75b097 + 7eafa3f commit db3b4f7

File tree

13 files changed

+361
-885
lines changed

13 files changed

+361
-885
lines changed

CLAUDE.md

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ The project is being split into specialized libraries:
2525
- **Integration**: Already listed in pyproject.toml dependencies
2626
- **Note**: Not to be confused with msgtrace - this is msgspec-ext
2727

28-
#### 2. msgtrace (In Development)
29-
- **Purpose**: Isolated observability service
28+
#### 2. msgtrace-sdk (Integrated ✓)
29+
- **Purpose**: OpenTelemetry-based tracing SDK for AI applications
30+
- **Status**: Integrated as core dependency (v1.0.0)
3031
- **Components**:
31-
- Backend for trace collection
32-
- Frontend dashboard
33-
- SDK (msgtrace-sdk) with similar interface to msgflux's current implementation
34-
- **Status**: Currently being developed separately
35-
- **Migration Plan**: Will replace built-in OpenTelemetry observability in msgflux
32+
- Spans API for creating traces (flows, modules, operations)
33+
- MsgTraceAttributes for GenAI semantic conventions
34+
- Zero-overhead when disabled
35+
- Thread-safe with async support
36+
- **Integration**: msgflux now uses msgtrace-sdk for all telemetry
37+
- **Note**: msgflux keeps specialized decorators for tools and agents on top of msgtrace-sdk base
3638

3739
#### 3. txml (Rust Implementation)
3840
- **Purpose**: XML parsing
@@ -45,13 +47,16 @@ The project is being split into specialized libraries:
4547
- **Language**: Rust
4648
- **Status**: New Rust-based implementation
4749

48-
### Current Built-in Features (To Be Migrated)
50+
### Telemetry Architecture (Updated ✓)
4951

50-
The msgflux codebase still includes:
51-
- OpenTelemetry-based observability (src/msgflux/telemetry/)
52-
- Will be replaced by msgtrace integration
53-
- Python XML parsers (src/msgflux/dsl/typed_parsers/)
54-
- Will be replaced by txml references
52+
msgflux now uses msgtrace-sdk for telemetry:
53+
- **Core**: msgtrace-sdk provides Spans API and MsgTraceAttributes
54+
- **Extensions**: msgflux adds specialized decorators for tools and agents
55+
- **Configuration**: Environment variables mapped from MSGFLUX_* to MSGTRACE_*
56+
- **Features Preserved**:
57+
- Tool execution tracking (local/remote, protocols like MCP)
58+
- Agent telemetry (name, ID, responses)
59+
- Detailed argument and response capture
5560

5661
## Project Structure
5762

pyproject.toml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,17 @@ readme = "README.md"
1111
authors = [
1212
{ name = "Vilson Rodrigues", email = "vilson@msgflux.com" }
1313
]
14-
requires-python = ">=3.9"
14+
requires-python = ">=3.10"
1515
classifiers = [
1616
"Development Status :: 4 - Beta",
1717
"Intended Audience :: Developers",
18-
"Intended Audience :: Information Technology",
18+
"Intended Audience :: Information Technology",
1919
"Intended Audience :: Science/Research",
2020
"License :: OSI Approved :: MIT License",
2121
"Operating System :: MacOS :: MacOS X",
2222
"Operating System :: POSIX :: Linux",
2323
"Operating System :: Microsoft :: Windows",
2424
"Programming Language :: Python",
25-
"Programming Language :: Python :: 3.9",
2625
"Programming Language :: Python :: 3.10",
2726
"Programming Language :: Python :: 3.11",
2827
"Programming Language :: Python :: 3.12",
@@ -33,9 +32,7 @@ classifiers = [
3332
dependencies = [
3433
"jinja2>=3.1.6",
3534
"msgspec-ext>=0.1.0",
36-
"opentelemetry-api>=1.35.0",
37-
"opentelemetry-exporter-otlp>=1.35.0",
38-
"opentelemetry-sdk>=1.35.0",
35+
"msgtrace-sdk>=1.1.0",
3936
"tenacity>=8.2.3",
4037
"typing-extensions>=4.14.1",
4138
"uvloop>=0.21.0 ; sys_platform != 'win32'",

src/msgflux/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from .message import Message
1111
from .models.gateway import ModelGateway
1212
from .models import Model
13-
from .telemetry.span import instrument
13+
from .telemetry import Spans
1414
from .utils.chat import ChatBlock, ChatML
1515
from .utils.console import cprint
1616
from .utils.inspect import get_fn_name
@@ -39,7 +39,7 @@
3939
"dotdict",
4040
"get_fn_name",
4141
"inline",
42-
"instrument",
42+
"Spans",
4343
"load",
4444
"msgspec_dumps",
4545
"response_cache",

src/msgflux/envs.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def set_envs(**kwargs: Any):
2525

2626
class EnvironmentVariables(BaseSettings):
2727
model_config = SettingsConfigDict(
28-
env_file=".msgflux_env",
28+
env_file=".env",
2929
env_prefix="msgflux_",
3030
)
3131

@@ -58,27 +58,16 @@ class EnvironmentVariables(BaseSettings):
5858
# If set to True, msgflux will trace function calls. Useful for debugging
5959
trace_function: bool = False
6060

61-
# if set, msgflux will track executions in nn modules using OTel
62-
telemetry_requires_trace: bool = False
63-
64-
# OTLP endpoint
65-
telemetry_otlp_endpoint: str = "http://localhost:8000/api/v1/traces/export"
66-
67-
# Span exporter type
68-
telemetry_span_exporter_type: Literal["console", "otlp"] = "console"
69-
70-
# Capture state dict
71-
telemetry_capture_state_dict: bool = False
72-
73-
# Capture platform details
74-
telemetry_capture_platform: bool = False
75-
61+
# Telemetry configuration (msgflux-specific)
7662
# Capture tool call responses
7763
telemetry_capture_tool_call_responses: bool = True
7864

7965
# Capture agent state, system prompt and tool schemas
8066
telemetry_capture_agent_prepare_model_execution: bool = False
8167

68+
# Capture state dict in module execution
69+
telemetry_capture_state_dict: bool = False
70+
8271
# State checkpoint, if True, if a module output is in message, skip process
8372
# if False, reprocess
8473
state_checkpoint: bool = False
@@ -106,3 +95,30 @@ class EnvironmentVariables(BaseSettings):
10695

10796

10897
envs = EnvironmentVariables()
98+
99+
100+
def configure_msgtrace_env(
101+
enabled: bool = True,
102+
otlp_endpoint: str = "http://localhost:8000/api/v1/traces/export",
103+
exporter: Literal["console", "otlp"] = "otlp",
104+
service_name: str = "msgflux",
105+
capture_platform: bool = True,
106+
):
107+
"""Configure msgtrace-sdk environment variables.
108+
109+
This is a convenience function for users to quickly configure
110+
msgtrace-sdk. Users can also configure msgtrace-sdk directly
111+
using MSGTRACE_* environment variables.
112+
113+
Args:
114+
enabled: Enable telemetry tracking
115+
otlp_endpoint: OTLP endpoint URL
116+
exporter: Exporter type ("otlp" or "console")
117+
service_name: Service name for telemetry
118+
capture_platform: Capture platform details (CPU, OS, Python version)
119+
"""
120+
os.environ["MSGTRACE_TELEMETRY_ENABLED"] = "true" if enabled else "false"
121+
os.environ["MSGTRACE_OTLP_ENDPOINT"] = otlp_endpoint
122+
os.environ["MSGTRACE_EXPORTER"] = exporter
123+
os.environ["MSGTRACE_SERVICE_NAME"] = service_name
124+
os.environ["MSGTRACE_CAPTURE_PLATFORM"] = "true" if capture_platform else "false"

src/msgflux/nn/functional.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from msgflux.dotdict import dotdict
99
from msgflux.logger import logger
1010
from msgflux.nn.modules.module import get_callable_name
11-
from msgflux.telemetry.span import instrument
11+
from msgflux.telemetry import Spans
1212

1313
__all__ = [
1414
"abackground_task",
@@ -27,7 +27,7 @@
2727
]
2828

2929

30-
@instrument("map_gather")
30+
@Spans.instrument()
3131
def map_gather(
3232
to_send: Callable,
3333
*,
@@ -107,7 +107,7 @@ def multiply(x, y=2): return x * y
107107
return tuple(responses)
108108

109109

110-
@instrument("scatter_gather")
110+
@Spans.instrument()
111111
def scatter_gather(
112112
to_send: List[Callable],
113113
args_list: Optional[List[Tuple[Any, ...]]] = None,
@@ -198,7 +198,7 @@ def farewell(person_name): return f"Goodbye, {person_name}"
198198
return tuple(responses)
199199

200200

201-
@instrument("msg_scatter_gather")
201+
@Spans.instrument()
202202
def msg_scatter_gather(
203203
to_send: List[Callable],
204204
messages: List[dotdict],
@@ -250,7 +250,7 @@ def msg_scatter_gather(
250250
return tuple(messages)
251251

252252

253-
@instrument("bcast_gather")
253+
@Spans.instrument()
254254
def bcast_gather(
255255
to_send: List[Callable], *args, timeout: Optional[float] = None, **kwargs
256256
) -> Tuple[Any, ...]:
@@ -307,7 +307,7 @@ def fail(x): raise ValueError("Intentional error")
307307
return tuple(responses)
308308

309309

310-
@instrument("msg_bcast_gather")
310+
@Spans.instrument()
311311
def msg_bcast_gather(
312312
to_send: List[Callable],
313313
message: dotdict,
@@ -350,7 +350,7 @@ def msg_bcast_gather(
350350
return message
351351

352352

353-
@instrument("wait_for")
353+
@Spans.instrument()
354354
def wait_for(
355355
to_send: Callable, *args, timeout: Optional[float] = None, **kwargs
356356
) -> Any:
@@ -394,7 +394,7 @@ async def f1(x):
394394
return None
395395

396396

397-
@instrument("wait_for_event")
397+
@Spans.instrument()
398398
def wait_for_event(event: asyncio.Event) -> None:
399399
"""Waits synchronously for an asyncio.Event to be set.
400400
@@ -417,7 +417,7 @@ def wait_for_event(event: asyncio.Event) -> None:
417417
logger.error(str(e))
418418

419419

420-
@instrument("background_task")
420+
@Spans.instrument()
421421
def background_task(to_send: Callable, *args, **kwargs) -> None:
422422
"""Executes a task in the background asynchronously without blocking,
423423
using the AsyncExecutorPool. This function is "fire-and-forget".
@@ -468,7 +468,7 @@ def log_future(future: Future) -> None:
468468
future.add_done_callback(log_future)
469469

470470

471-
@instrument("abackground_task")
471+
@Spans.ainstrument()
472472
async def abackground_task(to_send: Callable, *args, **kwargs) -> None:
473473
"""Executes an async task in the background without blocking.
474474
This is a truly async "fire-and-forget" function.
@@ -518,7 +518,7 @@ async def run_task():
518518
asyncio.create_task(run_task())
519519

520520

521-
@instrument("await_for_event")
521+
@Spans.ainstrument()
522522
async def await_for_event(event: asyncio.Event) -> None:
523523
"""Waits asynchronously for an asyncio.Event to be set.
524524
@@ -549,7 +549,7 @@ async def setter():
549549
await event.wait()
550550

551551

552-
@instrument("amap_gather")
552+
@Spans.ainstrument()
553553
async def amap_gather(
554554
to_send: Callable,
555555
*,
@@ -613,7 +613,7 @@ async def amap_gather(
613613
return tuple(results)
614614

615615

616-
@instrument("ascatter_gather")
616+
@Spans.ainstrument()
617617
async def ascatter_gather(
618618
to_send: List[Callable],
619619
args_list: Optional[List[Tuple[Any, ...]]] = None,
@@ -676,7 +676,7 @@ async def ascatter_gather(
676676
return tuple(results)
677677

678678

679-
@instrument("amsg_bcast_gather")
679+
@Spans.ainstrument()
680680
async def amsg_bcast_gather(
681681
to_send: List[Callable],
682682
message: dotdict,

src/msgflux/nn/modules/module.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
from msgflux.models.model import Model
3838
from msgflux.models.response import ModelResponse, ModelStreamResponse
3939
from msgflux.nn.parameter import Parameter
40-
from msgflux.telemetry.span import spans
40+
from msgflux.telemetry import Spans
41+
from msgtrace.sdk import MsgTraceAttributes
4142
from msgflux.utils.convert import convert_camel_snake_to_title
4243
from msgflux.utils.encode import encode_data_to_base64
4344
from msgflux.utils.hooks import RemovableHandle
@@ -401,7 +402,6 @@ def __init__(self, *args, **kwargs) -> None:
401402
super().__setattr__("_load_state_dict_pre_hooks", OrderedDict())
402403
super().__setattr__("_load_state_dict_post_hooks", OrderedDict())
403404
super().__setattr__("_modules", {})
404-
super().__setattr__("_spans", spans)
405405

406406
if self.call_super_init:
407407
super().__init__(*args, **kwargs)
@@ -1382,8 +1382,10 @@ def _execute_with_span(self, module_name_title: str, module_type: str, *args, **
13821382
Returns:
13831383
Module output from forward method
13841384
"""
1385-
with self._spans.init_module(module_name_title, module_type) as span:
1385+
with Spans.init_module(module_name_title, module_type) as span:
13861386
try:
1387+
MsgTraceAttributes.set_module_name(module_name_title)
1388+
MsgTraceAttributes.set_module_type(module_type)
13871389
result = self.forward(*args, **kwargs)
13881390
span.set_status(Status(StatusCode.OK))
13891391
return result
@@ -1406,10 +1408,12 @@ def _call(self, *args, **kwargs):
14061408
current_span = trace.get_current_span()
14071409
# If there is no active span or it is not recording, this is the root module
14081410
if current_span is None or not current_span.is_recording():
1409-
with self._spans.init_flow(
1411+
with Spans.init_flow(
14101412
module_name_title, module_type, encoded_state_dict
14111413
) as span:
14121414
try:
1415+
MsgTraceAttributes.set_module_name(module_name_title)
1416+
MsgTraceAttributes.set_module_type(module_type)
14131417
module_output = self.forward(*args, **kwargs)
14141418
span.set_status(Status(StatusCode.OK))
14151419
return module_output
@@ -1458,8 +1462,10 @@ async def _aexecute_with_span(self, module_name_title: str, module_type: str, *a
14581462
Returns:
14591463
Module output from aforward method
14601464
"""
1461-
async with self._spans.ainit_module(module_name_title, module_type) as span:
1465+
async with Spans.ainit_module(module_name_title, module_type) as span:
14621466
try:
1467+
MsgTraceAttributes.set_module_name(module_name_title)
1468+
MsgTraceAttributes.set_module_type(module_type)
14631469
result = await self.aforward(*args, **kwargs)
14641470
span.set_status(Status(StatusCode.OK))
14651471
return result
@@ -1482,10 +1488,12 @@ async def _acall(self, *args, **kwargs):
14821488
current_span = trace.get_current_span()
14831489
# If there is no active span or it is not recording, this is the root module
14841490
if current_span is None or not current_span.is_recording():
1485-
async with self._spans.ainit_flow(
1491+
async with Spans.ainit_flow(
14861492
module_name_title, module_type, encoded_state_dict
14871493
) as span:
14881494
try:
1495+
MsgTraceAttributes.set_module_name(module_name_title)
1496+
MsgTraceAttributes.set_module_type(module_type)
14891497
module_output = await self.aforward(*args, **kwargs)
14901498
span.set_status(Status(StatusCode.OK))
14911499
return module_output

0 commit comments

Comments
 (0)