@@ -170,7 +170,7 @@ func (t *multiWriter) Write(p []byte) (n int, err error) {
170170 return n , nil
171171}
172172
173- func startTrickleSubscribe (ctx context.Context , url * url.URL , params aiRequestParams , onFistSegment func ()) {
173+ func startTrickleSubscribe (ctx context.Context , url * url.URL , params aiRequestParams , sess * AISession , onFistSegment func ()) {
174174 // subscribe to the outputs and send them into LPMS
175175 subscriber := trickle .NewTrickleSubscriber (url .String ())
176176 r , w , err := os .Pipe ()
@@ -193,6 +193,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
193193 go func () {
194194 var err error
195195 firstSegment := true
196+ var segmentsReceived int64
196197
197198 defer w .Close ()
198199 defer wMediaMTX .Close ()
@@ -243,6 +244,23 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
243244 firstSegment = false
244245 onFistSegment ()
245246 }
247+ segmentsReceived += 1
248+ if segmentsReceived == 3 && monitor .Enabled {
249+ // We assume that after receiving 3 segments, the runner started successfully
250+ // and we should be able to start the playback
251+ monitor .SendQueueEventAsync ("stream_trace" , map [string ]interface {}{
252+ "type" : "gateway_receive_few_processed_segments" ,
253+ "timestamp" : time .Now ().UnixMilli (),
254+ "stream_id" : params .liveParams .streamID ,
255+ "pipeline_id" : params .liveParams .pipelineID ,
256+ "request_id" : params .liveParams .requestID ,
257+ "orchestrator_info" : map [string ]interface {}{
258+ "address" : sess .Address (),
259+ "url" : sess .Transcoder (),
260+ },
261+ })
262+
263+ }
246264 clog .V (8 ).Infof (ctx , "trickle subscribe read data completed seq=%d bytes=%s" , seq , humanize .Bytes (uint64 (n )))
247265 }
248266 }()
0 commit comments