Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
DataPacket_Kind,
DataStream_Chunk,
DataStream_Header,
DataStream_OperationType,
DataStream_Trailer,
DisconnectReason,
JoinResponse,
Expand Down Expand Up @@ -1831,6 +1832,11 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
topic: streamHeader.topic,
timestamp: Number(streamHeader.timestamp),
attributes: streamHeader.attributes,
version: streamHeader.contentHeader.value.version,
type:
streamHeader.contentHeader.value.operationType === DataStream_OperationType.UPDATE
? 'update'
: 'create',
};

const stream = new ReadableStream<DataStream_Chunk>({
Expand Down
2 changes: 1 addition & 1 deletion src/room/StreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
}
},

return(): IteratorResult<TextStreamChunk> {
async return(): Promise<IteratorResult<TextStreamChunk>> {
reader.releaseLock();
return { done: true, value: undefined };
},
Expand Down
42 changes: 40 additions & 2 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,42 @@ export default class LocalParticipant extends Participant {
return writer.info;
}

/** @experimental CAUTION, might get removed or changed in a minor release */
async updateText(
streamId: string,
text: string,
options?: {
topic?: string;
destinationIdentities?: Array<string>;
},
): Promise<TextStreamInfo> {
const textInBytes = new TextEncoder().encode(text);
const totalTextLength = textInBytes.byteLength;

const writer = await this.streamText({
streamId,
totalSize: totalTextLength,
destinationIdentities: options?.destinationIdentities,
topic: options?.topic,
type: 'update',
});

const textChunkSize = Math.floor(STREAM_CHUNK_SIZE / 4); // utf8 is at most 4 bytes long, so play it safe and take a quarter of the byte size to slice the string
const totalTextChunks = Math.ceil(totalTextLength / textChunkSize);

for (let i = 0; i < totalTextChunks; i++) {
const chunkData = text.slice(
i * textChunkSize,
Math.min((i + 1) * textChunkSize, totalTextLength),
);
await this.engine.waitForBufferStatusLow(DataPacket_Kind.RELIABLE);
await writer.write(chunkData);
}

await writer.close();
return writer.info;
}

/**
* @internal
* @experimental CAUTION, might get removed in a minor release
Expand All @@ -1580,6 +1616,8 @@ export default class LocalParticipant extends Participant {
timestamp: Date.now(),
topic: options?.topic ?? '',
size: options?.totalSize,
version: options?.version,
type: options?.type ?? 'create',
};
const header = new DataStream_Header({
streamId,
Expand All @@ -1590,11 +1628,11 @@ export default class LocalParticipant extends Participant {
contentHeader: {
case: 'textHeader',
value: new DataStream_TextHeader({
version: options?.version,
version: info?.version,
attachedStreamIds: options?.attachedStreamIds,
replyToStreamId: options?.replyToStreamId,
operationType:
options?.type === 'update'
info.type === 'update'
? DataStream_OperationType.UPDATE
: DataStream_OperationType.CREATE,
}),
Expand Down
5 changes: 4 additions & 1 deletion src/room/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ export interface ByteStreamInfo extends BaseStreamInfo {
name: string;
}

export interface TextStreamInfo extends BaseStreamInfo {}
export interface TextStreamInfo extends BaseStreamInfo {
version?: number;
type: 'create' | 'update';
}

export type TextStreamChunk = {
index: number;
Expand Down
Loading