Skip to content

Commit cec9492

Browse files
Add Openai chat completion methods + support for streaming (#34)
* openAI * change path * add response cleaning * add streaming support * add whitespace * pretier * biome formatting * Fix type and improve example (#35) Co-authored-by: Daniel Khoo <daniel.khoo@messari.io> * replace messari format with openAI --------- Co-authored-by: Daniel Khoo <daniel.khoo@messari.io>
1 parent 4c889e7 commit cec9492

File tree

8 files changed

+758
-83
lines changed

8 files changed

+758
-83
lines changed

packages/api/src/client/base.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ import type {
9797
modifyWatchlistAssetsResponse,
9898
getTeamAllowanceResponse,
9999
getPermissionsResponse,
100+
createChatCompletionOpenAIResponse,
100101
} from "../types";
101102
import { LogLevel, type Logger, makeConsoleLogger, createFilteredLogger, noOpLogger } from "../logging";
102103
import type { PaginatedResult, RequestOptions, ClientEventMap, ClientEventType, ClientEventHandler } from "./types";
@@ -106,12 +107,23 @@ import type { PaginatedResult, RequestOptions, ClientEventMap, ClientEventType,
106107
*/
107108
export interface AIInterface {
108109
/**
109-
* Creates a chat completion using Messari's AI
110+
* Creates a chat completion using OpenAI's API
110111
* @param params Parameters for the chat completion request
111112
* @param options Optional request configuration
112113
* @returns A promise resolving to the chat completion response
113114
*/
114-
createChatCompletion(params: createChatCompletionParameters, options?: RequestOptions): Promise<createChatCompletionResponse>;
115+
createChatCompletion(params: Omit<createChatCompletionParameters, "stream">, options?: RequestOptions): Promise<createChatCompletionOpenAIResponse>;
116+
117+
/**
118+
* Creates a streaming chat completion using OpenAI's API
119+
* @param params Parameters for the chat completion request
120+
* @param options Optional request configuration
121+
* @returns A promise resolving to a readable stream of chat completion chunks
122+
*/
123+
createChatCompletionStream(
124+
params: Omit<createChatCompletionParameters, "stream">,
125+
options?: RequestOptions,
126+
): Promise<ReadableStream<createChatCompletionOpenAIResponse>>;
115127

116128
/**
117129
* Extracts entities from text content

packages/api/src/client/client.ts

Lines changed: 207 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
createChatCompletion,
33
extractEntities,
4+
createChatCompletionOpenAI,
45
getNewsFeed,
56
getNewsSources,
67
getNewsFeedAssets,
@@ -146,6 +147,8 @@ import type {
146147
getWatchlistResponse,
147148
updateWatchlistParameters,
148149
updateWatchlistResponse,
150+
createChatCompletionOpenAIResponse,
151+
createChatCompletionOpenAIParameters,
149152
} from "../types";
150153
import type { Agent } from "node:http";
151154
import { pick } from "../utils";
@@ -324,27 +327,197 @@ export class MessariClient extends MessariClientBase {
324327

325328
// Check if the response is JSON or text based on Content-Type header
326329
const contentType = response.headers.get("Content-Type");
327-
let responseData: { data: T };
328330

329331
if (contentType?.toLowerCase().includes("application/json")) {
330-
responseData = await response.json();
331-
} else {
332-
responseData = { data: await response.text() } as { data: T };
332+
const jsonResponse = await response.json();
333+
// If response has data field and no error, unwrap it, otherwise use the whole response
334+
const data = jsonResponse.data && !jsonResponse.error ? jsonResponse.data : jsonResponse;
335+
return data as T;
333336
}
334337

335-
this.logger(LogLevel.DEBUG, "request success", { responseData });
338+
const text = await response.text();
339+
return text as T;
340+
} catch (error) {
341+
this.logger(LogLevel.ERROR, "request failed", { error });
336342

337-
// Emit response event
338-
this.emit("response", {
339-
method,
340-
path,
341-
status: response.status,
342-
data: responseData,
343+
// Emit error event
344+
this.emit("error", {
345+
error: error as Error,
346+
request: {
347+
method,
348+
path,
349+
queryParams,
350+
},
351+
});
352+
353+
throw error;
354+
}
355+
}
356+
357+
private async requestStream<T>({ method, path, body, queryParams = {}, options = {} }: RequestParameters): Promise<ReadableStream<T>> {
358+
this.logger(LogLevel.DEBUG, "stream request start", {
359+
method,
360+
url: `${this.baseUrl}${path}`,
361+
queryParams,
362+
});
363+
364+
this.emit("request", {
365+
method,
366+
path,
367+
queryParams,
368+
});
369+
370+
const queryString = Object.entries(queryParams)
371+
.filter(([_, value]) => value !== undefined)
372+
.map(([key, value]) => {
373+
if (Array.isArray(value)) {
374+
return value.map((item) => `${encodeURIComponent(key)}=${encodeURIComponent(String(item))}`).join("&");
375+
}
376+
return `${encodeURIComponent(key)}=${encodeURIComponent(String(value))}`;
377+
})
378+
.join("&");
379+
380+
const url = `${this.baseUrl}${path}${queryString ? `?${queryString}` : ""}`;
381+
382+
const headers = {
383+
...this.defaultHeaders,
384+
...options.headers,
385+
"Accept": "text/event-stream",
386+
"Cache-Control": "no-cache",
387+
"Connection": "keep-alive",
388+
};
389+
390+
const timeoutMs = options.timeoutMs || this.timeoutMs;
391+
392+
try {
393+
const response = await RequestTimeoutError.rejectAfterTimeout(
394+
this.fetchFn(url, {
395+
method,
396+
headers,
397+
body: body ? JSON.stringify(body) : undefined,
398+
signal: options.signal,
399+
cache: options.cache,
400+
credentials: options.credentials,
401+
integrity: options.integrity,
402+
keepalive: options.keepalive,
403+
mode: options.mode,
404+
redirect: options.redirect,
405+
referrer: options.referrer,
406+
referrerPolicy: options.referrerPolicy,
407+
// @ts-ignore - Next.js specific options
408+
next: options.next,
409+
// Node.js specific option
410+
agent: this.agent,
411+
}),
412+
timeoutMs,
413+
);
414+
415+
if (!response.ok) {
416+
const errorData = await response.json();
417+
this.logger(LogLevel.ERROR, "request error", {
418+
status: response.status,
419+
statusText: response.statusText,
420+
error: errorData,
421+
});
422+
423+
const error = new Error(errorData.error || "An error occurred");
424+
425+
this.emit("error", {
426+
error,
427+
request: {
428+
method,
429+
path,
430+
queryParams,
431+
},
432+
});
433+
434+
throw error;
435+
}
436+
437+
// For streaming responses, return a transformed stream that parses the chunks
438+
if (!response.body) {
439+
throw new Error("No reader available for streaming response");
440+
}
441+
442+
let buffer = "";
443+
const decoder = new TextDecoder();
444+
445+
// Create a TransformStream that will parse the raw bytes into the expected type T
446+
const transformer = new TransformStream<Uint8Array, T>({
447+
transform: async (chunk, controller) => {
448+
try {
449+
// Decode the chunk and add to buffer
450+
const text = decoder.decode(chunk, { stream: true });
451+
buffer += text;
452+
453+
// Process any complete lines in the buffer
454+
const lines = buffer.split("\n");
455+
// Keep the last potentially incomplete line in the buffer
456+
buffer = lines.pop() || "";
457+
458+
for (const line of lines) {
459+
if (line.startsWith("data: ")) {
460+
const jsonData = line.slice(6).trim(); // Remove 'data: ' prefix
461+
462+
// Skip [DONE] marker
463+
if (jsonData === "[DONE]") {
464+
continue;
465+
}
466+
467+
if (jsonData) {
468+
try {
469+
const parsed = JSON.parse(jsonData);
470+
controller.enqueue(parsed as T);
471+
} catch (e) {
472+
this.logger(LogLevel.ERROR, "Error parsing JSON from stream", {
473+
error: e,
474+
data: jsonData,
475+
});
476+
}
477+
}
478+
} else if (line.trim() && !line.startsWith(":")) {
479+
// Try to parse non-empty lines that aren't comments
480+
try {
481+
const parsed = JSON.parse(line);
482+
controller.enqueue(parsed as T);
483+
} catch (e) {
484+
// Not JSON, might be part of a multi-line chunk
485+
if (line.trim()) {
486+
this.logger(LogLevel.DEBUG, "Non-JSON line in stream", { line });
487+
}
488+
}
489+
}
490+
}
491+
} catch (error) {
492+
this.logger(LogLevel.ERROR, "Error processing stream chunk", { error });
493+
controller.error(error);
494+
}
495+
},
496+
flush: (controller) => {
497+
// Process any remaining data in the buffer
498+
if (buffer.trim()) {
499+
if (buffer.startsWith("data: ")) {
500+
const jsonData = buffer.slice(6).trim();
501+
if (jsonData && jsonData !== "[DONE]") {
502+
try {
503+
const parsed = JSON.parse(jsonData);
504+
controller.enqueue(parsed as T);
505+
} catch (e) {
506+
this.logger(LogLevel.ERROR, "Error parsing final JSON from stream", {
507+
error: e,
508+
data: jsonData,
509+
});
510+
}
511+
}
512+
}
513+
}
514+
},
343515
});
344516

345-
return responseData.data;
517+
// Pipe the response body through our transformer
518+
return response.body.pipeThrough(transformer);
346519
} catch (error) {
347-
this.logger(LogLevel.ERROR, "request failed", { error });
520+
this.logger(LogLevel.ERROR, "stream request failed", { error });
348521

349522
// Emit error event
350523
this.emit("error", {
@@ -453,10 +626,16 @@ export class MessariClient extends MessariClientBase {
453626
data: responseData,
454627
});
455628

456-
return {
457-
data: responseData.data,
458-
metadata: responseData.metadata,
459-
};
629+
// If response has data field, return wrapped format, otherwise treat whole response as data
630+
return responseData.data !== undefined
631+
? {
632+
data: responseData.data,
633+
metadata: responseData.metadata,
634+
}
635+
: {
636+
data: responseData,
637+
metadata: {} as M,
638+
};
460639
} catch (error) {
461640
this.logger(LogLevel.ERROR, "request with metadata failed", { error });
462641

@@ -677,10 +856,17 @@ export class MessariClient extends MessariClientBase {
677856

678857
public readonly ai: AIInterface = {
679858
createChatCompletion: (params: createChatCompletionParameters, options?: RequestOptions) =>
680-
this.request<createChatCompletionResponse>({
681-
method: createChatCompletion.method,
682-
path: createChatCompletion.path(),
683-
body: pick(params, createChatCompletion.bodyParams),
859+
this.request<createChatCompletionOpenAIResponse>({
860+
method: createChatCompletionOpenAI.method,
861+
path: createChatCompletionOpenAI.path(),
862+
body: pick(params, createChatCompletionOpenAI.bodyParams) as createChatCompletionOpenAIParameters & { stream: false },
863+
options,
864+
}),
865+
createChatCompletionStream: (params: createChatCompletionParameters, options?: RequestOptions) =>
866+
this.requestStream<createChatCompletionOpenAIResponse>({
867+
method: createChatCompletionOpenAI.method,
868+
path: createChatCompletionOpenAI.path(),
869+
body: { ...pick(params, createChatCompletionOpenAI.bodyParams), stream: true },
684870
options,
685871
}),
686872
extractEntities: (params: extractEntitiesParameters, options?: RequestOptions) =>

packages/api/src/types/index.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,21 @@ export const extractEntities = {
4747
} as const;
4848

4949

50+
export type createChatCompletionOpenAIResponse = components['schemas']['ChatCompletionResponseOpenAI'];
51+
export type createChatCompletionOpenAIError = components['schemas']['APIError'];
52+
53+
export type createChatCompletionOpenAIParameters = components['schemas']['ChatCompletionRequest'];
54+
55+
56+
export const createChatCompletionOpenAI = {
57+
method: 'POST' as const,
58+
pathParams: [] as const,
59+
queryParams: [] as const,
60+
bodyParams: ['messages', 'verbosity', 'response_format', 'inline_citations', 'stream'] as const,
61+
path: () => '/ai/openai/chat/completions'
62+
} as const;
63+
64+
5065
export type getAssetsV2Response = components['schemas']['V2AssetListItem'][];
5166
export type getAssetsV2Error = components['schemas']['APIError'];
5267

0 commit comments

Comments
 (0)