Skip to content

Commit 6a76ee6

Browse files
authored
Merge pull request #1407 from narengogi/chore/send-hook-results-in-streaming-chunks
send hook results when streaming response
2 parents a740033 + 69e18a9 commit 6a76ee6

File tree

4 files changed

+87
-20
lines changed

4 files changed

+87
-20
lines changed

src/handlers/handlerUtils.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,7 @@ export async function recursiveAfterRequestHookHandler(
11911191
responseJson: mappedResponseJson,
11921192
originalResponseJson,
11931193
} = await responseHandler(
1194+
c,
11941195
response,
11951196
isStreamingMode,
11961197
providerOption,
@@ -1200,7 +1201,8 @@ export async function recursiveAfterRequestHookHandler(
12001201
gatewayParams,
12011202
strictOpenAiCompliance,
12021203
c.req.url,
1203-
areSyncHooksAvailable
1204+
areSyncHooksAvailable,
1205+
hookSpanId
12041206
);
12051207

12061208
const arhResponse = await afterRequestHookHandler(

src/handlers/responseHandlers.ts

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { HookSpan } from '../middlewares/hooks';
1818
import { env } from 'hono/adapter';
1919
import { OpenAIModelResponseJSONToStreamGenerator } from '../providers/open-ai-base/createModelResponse';
2020
import { anthropicMessagesJsonToStreamGenerator } from '../providers/anthropic-base/utils/streamGenerator';
21+
import { endpointStrings } from '../providers/types';
2122

2223
/**
2324
* Handles various types of responses based on the specified parameters
@@ -35,6 +36,7 @@ import { anthropicMessagesJsonToStreamGenerator } from '../providers/anthropic-b
3536
* @returns {Promise<{response: Response, json?: any}>} - The mapped response.
3637
*/
3738
export async function responseHandler(
39+
c: Context,
3840
response: Response,
3941
streamingMode: boolean,
4042
providerOptions: Options,
@@ -44,7 +46,8 @@ export async function responseHandler(
4446
gatewayRequest: Params,
4547
strictOpenAiCompliance: boolean,
4648
gatewayRequestUrl: string,
47-
areSyncHooksAvailable: boolean
49+
areSyncHooksAvailable: boolean,
50+
hookSpanId: string
4851
): Promise<{
4952
response: Response;
5053
responseJson: Record<string, any> | null;
@@ -96,28 +99,31 @@ export async function responseHandler(
9699
responseTransformerFunction = undefined;
97100
}
98101

99-
if (
100-
streamingMode &&
101-
isSuccessStatusCode &&
102-
isCacheHit &&
103-
responseTransformerFunction
104-
) {
105-
const streamingResponse = await handleJSONToStreamResponse(
106-
response,
107-
provider,
108-
responseTransformerFunction
109-
);
110-
return { response: streamingResponse, responseJson: null };
111-
}
112102
if (streamingMode && isSuccessStatusCode) {
103+
const hooksManager = c.get('hooksManager');
104+
const span = hooksManager.getSpan(hookSpanId) as HookSpan;
105+
const hooksResult = span.getHooksResult();
106+
if (isCacheHit && responseTransformerFunction) {
107+
const streamingResponse = await handleJSONToStreamResponse(
108+
response,
109+
provider,
110+
responseTransformerFunction,
111+
strictOpenAiCompliance,
112+
responseTransformer as endpointStrings,
113+
hooksResult
114+
);
115+
return { response: streamingResponse, responseJson: null };
116+
}
113117
return {
114118
response: handleStreamingMode(
115119
response,
116120
provider,
117121
responseTransformerFunction,
118122
requestURL,
119123
strictOpenAiCompliance,
120-
gatewayRequest
124+
gatewayRequest,
125+
responseTransformer as endpointStrings,
126+
hooksResult
121127
),
122128
responseJson: null,
123129
};
@@ -264,7 +270,6 @@ export async function afterRequestHookHandler(
264270
...response,
265271
status: 246,
266272
statusText: 'Hooks failed',
267-
headers: response.headers,
268273
});
269274
}
270275
return response;

src/handlers/services/responseService.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ export class ResponseService {
8181
}> {
8282
const url = this.context.requestURL;
8383
return await responseHandler(
84+
this.context.honoContext,
8485
response,
8586
this.context.isStreaming,
8687
this.context.providerOption,
@@ -90,7 +91,8 @@ export class ResponseService {
9091
this.context.params,
9192
this.context.strictOpenAiCompliance,
9293
this.context.honoContext.req.url,
93-
this.hooksService.areSyncHooksAvailable
94+
this.hooksService.areSyncHooksAvailable,
95+
this.hooksService.hookSpan?.id as string
9496
);
9597
}
9698

src/handlers/streamHandler.ts

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import {
88
PRECONDITION_CHECK_FAILED_STATUS_CODE,
99
GOOGLE_VERTEX_AI,
1010
} from '../globals';
11+
import { HookSpan } from '../middlewares/hooks';
1112
import { VertexLlamaChatCompleteStreamChunkTransform } from '../providers/google-vertex-ai/chatComplete';
1213
import { OpenAIChatCompleteResponse } from '../providers/openai/chatComplete';
1314
import { OpenAICompleteResponse } from '../providers/openai/complete';
15+
import { endpointStrings } from '../providers/types';
1416
import { Params } from '../types/requestBody';
1517
import { getStreamModeSplitPattern, type SplitPatternType } from '../utils';
1618

@@ -24,6 +26,15 @@ function readUInt32BE(buffer: Uint8Array, offset: number) {
2426
); // Ensure the result is an unsigned integer
2527
}
2628

29+
const shouldSendHookResultChunk = (
30+
strictOpenAiCompliance: boolean,
31+
hooksResult: HookSpan['hooksResult']
32+
) => {
33+
return (
34+
!strictOpenAiCompliance && hooksResult?.beforeRequestHooksResult?.length > 0
35+
);
36+
};
37+
2738
function getPayloadFromAWSChunk(chunk: Uint8Array): string {
2839
const decoder = new TextDecoder();
2940
const chunkLength = readUInt32BE(chunk, 0);
@@ -292,7 +303,9 @@ export function handleStreamingMode(
292303
responseTransformer: Function | undefined,
293304
requestURL: string,
294305
strictOpenAiCompliance: boolean,
295-
gatewayRequest: Params
306+
gatewayRequest: Params,
307+
fn: endpointStrings,
308+
hooksResult: HookSpan['hooksResult']
296309
): Response {
297310
const splitPattern = getStreamModeSplitPattern(proxyProvider, requestURL);
298311
// If the provider doesn't supply completion id,
@@ -311,6 +324,12 @@ export function handleStreamingMode(
311324
if (proxyProvider === BEDROCK) {
312325
(async () => {
313326
try {
327+
if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) {
328+
const hookResultChunk = constructHookResultChunk(hooksResult, fn);
329+
if (hookResultChunk) {
330+
await writer.write(encoder.encode(hookResultChunk));
331+
}
332+
}
314333
for await (const chunk of readAWSStream(
315334
reader,
316335
responseTransformer,
@@ -337,6 +356,12 @@ export function handleStreamingMode(
337356
} else {
338357
(async () => {
339358
try {
359+
if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) {
360+
const hookResultChunk = constructHookResultChunk(hooksResult, fn);
361+
if (hookResultChunk) {
362+
await writer.write(encoder.encode(hookResultChunk));
363+
}
364+
}
340365
for await (const chunk of readStream(
341366
reader,
342367
splitPattern,
@@ -389,7 +414,10 @@ export function handleStreamingMode(
389414
export async function handleJSONToStreamResponse(
390415
response: Response,
391416
provider: string,
392-
responseTransformerFunction: Function
417+
responseTransformerFunction: Function,
418+
strictOpenAiCompliance: boolean,
419+
fn: endpointStrings,
420+
hooksResult: HookSpan['hooksResult']
393421
): Promise<Response> {
394422
const { readable, writable } = new TransformStream();
395423
const writer = writable.getWriter();
@@ -403,6 +431,12 @@ export async function handleJSONToStreamResponse(
403431
) {
404432
const generator = responseTransformerFunction(responseJSON, provider);
405433
(async () => {
434+
if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) {
435+
const hookResultChunk = constructHookResultChunk(hooksResult, fn);
436+
if (hookResultChunk) {
437+
await writer.write(encoder.encode(hookResultChunk));
438+
}
439+
}
406440
while (true) {
407441
const chunk = generator.next();
408442
if (chunk.done) {
@@ -418,6 +452,12 @@ export async function handleJSONToStreamResponse(
418452
provider
419453
);
420454
(async () => {
455+
if (shouldSendHookResultChunk(strictOpenAiCompliance, hooksResult)) {
456+
const hookResultChunk = constructHookResultChunk(hooksResult, fn);
457+
if (hookResultChunk) {
458+
await writer.write(encoder.encode(hookResultChunk));
459+
}
460+
}
421461
for (const chunk of streamChunkArray) {
422462
await writer.write(encoder.encode(chunk));
423463
}
@@ -434,3 +474,21 @@ export async function handleJSONToStreamResponse(
434474
statusText: response.statusText,
435475
});
436476
}
477+
478+
const constructHookResultChunk = (
479+
hooksResult: HookSpan['hooksResult'],
480+
fn: endpointStrings
481+
) => {
482+
if (fn === 'messages') {
483+
return `event: hook_results\ndata: ${JSON.stringify({
484+
hook_results: {
485+
before_request_hooks: hooksResult.beforeRequestHooksResult,
486+
},
487+
})}\n\n`;
488+
}
489+
return `data: ${JSON.stringify({
490+
hook_results: {
491+
before_request_hooks: hooksResult.beforeRequestHooksResult,
492+
},
493+
})}\n\n`;
494+
};

0 commit comments

Comments
 (0)