Skip to content

Commit aefc82c

Browse files
David-KreinerHarness
authored andcommitted
feat: [ML-1121]: Increase timeout for AI-devops flows and add streaming capabilities (#22)
* feat: [ML-1121]: rename params for nextgen ce * feat: [ML-1121]: Fix baseurl for CCM cost management * feat: [ML-1121]: Add CCM baseurl and secret for internal mode * feat: [ML-1121]: Fix formatting * feat: [ML-1121]: move connectors tool to own file * feat: [ML-1121]: Fix formatting and move connectors * feat: [ML-1121]: Add internal mode for AR * feat: [ML-1121]: Fix final outcome of streamed response * feat: [ML-1121]: Add poststream client and add notifcations for long-running processes * feat: [ML-1121]: Increase timeout for AI-devops flows
1 parent 6afbe15 commit aefc82c

File tree

8 files changed

+441
-164
lines changed

8 files changed

+441
-164
lines changed

client/client.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,92 @@ func (c *Client) PostRaw(
223223
return nil
224224
}
225225

226+
// PostStream is similar to Post but returns the raw http.Response for streaming
227+
func (c *Client) PostStream(
228+
ctx context.Context,
229+
path string,
230+
params map[string]string,
231+
body interface{},
232+
b ...backoff.BackOff,
233+
) (*http.Response, error) {
234+
bodyBytes, err := json.Marshal(body)
235+
if err != nil {
236+
return nil, fmt.Errorf("failed to serialize body: %w", err)
237+
}
238+
239+
return c.PostRawStream(ctx, path, params, bytes.NewBuffer(bodyBytes), nil, b...)
240+
}
241+
242+
// PostRawStream is similar to PostRaw but returns the raw http.Response for streaming
243+
func (c *Client) PostRawStream(
244+
ctx context.Context,
245+
path string,
246+
params map[string]string,
247+
body io.Reader,
248+
headers map[string]string,
249+
b ...backoff.BackOff,
250+
) (*http.Response, error) {
251+
var retryCount int
252+
var resp *http.Response
253+
254+
operation := func() error {
255+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, appendPath(c.BaseURL.String(), path), body)
256+
if err != nil {
257+
return backoff.Permanent(fmt.Errorf("unable to create HTTP request: %w", err))
258+
}
259+
260+
req.Header.Set("Content-Type", "application/json")
261+
// Add custom headers from the headers map
262+
for key, value := range headers {
263+
req.Header.Add(key, value)
264+
}
265+
addQueryParams(req, params)
266+
267+
resp, err = c.Do(req)
268+
269+
if err != nil || resp == nil {
270+
return fmt.Errorf("request failed: %w", err)
271+
}
272+
273+
if isRetryable(resp.StatusCode) {
274+
if resp.Body != nil {
275+
resp.Body.Close()
276+
}
277+
return fmt.Errorf("retryable status code: %d", resp.StatusCode)
278+
}
279+
280+
if statusErr := mapStatusCodeToError(resp.StatusCode); statusErr != nil {
281+
if resp.Body != nil {
282+
resp.Body.Close()
283+
}
284+
return backoff.Permanent(statusErr)
285+
}
286+
287+
return nil
288+
}
289+
290+
notify := func(err error, next time.Duration) {
291+
retryCount++
292+
log.Warn().
293+
Int("retry_count", retryCount).
294+
Dur("next_retry_in", next).
295+
Err(err).
296+
Msg("Retrying request due to error")
297+
}
298+
299+
if len(b) > 0 {
300+
if err := backoff.RetryNotify(operation, b[0], notify); err != nil {
301+
return nil, fmt.Errorf("request failed after %d retries: %w", retryCount, err)
302+
}
303+
} else {
304+
if err := operation(); err != nil {
305+
return nil, err
306+
}
307+
}
308+
309+
return resp, nil
310+
}
311+
226312
// Do is a wrapper of http.Client.Do that injects the auth header in the request.
227313
func (c *Client) Do(r *http.Request) (*http.Response, error) {
228314
slog.Debug("Request", "method", r.Method, "url", r.URL.String())

client/dto/genai.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,9 @@ type ServiceChatResponse struct {
6767
Response string `json:"response,omitempty"`
6868
Error string `json:"error,omitempty"`
6969
}
70+
71+
type ProgressUpdate struct {
72+
Progress int `json:"progress"` // Current progress step
73+
Total int `json:"total"` // Total number of steps
74+
Message string `json:"message,omitempty"` // Progress message
75+
}

client/genai.go

Lines changed: 140 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package client
22

33
import (
4+
"bufio"
45
"context"
6+
"encoding/json"
57
"fmt"
8+
"io"
9+
"log/slog"
10+
"net/http"
11+
"strings"
612

713
"github.com/harness/harness-mcp/client/dto"
814
)
@@ -15,28 +21,153 @@ type GenaiService struct {
1521
Client *Client
1622
}
1723

18-
func (g *GenaiService) SendAIDevOpsChat(ctx context.Context, scope dto.Scope, request *dto.ServiceChatParameters) (*dto.ServiceChatResponse, error) {
19-
path := aiDevopsChatPath
20-
params := make(map[string]string)
24+
// SendAIDevOpsChat sends a request to the AI DevOps service and returns the response.
25+
// If request.Stream is true and onProgress is provided, it will handle streaming responses.
26+
// For non-streaming requests or when onProgress is nil, it will use the standard request flow.
27+
func (g *GenaiService) SendAIDevOpsChat(ctx context.Context, scope dto.Scope, request *dto.ServiceChatParameters, onProgress ...func(progressUpdate dto.ProgressUpdate) error) (*dto.ServiceChatResponse, error) {
28+
if g == nil || g.Client == nil {
29+
return nil, fmt.Errorf("genai service or client is nil")
30+
}
31+
32+
if request == nil {
33+
return nil, fmt.Errorf("request is nil")
34+
}
35+
36+
// Validate context
37+
if ctx == nil {
38+
return nil, fmt.Errorf("context is nil")
39+
}
40+
41+
// Extract the progress callback if provided
42+
var progressCB func(progressUpdate dto.ProgressUpdate) error
43+
if len(onProgress) > 0 && onProgress[0] != nil {
44+
progressCB = onProgress[0]
45+
}
2146

47+
params := make(map[string]string)
2248
// Only add non-empty scope parameters
2349
if scope.AccountID != "" {
2450
params["accountIdentifier"] = scope.AccountID
2551
}
26-
2752
if scope.OrgID != "" {
2853
params["orgIdentifier"] = scope.OrgID
2954
}
30-
3155
if scope.ProjectID != "" {
3256
params["projectIdentifier"] = scope.ProjectID
3357
}
3458

35-
var response dto.ServiceChatResponse
36-
err := g.Client.Post(ctx, path, params, request, &response)
59+
// Handle non-streaming case with early return
60+
isStreaming := request.Stream && progressCB != nil
61+
if !isStreaming {
62+
var response dto.ServiceChatResponse
63+
err := g.Client.Post(ctx, aiDevopsChatPath, params, request, &response)
64+
if err != nil {
65+
return nil, fmt.Errorf("failed to send request to genai service: %w", err)
66+
}
67+
68+
return &response, nil
69+
}
70+
71+
// Execute the streaming request
72+
resp, err := g.Client.PostStream(ctx, aiDevopsChatPath, params, request)
73+
if err != nil {
74+
slog.Warn("Failed to execute streaming request", "error", err.Error())
75+
return nil, fmt.Errorf("failed to execute streaming request: %w", err)
76+
}
77+
defer resp.Body.Close()
78+
79+
// Check response status
80+
if resp.StatusCode != http.StatusOK {
81+
body, _ := io.ReadAll(resp.Body)
82+
return nil, fmt.Errorf("streaming request failed with status %d: %s", resp.StatusCode, string(body))
83+
}
84+
85+
// Initialize the response with conversation ID from request
86+
finalResponse := &dto.ServiceChatResponse{
87+
ConversationID: request.ConversationID,
88+
}
89+
90+
// Process the streaming response
91+
err = g.processStreamingResponse(resp.Body, finalResponse, progressCB)
3792
if err != nil {
38-
return nil, fmt.Errorf("failed to send request to genai service: %w", err)
93+
slog.Warn("Error processing streaming response", "error", err.Error())
94+
return finalResponse, fmt.Errorf("error processing streaming response: %w", err)
95+
}
96+
97+
return finalResponse, nil
98+
}
99+
100+
// processStreamingResponse handles Server-Sent Events (SSE) streaming responses
101+
// and accumulates complete events before forwarding them with appropriate event types
102+
func (g *GenaiService) processStreamingResponse(body io.ReadCloser, finalResponse *dto.ServiceChatResponse, onProgress func(dto.ProgressUpdate) error) error {
103+
scanner := bufio.NewScanner(body)
104+
var allContent, currentEvent strings.Builder
105+
inEvent := false
106+
var eventType string
107+
var eventData string
108+
109+
for scanner.Scan() {
110+
line := scanner.Text()
111+
allContent.WriteString(line + "\n")
112+
113+
switch {
114+
case strings.HasPrefix(line, "event: "):
115+
eventType = strings.TrimPrefix(line, "event: ")
116+
currentEvent.Reset()
117+
currentEvent.WriteString(line + "\n")
118+
inEvent = true
119+
120+
case strings.HasPrefix(line, "data: ") && inEvent:
121+
eventData = strings.TrimPrefix(line, "data: ")
122+
currentEvent.WriteString(line + "\n")
123+
124+
case line == "" && inEvent:
125+
// Empty line ends the event
126+
currentEvent.WriteString("\n")
127+
128+
if onProgress != nil {
129+
eventPayload := map[string]any{
130+
"type": eventType,
131+
"data": eventData,
132+
}
133+
134+
// Convert to JSON string
135+
jsonPayload, err := json.Marshal(eventPayload)
136+
if err != nil {
137+
slog.Warn("Error creating JSON payload", "error", err.Error())
138+
} else {
139+
progress := dto.ProgressUpdate{
140+
Message: string(jsonPayload),
141+
}
142+
143+
if err := onProgress(progress); err != nil {
144+
slog.Warn("Error forwarding event", "type", eventType, "error", err.Error())
145+
}
146+
}
147+
148+
if eventType == "error" {
149+
finalResponse.Error = eventData
150+
}
151+
}
152+
153+
// Reset for next event
154+
inEvent = false
155+
}
156+
}
157+
158+
// Add a header note to inform the Uber Agent that these events have already been shown to the user
159+
// TODO: move this to a prompt template for uber agent to ingest
160+
instructionNote := "NOTE TO AGENT: The SSE events below have already been streamed to the end user in real-time.\n" +
161+
"Do not repeat the full content of these events in your response.\n" +
162+
"Focus on summarizing key outcomes and providing additional context or next steps.\n" +
163+
"---\n\n"
164+
165+
finalResponse.Response = instructionNote + allContent.String()
166+
167+
if err := scanner.Err(); err != nil {
168+
slog.Warn("Error in scanner", "error", err.Error())
169+
return fmt.Errorf("error reading response stream: %w", err)
39170
}
40171

41-
return &response, nil
172+
return nil
42173
}

cmd/harness-mcp-server/config/config.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@ type Config struct {
1818
APIKey string
1919

2020
// Only used for internal mode
21-
BearerToken string
22-
PipelineSvcBaseURL string
23-
PipelineSvcSecret string
24-
NgManagerBaseURL string
25-
NgManagerSecret string
26-
ChatbotBaseURL string
27-
ChatbotSecret string
28-
GenaiBaseURL string
29-
GenaiSecret string
30-
McpSvcSecret string
21+
BearerToken string
22+
PipelineSvcBaseURL string
23+
PipelineSvcSecret string
24+
NgManagerBaseURL string
25+
NgManagerSecret string
26+
ChatbotBaseURL string
27+
ChatbotSecret string
28+
GenaiBaseURL string
29+
GenaiSecret string
30+
ArtifactRegistryBaseURL string
31+
ArtifactRegistrySecret string
32+
NextgenCEBaseURL string
33+
NextgenCESecret string
34+
McpSvcSecret string
3135
}

cmd/harness-mcp-server/main.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,16 +132,20 @@ var (
132132
Internal: true,
133133
AccountID: session.Principal.AccountID,
134134
// Internal mode specific fields
135-
BearerToken: viper.GetString("bearer_token"),
136-
PipelineSvcBaseURL: viper.GetString("pipeline_svc_base_url"),
137-
PipelineSvcSecret: viper.GetString("pipeline_svc_secret"),
138-
NgManagerBaseURL: viper.GetString("ng_manager_base_url"),
139-
NgManagerSecret: viper.GetString("ng_manager_secret"),
140-
ChatbotBaseURL: viper.GetString("chatbot_base_url"),
141-
ChatbotSecret: viper.GetString("chatbot_secret"),
142-
GenaiBaseURL: viper.GetString("genai_base_url"),
143-
GenaiSecret: viper.GetString("genai_secret"),
144-
McpSvcSecret: viper.GetString("mcp_svc_secret"),
135+
BearerToken: viper.GetString("bearer_token"),
136+
PipelineSvcBaseURL: viper.GetString("pipeline_svc_base_url"),
137+
PipelineSvcSecret: viper.GetString("pipeline_svc_secret"),
138+
NgManagerBaseURL: viper.GetString("ng_manager_base_url"),
139+
NgManagerSecret: viper.GetString("ng_manager_secret"),
140+
ChatbotBaseURL: viper.GetString("chatbot_base_url"),
141+
ChatbotSecret: viper.GetString("chatbot_secret"),
142+
GenaiBaseURL: viper.GetString("genai_base_url"),
143+
GenaiSecret: viper.GetString("genai_secret"),
144+
ArtifactRegistryBaseURL: viper.GetString("artifact_registry_base_url"),
145+
ArtifactRegistrySecret: viper.GetString("artifact_registry_secret"),
146+
NextgenCEBaseURL: viper.GetString("nextgen_ce_base_url"),
147+
NextgenCESecret: viper.GetString("nextgen_ce_secret"),
148+
McpSvcSecret: viper.GetString("mcp_svc_secret"),
145149
}
146150

147151
if err := runStdioServer(ctx, cfg); err != nil {
@@ -183,6 +187,10 @@ func init() {
183187
internalCmd.Flags().String("genai-base-url", "", "Base URL for genai service")
184188
internalCmd.Flags().String("genai-secret", "", "Secret for genai service")
185189
internalCmd.Flags().String("mcp-svc-secret", "", "Secret for MCP service")
190+
internalCmd.Flags().String("artifact-registry-base-url", "", "Base URL for artifact registry service")
191+
internalCmd.Flags().String("artifact-registry-secret", "", "Secret for artifact registry service")
192+
internalCmd.Flags().String("nextgen-ce-base-url", "", "Base URL for Nextgen CE service")
193+
internalCmd.Flags().String("nextgen-ce-secret", "", "Secret for Nextgen CE service")
186194

187195
// Bind global flags to viper
188196
_ = viper.BindPFlag("toolsets", rootCmd.PersistentFlags().Lookup("toolsets"))
@@ -207,6 +215,10 @@ func init() {
207215
_ = viper.BindPFlag("genai_base_url", internalCmd.Flags().Lookup("genai-base-url"))
208216
_ = viper.BindPFlag("genai_secret", internalCmd.Flags().Lookup("genai-secret"))
209217
_ = viper.BindPFlag("mcp_svc_secret", internalCmd.Flags().Lookup("mcp-svc-secret"))
218+
_ = viper.BindPFlag("artifact_registry_base_url", internalCmd.Flags().Lookup("artifact-registry-base-url"))
219+
_ = viper.BindPFlag("artifact_registry_secret", internalCmd.Flags().Lookup("artifact-registry-secret"))
220+
_ = viper.BindPFlag("nextgen_ce_base_url", internalCmd.Flags().Lookup("nextgen-ce-base-url"))
221+
_ = viper.BindPFlag("nextgen_ce_secret", internalCmd.Flags().Lookup("nextgen-ce-secret"))
210222

211223
// Add subcommands
212224
rootCmd.AddCommand(stdioCmd)

0 commit comments

Comments
 (0)