Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions internal/utils/http_requests/http_warpper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,44 @@ func PatchAndParse[T any](client *http.Client, url string, options ...HttpOption
return RequestAndParse[T](client, url, "PATCH", options...)
}

func readBodyStream(resp io.ReadCloser, callback func(data []byte) error) error {
reader := bufio.NewReaderSize(resp, 4*1024) // init with 4KB buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, the default buffer max size of bufio.Scanner is 64*1024, it's big enough, I'm not sure what's your specific scenarios, but 4*1024 should not works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yeuoly
64k is not enough, as many current LLMs support more than 64k of context. If MCP is used, it will add all the prompt words, the MCP response, and all the bytes of the JSON format structure, making it easy to exceed this limit.

reader := bufio.NewReaderSize(resp, 4*1024) // init with 4KB buffer

The 4K here is just a buffer to improve performance, and there is no maximum buffer limit.Therefore, the maximum byte size of a line ultimately depends on the context size supported by the LLM and the size of the response returned by the tool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, Dify uses stream mode for all the LLM calls, response was split into chunks, each chunk should not have a size larger than 64k.

As for the unlimited buffer, I suppose it's not a good design, it leads to DoS attack. for example, I can create a fake OpenAI server and returns 1G data in a single chunk, that's terrible, it might be a configurable environment variable, but not hardcoded. anyway, the buffer should not to be too large, if you use Dify in personal scenarios, yeah, just make it configurable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yeuoly
in my case, the llm return a chunk as below,when use tool calling, every chunk include the prompt_messages, the prompt_messages is very big:

{
    "data": {
        "model": "deepseek-v3-250324",
        "prompt_messages": [
            {
                "role": "system",
                "content": "system promt(Text in Chinese with a length of 9197 characters.)",
                "name": ""
            },
            {
                "role": "user",
                "content": "user  query",
                "name": ""
            },
            {
                "role": "assistant",
                "content": "",
                "name": ""
            },
            {
                "role": "tool",
                "content": "Tool execution result: [{'type': 'text', 'text': 'too response in Chinese with a length of 29008 characters",
                "name": "hotel_info"
            }
        ],
        "system_fingerprint": "",
        "delta": {
            "index": 1,
            "message": {
                "role": "assistant",
                "content": "###",
                "name": "",
                "tool_calls": []
            },
            "usage": null,
            "finish_reason": null
        }
    },
    "error": ""
}

In Go, a Chinese character generally occupies 3 bytes, so although the text doesn't seem too long, multiplying it by 3 could surpass 64K.

@Yeuoly

defer resp.Close()
for {
// read line by line
data, err := reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
log.Error("read body err:", err)
}
break
}

data = bytes.TrimSpace(data)
if len(data) == 0 {
continue
}

if bytes.HasPrefix(data, []byte("data:")) {
// split
data = data[5:]
}

if bytes.HasPrefix(data, []byte("event:")) {
// TODO: handle event
continue
}

// trim space
data = bytes.TrimSpace(data)
if err := callback(data); err != nil {
return err
}
}

return nil
}

func RequestAndParseStream[T any](client *http.Client, url string, method string, options ...HttpOptions) (*stream.Stream[T], error) {
resp, err := Request(client, url, method, options...)
if err != nil {
Expand Down Expand Up @@ -109,42 +147,21 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
"module": "http_requests",
"function": "RequestAndParseStream",
}, func() {
scanner := bufio.NewScanner(resp.Body)
defer resp.Body.Close()

for scanner.Scan() {
data := scanner.Bytes()
if len(data) == 0 {
continue
}

if bytes.HasPrefix(data, []byte("data:")) {
// split
data = data[5:]
}

if bytes.HasPrefix(data, []byte("event:")) {
// TODO: handle event
continue
}

// trim space
data = bytes.TrimSpace(data)

readBodyStream(resp.Body, func(data []byte) error {
// unmarshal
t, err := parser.UnmarshalJsonBytes[T](data)
if err != nil {
if raiseErrorWhenStreamDataNotMatch {
ch.WriteError(err)
break
return err
} else {
log.Warn("stream data not match for %s, got %s", url, string(data))
}
continue
}

ch.Write(t)
}
return nil
})

ch.Close()
})
Expand Down
150 changes: 150 additions & 0 deletions internal/utils/http_requests/http_wrapper_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package http_requests

import (
"io"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

type mockReader struct {
chunks []string
index int
}

func (m *mockReader) Read(p []byte) (n int, err error) {
if m.index >= len(m.chunks) {
return 0, io.EOF
}
n = copy(p, m.chunks[m.index])
m.index++
if m.index == len(m.chunks) {
return n, io.EOF
}
return n, nil
}

func TestParseJsonBody(t *testing.T) {
t.Run("multiple chunks with newlines", func(t *testing.T) {
chunks := []string{
`{"name": "John",`,
"\n",
`"age": 30}`,
"\n",
}
reader := &mockReader{chunks: chunks}
resp := &http.Response{Body: io.NopCloser(reader)}

var result map[string]interface{}
err := parseJsonBody(resp, &result)
assert.Nil(t, err)

assert.Equal(t, "John", result["name"])
assert.Equal(t, 30, int(result["age"].(float64)))
})

t.Run("chunks without newlines", func(t *testing.T) {
chunks := []string{
`{"name": "Alice",`,
`"age": 25}`,
}
reader := &mockReader{chunks: chunks}
resp := &http.Response{Body: io.NopCloser(reader)}

var result map[string]interface{}
err := parseJsonBody(resp, &result)
assert.Nil(t, err)
assert.Equal(t, "Alice", result["name"])
assert.Equal(t, 25, int(result["age"].(float64)))
})

t.Run("chunks with mixed newlines", func(t *testing.T) {
chunks := []string{
`{"name": "Bob",`,
"\n",
`"age": 35`,
`,"city": "New York"}`,
}
reader := &mockReader{chunks: chunks}
resp := &http.Response{Body: io.NopCloser(reader)}

var result map[string]interface{}
err := parseJsonBody(resp, &result)
assert.Nil(t, err)
assert.Equal(t, "Bob", result["name"])
assert.Equal(t, 35, int(result["age"].(float64)))
assert.Equal(t, "New York", result["city"])
})

t.Run("last chunk without newline", func(t *testing.T) {
chunks := []string{
`{"name": "Eve",`,
"\n",
`"age": 28}`,
}
reader := &mockReader{chunks: chunks}
resp := &http.Response{Body: io.NopCloser(reader)}

var result map[string]interface{}
err := parseJsonBody(resp, &result)
assert.Nil(t, err)
assert.Equal(t, "Eve", result["name"])
assert.Equal(t, 28, int(result["age"].(float64)))
})

t.Run("empty chunks", func(t *testing.T) {
chunks := []string{
"",
"\n",
"",
`{"name": "Charlie"}`,
"\n",
}
reader := &mockReader{chunks: chunks}
resp := &http.Response{Body: io.NopCloser(reader)}

var result map[string]interface{}
err := parseJsonBody(resp, &result)
assert.Nil(t, err)
assert.Equal(t, "Charlie", result["name"])
})

t.Run("invalid JSON", func(t *testing.T) {
chunks := []string{
`{"name": "Invalid`,
"\n",
`"age": }`,
}
reader := &mockReader{chunks: chunks}
resp := &http.Response{Body: io.NopCloser(reader)}

var result map[string]interface{}
err := parseJsonBody(resp, &result)
assert.NotNil(t, err)
})

t.Run("large JSON split across multiple chunks", func(t *testing.T) {
largeJSON := strings.Repeat(`{"key": "value"},`, 1000) // Create a large JSON array
largeJSON = "[" + largeJSON[:len(largeJSON)-1] + "]" // Remove last comma and wrap in array brackets

chunkSize := 100
chunks := make([]string, 0, len(largeJSON)/chunkSize+1)
for i := 0; i < len(largeJSON); i += chunkSize {
end := i + chunkSize
if end > len(largeJSON) {
end = len(largeJSON)
}
chunks = append(chunks, largeJSON[i:end])
}

reader := &mockReader{chunks: chunks}
resp := &http.Response{Body: io.NopCloser(reader)}

var result []map[string]string
err := parseJsonBody(resp, &result)
assert.Nil(t, err)
assert.Equal(t, 1000, len(result))
})
}