Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions Dockerfile.batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Build Stage: using Go 1.24 image
FROM quay.io/projectquay/golang:1.24 AS builder
ARG TARGETOS
ARG TARGETARCH
ARG COMMIT_SHA=unknown
ARG BUILD_REF

WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

# Copy the go source
COPY cmd/batch/ cmd/batch/
COPY pkg/batch/ pkg/batch/
COPY pkg/metrics/ pkg/metrics/
COPY pkg/common pkg/common

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
# was called. For example, if we call make image-build in a local env which has the Apple Silicon M1 SO
# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore,
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
ENV CGO_ENABLED=0
ENV GOOS=${TARGETOS:-linux}
ENV GOARCH=${TARGETARCH}
RUN go build -a -o bin/batch \
-ldflags="-X github.com/llm-d/llm-d-inference-scheduler/pkg/batch/version.CommitSHA=${COMMIT_SHA} -X github.com/llm-d/llm-d-inference-scheduler/pkg/batch/version.BuildRef=${BUILD_REF}" \
cmd/batch/main.go

FROM registry.access.redhat.com/ubi9/ubi-micro:latest
WORKDIR /
COPY --from=builder /workspace/bin/batch /app/batch
USER 65532:65532

ENTRYPOINT ["/app/batch"]
21 changes: 17 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ SIDECAR_TAG ?= dev
export SIDECAR_TAG
SIDECAR_IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/$(SIDECAR_IMAGE_NAME)
export SIDECAR_IMAGE ?= $(SIDECAR_IMAGE_TAG_BASE):$(SIDECAR_TAG)
BATCH_TAG ?= dev
export BATCH_TAG
BATCH_IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/batch
export BATCH_IMAGE ?= $(BATCH_IMAGE_TAG_BASE):$(BATCH_TAG)

NAMESPACE ?= hc4ai-operator
VLLM_SIMULATOR_TAG ?= v0.6.1
export VLLM_SIMULATOR_TAG
Expand Down Expand Up @@ -94,16 +99,24 @@ CGO_LDFLAGS := $(PYTHON_LDFLAGS) $(PYTHON_LIBS) '-L$(shell pwd)/lib' -ltokenizer
# Internal variables for generic targets
epp_IMAGE = $(EPP_IMAGE)
sidecar_IMAGE = $(SIDECAR_IMAGE)
batch_IMAGE = $(BATCH_IMAGE)
epp_NAME = epp
sidecar_NAME = $(SIDECAR_NAME)
batch_NAME = batch
epp_LDFLAGS = -ldflags="$(LDFLAGS)"
sidecar_LDFLAGS =
batch_LDFLAGS = -ldflags="$(LDFLAGS)"
epp_CGO_CFLAGS = "${CGO_CFLAGS}"
sidecar_CGO_CFLAGS =
batch_CGO_CFLAGS = "${CGO_CFLAGS}"
epp_CGO_LDFLAGS = "${CGO_LDFLAGS}"
sidecar_CGO_LDFLAGS =
batch_CGO_LDFLAGS = "${CGO_LDFLAGS}"
epp_TEST_FILES = go list ./... | grep -v /test/ | grep -v ./pkg/sidecar/
sidecar_TEST_FILES = go list ./pkg/sidecar/...
batch_TEST_FILES = go list ./... | grep -v /test/ | grep -v ./pkg/batch/



.PHONY: help
help: ## Print help
Expand Down Expand Up @@ -142,7 +155,7 @@ format: ## Format Go source files
test: test-unit test-e2e ## Run unit tests and e2e tests

.PHONY: test-unit
test-unit: test-unit-epp test-unit-sidecar
test-unit: test-unit-epp test-unit-sidecar test-unit-batch

.PHONY: test-unit-%
test-unit-%: download-tokenizer install-dependencies ## Run unit tests
Expand Down Expand Up @@ -173,7 +186,7 @@ lint: check-golangci-lint check-typos ## Run lint
##@ Build

.PHONY: build
build: build-epp build-sidecar ## Build the project
build: build-epp build-sidecar build-batch ## Build the project

.PHONY: build-%
build-%: check-go install-dependencies download-tokenizer ## Build the project
Expand All @@ -183,7 +196,7 @@ build-%: check-go install-dependencies download-tokenizer ## Build the project
##@ Container Build/Push

.PHONY: image-build
image-build: image-build-epp image-build-sidecar ## Build Docker image
image-build: image-build-epp image-build-sidecar image-build-batch ## Build Docker image

.PHONY: image-build-%
image-build-%: check-container-tool ## Build Docker image ## Build Docker image using $(CONTAINER_RUNTIME)
Expand All @@ -197,7 +210,7 @@ image-build-%: check-container-tool ## Build Docker image ## Build Docker image
-t $($*_IMAGE) -f Dockerfile.$* .

.PHONY: image-push
image-push: image-push-epp image-push-sidecar ## Push container images to registry
image-push: image-push-epp image-push-sidecar image-push-batch ## Push container images to registry

.PHONY: image-push-%
image-push-%: check-container-tool ## Push container image to registry
Expand Down
16 changes: 16 additions & 0 deletions cmd/batch/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package main

import (
"os"

ctrl "sigs.k8s.io/controller-runtime"

"github.com/llm-d/llm-d-inference-scheduler/cmd/batch/runner"
)

func main() {

if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
}
}
99 changes: 99 additions & 0 deletions cmd/batch/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package runner

import (
"context"
"flag"
"net/http"

"github.com/llm-d/llm-d-inference-scheduler/pkg/batch"
"github.com/llm-d/llm-d-inference-scheduler/pkg/batch/redis"
uberzap "go.uber.org/zap"
"go.uber.org/zap/zapcore"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

type Runner struct {
}

var (
setupLog = ctrl.Log.WithName("setup")
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
concurrency = flag.Int("concurrency", 8, "number of concurrent workers")
endpoint = flag.String("endpoint", "", "inference endpoint")
)

func NewRunner() *Runner {
return &Runner{}
}

func (r *Runner) Run(ctx context.Context) error {
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
initLogging(&opts)

/*if *tracing {
err := common.InitTracing(ctx, setupLog)
if err != nil {
return err
}
}*/

////////setupLog.Info("GIE build", "commit-sha", version.CommitSHA, "build-ref", version.BuildRef)

// Validate flags
if err := validateFlags(); err != nil {
setupLog.Error(err, "Failed to validate flags")
return err
}

// Print all flag values
flags := make(map[string]any)
flag.VisitAll(func(f *flag.Flag) {
flags[f.Name] = f.Value
})
setupLog.Info("Flags processed", "flags", flags)

httpClient := &http.Client{
// TODO: configure
}
var policy batch.RequestPolicy = batch.NewRandomRobinPolicy()

var impl batch.Flow = redis.NewRedisMQFlow("localhost:16379")
requestChannel := policy.MergeRequestChannels(impl.RequestChannels()).Channel
for w := 1; w <= *concurrency; w++ {
go batch.Worker(ctx, *endpoint, httpClient, requestChannel, impl.RetryChannel(), impl.ResultChannel())
}

impl.Start(ctx)
<-ctx.Done()
return nil
}

// TODO: is this dup of
func initLogging(opts *zap.Options) {
// Unless -zap-log-level is explicitly set, use -v
useV := true
flag.Visit(func(f *flag.Flag) {
if f.Name == "zap-log-level" {
useV = false
}
})
if useV {
// See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level
lvl := -1 * (*logVerbosity)
opts.Level = uberzap.NewAtomicLevelAt(zapcore.Level(int8(lvl)))
}

logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller()))
ctrl.SetLogger(logger)
}

func validateFlags() error {

return nil
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/onsi/gomega v1.38.2
github.com/openai/openai-go v1.12.0
github.com/prometheus/client_golang v1.23.2
github.com/alicebob/miniredis/v2 v2.35.0
github.com/stretchr/testify v1.11.1
golang.org/x/sync v0.18.0
google.golang.org/grpc v1.76.0
Expand Down Expand Up @@ -93,6 +94,7 @@ require (
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions pkg/batch/GUIDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Batch Processor - User Guide

The batch processor helps in workflows where you have requests that are latency tolerant. I.e., SLOs in minutes/ hours instead of seconds.

The batch processor pulls requests from a message queue (or several MQs according to a policy), sends to the Inference Gateway (IGW) and retries if necessary (e.g., message was shedded).

![Batch Processor - Redis architecture](/docs/images/batch_processor_redis_architecture.png "BP - Redis")
61 changes: 61 additions & 0 deletions pkg/batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Batch Processor

## Overview
The batch processor (BP) provides asynchronous workflows for variable SLO-based inference requests.


## Architecture

An underlying implementation should provide persistent messaging that adhere to the interface defined in [api.go](api.go).

A pluggable request policy is used to merge multiple request channels into a single request channel on which the batch worker is listening.

An example for such a policy is a [Random Robin Policy](random_robin_policy.go).

Each [Batch Processor worker](worker.go) is responsible for pulling requests from the merged request channel, submit to the IGW and apply retry logic if needed.



### Requests

Request messages should have the following format:
```json
{
"id" : "unique identifier for result mapping",
"deadline" : "deadline in Unix seconds",
"payload" : {regular inference payload}
}
```

Example:
```json
{
"id" : "19933123533434",
"deadline" : "1764045130",
"payload": {"model":"food-review","prompt":"hi", "max_tokens":10,"temperature":0}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shimib This only supports /v1/completions. Is that on purpose?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't, endpoint will be configurable. Will be in the user guide

}
```

### Results

Messages on the results channel will have the following structure:

```json
{
"id" : "id mapped to the request",
"payload" : {/*inference payload*/} ,
// or
"error" : "error's reason"
}
```


## Implementations

### Redis

An example implementation based on Redis is provided which behaves as follows:

- Redis Lists as the request queues.
- Redis Sorted Set as the retry exponential backoff implementation.
- Redis List as the result queue.
41 changes: 41 additions & 0 deletions pkg/batch/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package batch

import "context"

type Flow interface {
// starts processing requests.
Start(ctx context.Context)

// returns the channel for requests. Implementation is responsible for populating this channel.
RequestChannels() []RequestChannel
// returns the channel that accepts messages to be retries with their backoff delay.
RetryChannel() chan RetryMessage
// returns the channel for storing the results.
ResultChannel() chan ResultMessage
}

type RequestPolicy interface {
MergeRequestChannels(channels []RequestChannel) RequestChannel
}

type RequestMessage struct {
Id string `json:"id"`
RetryCount int `json:"retry_count,omitempty"`
DeadlineUnixSec string `json:"deadline"`
Payload map[string]any `json:"payload"`
}

type RequestChannel struct {
Channel chan RequestMessage
Metadata map[string]any
}

type RetryMessage struct {
RequestMessage
BackoffDurationSeconds float64
}

type ResultMessage struct {
Id string `json:"id"`
Payload map[string]any `json:"payload"`
}
Loading
Loading