Skip to content
This repository was archived by the owner on Mar 29, 2026. It is now read-only.

Commit 7c7f91e

Browse files
authored
fix(riva): avoid stale interim carryover in segment assembly (#3)
1 parent 82c6554 commit 7c7f91e

File tree

4 files changed

+160
-10
lines changed

4 files changed

+160
-10
lines changed

apps/sotto/internal/riva/client.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ type Stream struct {
4141

4242
recvDone chan struct{}
4343

44-
mu sync.Mutex
45-
segments []string // committed transcript segments (final and pause-committed interim)
46-
lastInterim string
47-
recvErr error
48-
closedSend bool
49-
debugSinkJSON io.Writer
44+
mu sync.Mutex
45+
segments []string // committed transcript segments (final and high-confidence boundary-committed interim)
46+
lastInterim string
47+
lastInterimStability float32
48+
recvErr error
49+
closedSend bool
50+
debugSinkJSON io.Writer
5051
}
5152

5253
// DialStream establishes a stream, sends config, and starts the receive loop.

apps/sotto/internal/riva/client_test.go

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestRecordResponseTracksInterimThenFinal(t *testing.T) {
5050
require.Equal(t, []string{"hello world"}, s.segments)
5151
}
5252

53-
func TestRecordResponseCommitsInterimAcrossPauseLikeReset(t *testing.T) {
53+
func TestRecordResponseReplacesDivergentInterimWithoutPrecommit(t *testing.T) {
5454
s := &Stream{}
5555

5656
s.recordResponse(&asrpb.StreamingRecognizeResponse{
@@ -67,10 +67,89 @@ func TestRecordResponseCommitsInterimAcrossPauseLikeReset(t *testing.T) {
6767
}},
6868
})
6969

70+
require.Empty(t, s.segments)
71+
segments := collectSegments(s.segments, s.lastInterim)
72+
require.Equal(t, []string{"second phrase"}, segments)
73+
}
74+
75+
func TestRecordResponseCommitsStableDivergentInterimForPartialRecovery(t *testing.T) {
76+
s := &Stream{}
77+
78+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
79+
Results: []*asrpb.StreamingRecognitionResult{{
80+
IsFinal: false,
81+
Stability: 0.95,
82+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "first phrase"}},
83+
}},
84+
})
85+
86+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
87+
Results: []*asrpb.StreamingRecognitionResult{{
88+
IsFinal: false,
89+
Stability: 0.20,
90+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "second phrase"}},
91+
}},
92+
})
93+
94+
require.Equal(t, []string{"first phrase"}, s.segments)
7095
segments := collectSegments(s.segments, s.lastInterim)
7196
require.Equal(t, []string{"first phrase", "second phrase"}, segments)
7297
}
7398

99+
func TestRecordResponseDoesNotPrependStaleInterimBeforeFinal(t *testing.T) {
100+
s := &Stream{}
101+
102+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
103+
Results: []*asrpb.StreamingRecognitionResult{{
104+
IsFinal: false,
105+
Stability: 0.05,
106+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "stale words"}},
107+
}},
108+
})
109+
110+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
111+
Results: []*asrpb.StreamingRecognitionResult{{
112+
IsFinal: false,
113+
Stability: 0.30,
114+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "hello world"}},
115+
}},
116+
})
117+
118+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
119+
Results: []*asrpb.StreamingRecognitionResult{{
120+
IsFinal: true,
121+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "hello world"}},
122+
}},
123+
})
124+
125+
segments := collectSegments(s.segments, s.lastInterim)
126+
require.Equal(t, []string{"hello world"}, segments)
127+
}
128+
129+
func TestRecordResponseTreatsSuffixCorrectionAsContinuation(t *testing.T) {
130+
s := &Stream{}
131+
132+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
133+
Results: []*asrpb.StreamingRecognitionResult{{
134+
IsFinal: false,
135+
Stability: 0.95,
136+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "replace reply replied on the review thread with details"}},
137+
}},
138+
})
139+
140+
s.recordResponse(&asrpb.StreamingRecognizeResponse{
141+
Results: []*asrpb.StreamingRecognitionResult{{
142+
IsFinal: false,
143+
Stability: 0.95,
144+
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "replied on the review thread with details"}},
145+
}},
146+
})
147+
148+
require.Empty(t, s.segments)
149+
segments := collectSegments(s.segments, s.lastInterim)
150+
require.Equal(t, []string{"replied on the review thread with details"}, segments)
151+
}
152+
74153
func TestAppendSegmentDedupAndPrefixMerge(t *testing.T) {
75154
segments := appendSegment(nil, "hello")
76155
require.Equal(t, []string{"hello"}, segments)
@@ -94,7 +173,13 @@ func TestCleanSegmentAndInterimContinuation(t *testing.T) {
94173

95174
require.True(t, isInterimContinuation("hello", "hello world"))
96175
require.True(t, isInterimContinuation("hello world", "hello"))
176+
require.True(t, isInterimContinuation("replace reply replied on thread", "replied on thread"))
97177
require.False(t, isInterimContinuation("first phrase", "second phrase"))
178+
179+
require.False(t, shouldCommitPriorInterimOnDivergence("first phrase", 0.2, "second phrase"))
180+
require.True(t, shouldCommitPriorInterimOnDivergence("first phrase", 0.9, "second phrase"))
181+
require.True(t, shouldCommitPriorInterimOnDivergence("Done.", 0.1, "new sentence"))
182+
require.False(t, shouldCommitPriorInterimOnDivergence("replace reply replied on thread", 0.95, "replied on thread"))
98183
}
99184

100185
func TestDialStreamEndToEndWithDebugSinkAndSpeechContexts(t *testing.T) {

apps/sotto/internal/riva/stream_receive.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,14 @@ func (s *Stream) recordResponse(resp *asrpb.StreamingRecognizeResponse) {
5353
if result.GetIsFinal() {
5454
s.segments = appendSegment(s.segments, transcript)
5555
s.lastInterim = ""
56+
s.lastInterimStability = 0
5657
continue
5758
}
5859

59-
if s.lastInterim != "" && !isInterimContinuation(s.lastInterim, transcript) {
60+
if shouldCommitPriorInterimOnDivergence(s.lastInterim, s.lastInterimStability, transcript) {
6061
s.segments = appendSegment(s.segments, s.lastInterim)
6162
}
6263
s.lastInterim = transcript
64+
s.lastInterimStability = result.GetStability()
6365
}
6466
}

apps/sotto/internal/riva/transcript_segments.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package riva
22

33
import "strings"
44

5+
const stableInterimBoundaryThreshold = 0.85
6+
57
// collectSegments appends a valid trailing interim segment when needed.
68
func collectSegments(committedSegments []string, lastInterim string) []string {
79
segments := append([]string(nil), committedSegments...)
@@ -48,18 +50,62 @@ func isInterimContinuation(previous string, current string) bool {
4850
if strings.HasPrefix(current, previous) || strings.HasPrefix(previous, current) {
4951
return true
5052
}
53+
if strings.HasSuffix(current, previous) || strings.HasSuffix(previous, current) {
54+
return true
55+
}
5156

5257
prevWords := strings.Fields(previous)
5358
currWords := strings.Fields(current)
54-
common := commonPrefixWords(prevWords, currWords)
5559
shorter := len(prevWords)
5660
if len(currWords) < shorter {
5761
shorter = len(currWords)
5862
}
5963
if shorter == 0 {
6064
return true
6165
}
62-
return common*2 >= shorter
66+
67+
commonPrefix := commonPrefixWords(prevWords, currWords)
68+
if commonPrefix*2 >= shorter {
69+
return true
70+
}
71+
72+
commonSuffix := commonSuffixWords(prevWords, currWords)
73+
if shorter >= 3 && commonSuffix*2 >= shorter {
74+
return true
75+
}
76+
77+
return false
78+
}
79+
80+
// shouldCommitPriorInterimOnDivergence decides whether to preserve prior interim
81+
// text when a new interim hypothesis diverges.
82+
func shouldCommitPriorInterimOnDivergence(previous string, previousStability float32, current string) bool {
83+
previous = cleanSegment(previous)
84+
current = cleanSegment(current)
85+
if previous == "" || current == "" {
86+
return false
87+
}
88+
if isInterimContinuation(previous, current) {
89+
return false
90+
}
91+
if previousStability >= stableInterimBoundaryThreshold {
92+
return true
93+
}
94+
return endsWithSentencePunctuation(previous)
95+
}
96+
97+
// endsWithSentencePunctuation reports whether transcript looks sentence-complete.
98+
func endsWithSentencePunctuation(transcript string) bool {
99+
transcript = strings.TrimSpace(transcript)
100+
if transcript == "" {
101+
return false
102+
}
103+
switch transcript[len(transcript)-1] {
104+
case '.', '!', '?':
105+
return true
106+
default:
107+
return false
108+
}
63109
}
64110

65111
// commonPrefixWords counts shared leading words across two slices.
@@ -78,6 +124,22 @@ func commonPrefixWords(left []string, right []string) int {
78124
return count
79125
}
80126

127+
// commonSuffixWords counts shared trailing words across two slices.
128+
func commonSuffixWords(left []string, right []string) int {
129+
li := len(left) - 1
130+
ri := len(right) - 1
131+
count := 0
132+
for li >= 0 && ri >= 0 {
133+
if left[li] != right[ri] {
134+
break
135+
}
136+
count++
137+
li--
138+
ri--
139+
}
140+
return count
141+
}
142+
81143
// cleanSegment normalizes transcript whitespace.
82144
func cleanSegment(raw string) string {
83145
raw = strings.TrimSpace(raw)

0 commit comments

Comments
 (0)