Skip to content

Commit cdea4c9

Browse files
authored
Merge pull request #23 from beyond5959/dev
feat(codex): capture reasoning deltas in embedded runtime
2 parents 9311e1c + 19c43fb commit cdea4c9

File tree

4 files changed

+176
-8
lines changed

4 files changed

+176
-8
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/beyond5959/ngent
33
go 1.24
44

55
require (
6-
github.com/beyond5959/acp-adapter v0.3.1-0.20260313104211-08e49ba1c367
6+
github.com/beyond5959/acp-adapter v0.3.1
77
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
88
modernc.org/sqlite v1.18.2
99
)

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
github.com/beyond5959/acp-adapter v0.2.1-0.20260313090730-8c07c32144c3 h1:PUh2AuLRbTG3lgGRmrAIOBp9Juely/C4Ydp0GjheZVQ=
2-
github.com/beyond5959/acp-adapter v0.2.1-0.20260313090730-8c07c32144c3/go.mod h1:cr4I+9+la75oUge+Pr97KlcdQrkwIG6VwWHbc9ALxSE=
3-
github.com/beyond5959/acp-adapter v0.3.1-0.20260313104211-08e49ba1c367 h1:LOT0AfexFf14hiZlForYTkDbXBAWIu14nqXFP2Lya2Q=
4-
github.com/beyond5959/acp-adapter v0.3.1-0.20260313104211-08e49ba1c367/go.mod h1:cr4I+9+la75oUge+Pr97KlcdQrkwIG6VwWHbc9ALxSE=
1+
github.com/beyond5959/acp-adapter v0.3.1 h1:zxhs2cHWK9DT9sDUjAF8WonxPavIl0QK5rxh0NyxkYc=
2+
github.com/beyond5959/acp-adapter v0.3.1/go.mod h1:cr4I+9+la75oUge+Pr97KlcdQrkwIG6VwWHbc9ALxSE=
53
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
64
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
75
github.com/google/go-cmp v0.5.3 h1:x95R7cp+rSeeqAMI2knLtQ0DKlaBhv2NrtrOvafPHRo=

internal/agents/codex/embedded.go

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
stableSessionResolveRetries = 5
4343
stableSessionResolveDelay = 150 * time.Millisecond
4444
initialSlashCommandsWait = 250 * time.Millisecond
45+
postPromptDrainTimeout = 250 * time.Millisecond
4546
)
4647

4748
// Config configures one embedded codex runtime provider instance.
@@ -467,14 +468,51 @@ func (c *Client) streamOnce(
467468
promptDone <- promptResult{response: resp, err: reqErr}
468469
}()
469470

471+
var (
472+
finalStopReason agents.StopReason
473+
promptFinished bool
474+
drainTimer *time.Timer
475+
drainCh <-chan time.Time
476+
)
477+
stopDrainTimer := func() {
478+
if drainTimer == nil {
479+
return
480+
}
481+
if !drainTimer.Stop() {
482+
select {
483+
case <-drainTimer.C:
484+
default:
485+
}
486+
}
487+
drainCh = nil
488+
}
489+
resetDrainTimer := func() {
490+
if drainTimer == nil {
491+
drainTimer = time.NewTimer(postPromptDrainTimeout)
492+
drainCh = drainTimer.C
493+
return
494+
}
495+
if !drainTimer.Stop() {
496+
select {
497+
case <-drainTimer.C:
498+
default:
499+
}
500+
}
501+
drainTimer.Reset(postPromptDrainTimeout)
502+
drainCh = drainTimer.C
503+
}
504+
defer stopDrainTimer()
505+
470506
for {
471507
select {
472508
case <-ctx.Done():
473509
stopCancelWatcher()
510+
stopDrainTimer()
474511
return agents.StopReasonCancelled, nil
475512
case result := <-promptDone:
476-
stopCancelWatcher()
477513
if result.err != nil {
514+
stopCancelWatcher()
515+
stopDrainTimer()
478516
if errors.Is(result.err, context.Canceled) || errors.Is(result.err, context.DeadlineExceeded) || ctx.Err() != nil {
479517
return agents.StopReasonCancelled, nil
480518
}
@@ -483,15 +521,24 @@ func (c *Client) streamOnce(
483521

484522
stopReason, parseErr := parsePromptStopReason(result.response.Result)
485523
if parseErr != nil {
524+
stopCancelWatcher()
525+
stopDrainTimer()
486526
return agents.StopReasonEndTurn, parseErr
487527
}
488528
if stopReason == "cancelled" {
489-
return agents.StopReasonCancelled, nil
529+
finalStopReason = agents.StopReasonCancelled
530+
} else {
531+
finalStopReason = agents.StopReasonEndTurn
490532
}
491-
return agents.StopReasonEndTurn, nil
533+
promptFinished = true
534+
resetDrainTimer()
492535
case msg, ok := <-updates:
493536
if !ok {
494537
stopCancelWatcher()
538+
stopDrainTimer()
539+
if promptFinished {
540+
return finalStopReason, nil
541+
}
495542
if ctx.Err() != nil {
496543
return agents.StopReasonCancelled, nil
497544
}
@@ -500,8 +547,23 @@ func (c *Client) streamOnce(
500547

501548
if err := c.handleUpdate(ctx, runtime, msg, onDelta); err != nil {
502549
stopCancelWatcher()
550+
stopDrainTimer()
503551
return agents.StopReasonEndTurn, err
504552
}
553+
if promptFinished {
554+
if acpSessionUpdateIsTerminal(msg.Params) {
555+
stopCancelWatcher()
556+
stopDrainTimer()
557+
return finalStopReason, nil
558+
}
559+
resetDrainTimer()
560+
}
561+
case <-drainCh:
562+
stopCancelWatcher()
563+
stopDrainTimer()
564+
if promptFinished {
565+
return finalStopReason, nil
566+
}
505567
}
506568
}
507569
}
@@ -548,6 +610,7 @@ func (c *Client) handleUpdate(
548610
observability.LogACPMessage(c.Name(), "inbound", msg)
549611

550612
if msg.Method == methodSessionUpdate {
613+
updateType := acpSessionUpdateTopLevelType(msg.Params)
551614
update, err := agents.ParseACPUpdate(msg.Params)
552615
if err != nil {
553616
return fmt.Errorf("codex: %w", err)
@@ -562,6 +625,15 @@ func (c *Client) handleUpdate(
562625
return err
563626
}
564627
return nil
628+
case agents.ACPUpdateTypeThoughtMessageChunk:
629+
if updateType != "" && updateType != "reasoning" {
630+
return nil
631+
}
632+
if err := agents.NotifyReasoningDelta(ctx, update.Delta); err != nil {
633+
c.sendSessionCancel(runtime, c.currentSessionID())
634+
return err
635+
}
636+
return nil
565637
case agents.ACPUpdateTypePlan:
566638
handler, ok := agents.PlanHandlerFromContext(ctx)
567639
if !ok {
@@ -592,6 +664,41 @@ func (c *Client) handleUpdate(
592664
return nil
593665
}
594666

667+
func acpSessionUpdateTopLevelType(raw json.RawMessage) string {
668+
if len(raw) == 0 {
669+
return ""
670+
}
671+
var payload struct {
672+
Type string `json:"type"`
673+
}
674+
if err := json.Unmarshal(raw, &payload); err != nil {
675+
return ""
676+
}
677+
return strings.TrimSpace(payload.Type)
678+
}
679+
680+
func acpSessionUpdateIsTerminal(raw json.RawMessage) bool {
681+
if len(raw) == 0 {
682+
return false
683+
}
684+
var payload struct {
685+
Type string `json:"type"`
686+
Status string `json:"status"`
687+
}
688+
if err := json.Unmarshal(raw, &payload); err != nil {
689+
return false
690+
}
691+
if strings.TrimSpace(payload.Type) != "status" {
692+
return false
693+
}
694+
switch strings.TrimSpace(payload.Status) {
695+
case "turn_completed", "turn_cancelled":
696+
return true
697+
default:
698+
return false
699+
}
700+
}
701+
595702
func (c *Client) handlePermissionRequest(
596703
ctx context.Context,
597704
runtime *codexacp.EmbeddedRuntime,

internal/agents/codex/embedded_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,69 @@ func TestStreamReplaysCachedSlashCommandsAfterConfigOptionsInit(t *testing.T) {
7979
}
8080
}
8181

82+
func TestStreamCapturesReasoningSummaryDeltas(t *testing.T) {
83+
client := newFakeCodexClient(t)
84+
defer func() {
85+
_ = client.Close()
86+
}()
87+
88+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
89+
defer cancel()
90+
91+
var reasoning strings.Builder
92+
ctx = agents.WithReasoningHandler(ctx, func(_ context.Context, delta string) error {
93+
reasoning.WriteString(delta)
94+
return nil
95+
})
96+
97+
var answer strings.Builder
98+
stopReason, err := client.Stream(ctx, "reasoning summary probe", func(delta string) error {
99+
answer.WriteString(delta)
100+
return nil
101+
})
102+
if err != nil {
103+
t.Fatalf("Stream(): %v", err)
104+
}
105+
if stopReason != agents.StopReasonEndTurn {
106+
t.Fatalf("StopReason = %q, want %q", stopReason, agents.StopReasonEndTurn)
107+
}
108+
109+
if got := answer.String(); !strings.Contains(got, "working") {
110+
t.Fatalf("answer = %q, want it to include %q", got, "working")
111+
}
112+
if got, want := reasoning.String(), "Inspect repository state.\n\nConfirm reasoning plumbing."; got != want {
113+
t.Fatalf("reasoning = %q, want %q", got, want)
114+
}
115+
}
116+
117+
func TestStreamCapturesReasoningTextDeltas(t *testing.T) {
118+
client := newFakeCodexClient(t)
119+
defer func() {
120+
_ = client.Close()
121+
}()
122+
123+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
124+
defer cancel()
125+
126+
var reasoning strings.Builder
127+
ctx = agents.WithReasoningHandler(ctx, func(_ context.Context, delta string) error {
128+
reasoning.WriteString(delta)
129+
return nil
130+
})
131+
132+
stopReason, err := client.Stream(ctx, "reasoning raw probe", func(string) error { return nil })
133+
if err != nil {
134+
t.Fatalf("Stream(): %v", err)
135+
}
136+
if stopReason != agents.StopReasonEndTurn {
137+
t.Fatalf("StopReason = %q, want %q", stopReason, agents.StopReasonEndTurn)
138+
}
139+
140+
if got, want := reasoning.String(), "Raw reasoning step 1. Raw reasoning step 2."; got != want {
141+
t.Fatalf("reasoning = %q, want %q", got, want)
142+
}
143+
}
144+
82145
func TestSlashCommandsAfterConfigOptionsInit(t *testing.T) {
83146
client := newFakeCodexClient(t)
84147
defer func() {

0 commit comments

Comments
 (0)