Skip to content

[#2373] Use memory streams for getBodyStream #5816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

Kezino
Copy link

@Kezino Kezino commented Jul 5, 2025

No description provided.

Copy link

codecov bot commented Jul 5, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 86.07%. Comparing base (b921af6) to head (72065ce).

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #5816   +/-   ##
=======================================
  Coverage   86.07%   86.07%           
=======================================
  Files         108      108           
  Lines       16833    16833           
  Branches     2985     2986    +1     
=======================================
  Hits        14489    14489           
  Misses       2344     2344           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Kezino Kezino force-pushed the feature/issue-2373 branch from 4eb5ce5 to 72065ce Compare July 5, 2025 08:32
@Kezino Kezino changed the title [issue #2373] Use memory streams for getBodyStream [#2373] Use memory streams for getBodyStream Jul 5, 2025
@NathanFreeman
Copy link
Member

Thank you for your contribution. I reviewed issue #2373 and found that its core requirement is to address the potential memory overflow caused by large messages, advocating for a streaming approach rather than fully caching them in memory. However, the current PR only implements accessing messages in memory via a memory stream.

PSR-7 mentions: "For situations where a string would be an appropriate message implementation, built-in streams such as php://memory and php://temp may be used."

If the goal is to implement this standard from PSR-7, then I think this PR is perfectly acceptable.

@NathanFreeman
Copy link
Member

I also saw your comment in #2373, where you mentioned that "Swoole HTTP client buffering the entire response before delivering it has unfortunately become a bit of a deal breaker for my use case."

Your PR modifies the code for the Swoole HTTP server, and this part of the code would not affect the Swoole HTTP client.

@Kezino
Copy link
Author

Kezino commented Jul 5, 2025

I also saw your comment in #2373, where you mentioned that "Swoole HTTP client buffering the entire response before delivering it has unfortunately become a bit of a deal breaker for my use case."

Your PR modifies the code for the Swoole HTTP server, and this part of the code would not affect the Swoole HTTP client.

@NathanFreeman , thank you for the tip! Will try to figure out how to fix the client, instead of server.

@NathanFreeman
Copy link
Member

NathanFreeman commented Jul 5, 2025

If you obtain OpenAI's streaming response via Swoole\Coroutine\Http\Client, you can refer to the following implementation:

$client = new Swoole\Coroutine\Http\Client('api.openai.com', 443, true);  
$client->set([  
    'write_func' => function ($client, $chunk) {  
        // Process streaming data chunks in real-time   
    }
]);  
$client->get('/v1/chat/completions');  // Example API endpoint  

Key Features:

  1. write_func Callback Mechanism – Similar to cURL's CURLOPT_WRITEFUNCTION, enabling chunk-by-chunk processing of streaming responses.
  2. Version Requirement – Requires Swoole 5.1.0 or later.
  3. Typical Use Case – Suitable for handling OpenAI's Server-Sent Events (SSE) streaming responses.

Important Notes:

  • After setting write_func, the getContent() method will no longer be available to retrieve the response, and $client->body will also remain empty.
  • The connection can be actively terminated via $client->close().

If you are using a different HTTP client implementation, please provide the relevant code snippet, and I will assist in analyzing and optimizing it.

@Kezino
Copy link
Author

Kezino commented Jul 5, 2025

@NathanFreeman , thank you for being so helpful!

I am using the standard OpenAI library (https://github.com/openai-php/client) with the Hyperf Guzzle client adapter (https://github.com/hyperf/guzzle). Now it is obvious, that https://github.com/hyperf/guzzle/blob/master/src/CoroutineHandler.php needs to be modified in order to allow handling stream responses.

@NathanFreeman
Copy link
Member

it work fine for me.

<?php
require './vendor/autoload.php';

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use GuzzleHttp\Client;
use OpenAI\Responses\Responses\CreateStreamedResponse;
use OpenAI\Responses\StreamResponse;
use Nyholm\Psr7\Response;
use Swoole\Http\Server;

$http = new Server('0.0.0.0', 80);
$http->set([
    'worker_num'       => 2,
    'enable_coroutine' => true,
    'hook_flags'       => SWOOLE_HOOK_ALL
]);

$http->on('request', function ($request, $response) {
    $response->header('Content-Type', 'text/event-stream;charset=utf-8');

    $client = OpenAI::factory()
        ->withApiKey('your key')
        ->withBaseUri('api.deepseek.com')
        ->withStreamHandler(
            function (RequestInterface $request) use ($response): ResponseInterface {
                $uri     = $request->getUri();
                $headers = [];
                foreach ($request->getHeaders() as $key => $values) {
                    $headers[$key] = $values[0];
                }

                $buffer = '';
                $result = '';

                $client       = new Client();
                $httpResponse = $client->post(
                    'https://api.deepseek.com/chat/completions',
                    [
                        'body'    => $request->getBody()->getContents(),
                        'headers' => $headers,
                        'stream'  => true
                    ]
                );

                $body = $httpResponse->getBody();
                while (!$body->eof()) {
                    $chunk  = $body->read(1024);
                    $buffer .= $chunk;
                    $result .= $chunk;

                    while (true) {
                        $position = mb_strpos($buffer, "\n\n");
                        if ($position === false) {
                            break;
                        }

                        $message = mb_substr($buffer, 0, $position);
                        if ($message != 'data: [DONE]') {
                            $message = json_decode(str_replace('data: ', '', $message), true);
                            $content = $message['choices'][0]['delta']['content'];
                            if ($content) {
                                var_dump($content);
                                $response->write($content);
                            }
                        }

                        $buffer = mb_substr($buffer, $position + 2);
                    }
                }

                return (new Response(200, [], $result));
            })->make();

    $client->chat()->createStreamed([
        'model'    => 'deepseek-chat',
        'messages' => [['content' => 'what is the openai, how can i use it, show me some example', 'role' => 'user']]
    ]);

    $response->end();
});

$http->start();

@NathanFreeman
Copy link
Member

<?php
require './vendor/autoload.php';

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Swoole\Coroutine\Http\Client;
use OpenAI\Responses\Responses\CreateStreamedResponse;
use OpenAI\Responses\StreamResponse;
use Nyholm\Psr7\Response;
use Swoole\Http\Server;

$http = new Server('0.0.0.0', 80);
$http->set([
    'worker_num'       => 2,
    'enable_coroutine' => true,
    'hook_flags'       => SWOOLE_HOOK_ALL
]);

$http->on('request', function ($request, $response) {
    $response->header('Content-Type', 'text/event-stream;charset=utf-8');

    $client = OpenAI::factory()
        ->withApiKey('your key')
        ->withBaseUri('api.deepseek.com')
        ->withStreamHandler(function (RequestInterface $request) use ($response): ResponseInterface {
            $uri     = $request->getUri();
            $headers = [];
            foreach ($request->getHeaders() as $key => $values) {
                $headers[$key] = $values[0];
            }

            $buffer = '';
            $result = '';

            $client = new Client($uri->getHost(), 443, $uri->getScheme());
            $client->setHeaders($headers);
            $client->set([
                'write_func' => function ($client, $chunk) use ($response, &$buffer, &$result) {
                    $buffer .= $chunk;
                    $result .= $chunk;
                    while (true) {
                        $position = mb_strpos($buffer, "\n\n");
                        if ($position === false) {
                            break;
                        }

                        $message = mb_substr($buffer, 0, $position);
                        if ($message != 'data: [DONE]') {
                            $message = json_decode(str_replace('data: ', '', $message), true);
                            $content = $message['choices'][0]['delta']['content'];
                            if ($content) {
                                $response->write($content);
                            }
                        }

                        $buffer = mb_substr($buffer, $position + 2);
                    }
                }
            ]);

            $client->post($uri->getPath(), $request->getBody()->getContents());
            return (new Response($client->getStatusCode(), $client->getHeaders(), $result));
        })->make();

    $client->chat()->createStreamed([
        'model'    => 'deepseek-chat',
        'messages' => [['content' => 'what is the openai, how can i use it, show me some example', 'role' => 'user']]
    ]);

    $response->end();
});

$http->start();

@Kezino
Copy link
Author

Kezino commented Jul 6, 2025

@NathanFreeman , thank you so much!

You are indeed very helpful, truly appreciate it! 🙏
I am working on a handler for Guzzle, which would return Psr response with stream as body

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants