Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@langtrase/typescript-sdk",
"version": "6.3.0",
"version": "6.3.1",
"description": "A typescript SDK for Langtrace",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand All @@ -22,7 +22,7 @@
"license": "Apache-2.0",
"dependencies": {
"@aws-sdk/client-bedrock-runtime": "^3.670.0",
"@langtrase/trace-attributes": "7.5.0",
"@langtrase/trace-attributes": "7.5.1",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/instrumentation": "^0.49.1",
"@opentelemetry/sdk-trace-base": "^1.22.0",
Expand All @@ -49,7 +49,7 @@
"@typescript-eslint/eslint-plugin": "^5.62.0",
"ai": "^3.2.29",
"chromadb": "^1.9.4",
"cohere-ai": "^7.9.3",
"cohere-ai": "^7.15.0",
"dotenv": "^16.4.5",
"eslint": "^8.34.0",
"eslint-config-standard-with-typescript": "^23.0.0",
Expand Down
49 changes: 47 additions & 2 deletions src/examples/cohere/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import * as cohere from 'cohere-ai'
import { Tool } from 'cohere-ai/api'
import fs from 'fs'

init({ disable_instrumentations: { all_except: ['cohere'] } })
init({ disable_instrumentations: { all_except: ['cohere'] }, api_key: '<LANGTRACE_API_KEY>' })

const c = new cohere.CohereClient({ token: '<COHERE_API_KEY>' })
const c2 = new cohere.CohereClientV2({ token: '<COHERE_API_KEY>' })

const c = new cohere.CohereClient()
export const basicChat = async (): Promise<void> => {
const prediction = await c.chat({
chatHistory: [
Expand Down Expand Up @@ -77,6 +79,37 @@ Unless the user asks for a different style of answer, you should answer in full
)
}

export const basicChatV2 = async (): Promise<void> => {
const response = await c2.chat({
model: 'command-r-plus-08-2024',
messages: [
{
role: 'user',
content: 'hello world!'
}
]
})

console.log(response)
}

export const basicChatStreamV2 = async (): Promise<void> => {
const stream = await c2.chatStream({
model: 'command-r-plus-08-2024',
messages: [
{
role: 'user',
content: 'hello world!'
}
]
})
for await (const chatEvent of stream) {
if (chatEvent.type === 'content-delta') {
console.log(chatEvent.delta?.message)
}
}
}

export const basicStream = async (): Promise<void> => {
const stream = await c.chatStream({
model: 'command',
Expand All @@ -103,6 +136,18 @@ export const basicEmbed = async (): Promise<void> => {
console.info('Received embed', embed)
}

export const basicEmbedV2 = async (): Promise<void> => {
const embed = await c2.embed({
texts: ['hello', 'goodbye'],
model: 'embed-english-v3.0',
inputType: 'classification',
embeddingTypes: ['float', 'int8']
})
console.info(embed)

console.info('Received embed', embed)
}

export const basicEmbedJobsCreate = async (): Promise<void> => {
const s = await c.datasets.list()
if (s.datasets === undefined) {
Expand Down
4 changes: 2 additions & 2 deletions src/examples/entrypoint.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import { basic } from '@langtrace-examples/awsbedrock/converse-anthropic'
import { basicEmbedV2 } from '@langtrace-examples/cohere/basic'

void basic()
void basicEmbedV2()
56 changes: 54 additions & 2 deletions src/instrumentation/cohere/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { diag } from '@opentelemetry/api'
import { InstrumentationBase, InstrumentationModuleDefinition, InstrumentationNodeModuleDefinition, isWrapped } from '@opentelemetry/instrumentation'
// eslint-disable-next-line no-restricted-imports
import { version, name } from '../../../package.json'
import { ChatFn, ChatStreamFn, EmbedFn, EmbedJobsCreateFn, RerankFn } from '@langtrace-instrumentation/cohere/types'
import { chatPatch, chatStreamPatch, embedJobsCreatePatch, embedPatch, rerankPatch } from '@langtrace-instrumentation/cohere/patch'
import { ChatFn, ChatStreamFn, ChatV2Fn, ChatV2StreamFn, EmbedFn, EmbedJobsCreateFn, RerankFn } from '@langtrace-instrumentation/cohere/types'
import { chatPatch, chatPatchV2, chatStreamPatch, chatStreamPatchV2, embedJobsCreatePatch, embedPatch, rerankPatch } from '@langtrace-instrumentation/cohere/patch'

class CohereInstrumentation extends InstrumentationBase<any> {
constructor () {
Expand Down Expand Up @@ -51,6 +51,8 @@ class CohereInstrumentation extends InstrumentationBase<any> {
}

private _patch (cohere: any, moduleVersion?: string): void {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this
if (isWrapped(cohere.CohereClient.prototype)) {
this._unpatch(cohere)
}
Expand All @@ -59,6 +61,30 @@ class CohereInstrumentation extends InstrumentationBase<any> {
cohere.CohereClient.prototype.chat.name,
(original: ChatFn) => chatPatch(original, this.tracer, this.instrumentationVersion, name, moduleVersion)
)
this._wrap(
cohere,
'CohereClientV2',
(OriginalClient: typeof cohere.CohereClientV2) => {
return function (this: any, ...args: any[]) {
const instance: any = new OriginalClient(...args)
const originalChat: ChatV2Fn = instance.chat
instance.chat = chatPatchV2(originalChat, that.tracer, that.instrumentationVersion, name, moduleVersion)
return instance
}
}
)
this._wrap(
cohere,
'CohereClientV2',
(OriginalClient: typeof cohere.CohereClientV2) => {
return function (this: any, ...args: any[]) {
const instance: any = new OriginalClient(...args)
const originalChat: ChatV2StreamFn = instance.chatStream
instance.chatStream = chatStreamPatchV2(originalChat, that.tracer, that.instrumentationVersion, name, moduleVersion)
return instance
}
}
)
this._wrap(cohere.CohereClient.prototype,
cohere.CohereClient.prototype.chatStream.name,
(original: ChatStreamFn) => chatStreamPatch(original, this.tracer, this.instrumentationVersion, name, moduleVersion))
Expand All @@ -67,10 +93,36 @@ class CohereInstrumentation extends InstrumentationBase<any> {
cohere.CohereClient.prototype.embed.name,
(original: EmbedFn) => embedPatch(original, this.tracer, this.instrumentationVersion, name, moduleVersion))

this._wrap(
cohere,
'CohereClientV2',
(OriginalClient: typeof cohere.CohereClientV2) => {
return function (this: any, ...args: any[]) {
const instance: any = new OriginalClient(...args)
const originalChat: EmbedFn = instance.embed
instance.embed = embedPatch(originalChat, that.tracer, that.instrumentationVersion, name, moduleVersion)
return instance
}
}
)

this._wrap(cohere.CohereClient.prototype,
cohere.CohereClient.prototype.rerank.name,
(original: RerankFn) => rerankPatch(original, this.tracer, this.instrumentationVersion, name, moduleVersion))

this._wrap(
cohere,
'CohereClientV2',
(OriginalClient: typeof cohere.CohereClientV2) => {
return function (this: any, ...args: any[]) {
const instance: any = new OriginalClient(...args)
const originalChat: RerankFn = instance.rerank
instance.rerank = rerankPatch(originalChat, that.tracer, that.instrumentationVersion, name, moduleVersion)
return instance
}
}
)

this._wrap(cohere.CohereClient.prototype.embedJobs,
'create',
(original: EmbedJobsCreateFn) => embedJobsCreatePatch(original, this.tracer, this.instrumentationVersion, name, moduleVersion))
Expand Down
164 changes: 162 additions & 2 deletions src/instrumentation/cohere/patch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ import {
trace
} from '@opentelemetry/api'
import {
ICohereClient, IChatRequest, IRequestOptions, ChatFn, INonStreamedChatResponse, ChatStreamFn, IEmbedRequest, IEmbedResponse,
ICohereClient, IChatRequest, IV2ChatRequest, IRequestOptions, ChatFn, INonStreamedChatResponse, ChatStreamFn, IEmbedRequest, IEmbedResponse,
EmbedFn,
RerankFn,
IRerankResponse,
IRerankRequest,
EmbedJobsCreateFn,
ICreateEmbedJobRequest,
ICreateEmbedJobResponse
ICreateEmbedJobResponse,
IV2ChatResponse,
ChatV2Fn,
ChatV2StreamFn
} from '@langtrace-instrumentation/cohere/types'
import { APIS, LLMSpanAttributes, Event } from '@langtrase/trace-attributes'
import { addSpanEvent, createStreamProxy } from '@langtrace-utils/misc'
Expand Down Expand Up @@ -108,6 +111,83 @@ export const chatPatch = (original: ChatFn, tracer: Tracer, langtraceVersion: st
}
}

export const chatPatchV2 = (
original: ChatV2Fn,
tracer: Tracer,
langtraceVersion: string,
sdkName: string,
moduleVersion?: string
) => {
return async function (
this: ICohereClient,
request: IV2ChatRequest,
requestOptions?: IRequestOptions
): Promise<IV2ChatResponse> {
const customAttributes = context.active().getValue(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY) ?? {}
const attributes: LLMSpanAttributes = {
'langtrace.sdk.name': sdkName,
'langtrace.service.name': this._options.clientName ?? 'cohere',
'langtrace.service.type': 'llm',
'gen_ai.operation.name': 'chat',
'langtrace.version': langtraceVersion,
'langtrace.service.version': moduleVersion,
'url.full': 'https://api.cohere.ai',
'url.path': APIS.cohere.CHATV2.ENDPOINT,
'gen_ai.request.model': request.model ?? 'command-r',
'http.max.retries': requestOptions?.maxRetries,
'gen_ai.request.temperature': request.temperature,
'gen_ai.request.frequency_penalty': request.frequencyPenalty,
'gen_ai.request.presence_penalty': request.presencePenalty,
'gen_ai.request.top_p': request.p,
'gen_ai.request.top_k': request.k,
'gen_ai.request.seed': request.seed?.toString(),
'gen_ai.request.max_tokens': request.maxTokens,
'gen_ai.request.documents': request.documents !== undefined ? JSON.stringify(request.documents) : undefined,
'gen_ai.request.tools': request.tools !== undefined ? JSON.stringify(request.tools) : undefined,
'http.timeout': requestOptions?.timeoutInSeconds !== undefined ? requestOptions.timeoutInSeconds / 1000 : undefined,
...customAttributes
}

const spanName = customAttributes['langtrace.span.name' as keyof typeof customAttributes] ?? APIS.cohere.CHAT.METHOD
const span = tracer.startSpan(spanName, { attributes, kind: SpanKind.CLIENT }, context.active())

try {
return await context.with(trace.setSpan(context.active(), span), async () => {
const prompts = [...request.messages]

const response = await original.apply(this, [request, requestOptions])

const responseAttributes: Partial<LLMSpanAttributes> = {
'gen_ai.usage.input_tokens': response.usage.billedUnits.inputTokens,
'gen_ai.usage.output_tokens': response.usage.billedUnits.outputTokens,
'gen_ai.usage.total_tokens': response.usage.billedUnits.inputTokens + response.usage.billedUnits.outputTokens,
'gen_ai.response_id': response.id
}

addSpanEvent(span, Event.GEN_AI_PROMPT, { 'gen_ai.prompt': JSON.stringify(prompts) })

const assistantMessage = {
role: response.message.role,
content: response.message.content[0]?.text ?? ''
}

addSpanEvent(span, Event.GEN_AI_COMPLETION, { 'gen_ai.completion': JSON.stringify([assistantMessage]) })

span.setAttributes({ ...attributes, ...responseAttributes })
span.setStatus({ code: SpanStatusCode.OK })

return response
})
} catch (error: any) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
throw new LangtraceSdkError(error.message as string, error.stack as string)
} finally {
span.end()
}
}
}

export const chatStreamPatch = (original: ChatStreamFn, tracer: Tracer, langtraceVersion: string, sdkName: string, moduleVersion?: string) => {
return async function (this: ICohereClient, request: IChatRequest, requestOptions?: IRequestOptions): Promise<any> {
const customAttributes = context.active().getValue(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY) ?? {}
Expand Down Expand Up @@ -155,6 +235,43 @@ export const chatStreamPatch = (original: ChatStreamFn, tracer: Tracer, langtrac
}
}

export const chatStreamPatchV2 = (original: ChatV2StreamFn, tracer: Tracer, langtraceVersion: string, sdkName: string, moduleVersion?: string) => {
return async function (this: ICohereClient, request: IV2ChatRequest, requestOptions?: IRequestOptions): Promise<any> {
const customAttributes = context.active().getValue(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY) ?? {}
const attributes: LLMSpanAttributes = {
'langtrace.sdk.name': sdkName,
'langtrace.service.name': this._options.clientName ?? 'cohere',
'langtrace.service.type': 'llm',
'gen_ai.operation.name': 'chat',
'langtrace.version': langtraceVersion,
'langtrace.service.version': moduleVersion,
'url.full': 'https://api.cohere.ai',
'url.path': '/v2/chat',
'gen_ai.request.stream': true,
'gen_ai.request.model': request.model ?? 'command-r',
'http.max.retries': requestOptions?.maxRetries,
'gen_ai.request.temperature': request.temperature,
'gen_ai.request.frequency_penalty': request.frequencyPenalty,
'gen_ai.request.presence_penalty': request.presencePenalty,
'gen_ai.request.top_p': request.p,
'gen_ai.request.top_k': request.k,
'gen_ai.request.max_tokens': request.maxTokens,
'gen_ai.request.seed': request.seed?.toString(),
'gen_ai.request.documents': request.documents !== undefined ? JSON.stringify(request.documents) : undefined,
'gen_ai.request.tools': request.tools !== undefined ? JSON.stringify(request.tools) : undefined,
'http.timeout': requestOptions?.timeoutInSeconds !== undefined ? requestOptions.timeoutInSeconds / 1000 : undefined,
...customAttributes
}
const spanName = customAttributes['langtrace.span.name' as keyof typeof customAttributes] ?? 'cohere.chatStream'
const span = tracer.startSpan(spanName, { kind: SpanKind.CLIENT, attributes }, context.active())
return await context.with(trace.setSpan(context.active(), span), async () => {
addSpanEvent(span, Event.GEN_AI_PROMPT, { 'gen_ai.prompt': JSON.stringify(request.messages) })
const response = await original.apply(this, [request, requestOptions])
return createStreamProxy(response, handleStreamV2(response, attributes, span))
})
}
}

export const embedPatch = (original: EmbedFn, tracer: Tracer, langtraceVersion: string, sdkName: string, moduleVersion?: string) => {
return async function (this: ICohereClient, request: IEmbedRequest, requestOptions?: IRequestOptions): Promise<IEmbedResponse> {
const customAttributes = context.active().getValue(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY) ?? {}
Expand Down Expand Up @@ -320,3 +437,46 @@ async function * handleStream (stream: any, attributes: LLMSpanAttributes, span:
span.end()
}
}

async function * handleStreamV2 (stream: any, attributes: LLMSpanAttributes, span: Span): AsyncGenerator {
let accumulatedText = ''

try {
addSpanEvent(span, Event.STREAM_START)

for await (const chunk of stream) {
if (chunk.type === 'message-start') {
attributes['gen_ai.response_id'] = chunk.id
} else if (chunk.type === 'content-delta') {
accumulatedText += chunk.delta?.message?.content?.text as string
} else if (chunk.type === 'message-end') {
addSpanEvent(span, Event.STREAM_END)

const response = {
role: 'assistant',
content: accumulatedText
}

addSpanEvent(span, Event.GEN_AI_COMPLETION, { 'gen_ai.completion': JSON.stringify([response]) })

if (chunk.delta?.usage?.billedUnits !== undefined) {
const { inputTokens, outputTokens } = chunk.delta.usage.billedUnits
attributes['gen_ai.usage.input_tokens'] = inputTokens
attributes['gen_ai.usage.output_tokens'] = outputTokens
attributes['gen_ai.usage.total_tokens'] = Number(inputTokens ?? 0) + Number(outputTokens ?? 0)
}
}

yield chunk
}

span.setAttributes(attributes)
span.setStatus({ code: SpanStatusCode.OK })
} catch (error: any) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
throw new LangtraceSdkError(error.message as string, error.stack as string)
} finally {
span.end()
}
}
Loading
Loading