Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ vendor
.DS_Store
*.test
manifests/dev-config.yaml
pkg/llm-d-inference-sim/tests-tmp/
3 changes: 1 addition & 2 deletions pkg/common/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

zmq "github.com/pebbe/zmq4"
"github.com/vmihailenco/msgpack/v5"
Expand Down Expand Up @@ -58,7 +57,7 @@ func NewPublisher(endpoint string, retries uint) (*Publisher, error) {

// If not the last attempt, wait before retrying
if i < retries {
time.Sleep(1 * time.Second)
SleepSec(1)
}
}

Expand Down
10 changes: 4 additions & 6 deletions pkg/common/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"context"
"encoding/binary"

"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
zmq "github.com/pebbe/zmq4"
Expand Down Expand Up @@ -50,7 +48,7 @@ var _ = Describe("Publisher", func() {
//nolint
defer sub.Close()

time.Sleep(100 * time.Millisecond)
SleepMilliSec(100)

pub, err := NewPublisher(endpoint, retries)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -60,7 +58,7 @@ var _ = Describe("Publisher", func() {

go func() {
// Make sure that sub.RecvMessageBytes is called before pub.PublishEvent
time.Sleep(time.Second)
SleepSec(1)
err := pub.PublishEvent(ctx, topic, data)
Expect(err).NotTo(HaveOccurred())
}()
Expand Down Expand Up @@ -107,12 +105,12 @@ var _ = Describe("Publisher", func() {
// This will trigger the retry mechanism
go func(sub *zmq.Socket, endpoint string) {
// Delay releasing the ephemeral addr
time.Sleep(1950 * time.Millisecond)
SleepMilliSec(1950)
err := sub.Close()
Expect(err).NotTo(HaveOccurred())

// Delay starting the server to simulate service recovery
time.Sleep(2 * time.Second)
SleepSec(2)

// Start subscriber as server
sub, err = zmq.NewSocket(zmq.SUB)
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/google/uuid"
)
Expand Down Expand Up @@ -383,3 +384,18 @@ func init() {
func Tokenize(text string) []string {
return re.FindAllString(text, -1)
}

// SleepSec sleep seconds, the only param, duration, is the second(s) to sleep
func SleepSec(duration int) {
time.Sleep(time.Duration(duration) * time.Second)
}

// SleepMilliSec sleep milliseconds, the only param, duration, is the millisecond(s) to sleep
func SleepMilliSec(duration int) {
time.Sleep(time.Duration(duration) * time.Millisecond)
}

// SleepMicroSec sleep microseconds, the only param, duration, is the microsecond(s) to sleep
func SleepMicroSec(duration int) {
time.Sleep(time.Duration(duration) * time.Microsecond)
}
8 changes: 4 additions & 4 deletions pkg/kv-cache/kv_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ var _ = Describe("KV cache", Ordered, func() {

for _, test := range testCases {
It(test.name, func() {
time.Sleep(300 * time.Millisecond)
common.SleepMilliSec(300)

config := &common.Configuration{
Port: 1234,
Expand Down Expand Up @@ -231,7 +231,7 @@ var _ = Describe("KV cache", Ordered, func() {

go func() {
// Make sure that the subscriber listens before the events are published
time.Sleep(time.Second)
common.SleepSec(1)

for _, action := range test.actions {
var err error
Expand Down Expand Up @@ -336,7 +336,7 @@ var _ = Describe("KV cache", Ordered, func() {

go func() {
// Make sure that the subscriber listens before the events are published
time.Sleep(time.Second)
common.SleepSec(1)

req1 := testRequest{"req1", []uint64{1, 2}}
req2 := testRequest{"req2", []uint64{3, 4}}
Expand Down Expand Up @@ -443,7 +443,7 @@ var _ = Describe("KV cache", Ordered, func() {
continue
}

time.Sleep(time.Duration(common.RandomInt(1, 100)) * time.Microsecond)
common.SleepMicroSec(common.RandomInt(1, 100))

err = blockCache.finishRequest(reqID)
Expect(err).NotTo(HaveOccurred())
Expand Down
6 changes: 3 additions & 3 deletions pkg/llm-d-inference-sim/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var _ = Describe("Simulator metrics", Ordered, func() {
defer wg.Done()
defer GinkgoRecover()

time.Sleep(300 * time.Millisecond)
common.SleepMilliSec(300)
metricsResp, err := client.Get(metricsUrl)
Expect(err).NotTo(HaveOccurred())
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))
Expand Down Expand Up @@ -187,13 +187,13 @@ var _ = Describe("Simulator metrics", Ordered, func() {
// sends three requests with a delay of 0.5 second between them
// request1 for lora1, request2 for lora2, and request 3 for lora1
go func() {
time.Sleep(500 * time.Millisecond)
common.SleepMilliSec(500)
defer GinkgoRecover()
_, err := openaiclient.Chat.Completions.New(ctx, paramsLora2)
Expect(err).NotTo(HaveOccurred())
}()
go func() {
time.Sleep(1 * time.Second)
common.SleepSec(1)
defer wg.Done()
defer GinkgoRecover()
_, err := openaiclient.Chat.Completions.New(ctx, paramsLora1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/llm-d-inference-sim/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,10 +677,10 @@ func (s *VllmSimulator) sendResponse(reqCtx *openaiserverapi.CompletionReqCtx, r
// calculate how long to wait before returning the response, time is based on number of tokens
nCachedPromptTokens := reqCtx.CompletionReq.GetNumberOfCachedPromptTokens()
ttft := s.getTimeToFirstToken(usageData.PromptTokens, nCachedPromptTokens, reqCtx.CompletionReq.IsDoRemotePrefill())
time.Sleep(time.Duration(ttft) * time.Millisecond)
common.SleepMilliSec(ttft)
for range usageData.CompletionTokens - 1 {
perTokenLatency := s.getInterTokenLatency()
time.Sleep(time.Duration(perTokenLatency) * time.Millisecond)
common.SleepMilliSec(perTokenLatency)
}

ctx.Response.Header.SetContentType("application/json")
Expand Down
4 changes: 2 additions & 2 deletions pkg/llm-d-inference-sim/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func (s *VllmSimulator) sendTokenChunks(context *streamingContext, w *bufio.Writ
tc *openaiserverapi.ToolCall, finishReason string) {
// time to first token delay
ttft := s.getTimeToFirstToken(context.nPromptTokens, context.nCachedPromptTokens, context.doRemotePrefill)
time.Sleep(time.Duration(ttft) * time.Millisecond)
common.SleepMilliSec(ttft)

for i, token := range genTokens {
if i != 0 {
time.Sleep(time.Duration(s.getInterTokenLatency()) * time.Millisecond)
common.SleepMilliSec(s.getInterTokenLatency())
}
var toolChunkInsert *openaiserverapi.ToolCall
if tc != nil {
Expand Down