-
Notifications
You must be signed in to change notification settings - Fork 101
Add durable conversation resumability to the Antigravity Interactions harness #180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,25 +30,30 @@ package harness | |
| // no executor is configured, no third-party tools are advertised. | ||
| // | ||
| // Neither kind of tool call is surfaced to the caller: Run drives the whole | ||
| // interaction to completion (initial turn -> resume -> resume -> ... -> final | ||
| // answer) and only forwards the agent's text output via Handler.OnMessage. | ||
| // interaction loop to completion (initial turn -> continuation turn -> ... -> | ||
| // final answer) and only forwards the agent's text output via Handler.OnMessage. | ||
| // Each turn after the first is chained to the previous one via the Interactions | ||
| // API's previous_interaction_id; this is an implementation detail of the loop, | ||
| // not an AX resume. | ||
| // | ||
| // Queue carries human input only -- the initial prompt and, in the future, | ||
| // "steering" messages injected mid-run. It never carries tool results (the | ||
| // harness produces those itself). Queued input is drained at each interaction | ||
| // gap (every resume point), which is the only place the harness can influence an | ||
| // otherwise atomic interaction. | ||
| // Queue carries human input only -- the initial prompt and "steering" messages | ||
| // injected mid-run. It never carries tool results (the harness produces those | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we keep the original comment? iiuc " and "steering" messages injected mid-run." is not supported yet |
||
| // itself). Queued input is drained at each interaction gap, which is the only | ||
| // place the harness can influence an otherwise atomic interaction. | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "math/rand" | ||
| "net/http" | ||
| "os" | ||
| "sort" | ||
| "strconv" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
@@ -91,7 +96,7 @@ type AntigravityInteractionsConfig struct { | |
| Agent string | ||
| // SystemInstruction, if set, is sent as the interaction's system_instruction | ||
| // (a free-form system prompt prepended to the agent's own instructions). It | ||
| // is sent on every turn so it persists across resumes. | ||
| // is sent on every turn of the interaction loop so it persists across them. | ||
| SystemInstruction string | ||
| // MaxTurns caps the number of interaction turns the harness will drive within | ||
| // a single Run before giving up. Defaults to 100. | ||
|
|
@@ -115,6 +120,14 @@ type AntigravityInteractionsConfig struct { | |
| // HTTPClient overrides the HTTP client. If nil, a default client with a long | ||
| // timeout is used. | ||
| HTTPClient *http.Client | ||
|
|
||
| // StateDir is the directory where each conversation's resume cursor is | ||
| // persisted, so a conversation can resume after a restart. It is required: | ||
| // NewAntigravityInteractionsHarness returns an error if it is empty. | ||
| // Correctness relies on a single writer per conversation (the controller | ||
| // guarantees at most one Execution per conversation), so writes are | ||
| // last-write-wins. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the controller doesn't enforces this yet. can we phrase this as an expectation/requirement of the caller instead of "the controller guarantees" |
||
| StateDir string | ||
| } | ||
|
|
||
| func (c *AntigravityInteractionsConfig) withDefaults() { | ||
|
|
@@ -143,6 +156,11 @@ type AntigravityInteractionsHarness struct { | |
| cfg AntigravityInteractionsConfig | ||
| httpClient *http.Client | ||
|
|
||
| // cursors persists each conversation's resume cursor to disk so a conversation | ||
| // can resume after a restart. It is always non-nil (the constructor requires a | ||
| // usable state directory). | ||
| cursors *cursorStore | ||
|
|
||
| // tsOnce guards lazy initialization of ts, the resolved OAuth2 token source. | ||
| // It is resolved on first use (rather than in the constructor) so credential | ||
| // errors surface to the caller of Run instead of at construction time. | ||
|
|
@@ -152,24 +170,49 @@ type AntigravityInteractionsHarness struct { | |
| } | ||
|
|
||
| // NewAntigravityInteractionsHarness creates a harness from the given config, | ||
| // filling in defaults for unset fields. | ||
| func NewAntigravityInteractionsHarness(cfg AntigravityInteractionsConfig) *AntigravityInteractionsHarness { | ||
| // filling in defaults for unset fields. It returns an error if cfg.StateDir is | ||
| // empty or the cursor store cannot be created: resume-cursor persistence is | ||
| // required, so a usable state directory must be provided. | ||
| func NewAntigravityInteractionsHarness(cfg AntigravityInteractionsConfig) (*AntigravityInteractionsHarness, error) { | ||
| cfg.withDefaults() | ||
| hc := cfg.HTTPClient | ||
| if hc == nil { | ||
| hc = &http.Client{Timeout: 10 * time.Minute} | ||
| } | ||
| return &AntigravityInteractionsHarness{cfg: cfg, httpClient: hc} | ||
| if cfg.StateDir == "" { | ||
| return nil, errors.New("AntigravityInteractionsConfig.StateDir must be set") | ||
| } | ||
| cursors, err := newCursorStore(cfg.StateDir) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating cursor store: %w", err) | ||
| } | ||
| return &AntigravityInteractionsHarness{cfg: cfg, httpClient: hc, cursors: cursors}, nil | ||
| } | ||
|
|
||
| // Start implements Harness.Start. | ||
| // Start implements Harness.Start. It loads any previously persisted resume | ||
| // cursor for conversationID so the returned Execution resumes the existing | ||
| // interaction chain instead of starting a new one. | ||
| func (h *AntigravityInteractionsHarness) Start(ctx context.Context, conversationID string, harnessConfig []byte) (Execution, error) { | ||
| return &antigravityInteractionsExecution{ | ||
| e := &antigravityInteractionsExecution{ | ||
| harness: h, | ||
| conversationID: conversationID, | ||
| id: uuid.NewString(), | ||
| harnessConfig: harnessConfig, | ||
| }, nil | ||
| } | ||
|
|
||
| cur, found, err := h.cursors.load(conversationID) | ||
| if err != nil { | ||
| // A real load failure must not be silently treated as "new", which would | ||
| // lose an existing conversation's history. | ||
| return nil, fmt.Errorf("loading resume cursor for %q: %w", conversationID, err) | ||
| } | ||
| if found { | ||
| // A persisted cursor is only written after a successful turn, so a | ||
| // non-empty interaction id means the conversation has already started; | ||
| // the first-turn check in Run derives that from prevInteractionID. | ||
| e.prevInteractionID = cur.PrevInteractionID | ||
| } | ||
| return e, nil | ||
| } | ||
|
|
||
| // antigravityInteractionsExecution implements Execution. It is long-lived | ||
|
|
@@ -186,10 +229,10 @@ type antigravityInteractionsExecution struct { | |
| queued []*proto.Message | ||
| closed bool | ||
|
|
||
| // started is false until the initial turn has been sent. | ||
| started bool | ||
| // prevInteractionID chains resume turns (the interaction chain this Execution | ||
| // owns). | ||
| // prevInteractionID chains the turns of the interaction loop (the interaction | ||
| // chain this Execution owns), and is the value persisted for cross-Execution | ||
| // resume. It is empty until the first turn completes successfully; an empty | ||
| // value therefore means "no turn has succeeded yet" (the first turn). | ||
| prevInteractionID string | ||
| } | ||
|
|
||
|
|
@@ -229,13 +272,22 @@ func (e *antigravityInteractionsExecution) drainQueue() []any { | |
| return messagesToInputSteps(msgs) | ||
| } | ||
|
|
||
| func (e *antigravityInteractionsExecution) setPrevID(id string) { | ||
| // setPrevID records the latest interaction id (in memory) and durably persists | ||
| // the resume cursor so the conversation can resume after a restart. A persistence | ||
| // failure is returned to the caller so the run can decide how to handle it rather | ||
| // than silently losing durability. | ||
| func (e *antigravityInteractionsExecution) setPrevID(ctx context.Context, id string) error { | ||
| if id == "" { | ||
| return | ||
| return nil | ||
| } | ||
| e.mu.Lock() | ||
| e.prevInteractionID = id | ||
| e.mu.Unlock() | ||
|
|
||
| if err := e.harness.cursors.save(e.conversationID, resumeCursor{PrevInteractionID: id}); err != nil { | ||
| return fmt.Errorf("persisting resume cursor: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Run implements Execution.Run. It drives the interaction to completion, | ||
|
|
@@ -252,7 +304,6 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand | |
| e.mu.Unlock() | ||
| return fmt.Errorf("execution session already closed") | ||
| } | ||
| started := e.started | ||
| prevID := e.prevInteractionID | ||
| e.mu.Unlock() | ||
|
|
||
|
|
@@ -262,16 +313,15 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand | |
| } | ||
|
|
||
| // Initial input for this Run: drain whatever is queued (the prompt on the | ||
| // first Run, or steering input on later Runs). | ||
| // first Run, or steering input on later Runs). An empty prevID means no turn | ||
| // has completed yet, so this is the conversation's first turn. (A first turn | ||
| // that failed leaves prevID empty, so a retried Run is correctly treated as | ||
| // the first turn again.) | ||
| input := e.drainQueue() | ||
| if !started { | ||
| if len(input) == 0 { | ||
| if len(input) == 0 { | ||
| if prevID == "" { | ||
| return fmt.Errorf("no input messages queued for the initial turn") | ||
| } | ||
| e.mu.Lock() | ||
| e.started = true | ||
| e.mu.Unlock() | ||
| } else if len(input) == 0 { | ||
| return fmt.Errorf("Run called with no queued input and no work pending") | ||
| } | ||
|
|
||
|
|
@@ -280,7 +330,9 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand | |
| return fmt.Errorf("interaction turn failed: %w", err) | ||
| } | ||
| prevID = res.interactionID | ||
| e.setPrevID(prevID) | ||
| if err := e.setPrevID(ctx, prevID); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for turn := 0; turn < e.harness.cfg.MaxTurns; turn++ { | ||
| e.harness.debugTurn(e.conversationID, turn+1, len(res.toolCalls)) | ||
|
|
@@ -315,10 +367,12 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand | |
|
|
||
| res, err = e.harness.postTurn(ctx, token, e.harness.newRequest(next, prevID)) | ||
| if err != nil { | ||
| return fmt.Errorf("resume turn failed: %w", err) | ||
| return fmt.Errorf("continuation turn failed: %w", err) | ||
| } | ||
| prevID = res.interactionID | ||
| e.setPrevID(prevID) | ||
| if err := e.setPrevID(ctx, prevID); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // Hit the turn cap while still driving tools. | ||
|
|
@@ -369,7 +423,7 @@ type userInputStep struct { | |
| Content []textPart `json:"content"` | ||
| } | ||
|
|
||
| // toolResultStep is one tool result returned on a resume turn: a flat Step with | ||
| // toolResultStep is one tool result returned on a continuation turn: a flat Step with | ||
| // "type":"function_result", the matching call_id, the tool name, and the result | ||
| // object under "result". | ||
| type toolResultStep struct { | ||
|
|
@@ -519,6 +573,14 @@ func (h *AntigravityInteractionsHarness) interactionsURL() string { | |
| interactionsEndpoint, interactionsAPIVersion, cloudProject(), cloudLocation()) | ||
| } | ||
|
|
||
| // Endpoint returns the Interactions API base endpoint this harness targets | ||
| // (e.g. the prod or autopush host). Exposed so callers/tools can log which | ||
| // backend is in use, since the endpoint is a compile-time constant. | ||
| func Endpoint() string { return interactionsEndpoint } | ||
|
|
||
| // APIVersion returns the Interactions API version this harness targets. | ||
| func APIVersion() string { return interactionsAPIVersion } | ||
|
|
||
| // token returns a bearer access token from the harness's OAuth2 token source. | ||
| // The source is resolved once (lazily) and auto-refreshes thereafter. | ||
| func (h *AntigravityInteractionsHarness) token(ctx context.Context) (string, error) { | ||
|
|
@@ -552,7 +614,7 @@ func newTokenSource(ctx context.Context) (oauth2.TokenSource, error) { | |
| // newRequest builds an interactionRequest common to every turn. The environment | ||
| // is always the client-side ("local") environment -- this harness exists to | ||
| // execute the agent's built-in env tools locally. Tools are re-declared on every | ||
| // turn so they stay known to the agent across resumes. | ||
| // turn so they stay known to the agent across the turns of the interaction loop. | ||
| func (h *AntigravityInteractionsHarness) newRequest(input []any, previousID string) interactionRequest { | ||
| var tools []FunctionTool | ||
| if h.cfg.ThirdPartyExecutor != nil { | ||
|
|
@@ -571,8 +633,93 @@ func (h *AntigravityInteractionsHarness) newRequest(input []any, previousID stri | |
| } | ||
| } | ||
|
|
||
| // postTurn POSTs the request and streams the SSE response for one turn. | ||
| // retryableHTTPError marks a transient HTTP response that should be retried. | ||
| // retryAfter is the server-suggested delay (parsed from the Retry-After | ||
| // header), or 0 if none was provided. | ||
| type retryableHTTPError struct { | ||
| status int | ||
| retryAfter time.Duration | ||
| body string | ||
| } | ||
|
|
||
| func (e *retryableHTTPError) Error() string { | ||
| return fmt.Sprintf("HTTP %d: %s", e.status, e.body) | ||
| } | ||
|
|
||
| // Retry tuning for transient rate-limit (429) responses. The stateful | ||
| // interaction quota is per-minute, so the backoff must be able to span ~a | ||
| // minute; with these values the cumulative wait reaches ~1-2 minutes before | ||
| // giving up. | ||
| const ( | ||
| rateLimitMaxRetries = 6 | ||
| rateLimitBaseDelay = 2 * time.Second | ||
| rateLimitMaxDelay = 32 * time.Second | ||
| ) | ||
|
|
||
| // postTurn POSTs one turn, retrying on HTTP 429 (rate limit / quota) with | ||
| // exponential backoff and jitter, honoring a Retry-After header when present. | ||
| // Creating an interaction is not idempotent, so only 429 -- which is rejected | ||
| // before any interaction is created -- is retried here. | ||
| func (h *AntigravityInteractionsHarness) postTurn(ctx context.Context, token string, reqBody interactionRequest) (*turnResult, error) { | ||
| delay := rateLimitBaseDelay | ||
| for attempt := 0; ; attempt++ { | ||
| res, err := h.postTurnOnce(ctx, token, reqBody) | ||
| if err == nil { | ||
| return res, nil | ||
| } | ||
| var re *retryableHTTPError | ||
| if !errors.As(err, &re) || attempt >= rateLimitMaxRetries { | ||
| return nil, err | ||
| } | ||
|
|
||
| wait := re.retryAfter | ||
| if wait <= 0 { | ||
| wait = backoffWithJitter(delay) | ||
| } | ||
| fmt.Fprintf(os.Stderr, "[harness] rate limited (HTTP %d); retrying in %s (attempt %d/%d)\n", | ||
| re.status, wait.Round(time.Millisecond), attempt+1, rateLimitMaxRetries) | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| case <-time.After(wait): | ||
| } | ||
| if delay *= 2; delay > rateLimitMaxDelay { | ||
| delay = rateLimitMaxDelay | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // backoffWithJitter returns a randomized delay in [delay/2, delay] to avoid | ||
| // synchronized retries. | ||
| func backoffWithJitter(delay time.Duration) time.Duration { | ||
| half := delay / 2 | ||
| return half + time.Duration(rand.Int63n(int64(half)+1)) | ||
| } | ||
|
|
||
| // parseRetryAfter parses a Retry-After header value, which may be either an | ||
| // integer number of seconds or an HTTP date. Returns 0 if absent or unparseable. | ||
| func parseRetryAfter(v string) time.Duration { | ||
| v = strings.TrimSpace(v) | ||
| if v == "" { | ||
| return 0 | ||
| } | ||
| if secs, err := strconv.Atoi(v); err == nil { | ||
| if secs <= 0 { | ||
| return 0 | ||
| } | ||
| return time.Duration(secs) * time.Second | ||
| } | ||
| if t, err := http.ParseTime(v); err == nil { | ||
| if d := time.Until(t); d > 0 { | ||
| return d | ||
| } | ||
| } | ||
| return 0 | ||
| } | ||
|
|
||
| // postTurnOnce POSTs the request and streams the SSE response for one turn. | ||
| func (h *AntigravityInteractionsHarness) postTurnOnce(ctx context.Context, token string, reqBody interactionRequest) (*turnResult, error) { | ||
| body, err := json.Marshal(reqBody) | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -595,7 +742,17 @@ func (h *AntigravityInteractionsHarness) postTurn(ctx context.Context, token str | |
| if _, err := b.ReadFrom(resp.Body); err != nil { | ||
| return nil, fmt.Errorf("HTTP %d (failed to read error body: %v)", resp.StatusCode, err) | ||
| } | ||
| return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, b.String()) | ||
| msg := b.String() | ||
| // 429 means the request was rejected by quota before any interaction was | ||
| // created, so it is safe to retry (unlike a partial 5xx). Mark it. | ||
| if resp.StatusCode == http.StatusTooManyRequests { | ||
| return nil, &retryableHTTPError{ | ||
| status: resp.StatusCode, | ||
| retryAfter: parseRetryAfter(resp.Header.Get("Retry-After")), | ||
| body: msg, | ||
| } | ||
| } | ||
| return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, msg) | ||
| } | ||
| return h.parseStreamedTurn(resp.Body) | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.