This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Monibuca is a high-performance streaming server framework written in Go. It's designed to be a modular, scalable platform for real-time audio/video streaming with support for multiple protocols including RTMP, RTSP, HLS, WebRTC, GB28181, and more.
Basic Run (with SQLite):
cd example/default
go run -tags sqlite main.goBuild Tags:
sqlite- Enable SQLite database supportsqliteCGO- Enable SQLite with CGOmysql- Enable MySQL database supportpostgres- Enable PostgreSQL database supportduckdb- Enable DuckDB database supportdisable_rm- Disable memory poolfasthttp- Use fasthttp instead of net/httptaskpanic- Enable panics for testing
Protocol Buffer Generation:
# Generate all proto files
sh scripts/protoc.sh
# Generate specific plugin proto
sh scripts/protoc.sh plugin_nameRelease Building:
# Uses goreleaser configuration
goreleaser buildTesting:
go test ./...Server (server.go): Main server instance that manages plugins, streams, and configurations. Implements the central event loop and lifecycle management.
Plugin System (plugin.go): Modular architecture where functionality is provided through plugins. Each plugin implements the IPlugin interface and can provide:
- Protocol handlers (RTMP, RTSP, etc.)
- Media transformers
- Pull/Push proxies
- Recording capabilities
- Custom HTTP endpoints
Configuration System (pkg/config/): Hierarchical configuration system with priority order: dynamic modifications > environment variables > config files > default YAML > global config > defaults.
Task System (pkg/task/): Advanced asynchronous task management system with multiple layers:
- Task: Basic unit of work with lifecycle management (Start/Run/Dispose)
- Job: Container that manages multiple child tasks and provides event loops
- Work: Special type of Job that acts as a persistent queue manager (keepalive=true)
- Channel: Event-driven task for handling continuous data streams
Work (Queue Manager)
└── Job (Container with Event Loop)
└── Task (Basic Work Unit)
├── Start() - Initialization phase
├── Run() - Main execution phase
└── Dispose() - Cleanup phase
The Task system supports sophisticated queue-based processing patterns:
- Work as Queue Manager: Work instances stay alive indefinitely and manage queues of tasks
- Task Queuing: Use
workInstance.AddTask(task, logger)to queue tasks - Automatic Lifecycle: Tasks are automatically started, executed, and disposed
- Error Handling: Built-in retry mechanisms and error propagation
Example Pattern (from S3 plugin):
type UploadQueueTask struct {
task.Work // Persistent queue manager
}
type FileUploadTask struct {
task.Task // Individual work item
// ... task-specific fields
}
// Initialize queue manager (typically in init())
var uploadQueueTask UploadQueueTask
m7s.Servers.AddTask(&uploadQueueTask)
// Queue individual tasks
uploadQueueTask.AddTask(&FileUploadTask{...}, logger)Tasks can coordinate across different plugins through:
- Global Instance Pattern: Plugins expose global instances for cross-plugin access
- Event-based Triggers: One plugin triggers tasks in another plugin
- Shared Queue Managers: Multiple plugins can use the same Work instance
Example (MP4 → S3 Integration):
// In MP4 plugin: trigger S3 upload after recording completes
s3plugin.TriggerUpload(filePath, deleteAfter)
// S3 plugin receives trigger and queues upload task
func TriggerUpload(filePath string, deleteAfter bool) {
if s3PluginInstance != nil {
s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter)
}
}Publisher: Handles incoming media streams and manages track information
Subscriber: Handles outgoing media streams to clients
Puller: Pulls streams from external sources
Pusher: Pushes streams to external destinations
Transformer: Processes/transcodes media streams
Recorder: Records streams to storage
- Publisher receives media data and creates tracks
- Tracks handle audio/video data with specific codecs
- Subscribers attach to publishers to receive media
- Transformers can process streams between publishers and subscribers
- Plugins provide protocol-specific implementations
Monibuca implements a sophisticated post-recording processing pipeline:
- Recording Completion: MP4 recorder finishes writing stream data
- Trailer Writing: Asynchronous task moves MOOV box to file beginning for web compatibility
- File Optimization: Temporary file operations ensure atomic updates
- External Storage Integration: Automatic upload to S3-compatible services
- Cleanup: Optional local file deletion after successful upload
This workflow uses queue-based task processing to avoid blocking the main recording pipeline.
- Implement the
IPlugininterface - Define plugin metadata using
PluginMeta - Register with
InstallPlugin[YourPluginType](meta) - Optionally implement protocol-specific interfaces:
ITCPPluginfor TCP serversIUDPPluginfor UDP serversIQUICPluginfor QUIC serversIRegisterHandlerfor HTTP endpoints
- Init: Configuration parsing and initialization
- Start: Network listeners and task registration
- Run: Active operation
- Dispose: Cleanup and shutdown
// Expose global instance for cross-plugin access
var s3PluginInstance *S3Plugin
func (p *S3Plugin) Start() error {
s3PluginInstance = p // Set global instance
// ... rest of start logic
}
// Provide public API functions
func TriggerUpload(filePath string, deleteAfter bool) {
if s3PluginInstance != nil {
s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter)
}
}// In one plugin: trigger event after completion
if t.filePath != "" {
t.Info("MP4 file processing completed, triggering S3 upload")
s3plugin.TriggerUpload(t.filePath, false)
}Multiple plugins can share Work instances for coordinated processing.
type MyTask struct {
task.Task
// ... custom fields
}
func (t *MyTask) Start() error {
// Initialize resources, validate inputs
return nil
}
func (t *MyTask) Run() error {
// Main work execution
// Return task.ErrTaskComplete for successful completion
return nil
}type MyQueueManager struct {
task.Work
}
var myQueue MyQueueManager
func init() {
m7s.Servers.AddTask(&myQueue)
}
// Queue tasks from anywhere
myQueue.AddTask(&MyTask{...}, logger)- Tasks automatically support retry mechanisms
- Use
task.SetRetry(maxRetry, interval)for custom retry behavior - Return
task.ErrTaskCompletefor successful completion - Return other errors to trigger retry or failure handling
- HTTP/TCP/UDP/QUIC listeners
- Database connections (SQLite, MySQL, PostgreSQL, DuckDB)
- Authentication settings
- Admin interface settings
- Global stream alias mappings
Each plugin can define its own configuration structure that gets merged with global settings.
Supports multiple database backends:
- SQLite: Default lightweight option
- MySQL: Production deployments
- PostgreSQL: Production deployments
- DuckDB: Analytics use cases
Automatic migration is handled for core models including users, proxies, and stream aliases.
- RTMP: Real-time messaging protocol
- RTSP: Real-time streaming protocol
- HLS: HTTP live streaming
- WebRTC: Web real-time communication
- GB28181: Chinese surveillance standard
- FLV: Flash video format
- MP4: MPEG-4 format with post-processing capabilities
- SRT: Secure reliable transport
- S3: File upload integration with AWS S3/MinIO compatibility
- JWT-based authentication for admin interface
- Stream-level authentication with URL signing
- Role-based access control (admin/user)
- Webhook support for external auth integration
- Follow existing patterns and naming conventions
- Use the task system for async operations
- Implement proper error handling and logging
- Use the configuration system for all settings
- Unit tests should be placed alongside source files
- Integration tests can use the example configurations
- Use the mock.py script for protocol testing
- Always use Work instances for queue management
- Implement proper Start/Run lifecycle in tasks
- Use global instance pattern for cross-plugin communication
- Handle errors gracefully with appropriate retry strategies
- Memory pool is enabled by default (disable with
disable_rm) - Zero-copy design for media data where possible
- Lock-free data structures for high concurrency
- Efficient buffer management with ring buffers
- Queue-based processing prevents blocking main threads
- Performance monitoring and profiling
- Real-time metrics via Prometheus endpoint (
/api/metrics) - pprof integration for memory/cpu profiling
- Structured logging with zerolog
- Configurable log levels
- Log rotation support
- Fatal crash logging
- Tasks automatically include detailed logging with task IDs and types
- Use
task.Debug/Info/Warn/Errormethods for consistent logging - Task state and progress can be monitored through descriptions
- Event loop status and queue lengths are logged automatically
- Web-based admin UI served from
admin.zip - RESTful API for all operations
- Real-time stream monitoring
- Configuration management
- User management (when auth enabled)
- Default HTTP port: 8080
- Default gRPC port: 50051
- Check plugin-specific port configurations
- Ensure proper build tags for database support
- Check DSN configuration strings
- Verify database file permissions
- Plugins are auto-discovered from imports
- Check plugin enable/disable status
- Verify configuration merging
- Ensure Work instances are added to server during initialization
- Check task queue status if tasks aren't executing
- Verify proper error handling in task implementation
- Monitor task retry counts and failure reasons in logs