Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 4154f26

Browse files
authored
Enable data flow from QUIC agent to audio agent. (#1102)
* Enable data flow from QUIC agent to audio agent. * Determine a track's kind by its ID. * Set track kind for data tracks. * Timestamps for audio frames are set in QUIC agent.
1 parent b7ef3c0 commit 4154f26

File tree

5 files changed

+53
-18
lines changed

5 files changed

+53
-18
lines changed

source/agent/addons/quic/QuicFactory.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include "QuicFactory.h"
88
#include "owt/quic/web_transport_factory.h"
9+
#include "owt/quic/logging.h"
910
#include <mutex>
1011

1112
DEFINE_LOGGER(QuicFactory, "QuicFactory");

source/agent/addons/quic/QuicTransportStream.cc

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ QuicTransportStream::QuicTransportStream(owt::quic::WebTransportStreamInterface*
3636
, m_hasSink(false)
3737
, m_buffer(nullptr)
3838
, m_bufferSize(0)
39-
, m_isMedia(false)
39+
, m_trackKind("unknown")
40+
, m_frameFormat(owt_base::FRAME_FORMAT_UNKNOWN)
4041
, m_readingFrameSize(false)
4142
, m_frameSizeOffset(0)
4243
, m_frameSizeArray(new uint8_t[frameHeaderSize])
4344
, m_currentFrameSize(0)
4445
, m_receivedFrameOffset(0)
46+
, m_audioTimeStamp(0)
4547
{
4648
}
4749

@@ -123,7 +125,7 @@ NAN_MODULE_INIT(QuicTransportStream::init)
123125
Nan::SetPrototypeMethod(tpl, "write", write);
124126
Nan::SetPrototypeMethod(tpl, "readTrackId", readTrackId);
125127
Nan::SetPrototypeMethod(tpl, "addDestination", addDestination);
126-
Nan::SetAccessor(instanceTpl, Nan::New("isMedia").ToLocalChecked(), isMediaGetter, isMediaSetter);
128+
Nan::SetAccessor(instanceTpl, Nan::New("trackKind").ToLocalChecked(), trackKindGetter, trackKindSetter);
127129
Nan::SetAccessor(instanceTpl, Nan::New("ondata").ToLocalChecked(), onDataGetter, onDataSetter);
128130

129131
s_constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked());
@@ -212,15 +214,16 @@ void QuicTransportStream::CheckReadableData()
212214
}
213215
}
214216

215-
NAN_GETTER(QuicTransportStream::isMediaGetter){
217+
NAN_GETTER(QuicTransportStream::trackKindGetter){
216218
QuicTransportStream* obj = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info.Holder());
217-
info.GetReturnValue().Set(Nan::New(obj->m_isMedia));
219+
info.GetReturnValue().Set(Nan::New(obj->m_trackKind).ToLocalChecked());
218220
}
219221

220-
NAN_SETTER(QuicTransportStream::isMediaSetter)
222+
NAN_SETTER(QuicTransportStream::trackKindSetter)
221223
{
222224
QuicTransportStream* obj = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info.Holder());
223-
obj->m_isMedia = value->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
225+
Nan::Utf8String trackKind(Nan::To<v8::String>(value).ToLocalChecked());
226+
obj->m_trackKind = std::string(*trackKind);
224227
}
225228

226229
NAN_GETTER(QuicTransportStream::onDataGetter)
@@ -365,10 +368,14 @@ void QuicTransportStream::SignalOnData()
365368
return;
366369
}
367370

371+
if (m_trackKind == "unknown") {
372+
return;
373+
}
374+
368375
while (m_stream->ReadableBytes() > 0) {
369376
auto readableBytes = m_stream->ReadableBytes();
370377
// Future check if it's an audio stream or video stream. Audio is not supported at this time.
371-
if (m_isMedia) {
378+
if (m_trackKind == "audio" || m_trackKind == "video") {
372379
// A new frame.
373380
if (m_currentFrameSize == 0 && m_receivedFrameOffset == 0 && !m_readingFrameSize) {
374381
m_readingFrameSize = true;
@@ -403,16 +410,29 @@ void QuicTransportStream::SignalOnData()
403410
// Complete frame.
404411
if (m_receivedFrameOffset == m_currentFrameSize) {
405412
owt_base::Frame frame;
406-
frame.format = owt_base::FRAME_FORMAT_I420;
413+
if (m_trackKind == "audio") {
414+
frame.format = owt_base::FRAME_FORMAT_OPUS;
415+
frame.timeStamp = m_audioTimeStamp;
416+
// TODO: Fill a correct timestamp and check overflow.
417+
m_audioTimeStamp += 10 * 1000;
418+
// TODO: Get format from signaling message.
419+
frame.additionalInfo.audio.isRtpPacket = false;
420+
frame.additionalInfo.audio.sampleRate = 48000;
421+
frame.additionalInfo.audio.channels = 2;
422+
} else if (m_trackKind == "video") {
423+
frame.format = owt_base::FRAME_FORMAT_H264;
424+
// Transport layer doesn't know a frame's type. Video agent is able to parse the type of a frame from bistream. However, video agent doesn't feed the frame to decoder when a key frame is requested.
425+
frame.additionalInfo.video.isKeyFrame = "key";
426+
} else {
427+
ELOG_ERROR("Unexpected track kind: %s.", m_trackKind);
428+
}
407429
frame.length = m_currentFrameSize;
408430
frame.payload = m_buffer;
409-
// Transport layer doesn't know a frame's type. Video agent is able to parse the type of a frame from bistream. However, video agent doesn't feed the frame to decoder when a key frame is requested.
410-
frame.additionalInfo.video.isKeyFrame = "key";
411431
deliverFrame(frame);
412432
m_currentFrameSize = 0;
413433
m_receivedFrameOffset = 0;
414434
}
415-
} else {
435+
} else if (m_trackKind == "data") {
416436
if (readableBytes > m_bufferSize) {
417437
ReallocateBuffer(readableBytes);
418438
}

source/agent/addons/quic/QuicTransportStream.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame
4040
// TODO: Make this as an async method when it's supported.
4141
static NAN_METHOD(readTrackId);
4242

43-
static NAN_GETTER(isMediaGetter);
44-
static NAN_SETTER(isMediaSetter);
43+
static NAN_GETTER(trackKindGetter);
44+
static NAN_SETTER(trackKindSetter);
4545
static NAN_GETTER(onDataGetter);
4646
static NAN_SETTER(onDataSetter);
4747

@@ -98,15 +98,18 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame
9898
uint8_t* m_buffer;
9999
size_t m_bufferSize;
100100

101-
// Indicates whether this is a media stream. If this is not a media stream, it can only be piped to another QUIC agent.
102-
bool m_isMedia;
101+
// Indicates the kind of this stream, could be 'audio', 'video', 'data'. If this is not a data track, it can only be piped to another QUIC agent.
102+
std::string m_trackKind;
103+
owt_base::FrameFormat m_frameFormat;
103104
Nan::Persistent<v8::Value> m_onDataCallback;
104105

105106
size_t m_readingFrameSize;
106107
size_t m_frameSizeOffset;
107108
uint8_t* m_frameSizeArray;
108109
size_t m_currentFrameSize;
109110
size_t m_receivedFrameOffset;
111+
// TODO: Using wall clock timestamps seems not working. Using an increasing sequence instead. Fix it later.
112+
uint32_t m_audioTimeStamp;
110113

111114
uv_async_t m_asyncOnContentSessionId;
112115
uv_async_t m_asyncOnTrackId;

source/agent/addons/quic/quic_sdk_url

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
https://api.github.com/repos/open-webrtc-toolkit/owt-sdk-quic/actions/artifacts/84639537/zip
1+
https://api.github.com/repos/open-webrtc-toolkit/owt-sdk-quic/actions/artifacts/97822442/zip

source/agent/quic/index.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const addon = require('./build/Release/quic');
2424
const cipher = require('../cipher');
2525
const path = require('path');
2626
const {InternalConnectionRouter} = require('./internalConnectionRouter');
27+
const audioTrackId = '00000000000000000000000000000001';
28+
const videoTrackId = '00000000000000000000000000000002';
2729

2830
log.info('QUIC transport node.')
2931

@@ -105,7 +107,6 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
105107
log.debug(
106108
'A stream with session ID ' + stream.contentSessionId +
107109
' is added.');
108-
// Set isMedia=true if the session is for media.
109110
if (outgoingStreamPipelines.has(stream.contentSessionId)) {
110111
const pipeline = outgoingStreamPipelines.get(stream.contentSessionId);
111112
pipeline.quicStream(stream);
@@ -119,9 +120,9 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
119120
const options = publicationOptions.get(stream.contentSessionId);
120121
// Only publications for media have tracks.
121122
if (options.tracks && options.tracks.length) {
122-
stream.isMedia = true;
123123
stream.readTrackId();
124124
} else {
125+
stream.trackKind = 'data';
125126
frameSourceMap.get(stream.contentSessionId).addInputStream(stream);
126127
}
127128
} else {
@@ -133,6 +134,16 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
133134
}
134135
});
135136
quicTransportServer.on('trackid', (stream) => {
137+
// TODO: Defined track ID and get track kind based on ID. Currently it
138+
// is hard coded.
139+
if (stream.trackId === audioTrackId) {
140+
stream.trackKind = 'audio';
141+
} else if (stream.trackId === videoTrackId) {
142+
stream.trackKind = 'video';
143+
} else {
144+
log.warn('Unexpected track ID: ' + stream.trackId);
145+
return;
146+
}
136147
if (frameSourceMap.has(stream.contentSessionId)) {
137148
frameSourceMap.get(stream.contentSessionId).addInputStream(stream);
138149
}

0 commit comments

Comments
 (0)