Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions machinery/src/capture/gortsplib.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,40 @@ func (g *Golibrtsp) Connect(ctx context.Context, ctxOtel context.Context) (err e
}
}

// Look for audio stream.
// find the LPCM media and format
audioFormaLPCM, audioMediLPCM := FindLPCM(desc, false)
g.AudioLPCMMedia = audioMediLPCM
g.AudioLPCMForma = audioFormaLPCM
if audioMediLPCM == nil {
log.Log.Debug("capture.golibrtsp.Connect(LPCM): " + "audio media not found")
} else {
_, err = g.Client.Setup(desc.BaseURL, audioMediLPCM, 0, 0)
if err != nil {
log.Log.Error("capture.golibrtsp.Connect(LPCM): " + err.Error())
} else {
audiortpDec, err := audioFormaLPCM.CreateDecoder()
if err != nil {
log.Log.Error("capture.golibrtsp.Connect(LPCM): " + err.Error())
} else {
g.AudioLPCMDecoder = audiortpDec
streamIndex := len(g.Streams)
g.Streams = append(g.Streams, packets.Stream{
Index: streamIndex,
Name: "LPCM",
IsVideo: false,
IsAudio: true,
IsBackChannel: false,
SampleRate: audioFormaLPCM.SampleRate,
Channels: audioFormaLPCM.ChannelCount,
BitDepth: audioFormaLPCM.BitDepth,
})

g.AudioLPCMIndex = int8(len(g.Streams)) - 1
}
}
}

// Look for audio stream.
// find the G711 media and format
audioForma, audioMedi := FindPCMU(desc, false)
Expand Down Expand Up @@ -367,6 +401,8 @@ func (g *Golibrtsp) Connect(ctx context.Context, ctxOtel context.Context) (err e
IsVideo: false,
IsAudio: true,
IsBackChannel: false,
SampleRate: defaultPCMUSampleRate,
Channels: defaultPCMUChannels,
})

// Set the index for the audio
Expand Down Expand Up @@ -509,6 +545,8 @@ func (g *Golibrtsp) ConnectBackChannel(ctx context.Context, ctxRunAgent context.
IsVideo: false,
IsAudio: true,
IsBackChannel: true,
SampleRate: defaultPCMUSampleRate,
Channels: defaultPCMUChannels,
})
// Set the index for the audio
g.AudioG711IndexBackChannel = int8(len(g.Streams)) - 1
Expand All @@ -521,6 +559,39 @@ func (g *Golibrtsp) ConnectBackChannel(ctx context.Context, ctxRunAgent context.
func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets.Queue, configuration *models.Configuration, communication *models.Communication) (err error) {
log.Log.Debug("capture.golibrtsp.Start(): started")

// called when a MULAW audio RTP packet arrives
if g.AudioLPCMMedia != nil && g.AudioLPCMForma != nil {
g.Client.OnPacketRTP(g.AudioLPCMMedia, g.AudioLPCMForma, func(rtppkt *rtp.Packet) {
pts, ok := g.Client.PacketPTS(g.AudioLPCMMedia, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.AudioLPCMMedia, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
return
}
Comment on lines +565 to +570

op, err := g.AudioLPCMDecoder.Decode(rtppkt)
if err != nil {
log.Log.Error("capture.golibrtsp.Start(): " + err.Error())
return
}

pkt := packets.Packet{
IsKeyFrame: false,
Packet: rtppkt,
Data: op,
Time: pts2,
TimeLegacy: pts,
CompositionTime: pts2,
CurrentTime: time.Now().UnixMilli(),
Idx: g.AudioLPCMIndex,
IsVideo: false,
IsAudio: true,
Codec: "LPCM",
}
queue.WritePacket(pkt)
})
}

// called when a MULAW audio RTP packet arrives
if g.AudioG711Media != nil && g.AudioG711Forma != nil {
g.Client.OnPacketRTP(g.AudioG711Media, g.AudioG711Forma, func(rtppkt *rtp.Packet) {
Expand Down Expand Up @@ -1259,6 +1330,21 @@ func FindPCMU(desc *description.Session, isBackChannel bool) (*format.G711, *des
return nil, nil
}

func FindLPCM(desc *description.Session, isBackChannel bool) (*format.LPCM, *description.Media) {
for _, media := range desc.Medias {
if media.IsBackChannel == isBackChannel {
for _, forma := range media.Formats {
if lpcm, ok := forma.(*format.LPCM); ok {
if lpcm.SampleRate > 0 && lpcm.ChannelCount > 0 && lpcm.BitDepth > 0 {
return lpcm, media
}
}
}
}
}
return nil, nil
}

func FindOPUS(desc *description.Session, isBackChannel bool) (*format.Opus, *description.Media) {
for _, media := range desc.Medias {
if media.IsBackChannel == isBackChannel {
Expand Down
128 changes: 78 additions & 50 deletions machinery/src/capture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,31 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// We only expect one audio and one video codec.
// If there are multiple audio or video streams, we will use the first one.
audioCodec := ""
audioBitDepth := 0
videoCodec := ""
configuredAudioSampleRate := config.Capture.IPCamera.SampleRate
configuredAudioChannels := config.Capture.IPCamera.Channels
audioStreams, _ := rtspClient.GetAudioStreams()
videoStreams, _ := rtspClient.GetVideoStreams()
if len(audioStreams) > 0 {
audioCodec = audioStreams[0].Name
config.Capture.IPCamera.SampleRate = audioStreams[0].SampleRate
config.Capture.IPCamera.Channels = audioStreams[0].Channels
resolvedSampleRate := audioStreams[0].SampleRate
resolvedChannels := audioStreams[0].Channels

if audioCodec == "LPCM" {
if configuredAudioSampleRate > 0 && configuredAudioSampleRate != resolvedSampleRate {
log.Log.Warning("capture.main.HandleRecordStream(): LPCM sample rate mismatch between configuration and RTSP stream; using configured value " + strconv.Itoa(configuredAudioSampleRate) + " instead of detected value " + strconv.Itoa(resolvedSampleRate))
resolvedSampleRate = configuredAudioSampleRate
}
if configuredAudioChannels > 0 && configuredAudioChannels != resolvedChannels {
log.Log.Warning("capture.main.HandleRecordStream(): LPCM channel count mismatch between configuration and RTSP stream; using configured value " + strconv.Itoa(configuredAudioChannels) + " instead of detected value " + strconv.Itoa(resolvedChannels))
resolvedChannels = configuredAudioChannels
}
}

config.Capture.IPCamera.SampleRate = resolvedSampleRate
config.Capture.IPCamera.Channels = resolvedChannels
audioBitDepth = audioStreams[0].BitDepth
}
if len(videoStreams) > 0 {
videoCodec = videoStreams[0].Name
Expand All @@ -105,13 +123,15 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
//var cws *cacheWriterSeeker
var mp4Video *video.MP4
var videoTrack uint32
var audioTrack uint32
var audioWriter *recordingAudioWriter
var name string

// Do not do anything!
log.Log.Info("capture.main.HandleRecordStream(continuous): start recording")

start := false
rolloverRequested := false
rolloverMaxLogged := false

// If continuous record the full length
postRecording = maxRecordingPeriod
Expand All @@ -136,9 +156,18 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
nextPkt, cursorError = recordingCursor.ReadPacket()

now := time.Now().UnixMilli()
hardMaxReached := now-startRecording > maxRecordingPeriod-500
postRecordingElapsed := startRecording+postRecording-now <= 0
if start && (postRecordingElapsed || hardMaxReached) {
rolloverRequested = true
if hardMaxReached && !rolloverMaxLogged {
log.Log.Info("capture.main.HandleRecordStream(continuous): max recording period reached, waiting for next keyframe to roll over without dropping frames")
rolloverMaxLogged = true
}
}

if start && // If already recording and current frame is a keyframe and we should stop recording
nextPkt.IsKeyFrame && (startRecording+postRecording-now <= 0 || now-startRecording > maxRecordingPeriod-500) {
rolloverRequested && nextPkt.IsKeyFrame {

pts := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
Expand All @@ -147,17 +176,17 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
} else if pkt.IsAudio {
// Write the last packet
if pkt.Codec == "AAC" {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
if err := audioWriter.WritePacket(pkt); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
}

if err := audioWriter.Flush(); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
audioWriter.Close()
audioWriter = nil

// Close mp4
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
Expand All @@ -177,6 +206,8 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat

// Cleanup muxer
start = false
rolloverRequested = false
rolloverMaxLogged = false

// Update the name of the recording with the duration.
// We will update the name of the recording with the duration in milliseconds.
Expand Down Expand Up @@ -305,27 +336,16 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
audioWriter = newRecordingAudioWriter(mp4Video, audioCodec, config.Capture.IPCamera.SampleRate, config.Capture.IPCamera.Channels, audioBitDepth, "capture.main.HandleRecordStream(continuous)")

pts := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
} else if pkt.IsAudio {
if pkt.Codec == "AAC" {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
// We might need to use ffmpeg to transcode the audio to AAC.
// For now we will skip the audio track.
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
if err := audioWriter.WritePacket(pkt); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
}
recordingStatus = "started"
Expand All @@ -339,13 +359,8 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
} else if pkt.IsAudio {
if pkt.Codec == "AAC" {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
if err := audioWriter.WritePacket(pkt); err != nil {
log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error())
}
}
}
Expand All @@ -355,6 +370,10 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// We might have interrupted the recording while restarting the agent.
// If this happens we need to check to properly close the recording.
if cursorError != nil {
if audioWriter != nil {
audioWriter.Close()
audioWriter = nil
}
if recordingStatus == "started" {

log.Log.Info("capture.main.HandleRecordStream(continuous): Recording finished: file save: " + name)
Expand Down Expand Up @@ -434,7 +453,6 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
var displayTime int64 = 0 // display time in milliseconds

var videoTrack uint32
var audioTrack uint32

for motion := range communication.HandleMotion {

Expand All @@ -448,6 +466,8 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
motionTimestamp := now

start := false
rolloverRequested := false
rolloverMaxLogged := false

if cursorError == nil {
pkt, cursorError = recordingCursor.ReadPacket()
Expand Down Expand Up @@ -520,6 +540,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
}
// Create the MP4 only once the first keyframe arrives.
var mp4Video *video.MP4
var audioWriter *recordingAudioWriter

for cursorError == nil {

Expand All @@ -538,7 +559,17 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
default:
}

if start && (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
hardMaxReached := now-startRecording > maxRecordingPeriod-500
postRecordingElapsed := motionTimestamp+postRecording-now < 0
if start && (postRecordingElapsed || hardMaxReached) {
rolloverRequested = true
if hardMaxReached && !rolloverMaxLogged {
log.Log.Info("capture.main.HandleRecordStream(motiondetection): max recording period reached, waiting for next keyframe to close without dropping frames")
rolloverMaxLogged = true
}
}

if start && rolloverRequested && nextPkt.IsKeyFrame {
log.Log.Info("capture.main.HandleRecordStream(motiondetection): timestamp+postRecording-now < 0 - " + strconv.FormatInt(motionTimestamp+postRecording-now, 10) + " < 0")
log.Log.Info("capture.main.HandleRecordStream(motiondetection): now-startRecording > maxRecordingPeriod-500 - " + strconv.FormatInt(now-startRecording, 10) + " > " + strconv.FormatInt(maxRecordingPeriod-500, 10))
log.Log.Info("capture.main.HandleRecordStream(motiondetection): closing recording (timestamp: " + strconv.FormatInt(motionTimestamp, 10) + ", postRecording: " + strconv.FormatInt(postRecording, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
Expand All @@ -563,11 +594,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
audioWriter = newRecordingAudioWriter(mp4Video, audioCodec, config.Capture.IPCamera.SampleRate, config.Capture.IPCamera.Channels, audioBitDepth, "capture.main.HandleRecordStream(motiondetection)")
start = true
}
if start {
Expand All @@ -581,17 +608,10 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
}
} else if pkt.IsAudio {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add audio sample")
if pkt.Codec == "AAC" {
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
if mp4Video != nil {
if err := audioWriter.WritePacket(pkt); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
// We might need to use ffmpeg to transcode the audio to AAC.
// For now we will skip the audio track.
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): no AAC audio codec detected, skipping audio track.")
}
}
}
Expand All @@ -604,10 +624,18 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
lastRecordingTime = pkt.CurrentTime

if mp4Video == nil {
if audioWriter != nil {
audioWriter.Close()
}
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): recording closed without keyframe; no MP4 created")
continue
}

if err := audioWriter.Flush(); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
audioWriter.Close()

// This will close the recording and write the last packet.
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
Expand Down
Loading
Loading