Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agent/graphagent/graph_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ func (ga *GraphAgent) createInitialState(ctx context.Context, invocation *agent.
}
// Add parent agent to state so agent nodes can access sub-agents.
initialState[graph.StateKeyParentAgent] = ga
// Set checkpoint namespace if not already set.
if ns, ok := initialState[graph.CfgKeyCheckpointNS]; !ok || ns == "" {
initialState[graph.CfgKeyCheckpointNS] = ga.name
}

return initialState
}
Expand Down
17 changes: 17 additions & 0 deletions docs/mkdocs/en/graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,7 @@ import (
"trpc.group/trpc-go/trpc-agent-go/agent/graphagent"
"trpc.group/trpc-go/trpc-agent-go/graph"
"trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/sqlite"
"trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/redis"
"trpc.group/trpc-go/trpc-agent-go/model"
)

Expand All @@ -2434,6 +2435,22 @@ graphAgent, _ := graphagent.New("workflow", g,

// Checkpoints are saved automatically during execution (by default every step)

// Resume from a checkpoint
eventCh, err := r.Run(ctx, userID, sessionID,
model.NewUserMessage("resume"),
agent.WithRuntimeState(map[string]any{
graph.CfgKeyCheckpointID: "ckpt-123",
}),
)

// Configure redis checkpoints
redisSaver, _ := redis.NewSaver(redis.WithRedisClientURL("redis://[username:password@]host:port[/database]"))

graphAgent, _ := graphagent.New("workflow", g,
graphagent.WithCheckpointSaver(redisSaver))

// Checkpoints are saved automatically during execution (by default every step)

// Resume from a checkpoint
eventCh, err := r.Run(ctx, userID, sessionID,
model.NewUserMessage("resume"),
Expand Down
19 changes: 18 additions & 1 deletion docs/mkdocs/zh/graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -2358,7 +2358,7 @@ API 参考:

### 检查点与恢复

为了支持时间旅行与可靠恢复,可以为执行器或 GraphAgent 配置检查点保存器。下面演示使用 SQLite Saver 持久化检查点并从特定检查点恢复。
为了支持时间旅行与可靠恢复,可以为执行器或 GraphAgent 配置检查点保存器。下面演示使用 SQLite/Redis Saver 持久化检查点并从特定检查点恢复。

```go
import (
Expand All @@ -2369,6 +2369,7 @@ import (
"trpc.group/trpc-go/trpc-agent-go/agent/graphagent"
"trpc.group/trpc-go/trpc-agent-go/graph"
"trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/sqlite"
"trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/redis"
"trpc.group/trpc-go/trpc-agent-go/model"
)

Expand All @@ -2381,6 +2382,22 @@ graphAgent, _ := graphagent.New("workflow", g,

// 执行时自动保存检查点(默认每步保存)

// 从检查点恢复
eventCh, err := r.Run(ctx, userID, sessionID,
model.NewUserMessage("resume"),
agent.WithRuntimeState(map[string]any{
graph.CfgKeyCheckpointID: "ckpt-123",
}),
)

// 配置redis检查点
redisSaver, _ := redis.NewSaver(redis.WithRedisClientURL("redis://[username:password@]host:port[/database]"))

graphAgent, _ := graphagent.New("workflow", g,
graphagent.WithCheckpointSaver(redisSaver))

// 执行时自动保存检查点(默认每步保存)

// 从检查点恢复
eventCh, err := r.Run(ctx, userID, sessionID,
model.NewUserMessage("resume"),
Expand Down
7 changes: 7 additions & 0 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.23.0

replace trpc.group/trpc-go/trpc-agent-go => ../

replace trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/redis => ../graph/checkpoint/redis

require (
github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.32
Expand All @@ -14,13 +16,16 @@ require (
go.uber.org/zap v1.27.0
trpc.group/trpc-go/trpc-a2a-go v0.2.5
trpc.group/trpc-go/trpc-agent-go v0.5.0
trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/redis v0.0.0-00010101000000-000000000000
trpc.group/trpc-go/trpc-mcp-go v0.0.10
)

require (
github.com/bmatcuk/doublestar/v4 v4.9.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/getkin/kin-openapi v0.124.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -43,6 +48,7 @@ require (
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/panjf2000/ants/v2 v2.10.0 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/redis/go-redis/v9 v9.17.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
Expand Down Expand Up @@ -72,4 +78,5 @@ require (
google.golang.org/grpc v1.67.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
trpc.group/trpc-go/trpc-agent-go/storage/redis v0.5.0 // indirect
)
16 changes: 16 additions & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI=
github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE=
github.com/bmatcuk/doublestar/v4 v4.9.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/getkin/kin-openapi v0.124.0 h1:VSFNMB9C9rTKBnQ/fpyDU8ytMTr4dWI9QovSKj9kz/M=
github.com/getkin/kin-openapi v0.124.0/go.mod h1:wb1aSZA/iWmorQP9KTAS/phLj/t17B5jT7+fS8ed9NM=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down Expand Up @@ -69,6 +79,8 @@ github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX
github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.17.0 h1:K6E+ZlYN95KSMmZeEQPbU/c++wfmEvfFB17yEAq/VhM=
github.com/redis/go-redis/v9 v9.17.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
Expand Down Expand Up @@ -98,6 +110,8 @@ github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
Expand Down Expand Up @@ -160,5 +174,7 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
trpc.group/trpc-go/trpc-a2a-go v0.2.5 h1:X3pAlWD128LaS9TtXsUDZoJWPVuPZDkZKUecKRxmWn4=
trpc.group/trpc-go/trpc-a2a-go v0.2.5/go.mod h1:Gtytau9Uoc3oPo/dpHvKit+tQn9Qlk5XFG1RiZTGqfk=
trpc.group/trpc-go/trpc-agent-go/storage/redis v0.5.0 h1:zuElT5t+ESMDvZzXI3rDrzg5FYOc4RPxwdWg+AQX8Ao=
trpc.group/trpc-go/trpc-agent-go/storage/redis v0.5.0/go.mod h1:aDGUkqlbGttFVGlZW2lMg5mAgurHezq5De/gAuLhV5E=
trpc.group/trpc-go/trpc-mcp-go v0.0.10 h1:kKPfevmikMojfOgtUBf5SJQ/v6aDugckodgyH1uDu2Q=
trpc.group/trpc-go/trpc-mcp-go v0.0.10/go.mod h1:OT6rLglkdaQ17D2T1Y87Y/ckItzdsEldDbw7dHAbGEA=
4 changes: 2 additions & 2 deletions examples/graph/checkpoint/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ go build .
### Command-Line Flags

- `-model`: LLM model to use (default: "deepseek-chat")
- `-storage`: Storage backend - "memory" or "sqlite" (default: "memory")
- `-storage`: Storage backend - "memory" or "sqlite" or "redis" (default: "memory")
- `-db`: SQLite database file path (default: "checkpoints.db", only used with -storage=sqlite)
- `-verbose`: Enable verbose execution output (default: false)

Expand Down Expand Up @@ -345,4 +345,4 @@ Set these environment variables before running:

1. **SQLite Schema**: The SQLite checkpoint saver may require schema updates if you encounter "no such column: seq" errors. The schema needs to be updated to match the latest checkpoint structure.

2. **Concurrent Access**: The in-memory checkpoint saver uses maps that may have concurrent access issues in high-throughput scenarios. Use proper synchronization if needed.
2. **Concurrent Access**: The in-memory checkpoint saver uses maps that may have concurrent access issues in high-throughput scenarios. Use proper synchronization if needed.
24 changes: 18 additions & 6 deletions examples/graph/checkpoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"trpc.group/trpc-go/trpc-agent-go/event"
"trpc.group/trpc-go/trpc-agent-go/graph"
checkpointinmemory "trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/inmemory"
checkpointredis "trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/redis"
checkpointsqlite "trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/sqlite"
agentlog "trpc.group/trpc-go/trpc-agent-go/log"
"trpc.group/trpc-go/trpc-agent-go/model"
Expand Down Expand Up @@ -83,9 +84,11 @@ var (
modelName = flag.String("model", defaultModelName,
"Name of the model to use")
storage = flag.String("storage", "memory",
"Storage type: 'memory' or 'sqlite'")
"Storage type: 'memory' or 'sqlite' or 'redis'")
dbPath = flag.String("db", defaultDBPath,
"Path to SQLite database file (only used with -storage=sqlite)")
redisClientURL = flag.String("redis-url", "redis://localhost:6379",
"Redis client URL (only used with -storage=redis)")
verbose = flag.Bool("verbose", false,
"Enable verbose output")
)
Expand All @@ -106,10 +109,12 @@ func main() {

// Create and run the workflow.
workflow := &checkpointWorkflow{
modelName: *modelName,
storageType: *storage,
dbPath: *dbPath,
verbose: *verbose,
modelName: *modelName,
storageType: *storage,
dbPath: *dbPath,
verbose: *verbose,
redisClientURL: *redisClientURL,
currentNamespace: "checkpoint-demo",
}
if err := workflow.run(); err != nil {
log.Fatalf("Workflow failed: %v", err)
Expand All @@ -121,6 +126,7 @@ type checkpointWorkflow struct {
modelName string
storageType string
dbPath string
redisClientURL string
verbose bool
logger agentlog.Logger
runner runner.Runner
Expand Down Expand Up @@ -171,6 +177,12 @@ func (w *checkpointWorkflow) setup() error {
w.saver = saver
case "memory":
w.saver = checkpointinmemory.NewSaver()
case "redis":
saver, err := checkpointredis.NewSaver(checkpointredis.WithRedisClientURL(w.redisClientURL))
if err != nil {
return fmt.Errorf("failed to create Redis saver: %w", err)
}
w.saver = saver
default:
return fmt.Errorf("unsupported storage type: %s", w.storageType)
}
Expand Down Expand Up @@ -553,7 +565,6 @@ func (w *checkpointWorkflow) startInteractiveMode(ctx context.Context) error {
func (w *checkpointWorkflow) runWorkflow(ctx context.Context, lineageID string) error {
startTime := time.Now()
w.currentLineageID = lineageID
w.currentNamespace = "" // Use empty namespace to align with LangGraph's design

w.logger.Infof("Starting workflow execution: lineage_id=%s, namespace=%s", lineageID, w.currentNamespace)

Expand Down Expand Up @@ -897,6 +908,7 @@ func (w *checkpointWorkflow) listCheckpoints(ctx context.Context, lineageID stri

// Create config for the lineage.
config := graph.NewCheckpointConfig(lineageID)
config.Namespace = "checkpoint-demo"

// List checkpoints with a filter.
manager := w.graphAgent.Executor().CheckpointManager()
Expand Down
49 changes: 49 additions & 0 deletions graph/checkpoint/redis/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
module trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/redis

go 1.21

replace (
trpc.group/trpc-go/trpc-agent-go => ../../../
trpc.group/trpc-go/trpc-agent-go/storage/redis => ../../../storage/redis
)

require (
github.com/alicebob/miniredis/v2 v2.35.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.17.0
github.com/stretchr/testify v1.10.0
trpc.group/trpc-go/trpc-a2a-go v0.2.5-0.20251023030722-7f02b57fd14a
trpc.group/trpc-go/trpc-agent-go v0.0.0-00010101000000-000000000000
trpc.group/trpc-go/trpc-agent-go/storage/redis v0.5.0
)

require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/sdk v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading