Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit ee2ec0a

Browse files
committed
Add support to subscribe data stream.
1 parent 8ef192a commit ee2ec0a

File tree

4 files changed

+129
-22
lines changed

4 files changed

+129
-22
lines changed

src/samples/conference/public/scripts/quic.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,21 @@ let quicChannel = null;
88
let bidirectionalStream = null;
99
let writeTask;
1010
const conference=new Owt.Conference.ConferenceClient();
11+
conference.addEventListener('streamadded', async (event) => {
12+
console.log(event.stream);
13+
if (event.stream.source.data) {
14+
const subscription = await conference.subscribe(event.stream);
15+
const reader = subscription.stream.readable.getReader();
16+
while (true) {
17+
const {value, done} = await reader.read();
18+
if (done) {
19+
console.log('Subscription ends.');
20+
break;
21+
}
22+
console.log('Received data: '+value);
23+
}
24+
}
25+
});
1126

1227
function updateConferenceStatus(message) {
1328
document.getElementById('conference-status').innerHTML +=

src/sdk/base/stream.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ function isAllowedValue(obj, allowedValues) {
2525
* "mic", "screen-cast", "file", "mixed" or undefined.
2626
* @param {?string} videoSourceInfo Video source info. Accepted values are:
2727
* "camera", "screen-cast", "file", "mixed" or undefined.
28+
* @param {boolean} dataSourceInfo Indicates whether it is data. Accepted values
29+
* are boolean.
2830
*/
2931
export class StreamSourceInfo {
3032
// eslint-disable-next-line require-jsdoc
31-
constructor(audioSourceInfo, videoSourceInfo) {
33+
constructor(audioSourceInfo, videoSourceInfo, dataSourceInfo) {
3234
if (!isAllowedValue(audioSourceInfo, [undefined, 'mic', 'screen-cast',
3335
'file', 'mixed'])) {
3436
throw new TypeError('Incorrect value for audioSourceInfo');
@@ -39,6 +41,7 @@ export class StreamSourceInfo {
3941
}
4042
this.audio = audioSourceInfo;
4143
this.video = videoSourceInfo;
44+
this.data = dataSourceInfo;
4245
}
4346
}
4447
/**

src/sdk/conference/client.js

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,11 @@ export const ConferenceClient = function(config, signalingImpl) {
281281
if (streamInfo.media.video) {
282282
videoSourceInfo = streamInfo.media.video.source;
283283
}
284+
const dataSourceInfo = streamInfo.data;
284285
const stream = new StreamModule.RemoteStream(streamInfo.id,
285-
streamInfo.info.owner, undefined, new StreamModule.StreamSourceInfo(
286-
audioSourceInfo, videoSourceInfo), streamInfo.info.attributes);
286+
streamInfo.info.owner, undefined, new StreamModule.StreamSourceInfo(
287+
audioSourceInfo, videoSourceInfo, dataSourceInfo), streamInfo.info
288+
.attributes);
287289
stream.settings = StreamUtilsModule.convertToPublicationSettings(
288290
streamInfo.media);
289291
stream.extraCapabilities = StreamUtilsModule
@@ -434,12 +436,14 @@ export const ConferenceClient = function(config, signalingImpl) {
434436
if (!(stream instanceof StreamModule.RemoteStream)) {
435437
return Promise.reject(new ConferenceError('Invalid stream.'));
436438
}
437-
if(!stream.source.audio&&!stream.source.video){ // QUIC stream.
438-
const dataChannel=createDataChannel();
439-
dataChannel.addEventListener('id',messageEvent=>{
440-
incomingDataChannels.set(messageEvent.message, dataChannel);
441-
});
442-
return dataChannel.subscribe(stream);
439+
if (stream.source.data) {
440+
if (stream.source.audio || stream.source.video) {
441+
return Promise.reject(new TypeError(
442+
'Invalid source info. A remote stream is either a data stream or a media stream.'
443+
));
444+
}
445+
return quicTransportChannel.subscribe(stream);
446+
// TODO
443447
}
444448
const channel = createPeerConnectionChannel(options.transport);
445449
return channel.subscribe(stream, options);
@@ -484,7 +488,7 @@ export const ConferenceClient = function(config, signalingImpl) {
484488
*/
485489
this.createQuicConnection = function() {
486490
const quicConnection = new QuicConnection(
487-
'quic-transport://jianjunz-nuc-ubuntu.sh.intel.com:7700/echo',
491+
'quic-transport://example.com', me.id,
488492
createSignalingForChannel());
489493
quicTransportChannel=quicConnection;
490494
return quicConnection;

src/sdk/conference/quicconnection.js

Lines changed: 97 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
EventDispatcher,
1313
MessageEvent,
1414
} from '../base/event.js';
15+
import {Subscription} from './subscription.js';
1516

1617
/**
1718
* @class QuicConnection
@@ -20,12 +21,14 @@ import {
2021
* @private
2122
*/
2223
export class QuicConnection extends EventDispatcher {
23-
constructor(url, signaling) {
24+
constructor(url, token, signaling) {
2425
super();
2526
this._signaling = signaling;
2627
this._ended = false;
2728
this._quicStreams = new Map(); // Key is publication or subscription ID.
2829
this._quicTransport = new QuicTransport(url);
30+
this._subscribePromises = new Map(); // Key is subscription ID.
31+
this._init(token);
2932
}
3033

3134
/**
@@ -54,23 +57,81 @@ export class QuicConnection extends EventDispatcher {
5457
}
5558
}
5659

60+
async _init(token) {
61+
this._authenticate(token);
62+
const receiveStreamReader =
63+
this._quicTransport.receiveStreams().getReader();
64+
Logger.info('Reader: '+receiveStreamReader);
65+
while (true) {
66+
const {
67+
value: receiveStream,
68+
done: readingReceiveStreamsDone
69+
} = await receiveStreamReader.read();
70+
Logger.info('New stream received');
71+
if (readingReceiveStreamsDone) {
72+
break;
73+
}
74+
const chunkReader = receiveStream.readable.getReader();
75+
const {
76+
value: uuid,
77+
done: readingChunksDone
78+
} = await chunkReader.read();
79+
if (readingChunksDone) {
80+
Logger.error('Stream closed unexpectedly.');
81+
return;
82+
}
83+
if (uuid.length != 16) {
84+
Logger.error('Unexpected length for UUID.');
85+
return;
86+
}
87+
chunkReader.releaseLock();
88+
const subscriptionId = this._uint8ArrayToUuid(uuid);
89+
this._quicStreams.set(subscriptionId, receiveStream);
90+
if (this._subscribePromises.has(subscriptionId)) {
91+
const subscription =
92+
this._createSubscription(subscriptionId, receiveStream);
93+
this._subscribePromises.get(subscriptionId).resolve(subscription);
94+
}
95+
}
96+
}
97+
98+
_createSubscription(id, receiveStream) {
99+
// TODO: Incomplete subscription.
100+
const subscription = new Subscription(id, () => {
101+
receiveStream.abortReading();
102+
});
103+
subscription.stream = receiveStream;
104+
return subscription;
105+
}
106+
107+
async _authenticate(token) {
108+
await this._quicTransport.ready;
109+
const quicStream = await this._quicTransport.createSendStream();
110+
const writer = quicStream.writable.getWriter();
111+
await writer.ready;
112+
writer.write(new Uint8Array(16));
113+
const encoder = new TextEncoder();
114+
writer.write(encoder.encode(token));
115+
}
116+
57117
async createSendStream(sessionId) {
58118
Logger.info('Create stream.');
59119
await this._quicTransport.ready;
60120
// TODO: Creating quicStream and initializing publication concurrently.
61121
const quicStream = await this._quicTransport.createSendStream();
62-
const publicationId = await this._initializePublication();
122+
const publicationId = await this._initiatePublication();
63123
const writer= quicStream.writable.getWriter();
64124
await writer.ready;
65-
writer.write(this.uuidToUint8Array(publicationId));
125+
writer.write(this._uuidToUint8Array(publicationId));
126+
writer.write(this._uuidToUint8Array(publicationId));
66127
this._quicStreams.set(publicationId, quicStream);
67128
}
68129

69130
hasContentSessionId(id) {
70131
return this._quicStreams.has(id);
71132
}
72133

73-
uuidToUint8Array(uuidString) {
134+
_uuidToUint8Array(uuidString) {
74135
if (uuidString.length != 32) {
75136
throw new TypeError('Incorrect UUID.');
76137
}
@@ -82,6 +143,15 @@ export class QuicConnection extends EventDispatcher {
82143
return uuidArray;
83144
}
84145

146+
_uint8ArrayToUuid(uuidBytes) {
147+
let s = '';
148+
for (let hex of uuidBytes) {
149+
const str = hex.toString(16);
150+
s += str.padStart(2, '0');
151+
}
152+
return s;
153+
}
154+
85155
/**
86156
* @function createStream
87157
* @desc Create a bidirectional stream.
@@ -106,23 +176,38 @@ export class QuicConnection extends EventDispatcher {
106176
}
107177

108178
subscribe(stream) {
109-
this._outgoing=false;
110-
this._streamSubscribed = stream;
111-
if (!this._quicTransport) {
112-
this.createStream();
113-
}
179+
const p = new Promise((resolve, reject) => {
180+
this._signaling
181+
.sendSignalingMessage(
182+
'subscribe',
183+
{media: null, data: {from: stream.id}, transport: {type: 'quic'}})
184+
.then((data) => {
185+
if (this._quicStreams.has(data.id)) {
186+
// QUIC stream created before signaling returns.
187+
const subscription = this._createSubscription(
188+
data.id, this._quicStreams.get(data.id));
189+
resolve(subscription);
190+
} else {
191+
// QUIC stream is not created yet, resolve promise after getting
192+
// QUIC stream.
193+
this._subscribePromises.set(
194+
data.id, {resolve: resolve, reject: reject});
195+
}
196+
});
197+
});
198+
return p;
114199
}
115200

116-
_readAndPrint(){
117-
this._quicStreams[0].waitForReadable(5).then(()=>{
201+
_readAndPrint() {
202+
this._quicStreams[0].waitForReadable(5).then(() => {
118203
let data = new Uint8Array(this._quicStreams[0].readBufferedAmount);
119204
this._quicStreams[0].readInto(data);
120205
Logger.info('Read data: ' + data);
121206
this._readAndPrint();
122207
});
123208
}
124209

125-
async _initializePublication() {
210+
async _initiatePublication() {
126211
const data = await this._signaling.sendSignalingMessage('publish', {
127212
media: null,
128213
data: true,

0 commit comments

Comments
 (0)