Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 25 additions & 58 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import {
type ChatMessage,
type DataPublishOptions,
type SendTextOptions,
type StreamTextOptions,
type TextStreamInfo,
} from '../types';
import {
Expand Down Expand Up @@ -1511,11 +1512,10 @@ export default class LocalParticipant extends Participant {
return msg;
}

async sendText(text: string, options?: SendTextOptions): Promise<{ id: string }> {
async sendText(text: string, options?: SendTextOptions): Promise<TextStreamInfo> {
const streamId = crypto.randomUUID();
const textInBytes = new TextEncoder().encode(text);
const totalTextLength = textInBytes.byteLength;
const totalTextChunks = Math.ceil(totalTextLength / STREAM_CHUNK_SIZE);

const fileIds = options?.attachments?.map(() => crypto.randomUUID());

Expand All @@ -1527,67 +1527,29 @@ export default class LocalParticipant extends Participant {
options?.onProgress?.(totalProgress);
};

const header = new DataStream_Header({
const writer = await this.streamText({
streamId,
totalLength: numberToBigInt(totalTextLength),
mimeType: 'text/plain',
totalSize: totalTextLength,
destinationIdentities: options?.destinationIdentities,
topic: options?.topic,
timestamp: numberToBigInt(Date.now()),
contentHeader: {
case: 'textHeader',
value: new DataStream_TextHeader({
operationType: DataStream_OperationType.CREATE,
attachedStreamIds: fileIds,
}),
},
attachedStreamIds: fileIds,
});

const destinationIdentities = options?.destinationIdentities;

const packet = new DataPacket({
destinationIdentities,
value: {
case: 'streamHeader',
value: header,
},
});

await this.engine.sendDataPacket(packet, DataPacket_Kind.RELIABLE);
const textChunkSize = Math.floor(STREAM_CHUNK_SIZE / 4); // utf8 is at most 4 bytes long, so play it safe and take a quarter of the byte size to slice the string
const totalTextChunks = Math.ceil(totalTextLength / textChunkSize);

for (let i = 0; i < totalTextChunks; i++) {
const chunkData = textInBytes.slice(
i * STREAM_CHUNK_SIZE,
Math.min((i + 1) * STREAM_CHUNK_SIZE, totalTextLength),
const chunkData = text.slice(
i * textChunkSize,
Math.min((i + 1) * textChunkSize, totalTextLength),
);
await this.engine.waitForBufferStatusLow(DataPacket_Kind.RELIABLE);
const chunk = new DataStream_Chunk({
content: chunkData,
streamId,
chunkIndex: numberToBigInt(i),
});
const chunkPacket = new DataPacket({
destinationIdentities,
value: {
case: 'streamChunk',
value: chunk,
},
});
await writer.write(chunkData);

await this.engine.sendDataPacket(chunkPacket, DataPacket_Kind.RELIABLE);
handleProgress(Math.ceil((i + 1) / totalTextChunks), 0);
}

const trailer = new DataStream_Trailer({
streamId,
});
const trailerPacket = new DataPacket({
destinationIdentities,
value: {
case: 'streamTrailer',
value: trailer,
},
});
await this.engine.sendDataPacket(trailerPacket, DataPacket_Kind.RELIABLE);
await writer.close();

if (options?.attachments && fileIds) {
await Promise.all(
Expand All @@ -1602,34 +1564,39 @@ export default class LocalParticipant extends Participant {
),
);
}
return { id: streamId };
return writer.info;
}

/**
* @internal
* @experimental CAUTION, might get removed in a minor release
*/
async streamText(options?: {
topic?: string;
destinationIdentities?: Array<string>;
}): Promise<TextStreamWriter> {
const streamId = crypto.randomUUID();
async streamText(options?: StreamTextOptions): Promise<TextStreamWriter> {
const streamId = options?.streamId ?? crypto.randomUUID();

const info: TextStreamInfo = {
id: streamId,
mimeType: 'text/plain',
timestamp: Date.now(),
topic: options?.topic ?? '',
size: options?.totalSize,
};
const header = new DataStream_Header({
streamId,
mimeType: info.mimeType,
topic: info.topic,
timestamp: numberToBigInt(info.timestamp),
totalLength: numberToBigInt(options?.totalSize),
contentHeader: {
case: 'textHeader',
value: new DataStream_TextHeader({
operationType: DataStream_OperationType.CREATE,
version: options?.version,
attachedStreamIds: options?.attachedStreamIds,
replyToStreamId: options?.replyToStreamId,
operationType:
options?.type === 'update'
? DataStream_OperationType.UPDATE
: DataStream_OperationType.CREATE,
}),
},
});
Expand Down
11 changes: 11 additions & 0 deletions src/room/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ export interface SendTextOptions {
onProgress?: (progress: number) => void;
}

export interface StreamTextOptions {
topic?: string;
destinationIdentities?: Array<string>;
type?: 'create' | 'update';
streamId?: string;
version?: number;
attachedStreamIds?: Array<string>;
replyToStreamId?: string;
totalSize?: number;
}

export type DataPublishOptions = {
/**
* whether to send this as reliable or lossy.
Expand Down
Loading