diff --git a/src/TockState.tsx b/src/TockState.tsx index 8b60f81..61eb0f9 100644 --- a/src/TockState.tsx +++ b/src/TockState.tsx @@ -106,8 +106,28 @@ export const tockReducer: Reducer = ( }); // If the message is new, add it to the messages + // We can have multiple messages with the same responseId, but we only want to add the last one (chunks when streaming is disabled) if (!messageAlreadyExists) { - newMessages.push(...action.messages); + const messagesToAdd: Message[] = []; + const seenResponseIds = new Set(); + + for (let i = action.messages.length - 1; i >= 0; i--) { + const message = action.messages[i]; + const responseId = message.metadata?.RESPONSE_ID; + + if (responseId) { + if (!seenResponseIds.has(responseId)) { + seenResponseIds.add(responseId); + messagesToAdd.push(message); + } + } else { + messagesToAdd.push(message); + } + } + + messagesToAdd.reverse(); + + newMessages.push(...messagesToAdd); } return { diff --git a/src/network/TockEventSource.ts b/src/network/TockEventSource.ts index d0ac287..82d2ed1 100644 --- a/src/network/TockEventSource.ts +++ b/src/network/TockEventSource.ts @@ -40,15 +40,20 @@ async function getSseStatus(url: string) { export class TockEventSource { private initialized: boolean; + private currentUrl: string | null; private eventSource: EventSource | null; private retryDelay: number; private retryTimeoutId: number; + private retryOnPingTimeoutId?: number + private retryOnPingTimeoutMs: number; onResponse: (botResponse: BotConnectorResponse) => void; onStateChange: (state: number) => void; + - constructor() { + constructor({retryOnPingTimeoutMs}: {retryOnPingTimeoutMs: number}) { this.initialized = false; this.retryDelay = INITIAL_RETRY_DELAY; + this.retryOnPingTimeoutMs = retryOnPingTimeoutMs; } isInitialized(): boolean { @@ -65,53 +70,85 @@ export class TockEventSource { */ open(endpoint: string, userId: string): Promise { this.onStateChange(EventSource.CONNECTING); - const url = `${endpoint}/sse?userid=${userId}`; + this.currentUrl = `${endpoint}/sse?userid=${userId}`; return new Promise((resolve, reject): void => { - this.tryOpen(url, resolve, reject); + this.tryOpen( resolve, reject); }); } - private tryOpen(url: string, resolve: () => void, reject: () => void) { - this.eventSource = new EventSource(url); + private tryOpen( resolve?: () => void, reject?: () => void) { + if (!this.currentUrl) { + reject?.(); + return; + } + this.eventSource = new EventSource(this.currentUrl); this.eventSource.addEventListener('open', () => { this.onStateChange(EventSource.OPEN); this.initialized = true; this.retryDelay = INITIAL_RETRY_DELAY; - resolve(); + this.scheduleRetryWatchdog({reason: "open"}); + resolve?.(); }); this.eventSource.addEventListener('error', () => { this.eventSource?.close(); - this.retry(url, reject, resolve); + this.retry( reject, resolve); }); this.eventSource.addEventListener('message', (e) => { + this.scheduleRetryWatchdog({reason: "message"}); this.onResponse(JSON.parse(e.data)); }); + this.eventSource.addEventListener('ping', () => { + this.scheduleRetryWatchdog({reason: "ping"}); + }); } - private retry(url: string, reject: () => void, resolve: () => void) { + private retry(reject?: () => void, resolve?: () => void) { + if (!this.currentUrl) { + reject?.(); + return; + } const retryDelay = this.retryDelay; this.retryDelay = Math.min( MAX_RETRY_DELAY, retryDelay + RETRY_DELAY_INCREMENT, ); + + this.onStateChange(EventSource.CONNECTING); + this.retryTimeoutId = window.setTimeout(async () => { - switch (await getSseStatus(url)) { + switch (await getSseStatus(this.currentUrl as string)) { case SseStatus.UNSUPPORTED: - reject(); + reject?.(); this.close(); break; case SseStatus.SUPPORTED: - this.tryOpen(url, resolve, reject); + this.tryOpen( resolve, reject); break; case SseStatus.SERVER_UNAVAILABLE: - this.retry(url, reject, resolve); + this.retry( reject, resolve); break; } }, retryDelay); } + // Set a watchdog timeout to trigger a retry if the server is not responding + private scheduleRetryWatchdog({reason}: {reason: string}) { + window.clearTimeout(this.retryOnPingTimeoutId); + this.retryOnPingTimeoutId = window.setTimeout(() => { + this.triggerRetryWatchdog({reason}); + }, this.retryOnPingTimeoutMs); + } + + // Trigger a retry if the watchdog timeout is reached + public triggerRetryWatchdog({reason}: {reason: string}) { + console.log(`TockEventSource::triggerRetryWatchdog (timeout: ${this.retryOnPingTimeoutMs}ms, reason: ${reason})`); + this.close(); + this.retry(); + } + close() { window.clearTimeout(this.retryTimeoutId); + window.clearTimeout(this.retryOnPingTimeoutId); this.eventSource?.close(); this.eventSource = null; this.initialized = false; diff --git a/src/settings/TockSettings.tsx b/src/settings/TockSettings.tsx index 6de9e66..6b7303f 100644 --- a/src/settings/TockSettings.tsx +++ b/src/settings/TockSettings.tsx @@ -11,6 +11,7 @@ export interface LocalStorageSettings { export interface NetworkSettings { disableSse: boolean; + retryOnPingTimeoutMs: number; extraHeadersProvider?: () => Promise>; } @@ -33,6 +34,7 @@ export const defaultSettings: TockSettings = { }, network: { disableSse: false, + retryOnPingTimeoutMs: 15000, // 15 seconds }, renderers: { buttonRenderers: { diff --git a/src/useTock.ts b/src/useTock.ts index 8d65f88..2a346a4 100644 --- a/src/useTock.ts +++ b/src/useTock.ts @@ -163,6 +163,7 @@ export const useTock0: ( localStorageSettings.maxMessageCount; const localStoragePrefix = localStorageSettings.prefix; const disableSse = disableSseArg ?? networkSettings.disableSse; + const retryOnPingTimeoutMs = networkSettings.retryOnPingTimeoutMs; const { clearMessages }: UseLocalTools = useLocalTools(localStorageEnabled); const handledResponses = useRef>({}); const afterInit = useRef(() => {}); @@ -171,7 +172,7 @@ export const useTock0: ( afterInit.current = resolve; }), ); - const sseSource = useRef(new TockEventSource()); + const sseSource = useRef(new TockEventSource({retryOnPingTimeoutMs})); const startLoading: () => void = useCallback(() => { dispatch({ @@ -408,6 +409,7 @@ export const useTock0: ( if (localStorage) { window.localStorage.setItem('tockQuickReplyHistory', ''); } + sseSource.current.triggerRetryWatchdog({reason: "handleError"}); dispatch({ type: 'SET_ERROR', error: true,