Skip to content

Commit e509073

Browse files
[azopenaiassistants] Adding in streaming support for Assistants V2. (Azure#23093)
Adding in streaming support for all the Assistants endpoints. Fixes Azure#22293
1 parent 844c85d commit e509073

20 files changed

+1189
-49
lines changed

sdk/ai/azopenaiassistants/autorest.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ directive:
212212
// POSTs
213213
$[threadsBase + "/messages/{messageId}"].post.parameters[2].schema["x-ms-client-name"] = "UpdateMessageBody";
214214
$[threadsBase + "/runs/{runId}"].post.parameters[2].schema["x-ms-client-name"] = "UpdateRunBody";
215+
$[threadsBase + "/runs"].post.parameters[1]["x-ms-client-name"] = "CreateRunBody";
215216
$[threadsBase + "/runs/{runId}/submit_tool_outputs"].post.parameters[2].schema["x-ms-client-name"] = "SubmitToolOutputsToRunBody";
216217
217218
//

sdk/ai/azopenaiassistants/client.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/ai/azopenaiassistants/client_custom_files.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type GetFileContentOptions struct {
6767
//
6868
// - fileID - The ID of the file to retrieve.
6969
// - options - GetFileContentOptions contains the optional parameters for the Client.GetFileContent method.
70-
func (client *Client) GetFileContent(ctx context.Context, fileID string, options *GetFileContentOptions) (GetFileContentResponse, error) {
70+
func (client *Client) GetFileContent(ctx context.Context, fileID string, _ *GetFileContentOptions) (GetFileContentResponse, error) {
7171
var err error
7272

7373
req, err := func() (*policy.Request, error) {

sdk/ai/azopenaiassistants/client_custom_pagers.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
// NewListMessagesPager returns a pager for messages associated with a thread.
1616
func (c *Client) NewListMessagesPager(threadID string, options *ListMessagesOptions) *runtime.Pager[ListMessagesResponse] {
17-
nextPageFn := func(client *Client, ctx context.Context, opts *ListMessagesOptions) (ListMessagesResponse, error) {
17+
nextPageFn := func(ctx context.Context, opts *ListMessagesOptions) (ListMessagesResponse, error) {
1818
return c.internalListMessages(ctx, threadID, opts)
1919
}
2020
return newOpenAIPager(c, nextPageFn, options)
@@ -26,7 +26,7 @@ func (r ListMessagesResponse) hasMore() bool { return *r.HasMore }
2626

2727
// NewListAssistantsPager returns a pager for assistants.
2828
func (c *Client) NewListAssistantsPager(options *ListAssistantsOptions) *runtime.Pager[ListAssistantsResponse] {
29-
nextPageFn := func(client *Client, ctx context.Context, opts *ListAssistantsOptions) (ListAssistantsResponse, error) {
29+
nextPageFn := func(ctx context.Context, opts *ListAssistantsOptions) (ListAssistantsResponse, error) {
3030
return c.internalListAssistants(ctx, opts)
3131
}
3232
return newOpenAIPager(c, nextPageFn, options)
@@ -38,7 +38,7 @@ func (r ListAssistantsResponse) hasMore() bool { return *r.HasMore }
3838

3939
// NewListRunStepsPager returns a pager for a Run's steps.
4040
func (c *Client) NewListRunStepsPager(threadID string, runID string, options *ListRunStepsOptions) *runtime.Pager[ListRunStepsResponse] {
41-
nextPageFn := func(client *Client, ctx context.Context, opts *ListRunStepsOptions) (ListRunStepsResponse, error) {
41+
nextPageFn := func(ctx context.Context, opts *ListRunStepsOptions) (ListRunStepsResponse, error) {
4242
return c.internalListRunSteps(ctx, threadID, runID, opts)
4343
}
4444

@@ -51,7 +51,7 @@ func (r ListRunStepsResponse) hasMore() bool { return *r.HasMore }
5151

5252
// NewListRunsPager returns a pager for a Thread's runs.
5353
func (c *Client) NewListRunsPager(threadID string, options *ListRunsOptions) *runtime.Pager[ListRunsResponse] {
54-
nextPageFn := func(client *Client, ctx context.Context, opts *ListRunsOptions) (ListRunsResponse, error) {
54+
nextPageFn := func(ctx context.Context, opts *ListRunsOptions) (ListRunsResponse, error) {
5555
return c.internalListRuns(ctx, threadID, opts)
5656
}
5757
return newOpenAIPager(c, nextPageFn, options)
@@ -63,7 +63,7 @@ func (r ListRunsResponse) hasMore() bool { return *r.HasMore }
6363

6464
// NewListVectorStoresPager returns a pager for a VectorStores.
6565
func (c *Client) NewListVectorStoresPager(options *ListVectorStoresOptions) *runtime.Pager[ListVectorStoresResponse] {
66-
nextPageFn := func(client *Client, ctx context.Context, opts *ListVectorStoresOptions) (ListVectorStoresResponse, error) {
66+
nextPageFn := func(ctx context.Context, opts *ListVectorStoresOptions) (ListVectorStoresResponse, error) {
6767
return c.internalListVectorStores(ctx, opts)
6868
}
6969
return newOpenAIPager(c, nextPageFn, options)
@@ -75,7 +75,7 @@ func (r ListVectorStoresResponse) hasMore() bool { return *r.HasMore
7575

7676
// NewListVectorStoreFilesPager returns a pager for a vector store files.
7777
func (c *Client) NewListVectorStoreFilesPager(vectorStoreID string, options *ListVectorStoreFilesOptions) *runtime.Pager[ListVectorStoreFilesResponse] {
78-
nextPageFn := func(client *Client, ctx context.Context, opts *ListVectorStoreFilesOptions) (ListVectorStoreFilesResponse, error) {
78+
nextPageFn := func(ctx context.Context, opts *ListVectorStoreFilesOptions) (ListVectorStoreFilesResponse, error) {
7979
return c.internalListVectorStoreFiles(ctx, vectorStoreID, opts)
8080
}
8181
return newOpenAIPager(c, nextPageFn, options)
@@ -87,7 +87,7 @@ func (r ListVectorStoreFilesResponse) hasMore() bool { return *r.Has
8787

8888
// NewListVectorStoreFileBatchFilesPager returns a pager for vector store files in a batch.
8989
func (c *Client) NewListVectorStoreFileBatchFilesPager(vectorStoreID string, batchID string, options *ListVectorStoreFileBatchFilesOptions) *runtime.Pager[ListVectorStoreFileBatchFilesResponse] {
90-
nextPageFn := func(client *Client, ctx context.Context, opts *ListVectorStoreFileBatchFilesOptions) (ListVectorStoreFileBatchFilesResponse, error) {
90+
nextPageFn := func(ctx context.Context, opts *ListVectorStoreFileBatchFilesOptions) (ListVectorStoreFileBatchFilesResponse, error) {
9191
return c.internalListVectorStoreFileBatchFiles(ctx, vectorStoreID, batchID, opts)
9292
}
9393
return newOpenAIPager(c, nextPageFn, options)
@@ -112,7 +112,7 @@ func newOpenAIPager[ResponseT respType, OptionsT any, POptionsT interface {
112112
updateAfter(after *string)
113113
}](
114114
client *Client,
115-
nextPageFn func(client *Client, ctx context.Context, opts POptionsT) (ResponseT, error),
115+
nextPageFn func(ctx context.Context, opts POptionsT) (ResponseT, error),
116116
options POptionsT) *runtime.Pager[ResponseT] {
117117
var lastID *string
118118

@@ -137,7 +137,7 @@ func newOpenAIPager[ResponseT respType, OptionsT any, POptionsT interface {
137137

138138
first = false
139139

140-
resp, err := nextPageFn(client, ctx, newOptions)
140+
resp, err := nextPageFn(ctx, newOptions)
141141

142142
if err != nil {
143143
var zero ResponseT
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
//go:build go1.18
2+
// +build go1.18
3+
4+
// Copyright (c) Microsoft Corporation. All rights reserved.
5+
// Licensed under the MIT License. See License.txt in the project root for license information.
6+
7+
package azopenaiassistants
8+
9+
import (
10+
"context"
11+
"net/http"
12+
13+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
14+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
15+
)
16+
17+
// API Reference: https://platform.openai.com/docs/api-reference/assistants-streaming
18+
19+
// CreateThreadAndRunStreamResponse contains the response from [CreateThreadAndRunStream].
20+
type CreateThreadAndRunStreamResponse struct {
21+
// Stream can be used to stream response events.
22+
Stream *EventReader[StreamEvent]
23+
}
24+
25+
// CreateThreadAndRunStreamOptions contains the optional parameters for [CreateThreadAndRunStream].
26+
type CreateThreadAndRunStreamOptions struct {
27+
// for future expansion
28+
}
29+
30+
// CreateThreadAndRunStream is the equivalent of [CreateThreadAndRun], but it returns a stream of responses instead of a
31+
// single response.
32+
func (client *Client) CreateThreadAndRunStream(ctx context.Context, body CreateAndRunThreadBody, _ *CreateThreadAndRunStreamOptions) (CreateThreadAndRunStreamResponse, error) {
33+
// enable streaming.
34+
// https: //platform.openai.com/docs/api-reference/runs/createThreadAndRun#runs-createthreadandrun-stream
35+
body.stream = to.Ptr(true)
36+
37+
var err error
38+
39+
req, err := client.createThreadAndRunCreateRequest(ctx, body, nil)
40+
if err != nil {
41+
return CreateThreadAndRunStreamResponse{}, err
42+
}
43+
44+
runtime.SkipBodyDownload(req) // we'll handle this.
45+
46+
httpResp, err := client.internal.Pipeline().Do(req)
47+
if err != nil {
48+
return CreateThreadAndRunStreamResponse{}, err
49+
}
50+
if !runtime.HasStatusCode(httpResp, http.StatusOK) {
51+
err = runtime.NewResponseError(httpResp)
52+
return CreateThreadAndRunStreamResponse{}, err
53+
}
54+
55+
return CreateThreadAndRunStreamResponse{
56+
Stream: newEventReader(httpResp.Body, unmarshalStreamEvent),
57+
}, err
58+
}
59+
60+
// API Reference: https://platform.openai.com/docs/api-reference/runs/createRun
61+
62+
// CreateRunStreamResponse contains the response from [CreateRunStream].
63+
type CreateRunStreamResponse struct {
64+
// Stream can be used to stream response events.
65+
Stream *EventReader[StreamEvent]
66+
}
67+
68+
// CreateRunStreamOptions contains the optional parameters for [CreateRunStream].
69+
type CreateRunStreamOptions struct {
70+
// for future expansion
71+
}
72+
73+
// CreateRunStream is the equivalent of [CreateRun], but it returns a stream of responses instead of a
74+
// single response.
75+
func (client *Client) CreateRunStream(ctx context.Context, threadID string, body CreateRunBody, _ *CreateRunStreamOptions) (CreateRunStreamResponse, error) {
76+
var err error
77+
body.stream = to.Ptr(true)
78+
79+
req, err := client.createRunCreateRequest(ctx, threadID, body, nil)
80+
if err != nil {
81+
return CreateRunStreamResponse{}, err
82+
}
83+
84+
runtime.SkipBodyDownload(req) // we'll handle this.
85+
86+
httpResp, err := client.internal.Pipeline().Do(req)
87+
if err != nil {
88+
return CreateRunStreamResponse{}, err
89+
}
90+
if !runtime.HasStatusCode(httpResp, http.StatusOK) {
91+
err = runtime.NewResponseError(httpResp)
92+
return CreateRunStreamResponse{}, err
93+
}
94+
95+
return CreateRunStreamResponse{
96+
Stream: newEventReader[StreamEvent](httpResp.Body, unmarshalStreamEvent),
97+
}, err
98+
}
99+
100+
// SubmitToolOutputsToRunStreamResponse contains the response from [SubmitToolOutputsToRunStream].
101+
type SubmitToolOutputsToRunStreamResponse struct {
102+
// Stream can be used to stream response events.
103+
Stream *EventReader[StreamEvent]
104+
}
105+
106+
// SubmitToolOutputsToRunStreamOptions contains the optional parameters for [SubmitToolOutputsToRunStream].
107+
type SubmitToolOutputsToRunStreamOptions struct {
108+
}
109+
110+
// SubmitToolOutputsToRunStream is the equivalent of [SubmitToolOutputsToRun], but it returns a stream of responses instead of a
111+
// single response.
112+
func (client *Client) SubmitToolOutputsToRunStream(ctx context.Context, threadID string, runID string, body SubmitToolOutputsToRunBody, options *SubmitToolOutputsToRunOptions) (SubmitToolOutputsToRunStreamResponse, error) {
113+
var err error
114+
115+
body.stream = to.Ptr(true)
116+
117+
req, err := client.submitToolOutputsToRunCreateRequest(ctx, threadID, runID, body, options)
118+
if err != nil {
119+
return SubmitToolOutputsToRunStreamResponse{}, err
120+
}
121+
122+
runtime.SkipBodyDownload(req) // we'll handle this.
123+
124+
httpResp, err := client.internal.Pipeline().Do(req)
125+
if err != nil {
126+
return SubmitToolOutputsToRunStreamResponse{}, err
127+
}
128+
if !runtime.HasStatusCode(httpResp, http.StatusOK) {
129+
err = runtime.NewResponseError(httpResp)
130+
return SubmitToolOutputsToRunStreamResponse{}, err
131+
}
132+
133+
return SubmitToolOutputsToRunStreamResponse{
134+
Stream: newEventReader[StreamEvent](httpResp.Body, unmarshalStreamEvent),
135+
}, nil
136+
}

0 commit comments

Comments
 (0)