Skip to content

Commit f26a5c5

Browse files
nshkrdotcomnshkrdotcom
authored andcommitted
p12 for streaming done
1 parent 5d9ae0e commit f26a5c5

File tree

5 files changed

+385
-50
lines changed

5 files changed

+385
-50
lines changed

ADVANCED_FEATURES.md

Lines changed: 86 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -650,14 +650,14 @@ Existing pipelines remain fully compatible. To use advanced features:
650650

651651
## 🚀 6. Async Streaming
652652

653-
Enable real-time response streaming for all Claude-based steps, providing progressive output display and better resource management.
653+
Enable real-time message streaming for all Claude-based steps, displaying complete messages as they arrive from ClaudeCodeSDK for better user experience and resource management.
654654

655655
### Why Async Streaming?
656656

657-
- **Real-time feedback**: See Claude's responses character by character as they're generated
657+
- **Real-time feedback**: See Claude's complete messages as they arrive (message-by-message streaming)
658658
- **Memory efficiency**: Stream large outputs without loading everything into memory
659659
- **Early interruption**: Stop long-running operations if they go off track
660-
- **Better user experience**: Immediate visual feedback instead of waiting
660+
- **Better user experience**: Progressive display of assistant responses, tool uses, and results
661661

662662
### Basic Configuration
663663

@@ -677,56 +677,93 @@ Enable real-time response streaming for all Claude-based steps, providing progre
677677

678678
### Stream Handlers
679679

680-
#### Console Handler
681-
Real-time terminal output with formatting options:
680+
The implementation provides 6 specialized handlers for different streaming needs:
681+
682+
#### 1. Console Handler (`console`)
683+
Fancy formatted output with styled headers and statistics:
682684

683685
```yaml
684686
stream_handler: "console"
685-
stream_console_config:
686-
show_timestamps: true # Display message timestamps
687-
color_output: true # Colorize output by type
688-
show_progress: true # Progress indicators
689-
clear_on_update: false # Keep history visible
690-
show_tool_use: true # Show tool invocations
687+
stream_handler_opts:
688+
show_header: true # Display styled header
689+
show_stats: true # Show completion statistics
690+
show_tool_use: true # Display tool invocations
691+
show_timestamps: false # Add timestamps to messages
692+
use_colors: true # Colorized output
693+
```
694+
695+
Output example:
696+
```
697+
╭─────────────────────────────────────────╮
698+
│ Claude Streaming Response │
699+
╰─────────────────────────────────────────╯
700+
701+
Assistant message content here...
702+
703+
╭─── Stream Statistics ───╮
704+
│ Messages: 3 │
705+
│ Tokens: 0 │
706+
│ Duration: 3.5s │
707+
│ Avg/msg: 1.2s │
708+
╰─────────────────────────╯
691709
```
692710

693-
#### File Handler
694-
Stream to file with automatic rotation:
711+
#### 2. Simple Handler (`simple`)
712+
Clean line-by-line output with optional timestamps:
713+
714+
```yaml
715+
stream_handler: "simple"
716+
stream_handler_opts:
717+
show_timestamps: true # Prepend timestamps
718+
```
719+
720+
Output example:
721+
```
722+
[06:54:52] ASSISTANT: I'll perform these file operations...
723+
[06:54:53] TOOL USE: Write
724+
[06:54:53] TOOL RESULT: File created successfully...
725+
[06:54:56] ASSISTANT: ✅ Step 1 completed...
726+
727+
✓ Stream completed: 5 messages in 23356ms
728+
```
729+
730+
#### 3. Debug Handler (`debug`)
731+
Complete message debugging showing all message types:
732+
733+
```yaml
734+
stream_handler: "debug"
735+
```
736+
737+
Shows system messages, assistant responses, tool uses, tool results, and metadata.
738+
739+
#### 4. File Handler (`file`)
740+
Stream messages to file with rotation support:
695741

696742
```yaml
697743
stream_handler: "file"
698-
stream_file_path: "./outputs/stream.jsonl"
699-
stream_file_format: "jsonl" # or "json", "text"
700-
stream_file_rotation:
701-
enabled: true
702-
max_size_mb: 10
703-
max_files: 5
704-
compress_old: true
744+
stream_handler_opts:
745+
file_path: "./outputs/stream.log"
746+
append: true
747+
format: "jsonl" # json lines format
705748
```
706749

707-
#### Buffer Handler
708-
Collect in memory with statistics:
750+
#### 5. Buffer Handler (`buffer`)
751+
Collect messages in memory for later processing:
709752

710753
```yaml
711754
stream_handler: "buffer"
712-
stream_buffer_config:
713-
max_size: 1000 # Maximum messages
714-
circular: true # Overwrite old messages
715-
deduplication: true # Remove duplicate messages
716-
collect_stats: true # Track message statistics
755+
stream_handler_opts:
756+
max_size: 1000 # Maximum buffer size
717757
```
718758

719-
#### Callback Handler
720-
Custom processing with filtering:
759+
#### 6. Callback Handler (`callback`)
760+
Custom message processing with your own handler:
721761

722762
```yaml
723763
stream_handler: "callback"
724-
stream_callback_config:
725-
callback_module: "MyApp.StreamProcessor"
726-
callback_function: "handle_message"
727-
filter_types: ["text", "tool_use"] # Message types to process
728-
rate_limit: 10 # Max messages per second
729-
async_callback: true # Non-blocking callbacks
764+
stream_handler_opts:
765+
module: "MyApp.StreamProcessor"
766+
function: "handle_message"
730767
```
731768

732769
### Advanced Streaming Features
@@ -783,10 +820,10 @@ Error recovery with streaming continuity:
783820

784821
### Performance Benefits
785822

786-
1. **Time to First Token**: See output immediately (typically <1s)
787-
2. **Memory Usage**: Constant memory even for large outputs
788-
3. **Throughput**: Process responses as they arrive
789-
4. **Scalability**: Handle multiple streams concurrently
823+
1. **Time to First Message**: See output as soon as first message arrives
824+
2. **Memory Usage**: Constant memory through streaming instead of buffering
825+
3. **Progressive Display**: View assistant responses, tool uses, and results in real-time
826+
4. **Message Metrics**: Track message count, duration, and timing statistics
790827

791828
### Integration Examples
792829

@@ -873,11 +910,19 @@ test "streaming improves time to first output" do
873910
end
874911
```
875912

913+
### Implementation Notes
914+
915+
- **Message-based streaming**: ClaudeCodeSDK uses `--output-format stream-json` to stream complete messages
916+
- **Not character streaming**: Each message arrives as a complete unit, not character-by-character
917+
- **Message types**: Stream includes system init, assistant messages, tool uses, tool results, and completion status
918+
- **Escaped newlines**: The handlers properly convert `\n` to actual line breaks in message content
919+
876920
### Examples
877921

878922
See complete streaming examples:
879-
- `examples/claude_streaming_example.yaml` - Basic streaming patterns
880-
- `examples/claude_streaming_advanced.yaml` - Advanced features
923+
- `examples/clean_streaming_numbers.yaml` - Simple number output example
924+
- `examples/streaming_file_operations.yaml` - Multi-message file operations
925+
- `examples/STREAMING_GUIDE.md` - Complete streaming implementation guide
881926
- `test/integration/async_streaming_test.exs` - Comprehensive tests
882927

883928
---

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
### Added
11+
- **Async Streaming Support**: Real-time message streaming for all Claude-based steps
12+
- Message-by-message streaming displays complete messages as they arrive from ClaudeCodeSDK
13+
- 6 specialized stream handlers: console, simple, debug, file, buffer, and callback
14+
- Works with all Claude step types (claude, claude_smart, claude_session, etc.)
15+
- Progressive display of assistant responses, tool uses, and results
16+
- Memory-efficient processing without buffering entire responses
17+
- Full mock support for testing without API calls
18+
- Documentation: ASYNC_STREAMING_MIGRATION_GUIDE.md and examples/STREAMING_GUIDE.md
19+
820
## [0.0.1] - 2025-01-05
921

1022
**Maintainer**: NSHkr <ZeroTrust@NSHkr.com>

README.md

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -551,14 +551,14 @@ mix pipeline.run examples/claude_session_example.yaml
551551

552552
## 🚀 Async Streaming (NEW)
553553

554-
The pipeline now supports **real-time response streaming** for all Claude-based steps, enabling progressive output display and lower memory usage for large responses.
554+
The pipeline now supports **real-time message streaming** for all Claude-based steps, displaying complete messages as they arrive from ClaudeCodeSDK for better user experience.
555555

556556
### Why Use Async Streaming?
557557

558-
- **Real-time feedback**: See Claude's responses as they're generated
559-
- **Lower memory usage**: Stream large outputs without loading everything into memory
560-
- **Better UX**: Users see progress immediately instead of waiting for completion
561-
- **Early error detection**: Interrupt long-running operations if needed
558+
- **Real-time feedback**: See Claude's complete messages as they arrive (message-by-message)
559+
- **Lower memory usage**: Process messages without buffering entire responses
560+
- **Better UX**: Progressive display of assistant responses, tool uses, and results
561+
- **Visibility**: Watch Claude's tool usage and thinking process in real-time
562562

563563
### Basic Streaming Example
564564

@@ -617,16 +617,18 @@ The pipeline now supports **real-time response streaming** for all Claude-based
617617
### Streaming Examples
618618

619619
```bash
620-
# Basic streaming example
621-
mix pipeline.run examples/claude_streaming_example.yaml
620+
# Simple streaming example
621+
mix pipeline.run examples/clean_streaming_numbers.yaml
622622
623-
# Advanced streaming with callbacks and error handling
624-
mix pipeline.run examples/claude_streaming_advanced.yaml
623+
# Multi-message streaming with file operations
624+
mix pipeline.run examples/streaming_file_operations.yaml
625625
626626
# Run streaming tests
627627
mix test test/integration/async_streaming_test.exs
628628
```
629629

630+
📖 **Complete Streaming Guide**: See [examples/STREAMING_GUIDE.md](examples/STREAMING_GUIDE.md) for implementation details and [docs/ASYNC_STREAMING_MIGRATION_GUIDE.md](docs/ASYNC_STREAMING_MIGRATION_GUIDE.md) for adding streaming to existing pipelines.
631+
630632
### Works with All Claude Step Types
631633

632634
Async streaming is supported by all Claude-based steps:

docs/20250704_yaml_format_v2/10_quick_reference.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ claude_options:
198198
append_system_prompt: "Be concise"
199199
cwd: "./workspace"
200200

201+
# Async Streaming
202+
async_streaming: true # Enable message streaming
203+
stream_handler: "console" # console|simple|debug|file|buffer|callback
204+
stream_buffer_size: 10 # Messages to buffer
205+
stream_handler_opts: # Handler-specific options
206+
show_timestamps: true
207+
show_tool_use: true
208+
201209
# Advanced
202210
session_id: "session_123"
203211
retry_config:
@@ -389,6 +397,18 @@ workflow:
389397
data: "{{data}}"
390398
```
391399

400+
### Async Streaming
401+
```yaml
402+
- name: "streaming_claude"
403+
type: "claude"
404+
claude_options:
405+
async_streaming: true
406+
stream_handler: "simple"
407+
stream_handler_opts:
408+
show_timestamps: true
409+
prompt: "Generate a report..."
410+
```
411+
392412
## Debugging
393413

394414
```bash

0 commit comments

Comments
 (0)