@@ -17,6 +17,8 @@ limitations under the License.
17
17
package epp
18
18
19
19
import (
20
+ "encoding/json"
21
+ "errors"
20
22
"fmt"
21
23
"strconv"
22
24
"strings"
@@ -26,9 +28,12 @@ import (
26
28
"github.com/google/go-cmp/cmp/cmpopts"
27
29
"github.com/onsi/ginkgo/v2"
28
30
"github.com/onsi/gomega"
31
+ corev1 "k8s.io/api/core/v1"
32
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
29
33
"k8s.io/apimachinery/pkg/types"
30
34
"k8s.io/utils/ptr"
31
- "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
35
+ client "sigs.k8s.io/controller-runtime/pkg/client"
36
+ v1alpha2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
32
37
testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
33
38
)
34
39
@@ -51,38 +56,57 @@ var _ = ginkgo.Describe("InferencePool", func() {
51
56
ginkgo .AfterEach (func () {
52
57
ginkgo .By ("Deleting the InferenceModel test resource." )
53
58
cleanupInferModelResources ()
59
+ gomega .Eventually (func () error {
60
+ err := cli .Get (ctx , types.NamespacedName {Namespace : infModel .Namespace , Name : infModel .Name }, infModel )
61
+ if err == nil {
62
+ return errors .New ("InferenceModel resource still exists" )
63
+ }
64
+ if ! k8serrors .IsNotFound (err ) {
65
+ return nil
66
+ }
67
+ return nil
68
+ }, existsTimeout , interval ).Should (gomega .Succeed ())
54
69
})
55
70
56
71
ginkgo .When ("The Inference Extension is running" , func () {
57
72
ginkgo .It ("Should route traffic to target model servers" , func () {
58
73
for _ , t := range []struct {
59
74
api string
60
- promptOrMessages string
75
+ promptOrMessages any
61
76
}{
62
77
{
63
78
api : "/completions" ,
64
79
promptOrMessages : "Write as if you were a critic: San Francisco" ,
65
80
},
66
81
{
67
- api : "/chat/completions" ,
68
- promptOrMessages : `[{"role": "user", "content": "Write as if you were a critic: San Francisco"}]` ,
82
+ api : "/chat/completions" ,
83
+ promptOrMessages : []map [string ]any {
84
+ {
85
+ "role" : "user" ,
86
+ "content" : "Write as if you were a critic: San Francisco" ,
87
+ },
88
+ },
69
89
},
70
90
{
71
91
api : "/chat/completions" ,
72
- promptOrMessages : `[{"role": "user", "content": "Write as if you were a critic: San Francisco"},` +
73
- `{"role": "assistant", "content": "Okay, let's see..."},` +
74
- `{"role": "user", "content": "Now summarize your thoughts."}]` ,
92
+ promptOrMessages : []map [string ]any {
93
+ {
94
+ "role" : "user" ,
95
+ "content" : "Write as if you were a critic: San Francisco" ,
96
+ },
97
+ {"role" : "assistant" , "content" : "Okay, let's see..." },
98
+ {"role" : "user" , "content" : "Now summarize your thoughts." },
99
+ },
75
100
},
76
101
} {
77
- ginkgo .By ("Verifying connectivity through the inference extension with " +
78
- t .api + " api and prompt/messages: " + t .promptOrMessages )
102
+ ginkgo .By (fmt .Sprintf ("Verifying connectivity through the inference extension with %s api and prompt/messages: %v" , t .api , t .promptOrMessages ))
79
103
80
104
// Ensure the expected responses include the inferencemodel target model names.
81
105
var expected []string
82
106
for _ , m := range infModel .Spec .TargetModels {
83
107
expected = append (expected , m .Name )
84
108
}
85
- curlCmd := getCurlCommand (envoyName , nsName , envoyPort , modelName , curlTimeout , t .api , t .promptOrMessages )
109
+ curlCmd := getCurlCommand (envoyName , nsName , envoyPort , modelName , curlTimeout , t .api , t .promptOrMessages , false )
86
110
87
111
actual := make (map [string ]int )
88
112
gomega .Eventually (func () error {
@@ -106,11 +130,103 @@ var _ = ginkgo.Describe("InferencePool", func() {
106
130
if ! cmp .Equal (got , expected , cmpopts .SortSlices (func (a , b string ) bool { return a < b })) {
107
131
return fmt .Errorf ("actual (%v) != expected (%v); resp=%q" , got , expected , resp )
108
132
}
109
-
110
133
return nil
111
134
}, readyTimeout , curlInterval ).Should (gomega .Succeed ())
112
135
}
113
136
})
137
+
138
+ ginkgo .It ("Should expose EPP metrics after generating traffic" , func () {
139
+ // Define the metrics we expect to see
140
+ expectedMetrics := []string {
141
+ "inference_model_request_total" ,
142
+ "inference_model_request_error_total" ,
143
+ "inference_model_request_duration_seconds" ,
144
+ // TODO: normalized_time_per_output_token_seconds is not actually recorded yet
145
+ // "normalized_time_per_output_token_seconds",
146
+ "inference_model_request_sizes" ,
147
+ "inference_model_response_sizes" ,
148
+ "inference_model_input_tokens" ,
149
+ "inference_model_output_tokens" ,
150
+ "inference_pool_average_kv_cache_utilization" ,
151
+ "inference_pool_average_queue_size" ,
152
+ "inference_pool_per_pod_queue_size" ,
153
+ "inference_model_running_requests" ,
154
+ "inference_pool_ready_pods" ,
155
+ "inference_extension_info" ,
156
+ }
157
+
158
+ // Generate traffic by sending requests through the inference extension
159
+ ginkgo .By ("Generating traffic through the inference extension" )
160
+ curlCmd := getCurlCommand (envoyName , nsName , envoyPort , modelName , curlTimeout , "/completions" , "Write as if you were a critic: San Francisco" , true )
161
+
162
+ // Run the curl command multiple times to generate some metrics data
163
+ for i := 0 ; i < 5 ; i ++ {
164
+ _ , err := testutils .ExecCommandInPod (ctx , cfg , scheme , kubeCli , nsName , "curl" , "curl" , curlCmd )
165
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
166
+ }
167
+
168
+ // modify the curl command to generate some error metrics
169
+ curlCmd [len (curlCmd )- 1 ] = "invalid input"
170
+ for i := 0 ; i < 5 ; i ++ {
171
+ _ , err := testutils .ExecCommandInPod (ctx , cfg , scheme , kubeCli , nsName , "curl" , "curl" , curlCmd )
172
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
173
+ }
174
+
175
+ // Now scrape metrics from the EPP endpoint via the curl pod
176
+ ginkgo .By ("Scraping metrics from the EPP endpoint" )
177
+
178
+ // Get Pod IP instead of Service
179
+ podList := & corev1.PodList {}
180
+ err := cli .List (ctx , podList , client .InNamespace (nsName ), client.MatchingLabels {"app" : inferExtName })
181
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
182
+ gomega .Expect (podList .Items ).NotTo (gomega .BeEmpty ())
183
+ podIP := podList .Items [0 ].Status .PodIP
184
+ gomega .Expect (podIP ).NotTo (gomega .BeEmpty ())
185
+
186
+ // Get the authorization token for reading metrics
187
+ token := ""
188
+ gomega .Eventually (func () error {
189
+ token , err = getMetricsReaderToken (cli )
190
+ if err != nil {
191
+ return err
192
+ }
193
+ if token == "" {
194
+ return errors .New ("token not found" )
195
+ }
196
+ return nil
197
+ }, existsTimeout , interval ).Should (gomega .Succeed ())
198
+
199
+ // Construct the metric scraping curl command using Pod IP
200
+ metricScrapeCmd := []string {
201
+ "curl" ,
202
+ "-i" ,
203
+ "--max-time" ,
204
+ strconv .Itoa ((int )(curlTimeout .Seconds ())),
205
+ "-H" ,
206
+ "Authorization: Bearer " + token ,
207
+ fmt .Sprintf ("http://%s:%d/metrics" , podIP , 9090 ),
208
+ }
209
+
210
+ ginkgo .By ("Verifying that all expected metrics are present." )
211
+ gomega .Eventually (func () error {
212
+ // Execute the metrics scrape command inside the curl pod
213
+ resp , err := testutils .ExecCommandInPod (ctx , cfg , scheme , kubeCli , nsName , "curl" , "curl" , metricScrapeCmd )
214
+ if err != nil {
215
+ return err
216
+ }
217
+ // Verify that we got a 200 OK responsecurl
218
+ if ! strings .Contains (resp , "200 OK" ) {
219
+ return fmt .Errorf ("did not get 200 OK: %s" , resp )
220
+ }
221
+ // Check if all expected metrics are present in the metrics output
222
+ for _ , metric := range expectedMetrics {
223
+ if ! strings .Contains (resp , metric ) {
224
+ return fmt .Errorf ("expected metric %s not found in metrics output" , metric )
225
+ }
226
+ }
227
+ return nil
228
+ }, readyTimeout , curlInterval ).Should (gomega .Succeed ())
229
+ })
114
230
})
115
231
})
116
232
@@ -130,16 +246,38 @@ func newInferenceModel(ns string) *v1alpha2.InferenceModel {
130
246
Obj ()
131
247
}
132
248
249
+ func getMetricsReaderToken (k8sClient client.Client ) (string , error ) {
250
+ secret := & corev1.Secret {}
251
+ err := k8sClient .Get (ctx , types.NamespacedName {Namespace : nsName , Name : metricsReaderSecretName }, secret )
252
+ if err != nil {
253
+ return "" , err
254
+ }
255
+ return string (secret .Data ["token" ]), nil
256
+ }
257
+
133
258
// getCurlCommand returns the command, as a slice of strings, for curl'ing
134
259
// the test model server at the given name, namespace, port, and model name.
135
- func getCurlCommand (name , ns , port , model string , timeout time.Duration , api string , promptOrMessages string ) []string {
136
- var body string
260
+ func getCurlCommand (name , ns , port , model string , timeout time.Duration , api string , promptOrMessages any , streaming bool ) []string {
261
+ body := map [string ]any {
262
+ "model" : model ,
263
+ "max_tokens" : 100 ,
264
+ "temperature" : 0 ,
265
+ }
266
+ body ["model" ] = model
137
267
switch api {
138
268
case "/completions" :
139
- body = fmt . Sprintf ( `{"model": "%s", " prompt": "%s", "max_tokens": 100, "temperature": 0}` , model , promptOrMessages )
269
+ body [ " prompt"] = promptOrMessages
140
270
case "/chat/completions" :
141
- body = fmt .Sprintf (`{"model": "%s", "messages": %s, "max_tokens": 100, "temperature": 0}` , model , promptOrMessages )
271
+ body ["messages" ] = promptOrMessages
272
+ }
273
+ if streaming {
274
+ body ["stream" ] = true
275
+ body ["stream_options" ] = map [string ]any {
276
+ "include_usage" : true ,
277
+ }
142
278
}
279
+ b , err := json .Marshal (body )
280
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
143
281
return []string {
144
282
"curl" ,
145
283
"-i" ,
@@ -149,6 +287,6 @@ func getCurlCommand(name, ns, port, model string, timeout time.Duration, api str
149
287
"-H" ,
150
288
"Content-Type: application/json" ,
151
289
"-d" ,
152
- body ,
290
+ string ( b ) ,
153
291
}
154
292
}
0 commit comments