From bc39172f33ce841f3b59bdf0d586054413e8b7a9 Mon Sep 17 00:00:00 2001 From: zbl94 Date: Fri, 26 Jun 2026 00:33:38 +0000 Subject: [PATCH 1/2] Add durable conversation resumability to the Antigravity Interactions harness Introduce a small durable key-value store and use it to persist each conversation's interaction-chain cursor (the last interaction id), so a conversation can resume after a process restart instead of starting a new chain. - internal/storage: a minimal Store interface (Get/Put/Delete + ErrNotFound) with documented semantics (atomicity, read-after-write, not-found-vs-error, durability) and a single-writer concurrency model. Includes a filesystem implementation (FileStore) that writes atomically via temp-file + rename. - harness: add AntigravityInteractionsConfig.StateStore; Start loads any persisted cursor and Run persists it after each successful turn, so a fresh Execution for the same conversation continues the existing interaction chain. - harness: document the single-writer-per-conversation expectation on the Harness interface (the controller guarantees it), which is what makes the last-write-wins store correct. Also includes related harness improvements: - Retry HTTP 429 (rate limit) with exponential backoff + jitter, honoring Retry-After; only 429 is retried since it is rejected before any interaction is created. - Terminology cleanup: the within-Run FC/FR loop is the "interaction loop" (continuation turns chained via previous_interaction_id), distinct from an AX-level resume. --- internal/harness/antigravityinteractions.go | 240 +++++++++++++++--- .../harness/antigravityinteractions_tools.go | 13 +- internal/harness/harness.go | 7 + internal/storage/filestore.go | 127 +++++++++ internal/storage/filestore_test.go | 107 ++++++++ internal/storage/storage.go | 76 ++++++ 6 files changed, 532 insertions(+), 38 deletions(-) create mode 100644 internal/storage/filestore.go create mode 100644 internal/storage/filestore_test.go create mode 100644 internal/storage/storage.go diff --git a/internal/harness/antigravityinteractions.go b/internal/harness/antigravityinteractions.go index 746dc4fa..fa7014be 100644 --- a/internal/harness/antigravityinteractions.go +++ b/internal/harness/antigravityinteractions.go @@ -30,29 +30,35 @@ package harness // no executor is configured, no third-party tools are advertised. // // Neither kind of tool call is surfaced to the caller: Run drives the whole -// interaction to completion (initial turn -> resume -> resume -> ... -> final -// answer) and only forwards the agent's text output via Handler.OnMessage. +// interaction loop to completion (initial turn -> continuation turn -> ... -> +// final answer) and only forwards the agent's text output via Handler.OnMessage. +// Each turn after the first is chained to the previous one via the Interactions +// API's previous_interaction_id; this is an implementation detail of the loop, +// not an AX resume. // -// Queue carries human input only -- the initial prompt and, in the future, -// "steering" messages injected mid-run. It never carries tool results (the -// harness produces those itself). Queued input is drained at each interaction -// gap (every resume point), which is the only place the harness can influence an -// otherwise atomic interaction. +// Queue carries human input only -- the initial prompt and "steering" messages +// injected mid-run. It never carries tool results (the harness produces those +// itself). Queued input is drained at each interaction gap, which is the only +// place the harness can influence an otherwise atomic interaction. import ( "bufio" "bytes" "context" "encoding/json" + "errors" "fmt" "io" + "math/rand" "net/http" "os" "sort" + "strconv" "strings" "sync" "time" + "github.com/google/ax/internal/storage" "github.com/google/ax/proto" "github.com/google/uuid" "golang.org/x/oauth2" @@ -91,7 +97,7 @@ type AntigravityInteractionsConfig struct { Agent string // SystemInstruction, if set, is sent as the interaction's system_instruction // (a free-form system prompt prepended to the agent's own instructions). It - // is sent on every turn so it persists across resumes. + // is sent on every turn of the interaction loop so it persists across them. SystemInstruction string // MaxTurns caps the number of interaction turns the harness will drive within // a single Run before giving up. Defaults to 100. @@ -115,6 +121,15 @@ type AntigravityInteractionsConfig struct { // HTTPClient overrides the HTTP client. If nil, a default client with a long // timeout is used. HTTPClient *http.Client + + // StateStore durably persists each conversation's resume cursor (the last + // interaction id) so a conversation can resume after a process restart or on + // a different replica. If nil, that cursor is kept in memory only (on the + // Execution) and is lost when the Execution is discarded or the process + // exits -- a later Start for the same conversation id then begins a new + // interaction chain (empty previous_interaction_id) instead of continuing the + // existing one. + StateStore storage.Store } func (c *AntigravityInteractionsConfig) withDefaults() { @@ -162,13 +177,54 @@ func NewAntigravityInteractionsHarness(cfg AntigravityInteractionsConfig) *Antig return &AntigravityInteractionsHarness{cfg: cfg, httpClient: hc} } -// Start implements Harness.Start. +// resumeCursor is the small per-conversation state persisted to StateStore so a +// conversation can resume across restarts/replicas. It records the tail of the +// server-side interaction chain (PrevInteractionID). The cursor is only ever +// persisted after a turn completes successfully, so a persisted cursor always +// has a non-empty PrevInteractionID; "the conversation has started" is therefore +// derived as PrevInteractionID != "" rather than stored separately. +type resumeCursor struct { + PrevInteractionID string `json:"prev_interaction_id"` +} + +// stateStoreKey is the StateStore key for a conversation's resume cursor. +func stateStoreKey(conversationID string) string { + return "antigravity-interactions/cursor/" + conversationID +} + +// Start implements Harness.Start. If a StateStore is configured, it loads any +// previously persisted resume cursor for conversationID so the returned +// Execution resumes the existing interaction chain instead of starting a new +// one. func (h *AntigravityInteractionsHarness) Start(ctx context.Context, conversationID string) (Execution, error) { - return &antigravityInteractionsExecution{ + e := &antigravityInteractionsExecution{ harness: h, conversationID: conversationID, id: uuid.NewString(), - }, nil + } + if h.cfg.StateStore == nil { + return e, nil + } + + value, err := h.cfg.StateStore.Get(ctx, stateStoreKey(conversationID)) + switch { + case errors.Is(err, storage.ErrNotFound): + // No prior state: a brand-new conversation. + case err != nil: + // A real storage failure must not be silently treated as "new", which + // would lose an existing conversation's history. + return nil, fmt.Errorf("loading resume cursor for %q: %w", conversationID, err) + default: + var cur resumeCursor + if err := json.Unmarshal(value, &cur); err != nil { + return nil, fmt.Errorf("decoding resume cursor for %q: %w", conversationID, err) + } + // A persisted cursor is only written after a successful turn, so a + // non-empty interaction id means the conversation has already started; + // the first-turn check in Run derives that from prevInteractionID. + e.prevInteractionID = cur.PrevInteractionID + } + return e, nil } // antigravityInteractionsExecution implements Execution. It is long-lived @@ -184,10 +240,10 @@ type antigravityInteractionsExecution struct { queued []*proto.Message closed bool - // started is false until the initial turn has been sent. - started bool - // prevInteractionID chains resume turns (the interaction chain this Execution - // owns). + // prevInteractionID chains the turns of the interaction loop (the interaction + // chain this Execution owns), and is the value persisted for cross-Execution + // resume. It is empty until the first turn completes successfully; an empty + // value therefore means "no turn has succeeded yet" (the first turn). prevInteractionID string } @@ -227,13 +283,36 @@ func (e *antigravityInteractionsExecution) drainQueue() []any { return messagesToInputSteps(msgs) } -func (e *antigravityInteractionsExecution) setPrevID(id string) { +// setPrevID records the latest interaction id (in memory) and, if a StateStore +// is configured, durably persists the resume cursor so the conversation can +// resume after a restart. A persistence failure is returned to the caller so +// the run can decide how to handle it rather than silently losing durability. +func (e *antigravityInteractionsExecution) setPrevID(ctx context.Context, id string) error { if id == "" { - return + return nil } e.mu.Lock() e.prevInteractionID = id e.mu.Unlock() + + if e.harness.cfg.StateStore == nil { + return nil + } + return e.persistCursor(ctx, resumeCursor{PrevInteractionID: id}) +} + +// persistCursor writes the resume cursor to the StateStore. The store is +// last-write-wins; correctness relies on the controller guaranteeing a single +// Execution (writer) per conversation (see the Harness interface). +func (e *antigravityInteractionsExecution) persistCursor(ctx context.Context, cur resumeCursor) error { + blob, err := json.Marshal(cur) + if err != nil { + return fmt.Errorf("encoding resume cursor: %w", err) + } + if err := e.harness.cfg.StateStore.Put(ctx, stateStoreKey(e.conversationID), blob); err != nil { + return fmt.Errorf("persisting resume cursor: %w", err) + } + return nil } // Run implements Execution.Run. It drives the interaction to completion, @@ -250,7 +329,6 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand e.mu.Unlock() return fmt.Errorf("execution session already closed") } - started := e.started prevID := e.prevInteractionID e.mu.Unlock() @@ -260,16 +338,15 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand } // Initial input for this Run: drain whatever is queued (the prompt on the - // first Run, or steering input on later Runs). + // first Run, or steering input on later Runs). An empty prevID means no turn + // has completed yet, so this is the conversation's first turn. (A first turn + // that failed leaves prevID empty, so a retried Run is correctly treated as + // the first turn again.) input := e.drainQueue() - if !started { - if len(input) == 0 { + if len(input) == 0 { + if prevID == "" { return fmt.Errorf("no input messages queued for the initial turn") } - e.mu.Lock() - e.started = true - e.mu.Unlock() - } else if len(input) == 0 { return fmt.Errorf("Run called with no queued input and no work pending") } @@ -278,7 +355,9 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand return fmt.Errorf("interaction turn failed: %w", err) } prevID = res.interactionID - e.setPrevID(prevID) + if err := e.setPrevID(ctx, prevID); err != nil { + return err + } for turn := 0; turn < e.harness.cfg.MaxTurns; turn++ { e.harness.debugTurn(e.conversationID, turn+1, len(res.toolCalls)) @@ -313,10 +392,12 @@ func (e *antigravityInteractionsExecution) Run(ctx context.Context, handler Hand res, err = e.harness.postTurn(ctx, token, e.harness.newRequest(next, prevID)) if err != nil { - return fmt.Errorf("resume turn failed: %w", err) + return fmt.Errorf("continuation turn failed: %w", err) } prevID = res.interactionID - e.setPrevID(prevID) + if err := e.setPrevID(ctx, prevID); err != nil { + return err + } } // Hit the turn cap while still driving tools. @@ -367,7 +448,7 @@ type userInputStep struct { Content []textPart `json:"content"` } -// toolResultStep is one tool result returned on a resume turn: a flat Step with +// toolResultStep is one tool result returned on a continuation turn: a flat Step with // "type":"function_result", the matching call_id, the tool name, and the result // object under "result". type toolResultStep struct { @@ -550,7 +631,7 @@ func newTokenSource(ctx context.Context) (oauth2.TokenSource, error) { // newRequest builds an interactionRequest common to every turn. The environment // is always the client-side ("local") environment -- this harness exists to // execute the agent's built-in env tools locally. Tools are re-declared on every -// turn so they stay known to the agent across resumes. +// turn so they stay known to the agent across the turns of the interaction loop. func (h *AntigravityInteractionsHarness) newRequest(input []any, previousID string) interactionRequest { var tools []FunctionTool if h.cfg.ThirdPartyExecutor != nil { @@ -569,8 +650,93 @@ func (h *AntigravityInteractionsHarness) newRequest(input []any, previousID stri } } -// postTurn POSTs the request and streams the SSE response for one turn. +// retryableHTTPError marks a transient HTTP response that should be retried. +// retryAfter is the server-suggested delay (parsed from the Retry-After +// header), or 0 if none was provided. +type retryableHTTPError struct { + status int + retryAfter time.Duration + body string +} + +func (e *retryableHTTPError) Error() string { + return fmt.Sprintf("HTTP %d: %s", e.status, e.body) +} + +// Retry tuning for transient rate-limit (429) responses. The stateful +// interaction quota is per-minute, so the backoff must be able to span ~a +// minute; with these values the cumulative wait reaches ~1-2 minutes before +// giving up. +const ( + rateLimitMaxRetries = 6 + rateLimitBaseDelay = 2 * time.Second + rateLimitMaxDelay = 32 * time.Second +) + +// postTurn POSTs one turn, retrying on HTTP 429 (rate limit / quota) with +// exponential backoff and jitter, honoring a Retry-After header when present. +// Creating an interaction is not idempotent, so only 429 -- which is rejected +// before any interaction is created -- is retried here. func (h *AntigravityInteractionsHarness) postTurn(ctx context.Context, token string, reqBody interactionRequest) (*turnResult, error) { + delay := rateLimitBaseDelay + for attempt := 0; ; attempt++ { + res, err := h.postTurnOnce(ctx, token, reqBody) + if err == nil { + return res, nil + } + var re *retryableHTTPError + if !errors.As(err, &re) || attempt >= rateLimitMaxRetries { + return nil, err + } + + wait := re.retryAfter + if wait <= 0 { + wait = backoffWithJitter(delay) + } + fmt.Fprintf(os.Stderr, "[harness] rate limited (HTTP %d); retrying in %s (attempt %d/%d)\n", + re.status, wait.Round(time.Millisecond), attempt+1, rateLimitMaxRetries) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(wait): + } + if delay *= 2; delay > rateLimitMaxDelay { + delay = rateLimitMaxDelay + } + } +} + +// backoffWithJitter returns a randomized delay in [delay/2, delay] to avoid +// synchronized retries. +func backoffWithJitter(delay time.Duration) time.Duration { + half := delay / 2 + return half + time.Duration(rand.Int63n(int64(half)+1)) +} + +// parseRetryAfter parses a Retry-After header value, which may be either an +// integer number of seconds or an HTTP date. Returns 0 if absent or unparseable. +func parseRetryAfter(v string) time.Duration { + v = strings.TrimSpace(v) + if v == "" { + return 0 + } + if secs, err := strconv.Atoi(v); err == nil { + if secs <= 0 { + return 0 + } + return time.Duration(secs) * time.Second + } + if t, err := http.ParseTime(v); err == nil { + if d := time.Until(t); d > 0 { + return d + } + } + return 0 +} + +// postTurnOnce POSTs the request and streams the SSE response for one turn. +func (h *AntigravityInteractionsHarness) postTurnOnce(ctx context.Context, token string, reqBody interactionRequest) (*turnResult, error) { body, err := json.Marshal(reqBody) if err != nil { return nil, err @@ -593,7 +759,17 @@ func (h *AntigravityInteractionsHarness) postTurn(ctx context.Context, token str if _, err := b.ReadFrom(resp.Body); err != nil { return nil, fmt.Errorf("HTTP %d (failed to read error body: %v)", resp.StatusCode, err) } - return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, b.String()) + msg := b.String() + // 429 means the request was rejected by quota before any interaction was + // created, so it is safe to retry (unlike a partial 5xx). Mark it. + if resp.StatusCode == http.StatusTooManyRequests { + return nil, &retryableHTTPError{ + status: resp.StatusCode, + retryAfter: parseRetryAfter(resp.Header.Get("Retry-After")), + body: msg, + } + } + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, msg) } return h.parseStreamedTurn(resp.Body) } diff --git a/internal/harness/antigravityinteractions_tools.go b/internal/harness/antigravityinteractions_tools.go index e565b71e..d45ecc00 100644 --- a/internal/harness/antigravityinteractions_tools.go +++ b/internal/harness/antigravityinteractions_tools.go @@ -29,13 +29,14 @@ import ( // send back as a function_result step. The built-in environment tools enumerated // in the switch below (file reads, command execution, and the file-mutation // family) are executed internally against the local filesystem/shell; every -// other tool is dispatched to the configured ThirdPartyExecutor. All execution -// is internal to the harness -- no tool call is surfaced to the caller. +// other tool is dispatched to the configured ThirdPartyExecutor, or, if none is +// configured, returned as an error result. All execution is internal to the +// harness -- no tool call is surfaced to the caller. // // Argument names match the agent's tool schema (PascalCase). Mutation tools -// (move/delete_dir/file_change family) require no success payload -- on success -// they return an empty result; on failure they return {"error": }, -// which marks the step as failed. +// (move, delete_dir, create_file, edit_file, multi_edit_file, delete_file) +// require no success payload -- on success they return an empty result; on +// failure they return {"error": }, which marks the step as failed. func (h *AntigravityInteractionsHarness) executeTool(ctx context.Context, call capturedToolCall) any { switch call.name { case "view_file": @@ -70,7 +71,7 @@ func (h *AntigravityInteractionsHarness) executeTool(ctx context.Context, call c // These run against the actual local filesystem/shell because they ARE the // client-side environment. Argument names follow the agent's tool schema // (view_file -> AbsolutePath, run_command -> CommandLine, list_dir -> -// DirectoryPath), and result field names follow what the proxy maps back into +// DirectoryPath), and result field names follow what the server maps back into // the step's own output (content, Output/ExitCode, results). // --------------------------------------------------------------------------- diff --git a/internal/harness/harness.go b/internal/harness/harness.go index 587365be..6a9a3c38 100644 --- a/internal/harness/harness.go +++ b/internal/harness/harness.go @@ -32,6 +32,13 @@ type Handler interface { } // Harness represents a service capable of starting execution sessions. +// +// Single-writer expectation: the controller must ensure that at most one +// Execution exists per conversation id at a time. Harness implementations rely +// on this invariant -- for example, a harness that durably persists +// per-conversation state may use a last-write-wins store without +// compare-and-swap, which is correct only because there is a single writer per +// conversation. type Harness interface { // Start initializes a new Execution session for a conversation. Start(ctx context.Context, conversationID string) (Execution, error) diff --git a/internal/storage/filestore.go b/internal/storage/filestore.go new file mode 100644 index 00000000..0f8a4e7d --- /dev/null +++ b/internal/storage/filestore.go @@ -0,0 +1,127 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "os" + "path/filepath" +) + +// FileStore is a minimal filesystem-backed Store. Each key maps to one file +// under a root directory whose contents are the value. +// +// Scope and limitations: +// +// - Durability across restarts: yes (files persist). +// - Sharing across replicas: only if the root directory is itself shared and +// consistent across them (e.g. a network filesystem). On purely local disk, +// FileStore is single-node and is intended as the default for local and +// single-replica use; use a managed backend for multi-replica deployments. +// - Atomicity: writes go to a temp file and are atomically renamed into place, +// so a reader never observes a torn value. +// +// FileStore assumes a single writer per key (see the package doc); it does not +// take OS file locks or provide compare-and-swap. +type FileStore struct { + root string +} + +var _ Store = (*FileStore)(nil) + +// NewFileStore creates a FileStore rooted at dir, creating the directory (and +// parents) if needed. +func NewFileStore(dir string) (*FileStore, error) { + if dir == "" { + return nil, errors.New("storage: FileStore dir must not be empty") + } + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("storage: creating root %q: %w", dir, err) + } + return &FileStore{root: dir}, nil +} + +// path maps a key to its file path. The key is hashed so arbitrary key strings +// (slashes, etc.) map to a safe, fixed-length filename. +func (s *FileStore) path(key string) string { + sum := sha256.Sum256([]byte(key)) + return filepath.Join(s.root, hex.EncodeToString(sum[:])+".val") +} + +// Get implements Store.Get. +func (s *FileStore) Get(ctx context.Context, key string) ([]byte, error) { + if key == "" { + return nil, errors.New("storage: empty key") + } + value, err := os.ReadFile(s.path(key)) + if errors.Is(err, os.ErrNotExist) { + return nil, ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("storage: reading key: %w", err) + } + return value, nil +} + +// Put implements Store.Put (last-write-wins). +func (s *FileStore) Put(ctx context.Context, key string, value []byte) error { + if key == "" { + return errors.New("storage: empty key") + } + return s.atomicWrite(s.path(key), value) +} + +// Delete implements Store.Delete (idempotent). +func (s *FileStore) Delete(ctx context.Context, key string) error { + if key == "" { + return errors.New("storage: empty key") + } + if err := os.Remove(s.path(key)); err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("storage: deleting key: %w", err) + } + return nil +} + +// atomicWrite writes value to a temp file and renames it into place so a reader +// never observes a partial write. It fsyncs the file before rename for +// durability. +func (s *FileStore) atomicWrite(path string, value []byte) error { + tmp, err := os.CreateTemp(s.root, ".tmp-*") + if err != nil { + return fmt.Errorf("storage: creating temp file: %w", err) + } + tmpName := tmp.Name() + defer os.Remove(tmpName) // no-op if the rename succeeded + + if _, err := tmp.Write(value); err != nil { + tmp.Close() + return fmt.Errorf("storage: writing temp file: %w", err) + } + if err := tmp.Sync(); err != nil { + tmp.Close() + return fmt.Errorf("storage: syncing temp file: %w", err) + } + if err := tmp.Close(); err != nil { + return fmt.Errorf("storage: closing temp file: %w", err) + } + if err := os.Rename(tmpName, path); err != nil { + return fmt.Errorf("storage: renaming temp file into place: %w", err) + } + return nil +} diff --git a/internal/storage/filestore_test.go b/internal/storage/filestore_test.go new file mode 100644 index 00000000..1b09f63b --- /dev/null +++ b/internal/storage/filestore_test.go @@ -0,0 +1,107 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "errors" + "testing" +) + +func newTestStore(t *testing.T) *FileStore { + t.Helper() + s, err := NewFileStore(t.TempDir()) + if err != nil { + t.Fatalf("NewFileStore: %v", err) + } + return s +} + +func TestGetNotFound(t *testing.T) { + s := newTestStore(t) + if _, err := s.Get(context.Background(), "missing"); !errors.Is(err, ErrNotFound) { + t.Fatalf("Get missing = %v, want ErrNotFound", err) + } +} + +func TestPutThenGet(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + if err := s.Put(ctx, "k", []byte("hello")); err != nil { + t.Fatalf("Put: %v", err) + } + got, err := s.Get(ctx, "k") + if err != nil { + t.Fatalf("Get: %v", err) + } + if string(got) != "hello" { + t.Fatalf("Get value = %q, want %q", got, "hello") + } +} + +func TestPutOverwrites(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + if err := s.Put(ctx, "k", []byte("v1")); err != nil { + t.Fatalf("Put v1: %v", err) + } + if err := s.Put(ctx, "k", []byte("v2")); err != nil { + t.Fatalf("Put v2: %v", err) + } + got, _ := s.Get(ctx, "k") + if string(got) != "v2" { + t.Fatalf("Get value = %q, want %q", got, "v2") + } +} + +func TestDeleteIdempotent(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + if err := s.Delete(ctx, "missing"); err != nil { + t.Fatalf("Delete missing: %v", err) + } + if err := s.Put(ctx, "k", []byte("x")); err != nil { + t.Fatalf("Put: %v", err) + } + if err := s.Delete(ctx, "k"); err != nil { + t.Fatalf("Delete existing: %v", err) + } + if _, err := s.Get(ctx, "k"); !errors.Is(err, ErrNotFound) { + t.Fatalf("Get after delete = %v, want ErrNotFound", err) + } +} + +// TestPersistsAcrossReopen verifies a value survives reconstructing the store +// over the same directory (the restart scenario). +func TestPersistsAcrossReopen(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + s1, _ := NewFileStore(dir) + if err := s1.Put(ctx, "k", []byte("durable")); err != nil { + t.Fatalf("Put: %v", err) + } + + s2, _ := NewFileStore(dir) // simulate a process restart + got, err := s2.Get(ctx, "k") + if err != nil { + t.Fatalf("Get after reopen: %v", err) + } + if string(got) != "durable" { + t.Fatalf("value after reopen = %q, want %q", got, "durable") + } +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 00000000..232fb66f --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,76 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package storage defines a small durable key-value store used by components +// (such as harnesses) that must persist state that outlives the process. +// +// # Why this exists +// +// A harness that must survive process restarts cannot keep its durable state +// (for example, a conversation's resume cursor) in memory: the state would be +// lost on restart. This package is the seam through which such state is +// persisted. The interface is intentionally a minimal key-value store so it can +// be backed by anything from the local filesystem (the default, see FileStore) +// to a managed service (e.g. Firestore or GCS) without changing callers. +// +// # Concurrency model +// +// The Store does NOT provide compare-and-swap. Callers are expected to ensure a +// single writer per key (for the harness, the controller guarantees at most one +// Execution per conversation, so there is only ever one writer for a given +// conversation's key). Under that assumption a last-write-wins Put is correct. +// +// # Semantics every implementation MUST honor +// +// - Atomicity: a Put is all-or-nothing. A reader never observes a torn/partial +// value; it sees either the previous value or the new one. +// - Read-after-write consistency: once Put returns success, a subsequent Get +// observes that value (or a later one), never an older one. +// - Not-found is distinct from failure: Get on a missing key returns +// ErrNotFound, which MUST NOT be conflated with a backend error +// (unavailable, permission denied, ...). Callers rely on ErrNotFound to mean +// "no state yet" and on other errors to mean "could not determine state" +// (which must not be treated as "no state"). +// - Durability: when Put returns success, the value is durably persisted +// (survives process restart). +// +// Keys are opaque non-empty strings; values are opaque byte slices (callers +// choose the encoding). +package storage + +import ( + "context" + "errors" +) + +// ErrNotFound is returned by Get when the key has no stored value. It is +// distinct from a backend failure: it means "no state yet", not "lookup failed". +var ErrNotFound = errors.New("storage: key not found") + +// Store is a durable key-value store. See the package doc for the semantics +// every implementation must satisfy. Implementations must be safe for concurrent +// use by multiple goroutines. +type Store interface { + // Get returns the value stored under key. It returns ErrNotFound if the key + // has no value; any other error indicates the lookup could not be completed + // (and MUST NOT be interpreted as "absent"). + Get(ctx context.Context, key string) ([]byte, error) + + // Put stores value under key (last-write-wins). + Put(ctx context.Context, key string, value []byte) error + + // Delete removes key. Deleting a missing key is not an error (it is + // idempotent). + Delete(ctx context.Context, key string) error +} From b24e89509800b8265338d7be743a78c66fb6b2e0 Mon Sep 17 00:00:00 2001 From: zbl94 Date: Tue, 30 Jun 2026 21:05:12 +0000 Subject: [PATCH 2/2] Scope resume-cursor persistence into the Antigravity Interactions harness Address PR review feedback: instead of a repo-level internal/storage key-value abstraction, keep resume-cursor persistence local to the Antigravity Interactions harness. - Remove the internal/storage package (Store interface + FileStore). - Add a harness-local, file-based cursorStore (cursorstore.go) with load/save of the per-conversation resumeCursor. resumeCursor stays a struct so it can grow (e.g. partial function-call results for mid-tool-loop recovery) later. - Replace Config.StateStore with Config.StateDir. StateDir is now required: NewAntigravityInteractionsHarness returns an error if it is empty, and the constructor now returns (*Harness, error). - Add tests with a fake Interactions API (an http.RoundTripper) that records each request and returns a canned SSE stream, covering the resume-across-restart CUJ (a fresh harness over the same StateDir sends the persisted previous_interaction_id), same-harness resume, the required-StateDir error, and a cursorStore load/save round-trip. --- internal/harness/antigravityinteractions.go | 111 ++++----- .../harness/antigravityinteractions_test.go | 222 ++++++++++++++++++ internal/harness/cursorstore.go | 119 ++++++++++ internal/storage/filestore.go | 127 ---------- internal/storage/filestore_test.go | 107 --------- internal/storage/storage.go | 76 ------ 6 files changed, 387 insertions(+), 375 deletions(-) create mode 100644 internal/harness/antigravityinteractions_test.go create mode 100644 internal/harness/cursorstore.go delete mode 100644 internal/storage/filestore.go delete mode 100644 internal/storage/filestore_test.go delete mode 100644 internal/storage/storage.go diff --git a/internal/harness/antigravityinteractions.go b/internal/harness/antigravityinteractions.go index 4de25af5..b6a542b7 100644 --- a/internal/harness/antigravityinteractions.go +++ b/internal/harness/antigravityinteractions.go @@ -58,7 +58,6 @@ import ( "sync" "time" - "github.com/google/ax/internal/storage" "github.com/google/ax/proto" "github.com/google/uuid" "golang.org/x/oauth2" @@ -122,14 +121,13 @@ type AntigravityInteractionsConfig struct { // timeout is used. HTTPClient *http.Client - // StateStore durably persists each conversation's resume cursor (the last - // interaction id) so a conversation can resume after a process restart or on - // a different replica. If nil, that cursor is kept in memory only (on the - // Execution) and is lost when the Execution is discarded or the process - // exits -- a later Start for the same conversation id then begins a new - // interaction chain (empty previous_interaction_id) instead of continuing the - // existing one. - StateStore storage.Store + // StateDir is the directory where each conversation's resume cursor is + // persisted, so a conversation can resume after a restart. It is required: + // NewAntigravityInteractionsHarness returns an error if it is empty. + // Correctness relies on a single writer per conversation (the controller + // guarantees at most one Execution per conversation), so writes are + // last-write-wins. + StateDir string } func (c *AntigravityInteractionsConfig) withDefaults() { @@ -158,6 +156,11 @@ type AntigravityInteractionsHarness struct { cfg AntigravityInteractionsConfig httpClient *http.Client + // cursors persists each conversation's resume cursor to disk so a conversation + // can resume after a restart. It is always non-nil (the constructor requires a + // usable state directory). + cursors *cursorStore + // tsOnce guards lazy initialization of ts, the resolved OAuth2 token source. // It is resolved on first use (rather than in the constructor) so credential // errors surface to the caller of Run instead of at construction time. @@ -167,35 +170,28 @@ type AntigravityInteractionsHarness struct { } // NewAntigravityInteractionsHarness creates a harness from the given config, -// filling in defaults for unset fields. -func NewAntigravityInteractionsHarness(cfg AntigravityInteractionsConfig) *AntigravityInteractionsHarness { +// filling in defaults for unset fields. It returns an error if cfg.StateDir is +// empty or the cursor store cannot be created: resume-cursor persistence is +// required, so a usable state directory must be provided. +func NewAntigravityInteractionsHarness(cfg AntigravityInteractionsConfig) (*AntigravityInteractionsHarness, error) { cfg.withDefaults() hc := cfg.HTTPClient if hc == nil { hc = &http.Client{Timeout: 10 * time.Minute} } - return &AntigravityInteractionsHarness{cfg: cfg, httpClient: hc} -} - -// resumeCursor is the small per-conversation state persisted to StateStore so a -// conversation can resume across restarts/replicas. It records the tail of the -// server-side interaction chain (PrevInteractionID). The cursor is only ever -// persisted after a turn completes successfully, so a persisted cursor always -// has a non-empty PrevInteractionID; "the conversation has started" is therefore -// derived as PrevInteractionID != "" rather than stored separately. -type resumeCursor struct { - PrevInteractionID string `json:"prev_interaction_id"` -} - -// stateStoreKey is the StateStore key for a conversation's resume cursor. -func stateStoreKey(conversationID string) string { - return "antigravity-interactions/cursor/" + conversationID + if cfg.StateDir == "" { + return nil, errors.New("AntigravityInteractionsConfig.StateDir must be set") + } + cursors, err := newCursorStore(cfg.StateDir) + if err != nil { + return nil, fmt.Errorf("creating cursor store: %w", err) + } + return &AntigravityInteractionsHarness{cfg: cfg, httpClient: hc, cursors: cursors}, nil } -// Start implements Harness.Start. If a StateStore is configured, it loads any -// previously persisted resume cursor for conversationID so the returned -// Execution resumes the existing interaction chain instead of starting a new -// one. +// Start implements Harness.Start. It loads any previously persisted resume +// cursor for conversationID so the returned Execution resumes the existing +// interaction chain instead of starting a new one. func (h *AntigravityInteractionsHarness) Start(ctx context.Context, conversationID string, harnessConfig []byte) (Execution, error) { e := &antigravityInteractionsExecution{ harness: h, @@ -203,23 +199,14 @@ func (h *AntigravityInteractionsHarness) Start(ctx context.Context, conversation id: uuid.NewString(), harnessConfig: harnessConfig, } - if h.cfg.StateStore == nil { - return e, nil - } - value, err := h.cfg.StateStore.Get(ctx, stateStoreKey(conversationID)) - switch { - case errors.Is(err, storage.ErrNotFound): - // No prior state: a brand-new conversation. - case err != nil: - // A real storage failure must not be silently treated as "new", which - // would lose an existing conversation's history. + cur, found, err := h.cursors.load(conversationID) + if err != nil { + // A real load failure must not be silently treated as "new", which would + // lose an existing conversation's history. return nil, fmt.Errorf("loading resume cursor for %q: %w", conversationID, err) - default: - var cur resumeCursor - if err := json.Unmarshal(value, &cur); err != nil { - return nil, fmt.Errorf("decoding resume cursor for %q: %w", conversationID, err) - } + } + if found { // A persisted cursor is only written after a successful turn, so a // non-empty interaction id means the conversation has already started; // the first-turn check in Run derives that from prevInteractionID. @@ -285,10 +272,10 @@ func (e *antigravityInteractionsExecution) drainQueue() []any { return messagesToInputSteps(msgs) } -// setPrevID records the latest interaction id (in memory) and, if a StateStore -// is configured, durably persists the resume cursor so the conversation can -// resume after a restart. A persistence failure is returned to the caller so -// the run can decide how to handle it rather than silently losing durability. +// setPrevID records the latest interaction id (in memory) and durably persists +// the resume cursor so the conversation can resume after a restart. A persistence +// failure is returned to the caller so the run can decide how to handle it rather +// than silently losing durability. func (e *antigravityInteractionsExecution) setPrevID(ctx context.Context, id string) error { if id == "" { return nil @@ -297,21 +284,7 @@ func (e *antigravityInteractionsExecution) setPrevID(ctx context.Context, id str e.prevInteractionID = id e.mu.Unlock() - if e.harness.cfg.StateStore == nil { - return nil - } - return e.persistCursor(ctx, resumeCursor{PrevInteractionID: id}) -} - -// persistCursor writes the resume cursor to the StateStore. The store is -// last-write-wins; correctness relies on the controller guaranteeing a single -// Execution (writer) per conversation (see the Harness interface). -func (e *antigravityInteractionsExecution) persistCursor(ctx context.Context, cur resumeCursor) error { - blob, err := json.Marshal(cur) - if err != nil { - return fmt.Errorf("encoding resume cursor: %w", err) - } - if err := e.harness.cfg.StateStore.Put(ctx, stateStoreKey(e.conversationID), blob); err != nil { + if err := e.harness.cursors.save(e.conversationID, resumeCursor{PrevInteractionID: id}); err != nil { return fmt.Errorf("persisting resume cursor: %w", err) } return nil @@ -600,6 +573,14 @@ func (h *AntigravityInteractionsHarness) interactionsURL() string { interactionsEndpoint, interactionsAPIVersion, cloudProject(), cloudLocation()) } +// Endpoint returns the Interactions API base endpoint this harness targets +// (e.g. the prod or autopush host). Exposed so callers/tools can log which +// backend is in use, since the endpoint is a compile-time constant. +func Endpoint() string { return interactionsEndpoint } + +// APIVersion returns the Interactions API version this harness targets. +func APIVersion() string { return interactionsAPIVersion } + // token returns a bearer access token from the harness's OAuth2 token source. // The source is resolved once (lazily) and auto-refreshes thereafter. func (h *AntigravityInteractionsHarness) token(ctx context.Context) (string, error) { diff --git a/internal/harness/antigravityinteractions_test.go b/internal/harness/antigravityinteractions_test.go new file mode 100644 index 00000000..efb95b8e --- /dev/null +++ b/internal/harness/antigravityinteractions_test.go @@ -0,0 +1,222 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package harness + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + "testing" + + "golang.org/x/oauth2" +) + +// fakeInteractions is a fake Interactions API: an http.RoundTripper that records +// the decoded request body of every POST and replies with a canned SSE stream. +// It lets the harness's real Start/Run/cursorStore code run end to end while the +// network is faked, so we can assert exactly what previous_interaction_id the +// harness sends on each turn. +type fakeInteractions struct { + mu sync.Mutex + // requests holds the decoded body of each interaction request, in order. + requests []interactionRequest + // interactionIDs are returned (in order) as the completed interaction id for + // successive turns; the i-th request gets interactionIDs[i]. + interactionIDs []string +} + +func (f *fakeInteractions) RoundTrip(req *http.Request) (*http.Response, error) { + body, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + var decoded interactionRequest + if err := json.Unmarshal(body, &decoded); err != nil { + return nil, fmt.Errorf("fake server: decoding request: %w", err) + } + + f.mu.Lock() + idx := len(f.requests) + f.requests = append(f.requests, decoded) + id := fmt.Sprintf("INT-%d", idx+1) + if idx < len(f.interactionIDs) { + id = f.interactionIDs[idx] + } + f.mu.Unlock() + + // A minimal completed turn: no tool calls, so Run finishes in one turn. + sse := "" + + "event: interaction.created\n" + + `data: {"interaction":{"id":"` + id + `","status":"in_progress"},"event_type":"interaction.created"}` + "\n\n" + + "event: interaction.completed\n" + + `data: {"interaction":{"id":"` + id + `","status":"completed"},"event_type":"interaction.completed"}` + "\n\n" + + "event: done\n" + + "data: [DONE]\n\n" + + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(strings.NewReader(sse)), + Request: req, + }, nil +} + +func (f *fakeInteractions) recorded() []interactionRequest { + f.mu.Lock() + defer f.mu.Unlock() + return append([]interactionRequest(nil), f.requests...) +} + +// newTestHarness builds a harness wired to the fake server, a static token (no +// ADC), and the given state dir. It also sets the project env so the request URL +// and X-Goog-User-Project header are well-formed. +func newTestHarness(t *testing.T, fake *fakeInteractions, stateDir string) *AntigravityInteractionsHarness { + t.Helper() + t.Setenv(envCloudProject, "test-project") + h, err := NewAntigravityInteractionsHarness(AntigravityInteractionsConfig{ + Agent: "test-agent", + StateDir: stateDir, + HTTPClient: &http.Client{Transport: fake}, + TokenSource: oauth2.StaticTokenSource(&oauth2.Token{AccessToken: "fake-token"}), + }) + if err != nil { + t.Fatalf("NewAntigravityInteractionsHarness: %v", err) + } + return h +} + +// runOneTurn starts an Execution for conversationID, queues a single user +// message, and runs it to completion. +func runOneTurn(t *testing.T, h *AntigravityInteractionsHarness, conversationID, prompt string) { + t.Helper() + ctx := context.Background() + exec, err := h.Start(ctx, conversationID, nil) + if err != nil { + t.Fatalf("Start(%q): %v", conversationID, err) + } + if err := exec.Queue(ctx, userText(prompt)); err != nil { + t.Fatalf("Queue: %v", err) + } + if err := exec.Run(ctx, &mockHandler{}); err != nil { + t.Fatalf("Run: %v", err) + } + if err := exec.Close(ctx); err != nil { + t.Fatalf("Close: %v", err) + } +} + +// TestResumeAcrossRestart is the core CUJ: a first Execution starts a new +// interaction chain (empty previous_interaction_id) and persists the returned +// interaction id; then a brand-new harness over the SAME state dir (a simulated +// restart / snapshot restore) loads that cursor and sends it as +// previous_interaction_id on the next request. +func TestResumeAcrossRestart(t *testing.T) { + fake := &fakeInteractions{interactionIDs: []string{"INT-1", "INT-2"}} + stateDir := t.TempDir() + + // First Execution: starts the chain. + h1 := newTestHarness(t, fake, stateDir) + runOneTurn(t, h1, "conv-1", "hello") + + // Simulated restart: a brand-new harness over the same state dir, so any + // resumed state must come from disk, not h1's memory. + h2 := newTestHarness(t, fake, stateDir) + runOneTurn(t, h2, "conv-1", "again") + + reqs := fake.recorded() + if len(reqs) != 2 { + t.Fatalf("expected 2 requests, got %d", len(reqs)) + } + if reqs[0].PreviousInteractionID != "" { + t.Errorf("turn 1: previous_interaction_id = %q, want empty (new chain)", reqs[0].PreviousInteractionID) + } + if got, want := reqs[1].PreviousInteractionID, "INT-1"; got != want { + t.Errorf("turn 2 (after restart): previous_interaction_id = %q, want %q (resumed from persisted cursor)", got, want) + } +} + +// TestNewRequiresStateDir verifies that the constructor rejects an empty +// StateDir: resume-cursor persistence is required. +func TestNewRequiresStateDir(t *testing.T) { + t.Setenv(envCloudProject, "test-project") + _, err := NewAntigravityInteractionsHarness(AntigravityInteractionsConfig{ + Agent: "test-agent", + StateDir: "", // missing + TokenSource: oauth2.StaticTokenSource(&oauth2.Token{AccessToken: "fake-token"}), + }) + if err == nil { + t.Fatal("NewAntigravityInteractionsHarness with empty StateDir: got nil error, want error") + } +} + +// TestSameHarnessSecondTurnResumes checks that even without a "restart", a second +// Execution on the same harness/conversation continues the chain via the cursor. +func TestSameHarnessSecondTurnResumes(t *testing.T) { + fake := &fakeInteractions{interactionIDs: []string{"INT-1", "INT-2"}} + h := newTestHarness(t, fake, t.TempDir()) + + runOneTurn(t, h, "conv-1", "hello") + runOneTurn(t, h, "conv-1", "again") + + reqs := fake.recorded() + if len(reqs) != 2 { + t.Fatalf("expected 2 requests, got %d", len(reqs)) + } + if got, want := reqs[1].PreviousInteractionID, "INT-1"; got != want { + t.Errorf("turn 2: previous_interaction_id = %q, want %q", got, want) + } +} + +// TestCursorStoreLoadSave is a focused unit test of the harness-local cursor +// store round-trip. +func TestCursorStoreLoadSave(t *testing.T) { + cs, err := newCursorStore(t.TempDir()) + if err != nil { + t.Fatalf("newCursorStore: %v", err) + } + + // Missing key: found is false, no error. + if _, found, err := cs.load("missing"); err != nil || found { + t.Fatalf("load(missing) = found=%v err=%v, want found=false err=nil", found, err) + } + + // Round-trip. + if err := cs.save("conv-1", resumeCursor{PrevInteractionID: "INT-7"}); err != nil { + t.Fatalf("save: %v", err) + } + cur, found, err := cs.load("conv-1") + if err != nil || !found { + t.Fatalf("load(conv-1) = found=%v err=%v, want found=true err=nil", found, err) + } + if cur.PrevInteractionID != "INT-7" { + t.Errorf("loaded PrevInteractionID = %q, want %q", cur.PrevInteractionID, "INT-7") + } + + // Last-write-wins overwrite. + if err := cs.save("conv-1", resumeCursor{PrevInteractionID: "INT-8"}); err != nil { + t.Fatalf("save (overwrite): %v", err) + } + cur, _, err = cs.load("conv-1") + if err != nil { + t.Fatalf("load after overwrite: %v", err) + } + if cur.PrevInteractionID != "INT-8" { + t.Errorf("after overwrite PrevInteractionID = %q, want %q", cur.PrevInteractionID, "INT-8") + } +} diff --git a/internal/harness/cursorstore.go b/internal/harness/cursorstore.go new file mode 100644 index 00000000..59731139 --- /dev/null +++ b/internal/harness/cursorstore.go @@ -0,0 +1,119 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package harness + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" +) + +// resumeCursor is the small per-conversation state persisted so a conversation +// can resume across restarts/replicas. It records the tail of the server-side +// interaction chain (PrevInteractionID). +// +// It is a struct (rather than a bare string) so it can grow to hold richer +// resume state later, e.g. partial function-call results for mid-tool-loop +// recovery, without changing the on-disk shape's identity. +type resumeCursor struct { + PrevInteractionID string `json:"prev_interaction_id"` +} + +// cursorStore is a minimal filesystem-backed store for resume cursors, local to +// the Antigravity Interactions harness. Each conversation maps to one file whose +// contents are the JSON-encoded cursor. +// +// It assumes a single writer per conversation (the controller guarantees at most +// one Execution per conversation), so writes are last-write-wins with no +// compare-and-swap. Writes are atomic (temp file + rename) so a reader never +// observes a torn value. +type cursorStore struct { + dir string +} + +// newCursorStore creates a cursorStore rooted at dir, creating it if needed. +func newCursorStore(dir string) (*cursorStore, error) { + if dir == "" { + return nil, errors.New("cursorStore dir must not be empty") + } + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("creating cursor dir %q: %w", dir, err) + } + return &cursorStore{dir: dir}, nil +} + +// path maps a conversation id to its cursor file. The id is hashed so arbitrary +// id strings map to a safe, fixed-length filename. +func (s *cursorStore) path(conversationID string) string { + sum := sha256.Sum256([]byte(conversationID)) + return filepath.Join(s.dir, hex.EncodeToString(sum[:])+".json") +} + +// load returns the stored cursor for conversationID. found is false if no cursor +// has been persisted yet; a non-nil error means the lookup itself failed (which +// callers must not treat as "no cursor"). +func (s *cursorStore) load(conversationID string) (cur resumeCursor, found bool, err error) { + blob, err := os.ReadFile(s.path(conversationID)) + if errors.Is(err, os.ErrNotExist) { + return resumeCursor{}, false, nil + } + if err != nil { + return resumeCursor{}, false, fmt.Errorf("reading cursor: %w", err) + } + if err := json.Unmarshal(blob, &cur); err != nil { + return resumeCursor{}, false, fmt.Errorf("decoding cursor: %w", err) + } + return cur, true, nil +} + +// save durably writes the cursor for conversationID (last-write-wins). +func (s *cursorStore) save(conversationID string, cur resumeCursor) error { + blob, err := json.Marshal(cur) + if err != nil { + return fmt.Errorf("encoding cursor: %w", err) + } + return s.atomicWrite(s.path(conversationID), blob) +} + +// atomicWrite writes value to a temp file, fsyncs it, and renames it into place +// so a reader never observes a partial write. +func (s *cursorStore) atomicWrite(path string, value []byte) error { + tmp, err := os.CreateTemp(s.dir, ".tmp-*") + if err != nil { + return fmt.Errorf("creating temp file: %w", err) + } + tmpName := tmp.Name() + defer os.Remove(tmpName) // no-op if the rename succeeded + + if _, err := tmp.Write(value); err != nil { + tmp.Close() + return fmt.Errorf("writing temp file: %w", err) + } + if err := tmp.Sync(); err != nil { + tmp.Close() + return fmt.Errorf("syncing temp file: %w", err) + } + if err := tmp.Close(); err != nil { + return fmt.Errorf("closing temp file: %w", err) + } + if err := os.Rename(tmpName, path); err != nil { + return fmt.Errorf("renaming temp file into place: %w", err) + } + return nil +} diff --git a/internal/storage/filestore.go b/internal/storage/filestore.go deleted file mode 100644 index 0f8a4e7d..00000000 --- a/internal/storage/filestore.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2026 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "errors" - "fmt" - "os" - "path/filepath" -) - -// FileStore is a minimal filesystem-backed Store. Each key maps to one file -// under a root directory whose contents are the value. -// -// Scope and limitations: -// -// - Durability across restarts: yes (files persist). -// - Sharing across replicas: only if the root directory is itself shared and -// consistent across them (e.g. a network filesystem). On purely local disk, -// FileStore is single-node and is intended as the default for local and -// single-replica use; use a managed backend for multi-replica deployments. -// - Atomicity: writes go to a temp file and are atomically renamed into place, -// so a reader never observes a torn value. -// -// FileStore assumes a single writer per key (see the package doc); it does not -// take OS file locks or provide compare-and-swap. -type FileStore struct { - root string -} - -var _ Store = (*FileStore)(nil) - -// NewFileStore creates a FileStore rooted at dir, creating the directory (and -// parents) if needed. -func NewFileStore(dir string) (*FileStore, error) { - if dir == "" { - return nil, errors.New("storage: FileStore dir must not be empty") - } - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, fmt.Errorf("storage: creating root %q: %w", dir, err) - } - return &FileStore{root: dir}, nil -} - -// path maps a key to its file path. The key is hashed so arbitrary key strings -// (slashes, etc.) map to a safe, fixed-length filename. -func (s *FileStore) path(key string) string { - sum := sha256.Sum256([]byte(key)) - return filepath.Join(s.root, hex.EncodeToString(sum[:])+".val") -} - -// Get implements Store.Get. -func (s *FileStore) Get(ctx context.Context, key string) ([]byte, error) { - if key == "" { - return nil, errors.New("storage: empty key") - } - value, err := os.ReadFile(s.path(key)) - if errors.Is(err, os.ErrNotExist) { - return nil, ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("storage: reading key: %w", err) - } - return value, nil -} - -// Put implements Store.Put (last-write-wins). -func (s *FileStore) Put(ctx context.Context, key string, value []byte) error { - if key == "" { - return errors.New("storage: empty key") - } - return s.atomicWrite(s.path(key), value) -} - -// Delete implements Store.Delete (idempotent). -func (s *FileStore) Delete(ctx context.Context, key string) error { - if key == "" { - return errors.New("storage: empty key") - } - if err := os.Remove(s.path(key)); err != nil && !errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("storage: deleting key: %w", err) - } - return nil -} - -// atomicWrite writes value to a temp file and renames it into place so a reader -// never observes a partial write. It fsyncs the file before rename for -// durability. -func (s *FileStore) atomicWrite(path string, value []byte) error { - tmp, err := os.CreateTemp(s.root, ".tmp-*") - if err != nil { - return fmt.Errorf("storage: creating temp file: %w", err) - } - tmpName := tmp.Name() - defer os.Remove(tmpName) // no-op if the rename succeeded - - if _, err := tmp.Write(value); err != nil { - tmp.Close() - return fmt.Errorf("storage: writing temp file: %w", err) - } - if err := tmp.Sync(); err != nil { - tmp.Close() - return fmt.Errorf("storage: syncing temp file: %w", err) - } - if err := tmp.Close(); err != nil { - return fmt.Errorf("storage: closing temp file: %w", err) - } - if err := os.Rename(tmpName, path); err != nil { - return fmt.Errorf("storage: renaming temp file into place: %w", err) - } - return nil -} diff --git a/internal/storage/filestore_test.go b/internal/storage/filestore_test.go deleted file mode 100644 index 1b09f63b..00000000 --- a/internal/storage/filestore_test.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2026 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "context" - "errors" - "testing" -) - -func newTestStore(t *testing.T) *FileStore { - t.Helper() - s, err := NewFileStore(t.TempDir()) - if err != nil { - t.Fatalf("NewFileStore: %v", err) - } - return s -} - -func TestGetNotFound(t *testing.T) { - s := newTestStore(t) - if _, err := s.Get(context.Background(), "missing"); !errors.Is(err, ErrNotFound) { - t.Fatalf("Get missing = %v, want ErrNotFound", err) - } -} - -func TestPutThenGet(t *testing.T) { - s := newTestStore(t) - ctx := context.Background() - - if err := s.Put(ctx, "k", []byte("hello")); err != nil { - t.Fatalf("Put: %v", err) - } - got, err := s.Get(ctx, "k") - if err != nil { - t.Fatalf("Get: %v", err) - } - if string(got) != "hello" { - t.Fatalf("Get value = %q, want %q", got, "hello") - } -} - -func TestPutOverwrites(t *testing.T) { - s := newTestStore(t) - ctx := context.Background() - - if err := s.Put(ctx, "k", []byte("v1")); err != nil { - t.Fatalf("Put v1: %v", err) - } - if err := s.Put(ctx, "k", []byte("v2")); err != nil { - t.Fatalf("Put v2: %v", err) - } - got, _ := s.Get(ctx, "k") - if string(got) != "v2" { - t.Fatalf("Get value = %q, want %q", got, "v2") - } -} - -func TestDeleteIdempotent(t *testing.T) { - s := newTestStore(t) - ctx := context.Background() - if err := s.Delete(ctx, "missing"); err != nil { - t.Fatalf("Delete missing: %v", err) - } - if err := s.Put(ctx, "k", []byte("x")); err != nil { - t.Fatalf("Put: %v", err) - } - if err := s.Delete(ctx, "k"); err != nil { - t.Fatalf("Delete existing: %v", err) - } - if _, err := s.Get(ctx, "k"); !errors.Is(err, ErrNotFound) { - t.Fatalf("Get after delete = %v, want ErrNotFound", err) - } -} - -// TestPersistsAcrossReopen verifies a value survives reconstructing the store -// over the same directory (the restart scenario). -func TestPersistsAcrossReopen(t *testing.T) { - dir := t.TempDir() - ctx := context.Background() - - s1, _ := NewFileStore(dir) - if err := s1.Put(ctx, "k", []byte("durable")); err != nil { - t.Fatalf("Put: %v", err) - } - - s2, _ := NewFileStore(dir) // simulate a process restart - got, err := s2.Get(ctx, "k") - if err != nil { - t.Fatalf("Get after reopen: %v", err) - } - if string(got) != "durable" { - t.Fatalf("value after reopen = %q, want %q", got, "durable") - } -} diff --git a/internal/storage/storage.go b/internal/storage/storage.go deleted file mode 100644 index 232fb66f..00000000 --- a/internal/storage/storage.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2026 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package storage defines a small durable key-value store used by components -// (such as harnesses) that must persist state that outlives the process. -// -// # Why this exists -// -// A harness that must survive process restarts cannot keep its durable state -// (for example, a conversation's resume cursor) in memory: the state would be -// lost on restart. This package is the seam through which such state is -// persisted. The interface is intentionally a minimal key-value store so it can -// be backed by anything from the local filesystem (the default, see FileStore) -// to a managed service (e.g. Firestore or GCS) without changing callers. -// -// # Concurrency model -// -// The Store does NOT provide compare-and-swap. Callers are expected to ensure a -// single writer per key (for the harness, the controller guarantees at most one -// Execution per conversation, so there is only ever one writer for a given -// conversation's key). Under that assumption a last-write-wins Put is correct. -// -// # Semantics every implementation MUST honor -// -// - Atomicity: a Put is all-or-nothing. A reader never observes a torn/partial -// value; it sees either the previous value or the new one. -// - Read-after-write consistency: once Put returns success, a subsequent Get -// observes that value (or a later one), never an older one. -// - Not-found is distinct from failure: Get on a missing key returns -// ErrNotFound, which MUST NOT be conflated with a backend error -// (unavailable, permission denied, ...). Callers rely on ErrNotFound to mean -// "no state yet" and on other errors to mean "could not determine state" -// (which must not be treated as "no state"). -// - Durability: when Put returns success, the value is durably persisted -// (survives process restart). -// -// Keys are opaque non-empty strings; values are opaque byte slices (callers -// choose the encoding). -package storage - -import ( - "context" - "errors" -) - -// ErrNotFound is returned by Get when the key has no stored value. It is -// distinct from a backend failure: it means "no state yet", not "lookup failed". -var ErrNotFound = errors.New("storage: key not found") - -// Store is a durable key-value store. See the package doc for the semantics -// every implementation must satisfy. Implementations must be safe for concurrent -// use by multiple goroutines. -type Store interface { - // Get returns the value stored under key. It returns ErrNotFound if the key - // has no value; any other error indicates the lookup could not be completed - // (and MUST NOT be interpreted as "absent"). - Get(ctx context.Context, key string) ([]byte, error) - - // Put stores value under key (last-write-wins). - Put(ctx context.Context, key string, value []byte) error - - // Delete removes key. Deleting a missing key is not an error (it is - // idempotent). - Delete(ctx context.Context, key string) error -}