This directory contains components used by both master and worker nodes in the distributed system.
Shared components provide:
- Common data models: Ensure API compatibility between master and workers
- Shared libraries: Reusable Go packages for authentication, storage, etc.
- Utilities and scripts: Tools that can be run from any node
- ML models: Energy efficiency scoring models
- Documentation: Project-wide documentation
shared/
├── pkg/ # Shared Go packages
│ ├── api/ # HTTP API handlers and definitions
│ ├── models/ # Data models (Job, Node, Result, etc.)
│ ├── auth/ # Authentication middleware
│ ├── agent/ # Agent logic (hardware detection, execution)
│ ├── store/ # Storage interfaces (SQLite, in-memory)
│ ├── tls/ # TLS utilities for secure connections
│ ├── metrics/ # Prometheus metrics utilities
│ └── logging/ # Structured logging
├── scripts/ # Utility scripts
│ ├── run_tests.py # Test runner
│ ├── analyze_results.py # Results analyzer
│ ├── run_benchmarks.sh # Benchmark automation
│ └── retrain_models.py # ML model retraining
├── advisor/ # Energy efficiency advisor
│ └── quality/ # ML models for efficiency scoring
├── models/ # Trained ML model files
│ └── x86_64_linux/ # Platform-specific models
└── docs/ # Project documentation
├── architecture.md
├── distributed_architecture_v1.md
├── DEPLOYMENT_MODES.md
└── ...
These packages are imported by both cmd/master and cmd/agent:
HTTP API request/response handlers used by master
Example:
import "github.com/psantana5/ffmpeg-rtmp/pkg/api"
router := api.NewRouter(store, jobQueue)Core data structures for distributed system
Key models:
Node: Represents a registered worker nodeJob: Transcoding job with parametersResult: Job execution resultsHardwareCapabilities: CPU/GPU/RAM specs
Example:
import "github.com/psantana5/ffmpeg-rtmp/pkg/models"
node := &models.Node{
ID: uuid.New().String(),
Address: "worker-01",
Type: "desktop",
CPUThreads: 8,
HasGPU: false,
}Authentication middleware for API endpoints
Features:
- API key validation
- Bearer token support
- Environment variable configuration
Example:
import "github.com/psantana5/ffmpeg-rtmp/pkg/auth"
// On master
router.Use(auth.RequireAPIKey(apiKey))
// On agent
client := &http.Client{}
req.Header.Set("Authorization", "Bearer "+apiKey)Agent-specific logic used by workers
Functions:
DetectHardware(): Auto-detect CPU, GPU, RAMExecuteJob(): Run FFmpeg transcodingAnalyzeResults(): Calculate efficiency scores
Example:
import "github.com/psantana5/ffmpeg-rtmp/pkg/agent"
caps, err := agent.DetectHardware()
result, err := agent.ExecuteJob(job, caps)Storage abstractions for master node
Implementations:
MemoryStore: In-memory storage (development)SQLiteStore: SQLite persistence (production)
Example:
import "github.com/psantana5/ffmpeg-rtmp/pkg/store"
store, err := store.NewSQLiteStore("master.db")
store.SaveJob(job)TLS utilities for secure connections
Functions:
GenerateSelfSignedCert(): Auto-generate TLS certificatesLoadTLSConfig(): Load certificate filesNewTLSClient(): Create TLS-enabled HTTP client
Example:
import tlsutil "github.com/psantana5/ffmpeg-rtmp/pkg/tls"
// Generate cert on master
cert, key, err := tlsutil.GenerateSelfSignedCert()
// Load cert on agent
tlsConfig, err := tlsutil.LoadTLSConfig(certFile, keyFile, caFile)Prometheus metrics registration and collection
Example:
import "github.com/psantana5/ffmpeg-rtmp/pkg/metrics"
metrics.JobsTotal.Inc()
metrics.JobDuration.Observe(duration.Seconds())Structured logging utilities
Example:
import "github.com/psantana5/ffmpeg-rtmp/pkg/logging"
log := logging.New("master")
log.Info("Server started", "port", 8080)
log.Error("Connection failed", "error", err)Run transcoding tests with various configurations
Usage:
# Single test
python3 scripts/run_tests.py single --name test1 --bitrate 2000k --duration 60
# Batch tests
python3 scripts/run_tests.py batch --file batch_stress_matrix.jsonUse cases:
- Run from master node to test overall system
- Run from worker node for local testing
- Run from developer machine for feature testing
Analyze test results and generate efficiency rankings
Usage:
python3 scripts/analyze_results.pyOutput:
- CSV export of results
- Efficiency rankings
- Recommendations for optimal settings
Automated benchmark suite
Usage:
bash scripts/run_benchmarks.shRuns:
- Baseline tests
- Multi-bitrate tests
- Codec comparisons
- Resolution tests
Retrain ML models from test results
Usage:
python3 scripts/retrain_models.py --results-dir ./test_results --models-dir ./modelsML-based energy efficiency scoring
Components:
- Feature extraction from results
- Random Forest models for scoring
- Recommendation engine
Used by:
- Workers: Score job results locally
- Master: Aggregate scores for visualization
- Scripts: Analyze and compare results
Models:
models/x86_64_linux/: Pre-trained models for x86_64 Linux- Training data: Historical test results
All project documentation in one place
Key documents:
architecture.md: System architecture overviewdistributed_architecture_v1.md: Distributed mode detailsDEPLOYMENT_MODES.md: Production vs developmentPRODUCTION_FEATURES.md: Production-ready featuresgetting-started.md: Setup walkthroughtroubleshooting.md: Common issues
Why shared?:
- Documentation applies to both master and workers
- Developers need full context regardless of component
- Centralized documentation is easier to maintain
// cmd/master/main.go
import (
"github.com/psantana5/ffmpeg-rtmp/pkg/api"
"github.com/psantana5/ffmpeg-rtmp/pkg/models"
"github.com/psantana5/ffmpeg-rtmp/pkg/store"
"github.com/psantana5/ffmpeg-rtmp/pkg/auth"
)
func main() {
store := store.NewMemoryStore()
router := api.NewRouter(store)
router.Use(auth.RequireAPIKey(apiKey))
// ...
}// cmd/agent/main.go
import (
"github.com/psantana5/ffmpeg-rtmp/pkg/agent"
"github.com/psantana5/ffmpeg-rtmp/pkg/models"
tlsutil "github.com/psantana5/ffmpeg-rtmp/pkg/tls"
)
func main() {
caps, _ := agent.DetectHardware()
node := &models.Node{ /* ... */ }
tlsConfig, _ := tlsutil.LoadTLSConfig(cert, key, ca)
// ...
}# scripts/analyze_results.py
from shared.advisor.quality.efficiency import score_efficiency
result = load_result("test_results/test1.json")
score = score_efficiency(result)Important: Master and worker binaries must use compatible versions of shared packages.
Best practices:
- Tag releases: Use semantic versioning for releases
- Test compatibility: Run integration tests after updating shared code
- Document breaking changes: Clearly mark API changes in CHANGELOG
- Gradual rollout: Update master first, then workers
Breaking change example:
// Old API (v1.0)
type Job struct {
Scenario string
}
// New API (v2.0) - BREAKING
type Job struct {
Scenario string
Parameters map[string]interface{} // NEW REQUIRED FIELD
}Solution: Add field with backward compatibility
// Better (v1.1) - Non-breaking
type Job struct {
Scenario string
Parameters map[string]interface{} `json:"parameters,omitempty"` // Optional
}# Test shared Go packages
cd shared/pkg
go test ./...
# Test shared Python scripts
cd shared/scripts
python3 -m pytest
# Test advisor models
cd shared/advisor
python3 -m pytest- FOLDER_ORGANIZATION.md - Overall project structure
- ../master/README.md - Master node components
- ../worker/README.md - Worker node components
- docs/DEPLOYMENT_MODES.md - Deployment guide
When modifying shared components:
- Consider both master and worker: Changes affect both
- Maintain backward compatibility: Avoid breaking changes when possible
- Update tests: Test both master and worker usage
- Document changes: Update this README and relevant docs
- Version appropriately: Bump version if API changes
For issues with shared components:
- Check if issue is master-specific or worker-specific first
- Include which component (master/worker) is using the shared code
- Provide version info:
git log -1 --oneline shared/pkg