Skip to content

Commit 44b2d3b

Browse files
committed
fix(riva): preserve one-shot interim boundaries via audio progression
Track interim audio_processed progress and use it when deciding whether to seal a divergent one-shot interim segment. This keeps early long-dictation chunks without reintroducing stale carry-over from quick rewrites. Add regression coverage for both large-audio-advance commit behavior and small-audio-advance replacement behavior.
1 parent 8307e65 commit 44b2d3b

File tree

4 files changed

+103
-18
lines changed

4 files changed

+103
-18
lines changed

apps/sotto/internal/riva/client.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ type Stream struct {
4141

4242
recvDone chan struct{}
4343

44-
mu sync.Mutex
45-
segments []string // committed transcript segments (final results and sealed interim chains)
46-
lastInterim string
47-
lastInterimAge int
48-
lastInterimStability float32
49-
recvErr error
50-
closedSend bool
51-
debugSinkJSON io.Writer
44+
mu sync.Mutex
45+
segments []string // committed transcript segments (final results and sealed interim chains)
46+
lastInterim string
47+
lastInterimAge int
48+
lastInterimStability float32
49+
lastInterimAudioProcessed float32
50+
recvErr error
51+
closedSend bool
52+
debugSinkJSON io.Writer
5253
}
5354

5455
// DialStream establishes a stream, sends config, and starts the receive loop.

apps/sotto/internal/riva/client_test.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,54 @@ func TestRecordResponseCommitsStableSingleInterimOnDivergence(t *testing.T) {
106106
require.Equal(t, []string{"first phrase", "second phrase"}, segments)
107107
}
108108

109+
func TestRecordResponseCommitsOneShotInterimOnAudioAdvance(t *testing.T) {
110+
s := &Stream{}
111+
112+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
113+
Results: []*asrpb.StreamingRecognitionResult{{
114+
IsFinal: false,
115+
AudioProcessed: 1.0,
116+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "first phrase has enough words"}},
117+
}},
118+
})
119+
120+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
121+
Results: []*asrpb.StreamingRecognitionResult{{
122+
IsFinal: false,
123+
AudioProcessed: 2.0,
124+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "second phrase continues now"}},
125+
}},
126+
})
127+
128+
require.Equal(t, []string{"first phrase has enough words"}, s.segments)
129+
segments := collectSegments(s.segments, s.lastInterim)
130+
require.Equal(t, []string{"first phrase has enough words", "second phrase continues now"}, segments)
131+
}
132+
133+
func TestRecordResponseKeepsOneShotInterimWhenAudioAdvanceIsSmall(t *testing.T) {
134+
s := &Stream{}
135+
136+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
137+
Results: []*asrpb.StreamingRecognitionResult{{
138+
IsFinal: false,
139+
AudioProcessed: 1.0,
140+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "first phrase has enough words"}},
141+
}},
142+
})
143+
144+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
145+
Results: []*asrpb.StreamingRecognitionResult{{
146+
IsFinal: false,
147+
AudioProcessed: 1.2,
148+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "second phrase continues now"}},
149+
}},
150+
})
151+
152+
require.Empty(t, s.segments)
153+
segments := collectSegments(s.segments, s.lastInterim)
154+
require.Equal(t, []string{"second phrase continues now"}, segments)
155+
}
156+
109157
func TestRecordResponseCommitsInterimChainOnDivergence(t *testing.T) {
110158
s := &Stream{}
111159

@@ -244,11 +292,13 @@ func TestInterimHelpers(t *testing.T) {
244292
})
245293
}
246294

247-
require.False(t, shouldCommitInterimBoundary("", 5, 0.9))
248-
require.False(t, shouldCommitInterimBoundary("first phrase", 1, 0.1))
249-
require.True(t, shouldCommitInterimBoundary("first phrase", 2, 0.1))
250-
require.True(t, shouldCommitInterimBoundary("first phrase", 1, 0.9))
251-
require.True(t, shouldCommitInterimBoundary("done.", 1, 0.0))
295+
require.False(t, shouldCommitInterimBoundary("", 5, 0.9, 1.0, 2.0))
296+
require.False(t, shouldCommitInterimBoundary("first phrase", 1, 0.1, 1.0, 1.2))
297+
require.True(t, shouldCommitInterimBoundary("first phrase", 2, 0.1, 1.0, 1.1))
298+
require.True(t, shouldCommitInterimBoundary("first phrase", 1, 0.9, 1.0, 1.1))
299+
require.True(t, shouldCommitInterimBoundary("done.", 1, 0.0, 1.0, 1.1))
300+
require.True(t, shouldCommitInterimBoundary("first phrase has enough words", 1, 0.1, 1.0, 2.0))
301+
require.False(t, shouldCommitInterimBoundary("too short", 1, 0.1, 1.0, 2.0))
252302
}
253303

254304
func TestDialStreamEndToEndWithDebugSinkAndSpeechContexts(t *testing.T) {

apps/sotto/internal/riva/stream_receive.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,33 @@ func (s *Stream) recordResponse(resp *asrpb.StreamingRecognizeResponse) {
5555
s.lastInterim = ""
5656
s.lastInterimAge = 0
5757
s.lastInterimStability = 0
58+
s.lastInterimAudioProcessed = 0
5859
continue
5960
}
6061

62+
currentAudioProcessed := result.GetAudioProcessed()
6163
if s.lastInterim != "" {
6264
if isInterimContinuation(s.lastInterim, transcript) {
6365
s.lastInterim = transcript
6466
s.lastInterimAge++
6567
s.lastInterimStability = result.GetStability()
68+
s.lastInterimAudioProcessed = currentAudioProcessed
6669
continue
6770
}
68-
if shouldCommitInterimBoundary(s.lastInterim, s.lastInterimAge, s.lastInterimStability) {
71+
if shouldCommitInterimBoundary(
72+
s.lastInterim,
73+
s.lastInterimAge,
74+
s.lastInterimStability,
75+
s.lastInterimAudioProcessed,
76+
currentAudioProcessed,
77+
) {
6978
s.segments = appendSegment(s.segments, s.lastInterim)
7079
}
7180
}
7281

7382
s.lastInterim = transcript
7483
s.lastInterimAge = 1
7584
s.lastInterimStability = result.GetStability()
85+
s.lastInterimAudioProcessed = currentAudioProcessed
7686
}
7787
}

apps/sotto/internal/riva/transcript_segments.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package riva
33
import "strings"
44

55
const (
6-
minInterimChainUpdates = 2
7-
stableInterimBoundaryThreshold = 0.85
6+
minInterimChainUpdates = 2
7+
stableInterimBoundaryThreshold = 0.85
8+
interimBoundaryAudioAdvanceSeconds = 0.75
9+
minInterimWordsForAudioBoundary = 3
810
)
911

1012
// collectSegments appends a valid trailing interim segment when needed.
@@ -80,7 +82,13 @@ func isInterimContinuation(previous string, current string) bool {
8082

8183
// shouldCommitInterimBoundary returns true when a divergent interim chain looks
8284
// established enough to preserve as a committed segment.
83-
func shouldCommitInterimBoundary(previous string, chainUpdates int, stability float32) bool {
85+
func shouldCommitInterimBoundary(
86+
previous string,
87+
chainUpdates int,
88+
stability float32,
89+
previousAudioProcessed float32,
90+
currentAudioProcessed float32,
91+
) bool {
8492
previous = cleanSegment(previous)
8593
if previous == "" {
8694
return false
@@ -91,7 +99,23 @@ func shouldCommitInterimBoundary(previous string, chainUpdates int, stability fl
9199
if stability >= stableInterimBoundaryThreshold {
92100
return true
93101
}
94-
return endsWithSentencePunctuation(previous)
102+
if endsWithSentencePunctuation(previous) {
103+
return true
104+
}
105+
return advancedAudioBoundary(previous, previousAudioProcessed, currentAudioProcessed)
106+
}
107+
108+
func advancedAudioBoundary(previous string, previousAudioProcessed float32, currentAudioProcessed float32) bool {
109+
if previousAudioProcessed <= 0 || currentAudioProcessed <= 0 {
110+
return false
111+
}
112+
if currentAudioProcessed <= previousAudioProcessed {
113+
return false
114+
}
115+
if currentAudioProcessed-previousAudioProcessed < interimBoundaryAudioAdvanceSeconds {
116+
return false
117+
}
118+
return len(strings.Fields(previous)) >= minInterimWordsForAudioBoundary
95119
}
96120

97121
func endsWithSentencePunctuation(text string) bool {

0 commit comments

Comments
 (0)