Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/TockState.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,28 @@ export const tockReducer: Reducer<TockState, TockAction> = (
});

// If the message is new, add it to the messages
// We can have multiple messages with the same responseId, but we only want to add the last one (chunks when streaming is disabled)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if TOCK_STREAM_RESPONSE is enabled on this response!

if (!messageAlreadyExists) {
newMessages.push(...action.messages);
const messagesToAdd: Message[] = [];
const seenResponseIds = new Set<string>();

for (let i = action.messages.length - 1; i >= 0; i--) {
const message = action.messages[i];
const responseId = message.metadata?.RESPONSE_ID;

if (responseId) {
if (!seenResponseIds.has(responseId)) {
seenResponseIds.add(responseId);
messagesToAdd.push(message);
}
} else {
messagesToAdd.push(message);
}
}

messagesToAdd.reverse();

newMessages.push(...messagesToAdd);
}

return {
Expand Down
61 changes: 49 additions & 12 deletions src/network/TockEventSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,20 @@ async function getSseStatus(url: string) {

export class TockEventSource {
private initialized: boolean;
private currentUrl: string | null;
private eventSource: EventSource | null;
private retryDelay: number;
private retryTimeoutId: number;
private retryOnPingTimeoutId?: number
private retryOnPingTimeoutMs: number;
onResponse: (botResponse: BotConnectorResponse) => void;
onStateChange: (state: number) => void;


constructor() {
constructor({retryOnPingTimeoutMs}: {retryOnPingTimeoutMs: number}) {
this.initialized = false;
this.retryDelay = INITIAL_RETRY_DELAY;
this.retryOnPingTimeoutMs = retryOnPingTimeoutMs;
}

isInitialized(): boolean {
Expand All @@ -65,53 +70,85 @@ export class TockEventSource {
*/
open(endpoint: string, userId: string): Promise<void> {
this.onStateChange(EventSource.CONNECTING);
const url = `${endpoint}/sse?userid=${userId}`;
this.currentUrl = `${endpoint}/sse?userid=${userId}`;
return new Promise<void>((resolve, reject): void => {
this.tryOpen(url, resolve, reject);
this.tryOpen( resolve, reject);
});
}

private tryOpen(url: string, resolve: () => void, reject: () => void) {
this.eventSource = new EventSource(url);
private tryOpen( resolve?: () => void, reject?: () => void) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you run lint on updated files?

if (!this.currentUrl) {
reject?.();
return;
}
this.eventSource = new EventSource(this.currentUrl);
this.eventSource.addEventListener('open', () => {
this.onStateChange(EventSource.OPEN);
this.initialized = true;
this.retryDelay = INITIAL_RETRY_DELAY;
resolve();
this.scheduleRetryWatchdog({reason: "open"});
resolve?.();
});
this.eventSource.addEventListener('error', () => {
this.eventSource?.close();
this.retry(url, reject, resolve);
this.retry( reject, resolve);
});
this.eventSource.addEventListener('message', (e) => {
this.scheduleRetryWatchdog({reason: "message"});
this.onResponse(JSON.parse(e.data));
});
this.eventSource.addEventListener('ping', () => {
this.scheduleRetryWatchdog({reason: "ping"});
});
}

private retry(url: string, reject: () => void, resolve: () => void) {
private retry(reject?: () => void, resolve?: () => void) {
if (!this.currentUrl) {
reject?.();
return;
}
const retryDelay = this.retryDelay;
this.retryDelay = Math.min(
MAX_RETRY_DELAY,
retryDelay + RETRY_DELAY_INCREMENT,
);

this.onStateChange(EventSource.CONNECTING);

this.retryTimeoutId = window.setTimeout(async () => {
switch (await getSseStatus(url)) {
switch (await getSseStatus(this.currentUrl as string)) {
case SseStatus.UNSUPPORTED:
reject();
reject?.();
this.close();
break;
case SseStatus.SUPPORTED:
this.tryOpen(url, resolve, reject);
this.tryOpen( resolve, reject);
break;
case SseStatus.SERVER_UNAVAILABLE:
this.retry(url, reject, resolve);
this.retry( reject, resolve);
break;
}
}, retryDelay);
}

// Set a watchdog timeout to trigger a retry if the server is not responding
private scheduleRetryWatchdog({reason}: {reason: string}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it useful to have named parameters for a private method?

window.clearTimeout(this.retryOnPingTimeoutId);
this.retryOnPingTimeoutId = window.setTimeout(() => {
this.triggerRetryWatchdog({reason});
}, this.retryOnPingTimeoutMs);
}

// Trigger a retry if the watchdog timeout is reached
public triggerRetryWatchdog({reason}: {reason: string}) {
console.log(`TockEventSource::triggerRetryWatchdog (timeout: ${this.retryOnPingTimeoutMs}ms, reason: ${reason})`);
this.close();
this.retry();
}

close() {
window.clearTimeout(this.retryTimeoutId);
window.clearTimeout(this.retryOnPingTimeoutId);
this.eventSource?.close();
this.eventSource = null;
this.initialized = false;
Expand Down
2 changes: 2 additions & 0 deletions src/settings/TockSettings.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface LocalStorageSettings {

export interface NetworkSettings {
disableSse: boolean;
retryOnPingTimeoutMs: number;
extraHeadersProvider?: () => Promise<Record<string, string>>;
}

Expand All @@ -33,6 +34,7 @@ export const defaultSettings: TockSettings = {
},
network: {
disableSse: false,
retryOnPingTimeoutMs: 15000, // 15 seconds
},
renderers: {
buttonRenderers: {
Expand Down
4 changes: 3 additions & 1 deletion src/useTock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export const useTock0: (
localStorageSettings.maxMessageCount;
const localStoragePrefix = localStorageSettings.prefix;
const disableSse = disableSseArg ?? networkSettings.disableSse;
const retryOnPingTimeoutMs = networkSettings.retryOnPingTimeoutMs;
const { clearMessages }: UseLocalTools = useLocalTools(localStorageEnabled);
const handledResponses = useRef<Record<string, number>>({});
const afterInit = useRef(() => {});
Expand All @@ -171,7 +172,7 @@ export const useTock0: (
afterInit.current = resolve;
}),
);
const sseSource = useRef(new TockEventSource());
const sseSource = useRef(new TockEventSource({retryOnPingTimeoutMs}));

const startLoading: () => void = useCallback(() => {
dispatch({
Expand Down Expand Up @@ -408,6 +409,7 @@ export const useTock0: (
if (localStorage) {
window.localStorage.setItem('tockQuickReplyHistory', '');
}
sseSource.current.triggerRetryWatchdog({reason: "handleError"});
dispatch({
type: 'SET_ERROR',
error: true,
Expand Down