Skip to content

Commit 7b06b6d

Browse files
feat(api): add events streaming
1 parent 2c7e88f commit 7b06b6d

File tree

4 files changed

+65
-33
lines changed

4 files changed

+65
-33
lines changed

.stats.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
configured_endpoints: 106
2-
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/gitpod%2Fgitpod-2e9f8b8666b2fd4e346a3acbf81a2c82a6f3793e01bc146499708efaf0c250c5.yml
2+
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/gitpod%2Fgitpod-922f204ec36b8a84ae8f96e73923e92cb2044a14c6497d173f4b7110a090ac30.yml

event.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stainless-sdks/gitpod-go/internal/param"
1515
"github.com/stainless-sdks/gitpod-go/internal/requestconfig"
1616
"github.com/stainless-sdks/gitpod-go/option"
17+
"github.com/stainless-sdks/gitpod-go/packages/jsonl"
1718
)
1819

1920
// EventService contains methods and other services that help with interacting with
@@ -51,18 +52,22 @@ func (r *EventService) List(ctx context.Context, params EventListParams, opts ..
5152
}
5253

5354
// WatchEvents streams all requests events to the client
54-
func (r *EventService) Watch(ctx context.Context, params EventWatchParams, opts ...option.RequestOption) (res *EventWatchResponse, err error) {
55+
func (r *EventService) WatchStreaming(ctx context.Context, params EventWatchParams, opts ...option.RequestOption) (stream *jsonl.Stream[EventWatchResponse]) {
56+
var (
57+
raw *http.Response
58+
err error
59+
)
5560
if params.ConnectProtocolVersion.Present {
5661
opts = append(opts, option.WithHeader("Connect-Protocol-Version", fmt.Sprintf("%s", params.ConnectProtocolVersion)))
5762
}
5863
if params.ConnectTimeoutMs.Present {
5964
opts = append(opts, option.WithHeader("Connect-Timeout-Ms", fmt.Sprintf("%s", params.ConnectTimeoutMs)))
6065
}
6166
opts = append(r.Options[:], opts...)
62-
opts = append([]option.RequestOption{option.WithHeader("Accept", "application/connect+json")}, opts...)
67+
opts = append([]option.RequestOption{option.WithHeader("Accept", "application/jsonl")}, opts...)
6368
path := "gitpod.v1.EventService/WatchEvents"
64-
err = requestconfig.ExecuteNewRequest(ctx, http.MethodPost, path, params, &res, opts...)
65-
return
69+
err = requestconfig.ExecuteNewRequest(ctx, http.MethodPost, path, params, &raw, opts...)
70+
return jsonl.NewStream[EventWatchResponse](raw, err)
6671
}
6772

6873
type EventListResponse struct {

event_test.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,31 +42,3 @@ func TestEventListWithOptionalParams(t *testing.T) {
4242
t.Fatalf("err should be nil: %s", err.Error())
4343
}
4444
}
45-
46-
func TestEventWatchWithOptionalParams(t *testing.T) {
47-
baseURL := "http://localhost:4010"
48-
if envURL, ok := os.LookupEnv("TEST_API_BASE_URL"); ok {
49-
baseURL = envURL
50-
}
51-
if !testutil.CheckTestServer(t, baseURL) {
52-
return
53-
}
54-
client := gitpod.NewClient(
55-
option.WithBaseURL(baseURL),
56-
option.WithBearerToken("My Bearer Token"),
57-
)
58-
_, err := client.Events.Watch(context.TODO(), gitpod.EventWatchParams{
59-
Body: gitpod.EventWatchParamsBodyEnvironmentScopeProducesEventsForTheEnvironmentItselfAllTasksTaskExecutionsAndServicesAssociatedWithThatEnvironment{
60-
EnvironmentID: gitpod.F("environmentId"),
61-
},
62-
ConnectProtocolVersion: gitpod.F(gitpod.EventWatchParamsConnectProtocolVersion1),
63-
ConnectTimeoutMs: gitpod.F(0.000000),
64-
})
65-
if err != nil {
66-
var apierr *gitpod.Error
67-
if errors.As(err, &apierr) {
68-
t.Log(string(apierr.DumpRequest(true)))
69-
}
70-
t.Fatalf("err should be nil: %s", err.Error())
71-
}
72-
}

packages/jsonl/jsonl.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
2+
3+
package jsonl
4+
5+
import (
6+
"bufio"
7+
"encoding/json"
8+
"io"
9+
"net/http"
10+
)
11+
12+
type Stream[T any] struct {
13+
rc io.ReadCloser
14+
scn *bufio.Scanner
15+
cur T
16+
err error
17+
}
18+
19+
func NewStream[T any](res *http.Response, err error) *Stream[T] {
20+
if res == nil || res.Body == nil {
21+
return nil
22+
}
23+
24+
return &Stream[T]{
25+
rc: res.Body,
26+
scn: bufio.NewScanner(res.Body),
27+
err: err,
28+
}
29+
}
30+
31+
func (s *Stream[T]) Next() bool {
32+
if s.err != nil {
33+
return false
34+
}
35+
36+
if !s.scn.Scan() {
37+
return false
38+
}
39+
40+
line := s.scn.Bytes()
41+
s.err = json.Unmarshal(line, &s.cur)
42+
return s.err == nil
43+
}
44+
45+
func (s *Stream[T]) Current() T {
46+
return s.cur
47+
}
48+
49+
func (s *Stream[T]) Err() error {
50+
return s.err
51+
}
52+
53+
func (s *Stream[T]) Close() error {
54+
return s.rc.Close()
55+
}

0 commit comments

Comments
 (0)