A distributed CPU/GPU task scheduler for large-scale batch jobs across thousands of machines.
- Quick Start: docs/quickstart.md
- API Documentation: docs/api.md
- Design Doc: docs/plans/2025-12-14-distributed-scheduler-design.md
- MVP Implementation: docs/plans/2025-12-14-mvp-implementation.md
- Zero dependencies: No Redis, Kafka, or other middleware required
- High performance: Sub-millisecond scheduling latency (< 1ms)
- Load balancing: Automatic task distribution based on worker load
- Resource matching: Tag-based worker filtering (GPU, CPU, CUDA versions, etc.)
- Simple deployment: Single binary for scheduler and worker
| Metric | Value | Description |
|---|---|---|
| Scheduling Latency | < 1ms | Time to assign task to worker |
| Throughput | 1000+ req/s | Scheduling requests per second |
| Worker Scale | 500+ machines | Tested worker pool size |
| Heartbeat Overhead | 33KB/s | Network bandwidth for 500 workers |
| Memory Usage | < 3MB | Scheduler memory footprint for 500 workers |
| Timeout Detection | 10s/20s | Suspicious/Offline thresholds |
| Test Coverage | 84-100% | Unit and integration test coverage |
| Component | Status | Description |
|---|---|---|
| Core Scheduler | ✅ Production Ready | Single scheduler with in-memory state |
| Worker Agent | ✅ Production Ready | Heartbeat sender with graceful shutdown |
| Resource Filtering | ✅ Production Ready | Tag-based worker matching |
| Load Balancing | ✅ Production Ready | Load ratio-based selection |
| HTTP API | ✅ Production Ready | 3 endpoints with error handling |
| Integration Tests | ✅ Passing | 31 tests, 100% pass rate |
| High Availability | 🚧 Planned | Standby scheduler with failover |
| Monitoring | 🚧 Planned | Metrics and alerting |
| Tag Indexing | 🚧 Planned | Performance optimization |
Client → Scheduler → Worker Pool (500+ machines)
↑
└─ Heartbeat (every 3s)
See the Design Document for details.
flowchart LR
Client[Client\nAPI Caller] --> API[Scheduler API]
API --> State[State Manager\nin-memory]
API --> Algo[Scheduling Algorithm]
Algo --> W1[Worker A]
Algo --> W2[Worker B]
Algo --> W3[Worker N...]
W1 -.->|heartbeat| API
W2 -.->|heartbeat| API
W3 -.->|heartbeat| API
sequenceDiagram
participant Client
participant Scheduler
participant State
participant Worker
Client->>Scheduler: POST /schedule (task_id, required_tags)
Scheduler->>State: Filter by tags & availability
Scheduler->>Scheduler: Sort by load ratio
Scheduler-->>Client: worker_id + address
loop every 3s
Worker->>Scheduler: POST /heartbeat (load, tags)
Scheduler->>State: Update worker state
end
go get github.com/chicogong/dtask-scheduler@v1.0.0For a full local/production guide, see docs/quickstart.md.
- Go 1.21+
- Network connectivity between scheduler and workers
go build -o bin/scheduler ./cmd/scheduler
go build -o bin/worker ./cmd/worker./bin/scheduler --port=8080# GPU worker
./bin/worker --id=worker-001 --addr=localhost:9001 --tags=gpu,cuda-12.0 --max-tasks=30 --scheduler=http://localhost:8080
# CPU worker
./bin/worker --id=worker-002 --addr=localhost:9002 --tags=cpu,avx2 --max-tasks=30 --scheduler=http://localhost:8080curl -X POST http://localhost:8080/api/v1/schedule \
-H "Content-Type: application/json" \
-d '{"task_id":"task-001","required_tags":["gpu"]}'Response:
{
"worker_id": "worker-001",
"address": "localhost:9001"
}curl http://localhost:8080/api/v1/workersYou can embed the scheduler or worker in your Go application:
Embedding the Scheduler:
import (
"context"
"net/http"
"time"
"github.com/chicogong/dtask-scheduler/pkg/scheduler"
)
// Create state manager and handler
state := scheduler.NewStateManager()
handler := scheduler.NewHandler(state)
// Setup HTTP routes
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/heartbeat", handler.HandleHeartbeat)
mux.HandleFunc("/api/v1/schedule", handler.HandleSchedule)
mux.HandleFunc("/api/v1/workers", handler.HandleListWorkers)
// Start background timeout checker
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
state.CheckTimeouts()
case <-ctx.Done():
return
}
}
}()
// Start server
http.ListenAndServe(":8080", mux)Embedding a Worker:
import (
"context"
"github.com/chicogong/dtask-scheduler/pkg/worker"
)
// Create and start heartbeat sender
sender := worker.NewHeartbeatSender(
"worker-001",
"localhost:9001",
[]string{"gpu", "cuda-12.0"},
30,
"http://localhost:8080",
)
ctx := context.Background()
go sender.Start(ctx)
// Update task count as needed
sender.UpdateTaskCount(15)Using the Client Library:
import (
"context"
"github.com/chicogong/dtask-scheduler/pkg/client"
"github.com/chicogong/dtask-scheduler/pkg/types"
)
// Create client
c := client.NewClient("http://localhost:8080")
// Schedule a task
resp, err := c.Schedule(context.Background(), &types.ScheduleRequest{
TaskID: "task-001",
RequiredTags: []string{"gpu"},
})
if err != nil {
// Handle error
}
// Use worker info
println("Scheduled to:", resp.WorkerID, resp.Address)
// List all workers
workers, err := c.ListWorkers(context.Background())Using Types:
import "github.com/chicogong/dtask-scheduler/pkg/types"
req := &types.ScheduleRequest{
TaskID: "task-001",
RequiredTags: []string{"gpu"},
}| Flag | Default | Description |
|---|---|---|
--port |
8080 |
Listening port |
| Flag | Default | Description |
|---|---|---|
--id |
worker-001 |
Worker ID |
--addr |
localhost:9000 |
Worker address (returned in scheduling result) |
--tags |
cpu |
Resource tags, comma-separated |
--max-tasks |
30 |
Maximum concurrent tasks |
--scheduler |
http://localhost:8080 |
Scheduler base URL |
Base URL: http://localhost:8080/api/v1
POST /heartbeat: Worker heartbeatPOST /schedule: Schedule a taskGET /workers: List workers
See docs/api.md for details.
- Filter by tags: Only workers with ALL required tags are considered
- Filter by availability: Offline workers or workers at max capacity are excluded
- Sort by load ratio:
load_ratio = current_tasks / max_tasks - Select lowest: Worker with lowest load ratio is selected
- Optimistic allocation: Task count incremented immediately (corrected by next heartbeat)
dtask-scheduler/
├── cmd/
│ ├── scheduler/ # Scheduler entry
│ └── worker/ # Worker entry
├── internal/
│ ├── scheduler/ # Scheduler core logic
│ │ ├── algorithm.go # Scheduling algorithm
│ │ ├── handlers.go # HTTP handlers
│ │ └── state.go # State manager
│ └── worker/ # Worker core logic
│ └── heartbeat.go # Heartbeat sender
├── pkg/
│ └── types/ # Shared types
├── tests/ # Integration tests
└── docs/ # Documentation
# Unit tests
go test ./...
# Integration tests
go test ./tests -v- Audio processing: Large-scale transcoding, denoise, feature extraction
- Video processing: Transcoding, editing, AI enhancement
- AI inference: Dispatch model inference to GPU clusters
- Data processing: Batch data cleaning and transformation
- Scientific computing: Distributed computation scheduling
- Language: Go 1.21+
- Dependencies: Standard library only (net/http, encoding/json, sync, etc.)
- Protocol: HTTP/REST (heartbeat and scheduling API)
- Concurrency: goroutines + context + sync.RWMutex
- Testing: Standard testing + table-driven tests
- MVP: Single scheduler + heartbeat + basic scheduling
- High availability: Standby scheduler with failover
- Monitoring: Metrics and alerting
- Tag indexing: Faster resource filtering
- Queue: Wait queue for resource shortage
- Task priority: Preempting low-priority tasks
- Resource reservation: CPU/memory/GPU memory reservations
Issues and pull requests are welcome!
MIT License - see LICENSE for details