Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions apps/sotto/internal/riva/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ type Stream struct {

recvDone chan struct{}

mu sync.Mutex
segments []string // committed transcript segments (final results and sealed interim chains)
lastInterim string
lastInterimAge int
lastInterimStability float32
recvErr error
closedSend bool
debugSinkJSON io.Writer
mu sync.Mutex
segments []string // committed transcript segments (final results and sealed interim chains)
lastInterim string
lastInterimAge int
lastInterimStability float32
lastInterimAudioProcessed float32
recvErr error
closedSend bool
debugSinkJSON io.Writer
}

// DialStream establishes a stream, sends config, and starts the receive loop.
Expand Down
60 changes: 55 additions & 5 deletions apps/sotto/internal/riva/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,54 @@ func TestRecordResponseCommitsStableSingleInterimOnDivergence(t *testing.T) {
require.Equal(t, []string{"first phrase", "second phrase"}, segments)
}

func TestRecordResponseCommitsOneShotInterimOnAudioAdvance(t *testing.T) {
s := &Stream{}

s.recordResponse(&asrpb.StreamingRecognizeResponse{
Results: []*asrpb.StreamingRecognitionResult{{
IsFinal: false,
AudioProcessed: 1.0,
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "first phrase has enough words"}},
}},
})

s.recordResponse(&asrpb.StreamingRecognizeResponse{
Results: []*asrpb.StreamingRecognitionResult{{
IsFinal: false,
AudioProcessed: 2.0,
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "second phrase continues now"}},
}},
})

require.Equal(t, []string{"first phrase has enough words"}, s.segments)
segments := collectSegments(s.segments, s.lastInterim)
require.Equal(t, []string{"first phrase has enough words", "second phrase continues now"}, segments)
}

func TestRecordResponseKeepsOneShotInterimWhenAudioAdvanceIsSmall(t *testing.T) {
s := &Stream{}

s.recordResponse(&asrpb.StreamingRecognizeResponse{
Results: []*asrpb.StreamingRecognitionResult{{
IsFinal: false,
AudioProcessed: 1.0,
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "first phrase has enough words"}},
}},
})

s.recordResponse(&asrpb.StreamingRecognizeResponse{
Results: []*asrpb.StreamingRecognitionResult{{
IsFinal: false,
AudioProcessed: 1.2,
Alternatives: []*asrpb.SpeechRecognitionAlternative{{Transcript: "second phrase continues now"}},
}},
})

require.Empty(t, s.segments)
segments := collectSegments(s.segments, s.lastInterim)
require.Equal(t, []string{"second phrase continues now"}, segments)
}

func TestRecordResponseCommitsInterimChainOnDivergence(t *testing.T) {
s := &Stream{}

Expand Down Expand Up @@ -244,11 +292,13 @@ func TestInterimHelpers(t *testing.T) {
})
}

require.False(t, shouldCommitInterimBoundary("", 5, 0.9))
require.False(t, shouldCommitInterimBoundary("first phrase", 1, 0.1))
require.True(t, shouldCommitInterimBoundary("first phrase", 2, 0.1))
require.True(t, shouldCommitInterimBoundary("first phrase", 1, 0.9))
require.True(t, shouldCommitInterimBoundary("done.", 1, 0.0))
require.False(t, shouldCommitInterimBoundary("", 5, 0.9, 1.0, 2.0))
require.False(t, shouldCommitInterimBoundary("first phrase", 1, 0.1, 1.0, 1.2))
require.True(t, shouldCommitInterimBoundary("first phrase", 2, 0.1, 1.0, 1.1))
require.True(t, shouldCommitInterimBoundary("first phrase", 1, 0.9, 1.0, 1.1))
require.True(t, shouldCommitInterimBoundary("done.", 1, 0.0, 1.0, 1.1))
require.True(t, shouldCommitInterimBoundary("first phrase has enough words", 1, 0.1, 1.0, 2.0))
require.False(t, shouldCommitInterimBoundary("too short", 1, 0.1, 1.0, 2.0))
}

func TestDialStreamEndToEndWithDebugSinkAndSpeechContexts(t *testing.T) {
Expand Down
12 changes: 11 additions & 1 deletion apps/sotto/internal/riva/stream_receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,33 @@ func (s *Stream) recordResponse(resp *asrpb.StreamingRecognizeResponse) {
s.lastInterim = ""
s.lastInterimAge = 0
s.lastInterimStability = 0
s.lastInterimAudioProcessed = 0
continue
}

currentAudioProcessed := result.GetAudioProcessed()
if s.lastInterim != "" {
if isInterimContinuation(s.lastInterim, transcript) {
s.lastInterim = transcript
s.lastInterimAge++
s.lastInterimStability = result.GetStability()
s.lastInterimAudioProcessed = currentAudioProcessed
continue
}
if shouldCommitInterimBoundary(s.lastInterim, s.lastInterimAge, s.lastInterimStability) {
if shouldCommitInterimBoundary(
s.lastInterim,
s.lastInterimAge,
s.lastInterimStability,
s.lastInterimAudioProcessed,
currentAudioProcessed,
) {
s.segments = appendSegment(s.segments, s.lastInterim)
}
}

s.lastInterim = transcript
s.lastInterimAge = 1
s.lastInterimStability = result.GetStability()
s.lastInterimAudioProcessed = currentAudioProcessed
}
}
32 changes: 28 additions & 4 deletions apps/sotto/internal/riva/transcript_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package riva
import "strings"

const (
minInterimChainUpdates = 2
stableInterimBoundaryThreshold = 0.85
minInterimChainUpdates = 2
stableInterimBoundaryThreshold = 0.85
interimBoundaryAudioAdvanceSeconds = 0.75
minInterimWordsForAudioBoundary = 3
)

// collectSegments appends a valid trailing interim segment when needed.
Expand Down Expand Up @@ -80,7 +82,13 @@ func isInterimContinuation(previous string, current string) bool {

// shouldCommitInterimBoundary returns true when a divergent interim chain looks
// established enough to preserve as a committed segment.
func shouldCommitInterimBoundary(previous string, chainUpdates int, stability float32) bool {
func shouldCommitInterimBoundary(
previous string,
chainUpdates int,
stability float32,
previousAudioProcessed float32,
currentAudioProcessed float32,
) bool {
previous = cleanSegment(previous)
if previous == "" {
return false
Expand All @@ -91,7 +99,23 @@ func shouldCommitInterimBoundary(previous string, chainUpdates int, stability fl
if stability >= stableInterimBoundaryThreshold {
return true
}
return endsWithSentencePunctuation(previous)
if endsWithSentencePunctuation(previous) {
return true
}
return advancedAudioBoundary(previous, previousAudioProcessed, currentAudioProcessed)
}

func advancedAudioBoundary(previous string, previousAudioProcessed float32, currentAudioProcessed float32) bool {
if previousAudioProcessed <= 0 || currentAudioProcessed <= 0 {
return false
}
if currentAudioProcessed <= previousAudioProcessed {
return false
}
if currentAudioProcessed-previousAudioProcessed < interimBoundaryAudioAdvanceSeconds {
return false
}
return len(strings.Fields(previous)) >= minInterimWordsForAudioBoundary
}

func endsWithSentencePunctuation(text string) bool {
Expand Down
Loading