Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions pkg/providers/anthropic/chat_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,102 @@ package anthropic

import (
"context"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"glide/pkg/api/schemas"
"glide/pkg/providers/clients"
)

type Client struct {
Copy link
Member

@roma-glushko roma-glushko Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Golang uniquely, compared to other programming languages, allows struct/object methods can be separate across go files in the module. For example, in case of Anthropic client is spread across these three files:

Screenshot 2024-03-19 at 22 25 12

Specifically, the client is defined in https://github.com/EinStack/glide/blob/develop/pkg/providers/anthropic/client.go#L22

and it has already pulling a lot of useful values like API key (e.g. c.config.APIKey) and a bunch of other configurations. So these three files are really worth checking and reuse that existing client.

APIKey string
}

func (c *Client) SupportChatStream() bool {
return false
return true
}

func (c *Client) ChatStream(ctx context.Context, chatReq *schemas.ChatRequest) (clients.ChatStream, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over the weekend, I was finalizing the ChatStream interface and added this Open() method where the request initialization is supposed to happen (here is an example from OpenAI). Would good to move most of this code to AnthropicChatStream.Open() so we can properly track the initial request latency.

apiURL := "https://api.anthropic.com/v1/complete"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this existing chatURL field work here? https://github.com/EinStack/glide/blob/develop/pkg/providers/anthropic/client.go

Seems like it's one and the same completion endpoint for both sync and streaming API in Anthropic just like in OpenAI

requestBody := map[string]interface{}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this interesting method to combines configs provided by user in Glide provider YAML config + incoming request data to map them into a real Anthropic request:
https://github.com/EinStack/glide/blob/develop/pkg/providers/anthropic/chat.go#L82-L88

Would be great to leverage it here.

"model": "claude-2",
"prompt": chatReq.Message, // Assuming chatReq.Message contains the user's message
"max_tokens_to_sample": 256,
"stream": true,
}
requestJSON, err := json.Marshal(requestBody)
if err != nil {
return nil, err
}

httpReq, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(requestJSON))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("anthropic-version", "2023-06-01")
httpReq.Header.Set("x-api-key", c.APIKey)

resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

chatStream := &AnthropicChatStream{
responseBody: resp.Body,
}

return chatStream, nil
}

type AnthropicChatStream struct {
responseBody io.ReadCloser
}

func (s *AnthropicChatStream) Receive() (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be called Recv() to fulfil the ChatStream interface:

Suggested change
func (s *AnthropicChatStream) Receive() (string, error) {
func (s *AnthropicChatStream) Recv() (string, error) {

It also should return (*schemas.ChatStreamChunk, error) but I think you will get to that:

https://github.com/EinStack/glide/blob/develop/pkg/providers/clients/stream.go#L9

decoder := json.NewDecoder(s.responseBody)
for {
var event map[string]interface{}
if err := decoder.Decode(&event); err != nil {
if err == io.EOF {
return "", nil // No more events, return nil error
}
return "", err
}

eventType, ok := event["type"].(string)
if !ok {
return "", fmt.Errorf("missing event type")
}

switch eventType {
case "completion":
completionData, ok := event["completion"].(string)
if !ok {
return "", fmt.Errorf("missing completion data")
}
return completionData, nil
case "error":
errorData, ok := event["error"].(map[string]interface{})
if !ok {
return "", fmt.Errorf("missing error data")
}
errorType, _ := errorData["type"].(string)
errorMessage, _ := errorData["message"].(string)
return "", fmt.Errorf("error from Anthropic API: %s - %s", errorType, errorMessage)
}
}
}

func (c *Client) ChatStream(_ context.Context, _ *schemas.ChatRequest) (clients.ChatStream, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has to be ChatStream() method that would create an instance of AnthropicChatStream in this case.
For example, in OpenAI case, it looked this way:
https://github.com/EinStack/glide/blob/develop/pkg/providers/openai/chat_stream.go#L162-L177

Without that there is nothing that would use the AnthropicChatStream struct

return nil, clients.ErrChatStreamNotImplemented
func (s *AnthropicChatStream) Close() error {
return s.responseBody.Close()
}