Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ import { MessageDeliveryReporter } from './messageDelivery';
import { NotificationManager } from './notifications';
import { ReminderManager } from './reminders';
import { StateStore } from './store';
import type { MessageComposer } from './messageComposer';
import { type MessageComposer, PendingMessagesManager } from './messageComposer';
import type { AbstractOfflineDB } from './offline-support';

function isString(x: unknown): x is string {
Expand Down Expand Up @@ -297,6 +297,7 @@ export class StreamChat {
};
threads: ThreadManager;
polls: PollManager;
pendingMessages: PendingMessagesManager;
offlineDb?: AbstractOfflineDB;
notifications: NotificationManager;
reminders: ReminderManager;
Expand Down Expand Up @@ -526,6 +527,7 @@ export class StreamChat {
this.recoverStateOnReconnect = this.options.recoverStateOnReconnect;
this.threads = new ThreadManager({ client: this });
this.polls = new PollManager({ client: this });
this.pendingMessages = new PendingMessagesManager({ client: this });
this.reminders = new ReminderManager({ client: this });
this.messageDeliveryReporter = new MessageDeliveryReporter({ client: this });
}
Expand Down Expand Up @@ -1014,6 +1016,7 @@ export class StreamChat {
this.state = new ClientState({ client: this });
// reset thread manager
this.threads.resetState();
this.pendingMessages.clear();

// Since we wipe all user data already, we should reset token manager as well
closePromise
Expand Down
22 changes: 22 additions & 0 deletions src/messageComposer/attachmentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,28 @@ export class AttachmentManager {
);
}

/**
* Resolves when no attachment is still in `pending` or `uploading` (each in-flight upload has
* reached `finished`, `failed`, or `blocked`). Resolves immediately if there is nothing to wait for.
*/
waitForPendingAttachments = (): Promise<void> => {
const hasPendingOrUploading = () =>
this.pendingUploadsCount > 0 || this.uploadsInProgressCount > 0;

if (!hasPendingOrUploading()) {
return Promise.resolve();
}

return new Promise((resolve) => {
const unsubscribe = this.state.subscribe(() => {
if (!hasPendingOrUploading()) {
unsubscribe();
resolve();
}
});
});
};

initState = ({ message }: { message?: DraftMessage | LocalMessage } = {}) => {
this.state.next(initState({ message }));
};
Expand Down
83 changes: 83 additions & 0 deletions src/messageComposer/cloneMessageComposer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { MessageComposer } from './messageComposer';
import type { LocationComposerState } from './LocationComposer';

/**
* Deep-clone plain JSON-like data; keep function references (e.g. upload hooks) shared.
* `structuredClone` cannot copy {@link MessageComposerConfig} because it contains functions.
*/
const deepClonePreservingFunctions = <T>(value: T): T => {
if (typeof value === 'function') return value;
if (value === null || typeof value !== 'object') return value;
if (value instanceof Date) return new Date(value.getTime()) as T;
if (Array.isArray(value)) return value.map(deepClonePreservingFunctions) as T;
if (Object.getPrototypeOf(value) !== Object.prototype) return value;

const out = {} as Record<string, unknown>;
for (const key of Object.keys(value as object)) {
out[key] = deepClonePreservingFunctions((value as Record<string, unknown>)[key]);
}
return out as T;
};

/**
* Creates a new {@link MessageComposer} with the same client and composition context as
* `source`, copying all composer and sub-manager state. The clone receives a fresh
* {@link MessageComposer.id} so it does not collide with the live composer.
*/
export const cloneMessageComposerFrom = (source: MessageComposer): MessageComposer => {
const target = new MessageComposer({
client: source.client,
compositionContext: source.compositionContext,
});

target.configState.next(
deepClonePreservingFunctions(source.configState.getLatestValue()),
);
target.editingAuditState.next(
structuredClone(source.editingAuditState.getLatestValue()),
);

const messageComposerState = structuredClone(source.state.getLatestValue());
const newId = MessageComposer.generateId();
messageComposerState.id = newId;
target.state.next(messageComposerState);

target.textComposer.state.next(
structuredClone(source.textComposer.state.getLatestValue()),
);
target.attachmentManager.state.next(
structuredClone(source.attachmentManager.state.getLatestValue()),
);

const { previews } = source.linkPreviewsManager.state.getLatestValue();
target.linkPreviewsManager.state.next({
previews: new Map(
[...previews].map(([url, preview]) => [url, structuredClone(preview)]),
),
});

const locationState = structuredClone(
source.locationComposer.state.getLatestValue(),
) as LocationComposerState;
if (
locationState.location &&
typeof locationState.location === 'object' &&
'message_id' in locationState.location &&
locationState.location.message_id === source.id
) {
target.locationComposer.state.next({
location: { ...locationState.location, message_id: newId },
});
} else {
target.locationComposer.state.next(locationState);
}

target.pollComposer.state.next(
structuredClone(source.pollComposer.state.getLatestValue()),
);
target.customDataManager.state.next(
structuredClone(source.customDataManager.state.getLatestValue()),
);

return target;
};
1 change: 1 addition & 0 deletions src/messageComposer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from './linkPreviewsManager';
export * from './LocationComposer';
export * from './messageComposer';
export * from './middleware';
export * from './pendingMessagesManager';
export * from './pollComposer';
export * from './textComposer';
export * from './types';
Expand Down
37 changes: 37 additions & 0 deletions src/messageComposer/pendingMessagesManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { StreamChat } from '../client';
import { cloneMessageComposerFrom } from './cloneMessageComposer';
import type { MessageComposer } from './messageComposer';

export type PendingMessagesManagerOptions = {
client: StreamChat;
};

/**
* Holds {@link MessageComposer} snapshots keyed by pending (e.g. optimistic) message id.
*/
export class PendingMessagesManager {
private readonly composers = new Map<string, MessageComposer>();
readonly client: StreamChat;

constructor({ client }: PendingMessagesManagerOptions) {
this.client = client;
}

/**
* Stores a clone of `composer` under {@link MessageComposer.id}.
* The clone has identical composition state and a new composer id.
*/
addPendingMessage(composer: MessageComposer): MessageComposer {
const clone = cloneMessageComposerFrom(composer);
this.composers.set(composer.id, clone);
return clone;
}

removePendingMessage(messageId: string): void {
this.composers.delete(messageId);
}

clear(): void {
this.composers.clear();
}
}
67 changes: 67 additions & 0 deletions test/unit/MessageComposer/attachmentManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,73 @@ describe('AttachmentManager', () => {
});
});

describe('waitForPendingAttachments', () => {
it('resolves immediately when there are no pending or uploading attachments', async () => {
const {
messageComposer: { attachmentManager },
} = setup();

await expect(
attachmentManager.waitForPendingAttachments(),
).resolves.toBeUndefined();
});

it('resolves when in-flight uploads finish', async () => {
const {
messageComposer: { attachmentManager },
mockChannel,
} = setup();

let resolveUpload!: (value: { file: string; thumb_url?: string }) => void;
const uploadPromise = new Promise<{ file: string; thumb_url?: string }>(
(resolve) => {
resolveUpload = resolve;
},
);
mockChannel.sendImage.mockImplementation(() => uploadPromise);

const file = new File([''], 'test.jpg', { type: 'image/jpeg' });
const local = await attachmentManager.fileToLocalUploadAttachment(file);
void attachmentManager.uploadAttachment(local);

await vi.waitFor(() => {
expect(attachmentManager.uploadsInProgressCount).toBe(1);
});

const settled = attachmentManager.waitForPendingAttachments();
resolveUpload({ file: 'done-url' });
await expect(settled).resolves.toBeUndefined();
expect(attachmentManager.successfulUploadsCount).toBe(1);
});

it('resolves when in-flight uploads fail', async () => {
const {
messageComposer: { attachmentManager },
mockChannel,
} = setup();

let rejectUpload!: (err: Error) => void;
const uploadPromise = new Promise<never>((_, reject) => {
rejectUpload = reject;
});
mockChannel.sendImage.mockImplementation(() => uploadPromise);

const file = new File([''], 'test.jpg', { type: 'image/jpeg' });
const local = await attachmentManager.fileToLocalUploadAttachment(file);
const uploadWork = attachmentManager.uploadAttachment(local);

await vi.waitFor(() => {
expect(attachmentManager.uploadsInProgressCount).toBe(1);
});

const settled = attachmentManager.waitForPendingAttachments();
rejectUpload(new Error('upload failed'));
await expect(settled).resolves.toBeUndefined();
await uploadWork;
expect(attachmentManager.failedUploadsCount).toBe(1);
});
});

describe('uploadAttachment', () => {
it('should upload files successfully', async () => {
const {
Expand Down
Loading
Loading