From ec4c276233ace7bea9a6330f23018fce760f6391 Mon Sep 17 00:00:00 2001 From: Qifan Deng Date: Tue, 9 Sep 2025 23:36:49 +1000 Subject: [PATCH] Wrap time sleep Signed-off-by: Qifan Deng --- .gitignore | 1 + pkg/common/publisher.go | 3 +-- pkg/common/publisher_test.go | 10 ++++------ pkg/common/utils.go | 16 ++++++++++++++++ pkg/kv-cache/kv_cache_test.go | 8 ++++---- pkg/llm-d-inference-sim/metrics_test.go | 6 +++--- pkg/llm-d-inference-sim/simulator.go | 4 ++-- pkg/llm-d-inference-sim/streaming.go | 4 ++-- 8 files changed, 33 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index 950b0cb4..c325bcac 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ vendor .DS_Store *.test manifests/dev-config.yaml +pkg/llm-d-inference-sim/tests-tmp/ diff --git a/pkg/common/publisher.go b/pkg/common/publisher.go index 883c05a2..a80658b6 100644 --- a/pkg/common/publisher.go +++ b/pkg/common/publisher.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "sync/atomic" - "time" zmq "github.com/pebbe/zmq4" "github.com/vmihailenco/msgpack/v5" @@ -58,7 +57,7 @@ func NewPublisher(endpoint string, retries uint) (*Publisher, error) { // If not the last attempt, wait before retrying if i < retries { - time.Sleep(1 * time.Second) + SleepSec(1) } } diff --git a/pkg/common/publisher_test.go b/pkg/common/publisher_test.go index b2c96603..6fb5e81a 100644 --- a/pkg/common/publisher_test.go +++ b/pkg/common/publisher_test.go @@ -20,8 +20,6 @@ import ( "context" "encoding/binary" - "time" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" zmq "github.com/pebbe/zmq4" @@ -50,7 +48,7 @@ var _ = Describe("Publisher", func() { //nolint defer sub.Close() - time.Sleep(100 * time.Millisecond) + SleepMilliSec(100) pub, err := NewPublisher(endpoint, retries) Expect(err).NotTo(HaveOccurred()) @@ -60,7 +58,7 @@ var _ = Describe("Publisher", func() { go func() { // Make sure that sub.RecvMessageBytes is called before pub.PublishEvent - time.Sleep(time.Second) + SleepSec(1) err := pub.PublishEvent(ctx, topic, data) Expect(err).NotTo(HaveOccurred()) }() @@ -107,12 +105,12 @@ var _ = Describe("Publisher", func() { // This will trigger the retry mechanism go func(sub *zmq.Socket, endpoint string) { // Delay releasing the ephemeral addr - time.Sleep(1950 * time.Millisecond) + SleepMilliSec(1950) err := sub.Close() Expect(err).NotTo(HaveOccurred()) // Delay starting the server to simulate service recovery - time.Sleep(2 * time.Second) + SleepSec(2) // Start subscriber as server sub, err = zmq.NewSocket(zmq.SUB) diff --git a/pkg/common/utils.go b/pkg/common/utils.go index fa853e26..696b90cd 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -23,6 +23,7 @@ import ( "regexp" "strings" "sync" + "time" "github.com/google/uuid" ) @@ -383,3 +384,18 @@ func init() { func Tokenize(text string) []string { return re.FindAllString(text, -1) } + +// SleepSec sleep seconds, the only param, duration, is the second(s) to sleep +func SleepSec(duration int) { + time.Sleep(time.Duration(duration) * time.Second) +} + +// SleepMilliSec sleep milliseconds, the only param, duration, is the millisecond(s) to sleep +func SleepMilliSec(duration int) { + time.Sleep(time.Duration(duration) * time.Millisecond) +} + +// SleepMicroSec sleep microseconds, the only param, duration, is the microsecond(s) to sleep +func SleepMicroSec(duration int) { + time.Sleep(time.Duration(duration) * time.Microsecond) +} diff --git a/pkg/kv-cache/kv_cache_test.go b/pkg/kv-cache/kv_cache_test.go index f7e09e00..9e62ae99 100644 --- a/pkg/kv-cache/kv_cache_test.go +++ b/pkg/kv-cache/kv_cache_test.go @@ -197,7 +197,7 @@ var _ = Describe("KV cache", Ordered, func() { for _, test := range testCases { It(test.name, func() { - time.Sleep(300 * time.Millisecond) + common.SleepMilliSec(300) config := &common.Configuration{ Port: 1234, @@ -231,7 +231,7 @@ var _ = Describe("KV cache", Ordered, func() { go func() { // Make sure that the subscriber listens before the events are published - time.Sleep(time.Second) + common.SleepSec(1) for _, action := range test.actions { var err error @@ -336,7 +336,7 @@ var _ = Describe("KV cache", Ordered, func() { go func() { // Make sure that the subscriber listens before the events are published - time.Sleep(time.Second) + common.SleepSec(1) req1 := testRequest{"req1", []uint64{1, 2}} req2 := testRequest{"req2", []uint64{3, 4}} @@ -443,7 +443,7 @@ var _ = Describe("KV cache", Ordered, func() { continue } - time.Sleep(time.Duration(common.RandomInt(1, 100)) * time.Microsecond) + common.SleepMicroSec(common.RandomInt(1, 100)) err = blockCache.finishRequest(reqID) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/llm-d-inference-sim/metrics_test.go b/pkg/llm-d-inference-sim/metrics_test.go index bc94c460..88a5212e 100644 --- a/pkg/llm-d-inference-sim/metrics_test.go +++ b/pkg/llm-d-inference-sim/metrics_test.go @@ -99,7 +99,7 @@ var _ = Describe("Simulator metrics", Ordered, func() { defer wg.Done() defer GinkgoRecover() - time.Sleep(300 * time.Millisecond) + common.SleepMilliSec(300) metricsResp, err := client.Get(metricsUrl) Expect(err).NotTo(HaveOccurred()) Expect(metricsResp.StatusCode).To(Equal(http.StatusOK)) @@ -187,13 +187,13 @@ var _ = Describe("Simulator metrics", Ordered, func() { // sends three requests with a delay of 0.5 second between them // request1 for lora1, request2 for lora2, and request 3 for lora1 go func() { - time.Sleep(500 * time.Millisecond) + common.SleepMilliSec(500) defer GinkgoRecover() _, err := openaiclient.Chat.Completions.New(ctx, paramsLora2) Expect(err).NotTo(HaveOccurred()) }() go func() { - time.Sleep(1 * time.Second) + common.SleepSec(1) defer wg.Done() defer GinkgoRecover() _, err := openaiclient.Chat.Completions.New(ctx, paramsLora1) diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index b91057db..b8ee5de2 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -677,10 +677,10 @@ func (s *VllmSimulator) sendResponse(reqCtx *openaiserverapi.CompletionReqCtx, r // calculate how long to wait before returning the response, time is based on number of tokens nCachedPromptTokens := reqCtx.CompletionReq.GetNumberOfCachedPromptTokens() ttft := s.getTimeToFirstToken(usageData.PromptTokens, nCachedPromptTokens, reqCtx.CompletionReq.IsDoRemotePrefill()) - time.Sleep(time.Duration(ttft) * time.Millisecond) + common.SleepMilliSec(ttft) for range usageData.CompletionTokens - 1 { perTokenLatency := s.getInterTokenLatency() - time.Sleep(time.Duration(perTokenLatency) * time.Millisecond) + common.SleepMilliSec(perTokenLatency) } ctx.Response.Header.SetContentType("application/json") diff --git a/pkg/llm-d-inference-sim/streaming.go b/pkg/llm-d-inference-sim/streaming.go index ea9b6676..02d789c5 100644 --- a/pkg/llm-d-inference-sim/streaming.go +++ b/pkg/llm-d-inference-sim/streaming.go @@ -101,11 +101,11 @@ func (s *VllmSimulator) sendTokenChunks(context *streamingContext, w *bufio.Writ tc *openaiserverapi.ToolCall, finishReason string) { // time to first token delay ttft := s.getTimeToFirstToken(context.nPromptTokens, context.nCachedPromptTokens, context.doRemotePrefill) - time.Sleep(time.Duration(ttft) * time.Millisecond) + common.SleepMilliSec(ttft) for i, token := range genTokens { if i != 0 { - time.Sleep(time.Duration(s.getInterTokenLatency()) * time.Millisecond) + common.SleepMilliSec(s.getInterTokenLatency()) } var toolChunkInsert *openaiserverapi.ToolCall if tc != nil {