Skip to content
This repository was archived by the owner on Oct 6, 2025. It is now read-only.

Commit 3fa1028

Browse files
committed
feat: add requests monitoring
Signed-off-by: Dorin Geman <[email protected]>
1 parent 89218c5 commit 3fa1028

File tree

3 files changed

+140
-0
lines changed

3 files changed

+140
-0
lines changed

commands/requests.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package commands
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/docker/model-cli/commands/completion"
9+
"github.com/spf13/cobra"
10+
)
11+
12+
func newRequestsCmd() *cobra.Command {
13+
var model string
14+
var follow bool
15+
var includeExisting bool
16+
c := &cobra.Command{
17+
Use: "requests [OPTIONS]",
18+
Short: "Fetch requests+responses from Docker Model Runner",
19+
PreRunE: func(cmd *cobra.Command, args []string) error {
20+
// Make --include-existing only available when --follow is set.
21+
if includeExisting && !follow {
22+
return fmt.Errorf("--include-existing can only be used with --follow")
23+
}
24+
return nil
25+
},
26+
RunE: func(cmd *cobra.Command, args []string) error {
27+
if _, err := ensureStandaloneRunnerAvailable(cmd.Context(), cmd); err != nil {
28+
return fmt.Errorf("unable to initialize standalone model runner: %w", err)
29+
}
30+
if follow {
31+
responseBody, err := desktopClient.RequestsStream(model, includeExisting)
32+
if err != nil {
33+
errMsg := "Failed to get requests stream"
34+
if model != "" {
35+
errMsg = errMsg + " for" + model
36+
}
37+
err = handleClientError(err, errMsg)
38+
return handleNotRunningError(err)
39+
}
40+
scanner := bufio.NewScanner(responseBody)
41+
cmd.Println("Connected to request stream. Press Ctrl+C to stop.")
42+
var currentEvent string
43+
for scanner.Scan() {
44+
select {
45+
case <-cmd.Context().Done():
46+
return nil
47+
default:
48+
}
49+
line := scanner.Text()
50+
if strings.HasPrefix(line, "event: ") {
51+
currentEvent = strings.TrimPrefix(line, "event: ")
52+
} else if strings.HasPrefix(line, "data: ") &&
53+
(currentEvent == "new_request" || currentEvent == "existing_request") {
54+
data := strings.TrimPrefix(line, "data: ")
55+
cmd.Println(data)
56+
}
57+
}
58+
cmd.Println("Stream closed by server.")
59+
return nil
60+
}
61+
requests, err := desktopClient.RequestsList(model)
62+
if err != nil {
63+
errMsg := "Failed to get requests"
64+
if model != "" {
65+
errMsg = errMsg + " for" + model
66+
}
67+
err = handleClientError(err, errMsg)
68+
return handleNotRunningError(err)
69+
}
70+
cmd.Print(requests)
71+
return nil
72+
},
73+
ValidArgsFunction: completion.NoComplete,
74+
}
75+
c.Flags().BoolVarP(&follow, "follow", "f", false, "Follow requests stream")
76+
c.Flags().BoolVar(&includeExisting, "include-existing", false,
77+
"Include existing requests when starting to follow (only available with --follow)")
78+
c.Flags().StringVar(&model, "model", "", "Specify the model to filter requests")
79+
// Enable completion for the --model flag.
80+
_ = c.RegisterFlagCompletionFunc("model", completion.ModelNames(getDesktopClient, 1))
81+
return c
82+
}

commands/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func NewRootCmd(cli *command.DockerCli) *cobra.Command {
110110
newPSCmd(),
111111
newDFCmd(),
112112
newUnloadCmd(),
113+
newRequestsCmd(),
113114
)
114115
return rootCmd
115116
}

desktop/desktop.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,63 @@ func (c *Client) ConfigureBackend(request scheduling.ConfigureRequest) error {
657657
return nil
658658
}
659659

660+
func (c *Client) RequestsStream(modelFilter string, includeExisting bool) (io.ReadCloser, error) {
661+
path := c.modelRunner.URL(inference.InferencePrefix + "/requests/stream")
662+
var queryParams []string
663+
if modelFilter != "" {
664+
queryParams = append(queryParams, "model="+modelFilter)
665+
}
666+
if includeExisting {
667+
queryParams = append(queryParams, "include_existing=true")
668+
}
669+
if len(queryParams) > 0 {
670+
path += "?" + strings.Join(queryParams, "&")
671+
}
672+
req, err := http.NewRequest(http.MethodGet, path, nil)
673+
if err != nil {
674+
return nil, fmt.Errorf("failed to create request: %w", err)
675+
}
676+
req.Header.Set("Accept", "text/event-stream")
677+
req.Header.Set("Cache-Control", "no-cache")
678+
req.Header.Set("User-Agent", "docker-model-cli/"+Version)
679+
resp, err := c.modelRunner.Client().Do(req)
680+
if err != nil {
681+
return nil, fmt.Errorf("failed to connect to stream: %w", err)
682+
}
683+
if resp.StatusCode != http.StatusOK {
684+
resp.Body.Close()
685+
return nil, fmt.Errorf("stream request failed with status: %d", resp.StatusCode)
686+
}
687+
return resp.Body, nil
688+
}
689+
690+
func (c *Client) RequestsList(modelFilter string) (string, error) {
691+
path := inference.InferencePrefix + "/requests"
692+
if modelFilter != "" {
693+
path += "?model=" + modelFilter
694+
}
695+
resp, err := c.doRequest(http.MethodGet, path, nil)
696+
if err != nil {
697+
return "", c.handleQueryError(err, path)
698+
}
699+
defer resp.Body.Close()
700+
701+
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
702+
return "", fmt.Errorf("failed to list requests: %s", resp.Status)
703+
}
704+
705+
body, err := io.ReadAll(resp.Body)
706+
if err != nil {
707+
return "", fmt.Errorf("failed to read response body: %w", err)
708+
}
709+
710+
if resp.StatusCode == http.StatusNotFound {
711+
return "", fmt.Errorf("%s", strings.TrimSpace(string(body)))
712+
}
713+
714+
return string(body), nil
715+
}
716+
660717
// doRequest is a helper function that performs HTTP requests and handles 503 responses
661718
func (c *Client) doRequest(method, path string, body io.Reader) (*http.Response, error) {
662719
return c.doRequestWithAuth(method, path, body, "", "")

0 commit comments

Comments
 (0)