1
1
import { Tiktoken } from '@dqbd/tiktoken'
2
2
import { Response } from 'express'
3
+ import { isError } from '../../util/parser'
3
4
4
5
import { AZURE_RESOURCE , AZURE_API_KEY } from '../config'
5
6
import { validModels , inProduction } from '../../../config'
@@ -9,7 +10,14 @@ import { APIError } from '../../types'
9
10
import { AzureOpenAI } from 'openai'
10
11
// import { EventStream } from '@azure/openai'
11
12
import { Stream } from 'openai/streaming'
12
- import { ResponseStreamEvent } from 'openai/resources/responses/responses'
13
+ import {
14
+ FunctionTool ,
15
+ ResponseInput ,
16
+ ResponseInputItem ,
17
+ ResponseStreamEvent ,
18
+ } from 'openai/resources/responses/responses'
19
+
20
+ import { testTool } from './tools'
13
21
14
22
const endpoint = `https://${ AZURE_RESOURCE } .openai.azure.com/`
15
23
@@ -23,90 +31,139 @@ export const getAzureOpenAIClient = (deployment: string) =>
23
31
24
32
const client = getAzureOpenAIClient ( process . env . GPT_4O )
25
33
26
- /**
27
- * Mock stream for testing
28
- */
29
- // const getMockCompletionEvents: () => Promise<
30
- // EventStream<ResponseStreamEvent>
31
- // > = async () => {
32
- // const mockStream = new ReadableStream<ResponseStreamEvent>({
33
- // start(controller) {
34
- // for (let i = 0; i < 10; i += 1) {
35
- // controller.enqueue({
36
- // event: "response",
37
- // data: ""
38
- // })
39
- // }
40
- // controller.close()
41
- // },
42
- // }) as EventStream<ResponseStreamEvent>
43
-
44
- // return mockStream
45
- // }
46
-
47
- export const getResponsesEvents = async ( {
48
- model,
49
- input,
50
- stream,
51
- } : any ) : Promise <
52
- | Stream < ResponseStreamEvent >
53
- // EventStream<ChatCompletionChunk>
54
- | APIError
55
- | any
56
- > => {
57
- const deploymentId = validModels . find ( ( m ) => m . name === model ) ?. deployment
58
-
59
- if ( ! deploymentId ) throw new Error ( `Invalid model: ${ model } , not one of ${ validModels . map ( ( m ) => m . name ) . join ( ', ' ) } ` )
60
-
61
- // Mocking disabled because it's difficult to mock a event stream for responses API.
62
- // if (deploymentId === 'mock') return getMockCompletionEvents()
63
-
64
- try {
65
- const events = await client . responses . create ( {
66
- model : deploymentId ,
67
- instructions : 'Olet avulias apuri.' ,
68
- input,
69
- stream,
70
- tools : [ ] ,
71
- } )
34
+ export class ResponsesClient {
35
+ model : string
36
+ instructions : string
37
+ tools : FunctionTool [ ]
72
38
73
- return events
74
- } catch ( error : any ) {
75
- logger . error ( error )
39
+ constructor ( model : string , instructions ?: string ) {
40
+ const deploymentId = validModels . find ( ( m ) => m . name === model ) ?. deployment
76
41
77
- return { error } as any as APIError
42
+ if ( ! deploymentId )
43
+ throw new Error (
44
+ `Invalid model: ${ model } , not one of ${ validModels . map ( ( m ) => m . name ) . join ( ', ' ) } `
45
+ )
46
+
47
+ this . model = deploymentId
48
+ this . instructions = instructions || 'Olet avulias apuri.'
49
+ this . tools = [ testTool . definition ]
78
50
}
79
- }
80
51
81
- export const streamResponsesEvents = async ( events : Stream < ResponseStreamEvent > , encoding : Tiktoken , res : Response ) => {
82
- let tokenCount = 0
83
- const contents = [ ]
84
-
85
- for await ( const event of events ) {
86
- switch ( event . type ) {
87
- case 'response.output_text.delta' :
88
- if ( ! inProduction ) logger . info ( event . delta )
89
-
90
- await new Promise ( ( resolve ) => {
91
- if (
92
- ! res . write ( event . delta , ( err ) => {
93
- if ( err ) logger . error ( `${ event . delta } ${ err } ` )
94
- } )
95
- ) {
96
- logger . info ( `${ event . delta } res.write returned false, waiting for drain` )
97
- res . once ( 'drain' , resolve )
98
- } else {
99
- process . nextTick ( resolve )
100
- }
101
- } )
102
- contents . push ( event . delta )
103
- tokenCount += encoding . encode ( event . delta ) . length ?? 0
104
- break
52
+ async createResponse ( {
53
+ input,
54
+ } : {
55
+ input : ResponseInput
56
+ } ) : Promise < Stream < ResponseStreamEvent > | APIError > {
57
+ try {
58
+ return await client . responses . create ( {
59
+ model : this . model ,
60
+ instructions : this . instructions ,
61
+ input,
62
+ stream : true ,
63
+ tools : this . tools ,
64
+ } )
65
+ } catch ( error : any ) {
66
+ logger . error ( error )
67
+
68
+ return { error } as any as APIError
69
+ }
70
+ }
71
+
72
+ async handleResponse ( {
73
+ events,
74
+ prevMessages,
75
+ encoding,
76
+ res,
77
+ } : {
78
+ events : Stream < any >
79
+ prevMessages : ResponseInput
80
+ encoding : Tiktoken
81
+ res : Response
82
+ } ) {
83
+ let tokenCount = 0
84
+ const contents = [ ]
85
+
86
+ for await ( const event of events ) {
87
+ console . log ( 'event type:' , event . type )
88
+
89
+ switch ( event . type ) {
90
+ case 'response.output_text.delta' :
91
+ await this . writeDelta ( event . delta , res )
92
+
93
+ contents . push ( event . delta )
94
+ tokenCount += encoding . encode ( event . delta ) . length ?? 0
95
+ break
96
+
97
+ case 'response.function_call_arguments.done' :
98
+ // WORK IN PROGRESS
99
+
100
+ // const augRetrieval = await this.callToolFunction(
101
+ // event.arguments,
102
+ // event.call_id
103
+ // )
104
+ // const newEvents = await this.createResponse({
105
+ // input: [...prevMessages, augRetrieval],
106
+ // })
107
+
108
+ // if (isError(events)) {
109
+ // throw new Error(`Error creating response from function call`)
110
+ // }
111
+
112
+ // await this.handleResponse({
113
+ // events: newEvents as Stream<ResponseStreamEvent>,
114
+ // prevMessages: [...prevMessages, augRetrieval],
115
+ // encoding,
116
+ // res,
117
+ // })
118
+ break
119
+ }
105
120
}
121
+
122
+ return {
123
+ tokenCount,
124
+ response : contents . join ( '' ) ,
125
+ }
126
+ }
127
+
128
+ private async writeDelta ( text : string , res : Response ) {
129
+ // if (!inProduction) logger.info(text)
130
+
131
+ await new Promise ( ( resolve ) => {
132
+ if (
133
+ ! res . write ( text , ( err ) => {
134
+ if ( err ) logger . error ( `${ text } ${ err } ` )
135
+ } )
136
+ ) {
137
+ logger . info ( `${ text } res.write returned false, waiting for drain` )
138
+ res . once ( 'drain' , resolve )
139
+ } else {
140
+ process . nextTick ( resolve )
141
+ }
142
+ } )
106
143
}
107
144
108
- return {
109
- tokenCount,
110
- response : contents . join ( '' ) ,
145
+ private async callToolFunction (
146
+ args : string ,
147
+ callId : string
148
+ ) : Promise < ResponseInputItem [ ] > {
149
+ const { query } = JSON . parse ( args )
150
+ try {
151
+ const retrieval = await testTool . function ( query )
152
+
153
+ return [
154
+ {
155
+ role : 'user' ,
156
+ content : retrieval . query ,
157
+ } ,
158
+ {
159
+ type : 'function_call_output' ,
160
+ call_id : callId ,
161
+ output : retrieval . result ,
162
+ } ,
163
+ ]
164
+ } catch ( error ) {
165
+ logger . error ( 'Error calling tool function:' , error )
166
+ return null
167
+ }
111
168
}
112
169
}
0 commit comments