diff --git a/apps/sotto/internal/riva/client.go b/apps/sotto/internal/riva/client.go index 6274366..53ac5cf 100644 --- a/apps/sotto/internal/riva/client.go +++ b/apps/sotto/internal/riva/client.go @@ -41,14 +41,15 @@ type Stream struct { recvDone chan struct{} - mu sync.Mutex - segments []string // committed transcript segments (final results and sealed interim chains) - lastInterim string - lastInterimAge int - lastInterimStability float32 - recvErr error - closedSend bool - debugSinkJSON io.Writer + mu sync.Mutex + segments []string // committed transcript segments (final results and sealed interim chains) + lastInterim string + lastInterimAge int + lastInterimStability float32 + lastInterimAudioProcessed float32 + recvErr error + closedSend bool + debugSinkJSON io.Writer } // DialStream establishes a stream, sends config, and starts the receive loop. diff --git a/apps/sotto/internal/riva/client_test.go b/apps/sotto/internal/riva/client_test.go index cd0e9a4..b8936d4 100644 --- a/apps/sotto/internal/riva/client_test.go +++ b/apps/sotto/internal/riva/client_test.go @@ -106,6 +106,54 @@ func TestRecordResponseCommitsStableSingleInterimOnDivergence(t *testing.T) { require.Equal(t, []string{"first phrase", "second phrase"}, segments) } +func TestRecordResponseCommitsOneShotInterimOnAudioAdvance(t *testing.T) { + s := &Stream{} + + s.recordResponse(&asrpb.StreamingRecognizeResponse{ + Results: []*asrpb.StreamingRecognitionResult{{ + IsFinal: false, + AudioProcessed: 1.0, + Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "first phrase has enough words"}}, + }}, + }) + + s.recordResponse(&asrpb.StreamingRecognizeResponse{ + Results: []*asrpb.StreamingRecognitionResult{{ + IsFinal: false, + AudioProcessed: 2.0, + Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "second phrase continues now"}}, + }}, + }) + + require.Equal(t, []string{"first phrase has enough words"}, s.segments) + segments := collectSegments(s.segments, s.lastInterim) + require.Equal(t, []string{"first phrase has enough words", "second phrase continues now"}, segments) +} + +func TestRecordResponseKeepsOneShotInterimWhenAudioAdvanceIsSmall(t *testing.T) { + s := &Stream{} + + s.recordResponse(&asrpb.StreamingRecognizeResponse{ + Results: []*asrpb.StreamingRecognitionResult{{ + IsFinal: false, + AudioProcessed: 1.0, + Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "first phrase has enough words"}}, + }}, + }) + + s.recordResponse(&asrpb.StreamingRecognizeResponse{ + Results: []*asrpb.StreamingRecognitionResult{{ + IsFinal: false, + AudioProcessed: 1.2, + Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "second phrase continues now"}}, + }}, + }) + + require.Empty(t, s.segments) + segments := collectSegments(s.segments, s.lastInterim) + require.Equal(t, []string{"second phrase continues now"}, segments) +} + func TestRecordResponseCommitsInterimChainOnDivergence(t *testing.T) { s := &Stream{} @@ -244,11 +292,13 @@ func TestInterimHelpers(t *testing.T) { }) } - require.False(t, shouldCommitInterimBoundary("", 5, 0.9)) - require.False(t, shouldCommitInterimBoundary("first phrase", 1, 0.1)) - require.True(t, shouldCommitInterimBoundary("first phrase", 2, 0.1)) - require.True(t, shouldCommitInterimBoundary("first phrase", 1, 0.9)) - require.True(t, shouldCommitInterimBoundary("done.", 1, 0.0)) + require.False(t, shouldCommitInterimBoundary("", 5, 0.9, 1.0, 2.0)) + require.False(t, shouldCommitInterimBoundary("first phrase", 1, 0.1, 1.0, 1.2)) + require.True(t, shouldCommitInterimBoundary("first phrase", 2, 0.1, 1.0, 1.1)) + require.True(t, shouldCommitInterimBoundary("first phrase", 1, 0.9, 1.0, 1.1)) + require.True(t, shouldCommitInterimBoundary("done.", 1, 0.0, 1.0, 1.1)) + require.True(t, shouldCommitInterimBoundary("first phrase has enough words", 1, 0.1, 1.0, 2.0)) + require.False(t, shouldCommitInterimBoundary("too short", 1, 0.1, 1.0, 2.0)) } func TestDialStreamEndToEndWithDebugSinkAndSpeechContexts(t *testing.T) { diff --git a/apps/sotto/internal/riva/stream_receive.go b/apps/sotto/internal/riva/stream_receive.go index e7801fe..0abc5de 100644 --- a/apps/sotto/internal/riva/stream_receive.go +++ b/apps/sotto/internal/riva/stream_receive.go @@ -55,17 +55,26 @@ func (s *Stream) recordResponse(resp *asrpb.StreamingRecognizeResponse) { s.lastInterim = "" s.lastInterimAge = 0 s.lastInterimStability = 0 + s.lastInterimAudioProcessed = 0 continue } + currentAudioProcessed := result.GetAudioProcessed() if s.lastInterim != "" { if isInterimContinuation(s.lastInterim, transcript) { s.lastInterim = transcript s.lastInterimAge++ s.lastInterimStability = result.GetStability() + s.lastInterimAudioProcessed = currentAudioProcessed continue } - if shouldCommitInterimBoundary(s.lastInterim, s.lastInterimAge, s.lastInterimStability) { + if shouldCommitInterimBoundary( + s.lastInterim, + s.lastInterimAge, + s.lastInterimStability, + s.lastInterimAudioProcessed, + currentAudioProcessed, + ) { s.segments = appendSegment(s.segments, s.lastInterim) } } @@ -73,5 +82,6 @@ func (s *Stream) recordResponse(resp *asrpb.StreamingRecognizeResponse) { s.lastInterim = transcript s.lastInterimAge = 1 s.lastInterimStability = result.GetStability() + s.lastInterimAudioProcessed = currentAudioProcessed } } diff --git a/apps/sotto/internal/riva/transcript_segments.go b/apps/sotto/internal/riva/transcript_segments.go index 17f0d48..f4a1960 100644 --- a/apps/sotto/internal/riva/transcript_segments.go +++ b/apps/sotto/internal/riva/transcript_segments.go @@ -3,8 +3,10 @@ package riva import "strings" const ( - minInterimChainUpdates = 2 - stableInterimBoundaryThreshold = 0.85 + minInterimChainUpdates = 2 + stableInterimBoundaryThreshold = 0.85 + interimBoundaryAudioAdvanceSeconds = 0.75 + minInterimWordsForAudioBoundary = 3 ) // collectSegments appends a valid trailing interim segment when needed. @@ -80,7 +82,13 @@ func isInterimContinuation(previous string, current string) bool { // shouldCommitInterimBoundary returns true when a divergent interim chain looks // established enough to preserve as a committed segment. -func shouldCommitInterimBoundary(previous string, chainUpdates int, stability float32) bool { +func shouldCommitInterimBoundary( + previous string, + chainUpdates int, + stability float32, + previousAudioProcessed float32, + currentAudioProcessed float32, +) bool { previous = cleanSegment(previous) if previous == "" { return false @@ -91,7 +99,23 @@ func shouldCommitInterimBoundary(previous string, chainUpdates int, stability fl if stability >= stableInterimBoundaryThreshold { return true } - return endsWithSentencePunctuation(previous) + if endsWithSentencePunctuation(previous) { + return true + } + return advancedAudioBoundary(previous, previousAudioProcessed, currentAudioProcessed) +} + +func advancedAudioBoundary(previous string, previousAudioProcessed float32, currentAudioProcessed float32) bool { + if previousAudioProcessed <= 0 || currentAudioProcessed <= 0 { + return false + } + if currentAudioProcessed <= previousAudioProcessed { + return false + } + if currentAudioProcessed-previousAudioProcessed < interimBoundaryAudioAdvanceSeconds { + return false + } + return len(strings.Fields(previous)) >= minInterimWordsForAudioBoundary } func endsWithSentencePunctuation(text string) bool {