Skip to content

Commit 64902f6

Browse files
authored
Add more options to streamText method (#1398)
1 parent 4be40ba commit 64902f6

File tree

2 files changed

+36
-58
lines changed

2 files changed

+36
-58
lines changed

src/room/participant/LocalParticipant.ts

Lines changed: 25 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ import {
7272
type ChatMessage,
7373
type DataPublishOptions,
7474
type SendTextOptions,
75+
type StreamTextOptions,
7576
type TextStreamInfo,
7677
} from '../types';
7778
import {
@@ -1511,11 +1512,10 @@ export default class LocalParticipant extends Participant {
15111512
return msg;
15121513
}
15131514

1514-
async sendText(text: string, options?: SendTextOptions): Promise<{ id: string }> {
1515+
async sendText(text: string, options?: SendTextOptions): Promise<TextStreamInfo> {
15151516
const streamId = crypto.randomUUID();
15161517
const textInBytes = new TextEncoder().encode(text);
15171518
const totalTextLength = textInBytes.byteLength;
1518-
const totalTextChunks = Math.ceil(totalTextLength / STREAM_CHUNK_SIZE);
15191519

15201520
const fileIds = options?.attachments?.map(() => crypto.randomUUID());
15211521

@@ -1527,67 +1527,29 @@ export default class LocalParticipant extends Participant {
15271527
options?.onProgress?.(totalProgress);
15281528
};
15291529

1530-
const header = new DataStream_Header({
1530+
const writer = await this.streamText({
15311531
streamId,
1532-
totalLength: numberToBigInt(totalTextLength),
1533-
mimeType: 'text/plain',
1532+
totalSize: totalTextLength,
1533+
destinationIdentities: options?.destinationIdentities,
15341534
topic: options?.topic,
1535-
timestamp: numberToBigInt(Date.now()),
1536-
contentHeader: {
1537-
case: 'textHeader',
1538-
value: new DataStream_TextHeader({
1539-
operationType: DataStream_OperationType.CREATE,
1540-
attachedStreamIds: fileIds,
1541-
}),
1542-
},
1535+
attachedStreamIds: fileIds,
15431536
});
15441537

1545-
const destinationIdentities = options?.destinationIdentities;
1546-
1547-
const packet = new DataPacket({
1548-
destinationIdentities,
1549-
value: {
1550-
case: 'streamHeader',
1551-
value: header,
1552-
},
1553-
});
1554-
1555-
await this.engine.sendDataPacket(packet, DataPacket_Kind.RELIABLE);
1538+
const textChunkSize = Math.floor(STREAM_CHUNK_SIZE / 4); // utf8 is at most 4 bytes long, so play it safe and take a quarter of the byte size to slice the string
1539+
const totalTextChunks = Math.ceil(totalTextLength / textChunkSize);
15561540

15571541
for (let i = 0; i < totalTextChunks; i++) {
1558-
const chunkData = textInBytes.slice(
1559-
i * STREAM_CHUNK_SIZE,
1560-
Math.min((i + 1) * STREAM_CHUNK_SIZE, totalTextLength),
1542+
const chunkData = text.slice(
1543+
i * textChunkSize,
1544+
Math.min((i + 1) * textChunkSize, totalTextLength),
15611545
);
15621546
await this.engine.waitForBufferStatusLow(DataPacket_Kind.RELIABLE);
1563-
const chunk = new DataStream_Chunk({
1564-
content: chunkData,
1565-
streamId,
1566-
chunkIndex: numberToBigInt(i),
1567-
});
1568-
const chunkPacket = new DataPacket({
1569-
destinationIdentities,
1570-
value: {
1571-
case: 'streamChunk',
1572-
value: chunk,
1573-
},
1574-
});
1547+
await writer.write(chunkData);
15751548

1576-
await this.engine.sendDataPacket(chunkPacket, DataPacket_Kind.RELIABLE);
15771549
handleProgress(Math.ceil((i + 1) / totalTextChunks), 0);
15781550
}
15791551

1580-
const trailer = new DataStream_Trailer({
1581-
streamId,
1582-
});
1583-
const trailerPacket = new DataPacket({
1584-
destinationIdentities,
1585-
value: {
1586-
case: 'streamTrailer',
1587-
value: trailer,
1588-
},
1589-
});
1590-
await this.engine.sendDataPacket(trailerPacket, DataPacket_Kind.RELIABLE);
1552+
await writer.close();
15911553

15921554
if (options?.attachments && fileIds) {
15931555
await Promise.all(
@@ -1602,34 +1564,39 @@ export default class LocalParticipant extends Participant {
16021564
),
16031565
);
16041566
}
1605-
return { id: streamId };
1567+
return writer.info;
16061568
}
16071569

16081570
/**
16091571
* @internal
16101572
* @experimental CAUTION, might get removed in a minor release
16111573
*/
1612-
async streamText(options?: {
1613-
topic?: string;
1614-
destinationIdentities?: Array<string>;
1615-
}): Promise<TextStreamWriter> {
1616-
const streamId = crypto.randomUUID();
1574+
async streamText(options?: StreamTextOptions): Promise<TextStreamWriter> {
1575+
const streamId = options?.streamId ?? crypto.randomUUID();
16171576

16181577
const info: TextStreamInfo = {
16191578
id: streamId,
16201579
mimeType: 'text/plain',
16211580
timestamp: Date.now(),
16221581
topic: options?.topic ?? '',
1582+
size: options?.totalSize,
16231583
};
16241584
const header = new DataStream_Header({
16251585
streamId,
16261586
mimeType: info.mimeType,
16271587
topic: info.topic,
16281588
timestamp: numberToBigInt(info.timestamp),
1589+
totalLength: numberToBigInt(options?.totalSize),
16291590
contentHeader: {
16301591
case: 'textHeader',
16311592
value: new DataStream_TextHeader({
1632-
operationType: DataStream_OperationType.CREATE,
1593+
version: options?.version,
1594+
attachedStreamIds: options?.attachedStreamIds,
1595+
replyToStreamId: options?.replyToStreamId,
1596+
operationType:
1597+
options?.type === 'update'
1598+
? DataStream_OperationType.UPDATE
1599+
: DataStream_OperationType.CREATE,
16331600
}),
16341601
},
16351602
});

src/room/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,17 @@ export interface SendTextOptions {
2222
onProgress?: (progress: number) => void;
2323
}
2424

25+
export interface StreamTextOptions {
26+
topic?: string;
27+
destinationIdentities?: Array<string>;
28+
type?: 'create' | 'update';
29+
streamId?: string;
30+
version?: number;
31+
attachedStreamIds?: Array<string>;
32+
replyToStreamId?: string;
33+
totalSize?: number;
34+
}
35+
2536
export type DataPublishOptions = {
2637
/**
2738
* whether to send this as reliable or lossy.

0 commit comments

Comments
 (0)