Skip to content

Commit ed79af3

Browse files
authored
Merge pull request #29 from akkuman/patch-1
feat: streaming writes to io.Writer
2 parents 0ef331a + b00fba0 commit ed79af3

File tree

1 file changed

+37
-56
lines changed

1 file changed

+37
-56
lines changed

internal/communicate/communicate.go

Lines changed: 37 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -112,34 +112,18 @@ func NewCommunicate(text string, opt *communicateOption.CommunicateOption) (*Com
112112

113113
// WriteStreamTo write audio stream to io.WriteCloser
114114
func (c *Communicate) WriteStreamTo(rc io.Writer) error {
115-
116-
output := make(chan map[string]interface{})
117-
defer close(output)
118-
119115
ctx, cancel := context.WithCancel(context.Background())
120116
defer cancel()
121117

122-
err := c.stream(ctx, output)
118+
output, err := c.stream(ctx)
123119
if err != nil {
124120
return err
125121
}
126-
audioBinaryData := make([][][]byte, c.audioDataIndex)
127122

128123
for payload := range output {
129-
if _, ok := payload["end"]; ok {
130-
if len(audioBinaryData) == c.audioDataIndex {
131-
break
132-
}
133-
}
134124
if t, ok := payload["type"]; ok && t == "audio" {
135125
data := payload["data"].(audioData)
136-
audioBinaryData[data.Index] = append(audioBinaryData[data.Index], data.Data)
137-
}
138-
}
139-
140-
for _, dataSlice := range audioBinaryData {
141-
for _, data := range dataSlice {
142-
rc.Write(data)
126+
rc.Write(data.Data)
143127
}
144128
}
145129
return nil
@@ -188,7 +172,8 @@ func (c *Communicate) sendSSML(conn *websocket.Conn, currentTime string, text []
188172
),
189173
))
190174
}
191-
func (c *Communicate) stream(ctx context.Context, output chan map[string]interface{}) error {
175+
func (c *Communicate) stream(ctx context.Context) (chan map[string]interface{}, error) {
176+
output := make(chan map[string]interface{})
192177
texts := splitTextByByteLength(
193178
escape(removeIncompatibleCharacters(c.text)),
194179
calculateMaxMessageSize(c.opt.Pitch, c.opt.Voice, c.opt.Rate, c.opt.Volume),
@@ -197,46 +182,42 @@ func (c *Communicate) stream(ctx context.Context, output chan map[string]interfa
197182
c.finalUtterance = make(map[int]int)
198183
c.prevIdx = -1
199184
c.shiftTime = -1
200-
201-
var wg sync.WaitGroup
202-
203-
for idx, text := range texts {
204-
wsURL := businessConsts.EdgeWssEndpoint +
205-
"&Sec-MS-GEC=" + GenerateSecMsGecToken() +
206-
"&Sec-MS-GEC-Version=" + GenerateSecMsGecVersion() +
207-
"&ConnectionId=" + generateConnectID()
208-
dialer := websocket.Dialer{}
209-
setupWebSocketProxy(&dialer, c)
210-
211-
conn, _, err := dialer.Dial(wsURL, communicateHeader)
212-
if err != nil {
213-
return err
214-
}
215-
216-
wg.Add(1)
217-
go func(ctx context.Context, conn *websocket.Conn, idx int) {
218-
defer wg.Done()
219-
defer conn.Close()
220-
221-
currentTime := currentTimeInMST()
222-
err = c.sendConfig(conn, currentTime)
223-
if err != nil {
224-
log.Println("sendConfig error:", err)
225-
return
226-
}
227-
if err = c.sendSSML(conn, currentTime, text); err != nil {
228-
log.Println("sendSSML error:", err)
229-
return
230-
}
231-
c.connStreamExchange(ctx, conn, output, idx)
232-
}(ctx, conn, idx)
233-
}
234-
185+
235186
go func() {
236-
wg.Wait()
187+
defer close(output)
188+
for idx, text := range texts {
189+
func() {
190+
wsURL := businessConsts.EdgeWssEndpoint +
191+
"&Sec-MS-GEC=" + GenerateSecMsGecToken() +
192+
"&Sec-MS-GEC-Version=" + GenerateSecMsGecVersion() +
193+
"&ConnectionId=" + generateConnectID()
194+
dialer := websocket.Dialer{}
195+
setupWebSocketProxy(&dialer, c)
196+
197+
conn, _, err := dialer.Dial(wsURL, communicateHeader)
198+
if err != nil {
199+
output <- map[string]interface{}{
200+
"error": webSocketError{Message: err.Error()},
201+
}
202+
return
203+
}
204+
defer conn.Close()
205+
currentTime := currentTimeInMST()
206+
err = c.sendConfig(conn, currentTime)
207+
if err != nil {
208+
log.Println("sendConfig error:", err)
209+
return
210+
}
211+
if err = c.sendSSML(conn, currentTime, text); err != nil {
212+
log.Println("sendSSML error:", err)
213+
return
214+
}
215+
c.connStreamExchange(ctx, conn, output, idx)
216+
}()
217+
}
237218
}()
238219

239-
return nil
220+
return output, nil
240221
}
241222

242223
func (c *Communicate) connStreamExchange(ctx context.Context, conn *websocket.Conn, output chan map[string]interface{}, idx int) {

0 commit comments

Comments
 (0)