Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions src/messageDelivery/MessageDeliveryReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ import { Channel } from '../channel';
import type { ThreadUserReadState } from '../thread';
import { Thread } from '../thread';
import type {
ErrorFromResponse,
EventAPIResponse,
LocalMessage,
MarkDeliveredOptions,
MarkReadOptions,
} from '../types';
import { type APIErrorResponse } from '../types';
import { throttle } from '../utils';
import { isAPIError, isErrorRetryable } from '../errors';

const MAX_DELIVERED_MESSAGE_COUNT_IN_PAYLOAD = 100 as const;
const MARK_AS_DELIVERED_BUFFER_TIMEOUT = 1000 as const;
const MARK_AS_READ_THROTTLE_TIMEOUT = 1000 as const;
const RETRY_COUNT_LIMIT_FOR_TIMEOUT_INCREASE = 3 as const;

const isChannel = (item: Channel | Thread): item is Channel => item instanceof Channel;
const isThread = (item: Channel | Thread): item is Thread => item instanceof Thread;
Expand Down Expand Up @@ -40,25 +44,46 @@ export class MessageDeliveryReporter {
protected markDeliveredRequestPromise: Promise<EventAPIResponse | void> | null = null;
protected markDeliveredTimeout: ReturnType<typeof setTimeout> | null = null;

protected requestTimeoutMs: number = MARK_AS_DELIVERED_BUFFER_TIMEOUT;
// increased up to RETRY_COUNT_LIMIT_FOR_TIMEOUT_INCREASE
protected requestRetryCount: number = 0;

constructor({ client }: MessageDeliveryReporterOptions) {
this.client = client;
}

private get markDeliveredRequestInFlight() {
return this.markDeliveredRequestPromise !== null;
}

private get hasTimer() {
return this.markDeliveredTimeout !== null;
}

private get hasDeliveryCandidates() {
return this.deliveryReportCandidates.size > 0;
}

private get canExecuteRequest() {
return !this.markDeliveredRequestInFlight && this.hasDeliveryCandidates;
}

private static hasPermissionToReportDeliveryFor(collection: Channel | Thread) {
if (isChannel(collection)) return !!collection.getConfig()?.delivery_events;
if (isThread(collection)) return !!collection.channel.getConfig()?.delivery_events;
}

private increaseBackOff() {
if (this.requestRetryCount >= RETRY_COUNT_LIMIT_FOR_TIMEOUT_INCREASE) return;
this.requestRetryCount = this.requestRetryCount + 1;
this.requestTimeoutMs = this.requestTimeoutMs * 2;
}

private resetBackOff() {
this.requestTimeoutMs = MARK_AS_DELIVERED_BUFFER_TIMEOUT;
this.requestRetryCount = 0;
}

/**
* Build latest_delivered_messages payload from an arbitrary buffer (deliveryReportCandidates / nextDeliveryReportCandidates)
*/
Expand Down Expand Up @@ -186,15 +211,17 @@ export class MessageDeliveryReporter {
* @param options
*/
public announceDelivery = (options?: AnnounceDeliveryOptions) => {
if (this.markDeliveredRequestInFlight || !this.hasDeliveryCandidates) return;
if (!this.canExecuteRequest) return;

const { latest_delivered_messages, sendBuffer } =
this.confirmationsFromDeliveryReportCandidates();
if (!latest_delivered_messages.length) return;

const payload = { ...options, latest_delivered_messages };

const postFlightReconcile = () => {
const postFlightReconcile = ({
preventSchedulingRetry,
}: { preventSchedulingRetry?: boolean } = {}) => {
this.markDeliveredRequestPromise = null;

// promote anything that arrived during request
Expand All @@ -203,32 +230,47 @@ export class MessageDeliveryReporter {
}
this.nextDeliveryReportCandidates = new Map();

if (preventSchedulingRetry) return;
// checks internally whether there are candidates to announce
this.announceDeliveryBuffered(options);
};

const handleError = () => {
// repopulate relevant candidates for the next report
for (const [k, v] of Object.entries(sendBuffer)) {
if (!this.deliveryReportCandidates.has(k)) {
this.deliveryReportCandidates.set(k, v);
}
}
const handleSuccess = () => {
this.resetBackOff();
postFlightReconcile();
};

const handleError = (error: ErrorFromResponse<APIErrorResponse> | Error) => {
// re-populate relevant candidates for the next report
// but make sure to keep the items that failed to be reported the first next time
const newDeliveryReportCandidates = new Map(sendBuffer);
for (const [k, v] of this.deliveryReportCandidates.entries()) {
newDeliveryReportCandidates.set(k, v);
}
this.deliveryReportCandidates = newDeliveryReportCandidates;

if (
(isAPIError(error) && isErrorRetryable(error)) ||
(error as ErrorFromResponse<APIErrorResponse>).status >= 500
) {
this.increaseBackOff();
postFlightReconcile();
} else {
postFlightReconcile({ preventSchedulingRetry: true });
}
};

this.markDeliveredRequestPromise = this.client
.markChannelsDelivered(payload)
.then(postFlightReconcile, handleError);
.then(handleSuccess, handleError);
};

public announceDeliveryBuffered = (options?: AnnounceDeliveryOptions) => {
if (this.hasTimer || this.markDeliveredRequestInFlight || !this.hasDeliveryCandidates)
return;
if (this.hasTimer || !this.canExecuteRequest) return;
this.markDeliveredTimeout = setTimeout(() => {
this.markDeliveredTimeout = null;
this.announceDelivery(options);
}, MARK_AS_DELIVERED_BUFFER_TIMEOUT);
}, this.requestTimeoutMs);
};

/**
Expand Down
196 changes: 194 additions & 2 deletions test/unit/messageDelivery/MessageDeliveryReporter.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { getClientWithUser } from '../test-utils/getClient';
import type { Channel, Event, EventAPIResponse, StreamChat } from '../../../src';
import {
type APIErrorResponse,
Channel,
ErrorFromResponse,
Event,
EventAPIResponse,
StreamChat,
} from '../../../src';
import type { AxiosResponse } from 'axios';

const channelType = 'messaging';
const channelId = 'channelId';
Expand Down Expand Up @@ -129,7 +137,7 @@ describe('MessageDeliveryReporter', () => {
expect(markChannelsDeliveredSpy).not.toHaveBeenCalled();
});

it('does nothing when read events are disabled in channel config', async () => {
it('does nothing when delievry events are disabled in channel config', async () => {
client.configs[channel.cid] = {
created_at: '',
delivery_events: false,
Expand Down Expand Up @@ -302,6 +310,190 @@ describe('MessageDeliveryReporter', () => {
expect(markChannelsDeliveredSpy).not.toHaveBeenCalled();
});

const receiveMessages = (count: number, startId = 0) => {
// last_read < last message
const channels = Array.from({ length: count }, (_, i) => {
const channel = client.channel(channelType, (i + startId).toString());
channel.initialized = true;
channel.state.latestMessages = [mkMsg('m1', '2025-01-01T10:00:00Z')];
(channel.state as any).read['me'] = { last_read: new Date('2025-01-01T09:00:00Z') };
return channel;
});
channels.forEach((ch) => {
client.configs[ch.cid] = {
created_at: '',
delivery_events: true,
read_events: false,
reminders: false,
updated_at: '',
};
});
client.syncDeliveredCandidates(channels);
return channels;
};

const retryableError = new ErrorFromResponse<APIErrorResponse>('X', {
code: -1,
response: {} as AxiosResponse,
status: 400,
});

const notRetryableError = new ErrorFromResponse<APIErrorResponse>('X', {
code: 2,
response: {} as AxiosResponse,
status: 400,
});

it('re-queues failed markChannelsDelivered request payloads', async () => {
const markChannelsDeliveredSpy = vi.spyOn(client, 'markChannelsDelivered');

markChannelsDeliveredSpy.mockRejectedValue(retryableError);
const channels1 = receiveMessages(110);
// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(110);
// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(0);

// =======================================================//
// trigger mark delivered request that will fail
vi.advanceTimersByTime(1000);
await Promise.resolve();
expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1);
// all the candidates have been returned back to deliveryReportCandidates
// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(110);
// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(0);

// =======================================================//
// retry - start mark delivered request that will again fail
vi.advanceTimersByTime(2000);
// receive new channels during the request
const channels2 = receiveMessages(110, channels1.length);

// the first 100 retried channels are in a sendBuffer - local scope
// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(10);
expect(
// @ts-expect-error accessing protected property deliveryReportCandidates
Array.from(client.messageDeliveryReporter.deliveryReportCandidates.keys()),
).toEqual(channels1.slice(100).map((channel) => channel.cid));

// newly arrived channels2 present in nextDeliveryReportCandidates
// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(110);
expect(
// @ts-expect-error accessing protected property deliveryReportCandidates
Array.from(client.messageDeliveryReporter.nextDeliveryReportCandidates.keys()),
).toEqual(channels2.slice(0).map((channel) => channel.cid));

// finish mark delivered request
await Promise.resolve();
expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(2);
// all the candidates together now
// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(220);
expect(
// @ts-expect-error accessing protected property deliveryReportCandidates
Array.from(client.messageDeliveryReporter.deliveryReportCandidates.keys()),
).toEqual([
...channels1.slice(0).map((channel) => channel.cid),
...channels2.slice(0).map((channel) => channel.cid),
]);

// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(0);

// =======================================================//
// retry - start mark delivered request that will again fail
vi.advanceTimersByTime(4000);

// the first 100 retried channels are in a sendBuffer - local scope
// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(120);
expect(
// @ts-expect-error accessing protected property deliveryReportCandidates
Array.from(client.messageDeliveryReporter.deliveryReportCandidates.keys()),
).toEqual([
...channels1.slice(100).map((channel) => channel.cid),
...channels2.slice(0).map((channel) => channel.cid),
]);

// newly arrived channels2 present in nextDeliveryReportCandidates
// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(0);

// finish mark delivered request
await Promise.resolve();
// all the candidates together now
// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(220);
expect(
// @ts-expect-error accessing protected property deliveryReportCandidates
Array.from(client.messageDeliveryReporter.deliveryReportCandidates.keys()),
).toEqual([
...channels1.slice(0).map((channel) => channel.cid),
...channels2.slice(0).map((channel) => channel.cid),
]);

// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(0);

vi.advanceTimersByTime(8000);
// finish mark delivered request
await Promise.resolve();
expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(4);

// success resets the interval
markChannelsDeliveredSpy.mockResolvedValueOnce({ ok: true } as any);
// the timeout does not increase anymore from the fourth failed retry
vi.advanceTimersByTime(8000);
// finish mark delivered request
await Promise.resolve();
expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(5);

// after the previous success we are back to the base timeout
vi.advanceTimersByTime(1000);
// finish mark delivered request
await Promise.resolve();
expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(6);

// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(120);
expect(
// @ts-expect-error accessing protected property deliveryReportCandidates
Array.from(client.messageDeliveryReporter.deliveryReportCandidates.keys()),
).toEqual([
...channels1.slice(100).map((channel) => channel.cid),
...channels2.slice(0).map((channel) => channel.cid),
]);

// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(0);
});

it('non retryable error does not schedule retry', async () => {
const markChannelsDeliveredSpy = vi.spyOn(client, 'markChannelsDelivered');

markChannelsDeliveredSpy.mockRejectedValue(notRetryableError);
const channels1 = receiveMessages(110);
// @ts-expect-error accessing protected property deliveryReportCandidates
expect(client.messageDeliveryReporter.deliveryReportCandidates.size).toBe(110);
// @ts-expect-error accessing protected property nextDeliveryReportCandidates
expect(client.messageDeliveryReporter.nextDeliveryReportCandidates.size).toBe(0);

// =======================================================//
// trigger mark delivered request that will fail
vi.advanceTimersByTime(1000);
await Promise.resolve();
expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1);

// will not retry
vi.advanceTimersByTime(2000);
await Promise.resolve();
expect(markChannelsDeliveredSpy).toHaveBeenCalledTimes(1);
});

it('does not remove the pending delivery candidate after failed markRead request', async () => {
const markChannelsDeliveredSpy = vi.spyOn(client, 'markChannelsDelivered');
vi.spyOn(channel, 'markAsReadRequest').mockRejectedValue({} as any);
Expand Down