Skip to content

Commit efee43b

Browse files
committed
feat: support nats streaming
1 parent 5ec9141 commit efee43b

File tree

3 files changed

+112
-13
lines changed

3 files changed

+112
-13
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies = [
1717
"httpx>=0.24.0,<1.0.0", # For HTTP client
1818
"structlog>=25.4.0,<26.0.0", # For structured logging
1919
"python-dotenv>=1.0.0,<2.0.0", # For environment variables
20+
"nats-py>=2.6.0,<3.0.0", # For NATS streaming
2021
# Note: Langflow dependencies will be added via path or package reference
2122
]
2223

src/node/api.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import os
99
import sys
1010
import time
11+
import uuid
1112
from typing import Any
1213

1314
from fastapi import FastAPI, HTTPException
@@ -43,6 +44,29 @@
4344

4445
app = FastAPI(title="Langflow Executor Node", version="0.1.0")
4546

47+
# Initialize NATS client (lazy connection)
48+
_nats_client = None
49+
50+
51+
async def get_nats_client():
52+
"""Get or create NATS client instance."""
53+
global _nats_client
54+
if _nats_client is None:
55+
logger.info("[NATS] Creating new NATS client instance...")
56+
from node.nats import NATSClient
57+
nats_url = os.getenv("NATS_URL", "nats://localhost:4222")
58+
logger.info(f"[NATS] Connecting to NATS at {nats_url}")
59+
_nats_client = NATSClient(nats_url=nats_url)
60+
try:
61+
await _nats_client.connect()
62+
logger.info("[NATS] ✅ Successfully connected to NATS")
63+
except Exception as e:
64+
logger.warning(f"[NATS] ❌ Failed to connect to NATS (non-critical): {e}", exc_info=True)
65+
_nats_client = None
66+
else:
67+
logger.debug("[NATS] Using existing NATS client instance")
68+
return _nats_client
69+
4670

4771
class ComponentState(BaseModel):
4872
"""Component state for execution."""
@@ -55,6 +79,7 @@ class ComponentState(BaseModel):
5579
config: dict[str, Any] | None = None
5680
display_name: str | None = None
5781
component_id: str | None = None
82+
stream_topic: str | None = None # NATS stream topic for publishing results
5883

5984

6085
class ExecutionRequest(BaseModel):
@@ -64,6 +89,7 @@ class ExecutionRequest(BaseModel):
6489
method_name: str
6590
is_async: bool = False
6691
timeout: int = 30
92+
message_id: str | None = None # Unique message ID from backend for tracking published messages
6793

6894

6995
class ExecutionResponse(BaseModel):
@@ -74,6 +100,7 @@ class ExecutionResponse(BaseModel):
74100
result_type: str
75101
execution_time: float
76102
error: str | None = None
103+
message_id: str | None = None # Unique ID for the published NATS message
77104

78105

79106
async def load_component_class(
@@ -439,12 +466,16 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse:
439466

440467
try:
441468
# Log what we received
442-
logger.info(
469+
stream_topic_value = request.component_state.stream_topic
470+
log_msg = (
443471
f"Received execution request: "
444472
f"class={request.component_state.component_class}, "
445473
f"module={request.component_state.component_module}, "
446-
f"code_length={len(request.component_state.component_code or '') if request.component_state.component_code else 0}"
474+
f"code_length={len(request.component_state.component_code or '') if request.component_state.component_code else 0}, "
475+
f"stream_topic={stream_topic_value}"
447476
)
477+
logger.info(log_msg)
478+
print(f"[EXECUTOR] {log_msg}") # Also print to ensure visibility
448479

449480
# Load component class dynamically
450481
component_class = await load_component_class(
@@ -517,11 +548,52 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse:
517548
f"in {execution_time:.3f}s, result type: {type(result).__name__}"
518549
)
519550

551+
# Use message_id from request (generated by backend) or generate one if not provided
552+
message_id = request.message_id or str(uuid.uuid4())
553+
554+
# Publish result to NATS stream if topic is provided
555+
if request.component_state.stream_topic:
556+
topic = request.component_state.stream_topic
557+
logger.info(f"[NATS] Attempting to publish to topic: {topic} with message_id: {message_id}")
558+
print(f"[NATS] Attempting to publish to topic: {topic} with message_id: {message_id}")
559+
try:
560+
nats_client = await get_nats_client()
561+
if nats_client:
562+
logger.info(f"[NATS] NATS client obtained, preparing publish data...")
563+
print(f"[NATS] NATS client obtained, preparing publish data...")
564+
# Publish result to NATS with message ID from backend
565+
publish_data = {
566+
"message_id": message_id, # Use message_id from backend request
567+
"component_id": request.component_state.component_id,
568+
"component_class": request.component_state.component_class,
569+
"result": serialized_result,
570+
"result_type": type(result).__name__,
571+
"execution_time": execution_time,
572+
}
573+
logger.info(f"[NATS] Publishing to topic: {topic}, message_id: {message_id}, data keys: {list(publish_data.keys())}")
574+
print(f"[NATS] Publishing to topic: {topic}, message_id: {message_id}, data keys: {list(publish_data.keys())}")
575+
# Use the topic directly (already in format: droq.local.public.userid.workflowid.component.out)
576+
await nats_client.publish(topic, publish_data)
577+
logger.info(f"[NATS] ✅ Successfully published result to NATS topic: {topic} with message_id: {message_id}")
578+
print(f"[NATS] ✅ Successfully published result to NATS topic: {topic} with message_id: {message_id}")
579+
else:
580+
logger.warning(f"[NATS] NATS client is None, cannot publish")
581+
print(f"[NATS] ⚠️ NATS client is None, cannot publish")
582+
except Exception as e:
583+
# Non-critical: log but don't fail execution
584+
logger.warning(f"[NATS] ❌ Failed to publish to NATS (non-critical): {e}", exc_info=True)
585+
print(f"[NATS] ❌ Failed to publish to NATS (non-critical): {e}")
586+
else:
587+
msg = f"[NATS] ⚠️ No stream_topic provided in request, skipping NATS publish. Component: {request.component_state.component_class}, ID: {request.component_state.component_id}"
588+
logger.info(msg)
589+
print(msg)
590+
520591
return ExecutionResponse(
521592
result=serialized_result,
522593
success=True,
523594
result_type=type(result).__name__,
524595
execution_time=execution_time,
596+
message_id=message_id, # Return message ID (from request or generated) so backend can match it
525597
)
526598

527599
except asyncio.TimeoutError:

src/node/nats.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,39 @@ async def _ensure_stream(self) -> None:
5454
"""Ensure the JetStream exists, create if it doesn't."""
5555
try:
5656
# Try to get stream info
57-
await self.js.stream_info(self.stream_name)
57+
stream_info = await self.js.stream_info(self.stream_name)
5858
logger.info(f"Stream '{self.stream_name}' already exists")
59-
except Exception:
59+
logger.info(f"Stream subjects: {stream_info.config.subjects}")
60+
61+
# Check if 'droq.local.public.>' is in subjects, if not, update stream
62+
required_subject = "droq.local.public.>"
63+
if required_subject not in stream_info.config.subjects:
64+
logger.warning(f"Stream '{self.stream_name}' missing required subject '{required_subject}', updating...")
65+
subjects = list(stream_info.config.subjects) + [required_subject]
66+
await self.js.update_stream(
67+
StreamConfig(
68+
name=self.stream_name,
69+
subjects=subjects,
70+
retention=stream_info.config.retention,
71+
storage=stream_info.config.storage,
72+
)
73+
)
74+
logger.info(f"Stream '{self.stream_name}' updated with subject '{required_subject}'")
75+
except Exception as e:
6076
# Stream doesn't exist, create it
61-
logger.info(f"Creating stream '{self.stream_name}'")
77+
logger.info(f"Creating stream '{self.stream_name}' (error: {e})")
6278
await self.js.add_stream(
6379
StreamConfig(
6480
name=self.stream_name,
65-
subjects=[f"{self.stream_name}.>"],
81+
subjects=[
82+
f"{self.stream_name}.>", # Backward compatibility
83+
"droq.local.public.>", # Full topic path format
84+
],
6685
retention=RetentionPolicy.WORK_QUEUE,
6786
storage=StorageType.FILE,
6887
)
6988
)
70-
logger.info(f"Stream '{self.stream_name}' created")
89+
logger.info(f"Stream '{self.stream_name}' created with subjects: ['{self.stream_name}.>', 'droq.local.public.>']")
7190

7291
async def publish(
7392
self,
@@ -79,27 +98,34 @@ async def publish(
7998
Publish a message to a NATS subject.
8099
81100
Args:
82-
subject: NATS subject to publish to
101+
subject: NATS subject to publish to (can be full topic path or relative)
83102
data: Data to publish (will be JSON encoded)
84103
headers: Optional headers to include
85104
"""
86105
if not self.js:
87106
raise RuntimeError("Not connected to NATS. Call connect() first.")
88107

89108
try:
90-
# Full subject with stream prefix
91-
full_subject = f"{self.stream_name}.{subject}"
109+
# If subject starts with "droq.", use it as full topic path
110+
# Otherwise, prefix with stream name for backward compatibility
111+
if subject.startswith("droq."):
112+
full_subject = subject
113+
else:
114+
full_subject = f"{self.stream_name}.{subject}"
92115

93116
# Encode data as JSON
94117
payload = json.dumps(data).encode()
118+
payload_size = len(payload)
119+
120+
logger.info(f"[NATS] Publishing to subject: {full_subject}, payload size: {payload_size} bytes")
95121

96122
# Publish with headers if provided
97123
if headers:
98-
await self.js.publish(full_subject, payload, headers=headers)
124+
ack = await self.js.publish(full_subject, payload, headers=headers)
99125
else:
100-
await self.js.publish(full_subject, payload)
126+
ack = await self.js.publish(full_subject, payload)
101127

102-
logger.debug(f"Published message to {full_subject}")
128+
logger.info(f"[NATS] ✅ Published message to {full_subject} (seq: {ack.seq if hasattr(ack, 'seq') else 'N/A'})")
103129
except Exception as e:
104130
logger.error(f"Failed to publish message: {e}")
105131
raise

0 commit comments

Comments
 (0)