Skip to content

Commit 1573a9e

Browse files
committed
feat: go is horrible
1 parent be129a2 commit 1573a9e

File tree

4 files changed

+160
-134
lines changed

4 files changed

+160
-134
lines changed

templates/go/api.mustache

Lines changed: 27 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ type config struct {
3333
headerParams map[string]string
3434
timeouts transport.RequestConfiguration
3535
36+
{{#isIngestionClient}}
37+
// -- ChunkedPush options
38+
waitForTasks bool
39+
batchSize int
40+
41+
// -- Iterable options
42+
maxRetries int
43+
timeout func(int) time.Duration
44+
aggregator func(any, error)
45+
{{/isIngestionClient}}
46+
3647
{{#isSearchClient}}
3748
// -- ChunkedBatch options
3849
waitForTasks bool
@@ -102,42 +113,6 @@ func WithConnectTimeout(timeout time.Duration) requestOption {
102113

103114
{{#isSearchClient}}
104115

105-
// --------- ChunkedBatch options ---------
106-
107-
type ChunkedBatchOption interface {
108-
RequestOption
109-
chunkedBatch()
110-
}
111-
112-
type chunkedBatchOption func(*config)
113-
114-
var (
115-
_ ChunkedBatchOption = (*chunkedBatchOption)(nil)
116-
_ ChunkedBatchOption = (*requestOption)(nil)
117-
)
118-
119-
func (c chunkedBatchOption) apply(conf *config) {
120-
c(conf)
121-
}
122-
123-
func (c chunkedBatchOption) chunkedBatch() {}
124-
125-
func (r requestOption) chunkedBatch() {}
126-
127-
// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
128-
func WithWaitForTasks(waitForTasks bool) chunkedBatchOption {
129-
return chunkedBatchOption(func(c *config) {
130-
c.waitForTasks = waitForTasks
131-
})
132-
}
133-
134-
// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
135-
func WithBatchSize(batchSize int) chunkedBatchOption {
136-
return chunkedBatchOption(func(c *config) {
137-
c.batchSize = batchSize
138-
})
139-
}
140-
141116
// --------- PartialUpdateObjects options ---------
142117

143118
type PartialUpdateObjectsOption interface {
@@ -206,49 +181,6 @@ func WithScopes(scopes []ScopeType) replaceAllObjectsOption {
206181
})
207182
}
208183

209-
// --------- Iterable options ---------.
210-
211-
type IterableOption interface {
212-
RequestOption
213-
iterable()
214-
}
215-
216-
type iterableOption func(*config)
217-
218-
var (
219-
_ IterableOption = (*iterableOption)(nil)
220-
_ IterableOption = (*requestOption)(nil)
221-
)
222-
223-
func (i iterableOption) apply(c *config) {
224-
i(c)
225-
}
226-
227-
func (r requestOption) iterable() {}
228-
229-
func (i iterableOption) iterable() {}
230-
231-
// WithMaxRetries the maximum number of retry. Default to 50.
232-
func WithMaxRetries(maxRetries int) iterableOption {
233-
return iterableOption(func(c *config) {
234-
c.maxRetries = maxRetries
235-
})
236-
}
237-
238-
// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000)
239-
func WithTimeout(timeout func(int) time.Duration) iterableOption {
240-
return iterableOption(func(c *config) {
241-
c.timeout = timeout
242-
})
243-
}
244-
245-
// WithAggregator the function to aggregate the results of the iterable.
246-
func WithAggregator(aggregator func(any, error)) iterableOption {
247-
return iterableOption(func(c *config) {
248-
c.aggregator = aggregator
249-
})
250-
}
251-
252184
// --------- WaitForKey options ---------.
253185

254186
type WaitForApiKeyOption interface {
@@ -295,6 +227,18 @@ func toRequestOptions[T RequestOption](opts []T) []RequestOption {
295227
return requestOpts
296228
}
297229

230+
func toIngestionRequestOptions(opts []RequestOption) []ingestion.RequestOption {
231+
requestOpts := make([]ingestion.RequestOption, 0, len(opts))
232+
233+
for _, opt := range opts {
234+
if opt, ok := opt.(ingestion.RequestOption); ok {
235+
requestOpts = append(requestOpts, opt)
236+
}
237+
}
238+
239+
return requestOpts
240+
}
241+
298242
func toIterableOptions(opts []ChunkedBatchOption) []IterableOption {
299243
iterableOpts := make([]IterableOption, 0, len(opts))
300244
@@ -571,9 +515,13 @@ func (c *APIClient) {{nickname}}({{#hasParams}}r {{#structPrefix}}{{&classname}}
571515
{{/operations}}
572516

573517
{{#isSearchClient}}
518+
{{> helpers}}
519+
574520
{{> search_helpers}}
575521
{{/isSearchClient}}
576522

577523
{{#isIngestionClient}}
524+
{{> helpers}}
525+
578526
{{> ingestion_helpers}}
579527
{{/isIngestionClient}}

templates/go/helpers.mustache

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// --------- ChunkedBatch options ---------
2+
3+
type ChunkedBatchOption interface {
4+
RequestOption
5+
chunkedBatch()
6+
}
7+
8+
type chunkedBatchOption func(*config)
9+
10+
var (
11+
_ ChunkedBatchOption = (*chunkedBatchOption)(nil)
12+
_ ChunkedBatchOption = (*requestOption)(nil)
13+
)
14+
15+
func (c chunkedBatchOption) apply(conf *config) {
16+
c(conf)
17+
}
18+
19+
func (c chunkedBatchOption) chunkedBatch() {}
20+
21+
func (r requestOption) chunkedBatch() {}
22+
23+
// WithWaitForTasks whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
24+
func WithWaitForTasks(waitForTasks bool) chunkedBatchOption {
25+
return chunkedBatchOption(func(c *config) {
26+
c.waitForTasks = waitForTasks
27+
})
28+
}
29+
30+
// WithBatchSize the size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
31+
func WithBatchSize(batchSize int) chunkedBatchOption {
32+
return chunkedBatchOption(func(c *config) {
33+
c.batchSize = batchSize
34+
})
35+
}
36+
37+
// --------- Iterable options ---------.
38+
39+
type IterableOption interface {
40+
RequestOption
41+
iterable()
42+
}
43+
44+
type iterableOption func(*config)
45+
46+
var (
47+
_ IterableOption = (*iterableOption)(nil)
48+
_ IterableOption = (*requestOption)(nil)
49+
)
50+
51+
func (i iterableOption) apply(c *config) {
52+
i(c)
53+
}
54+
55+
func (r requestOption) iterable() {}
56+
57+
func (i iterableOption) iterable() {}
58+
59+
// WithMaxRetries the maximum number of retry. Default to 50.
60+
func WithMaxRetries(maxRetries int) iterableOption {
61+
return iterableOption(func(c *config) {
62+
c.maxRetries = maxRetries
63+
})
64+
}
65+
66+
// WithTimeout he function to decide how long to wait between retries. Default to min(retryCount * 200, 5000)
67+
func WithTimeout(timeout func(int) time.Duration) iterableOption {
68+
return iterableOption(func(c *config) {
69+
c.timeout = timeout
70+
})
71+
}
72+
73+
// WithAggregator the function to aggregate the results of the iterable.
74+
func WithAggregator(aggregator func(any, error)) iterableOption {
75+
return iterableOption(func(c *config) {
76+
c.aggregator = aggregator
77+
})
78+
}
79+
80+
func CreateIterable[T any](execute func(*T, error) (*T, error), validate func(*T, error) (bool, error), opts ...IterableOption) (*T, error) {
81+
conf := config{
82+
headerParams: map[string]string{},
83+
maxRetries: -1,
84+
timeout: func(count int) time.Duration {
85+
return 0 * time.Millisecond
86+
},
87+
}
88+
89+
for _, opt := range opts {
90+
opt.apply(&conf)
91+
}
92+
93+
var executor func(*T, error) (*T, error)
94+
95+
retryCount := 0
96+
97+
executor = func(previousResponse *T, previousError error) (*T, error) {
98+
response, responseErr := execute(previousResponse, previousError)
99+
100+
retryCount++
101+
102+
if conf.aggregator != nil {
103+
conf.aggregator(response, responseErr)
104+
}
105+
106+
canStop, err := validate(response, responseErr)
107+
if canStop || err != nil {
108+
return response, err
109+
}
110+
111+
if conf.maxRetries >= 0 && retryCount >= conf.maxRetries {
112+
return nil, errs.NewWaitError(fmt.Sprintf("The maximum number of retries exceeded. (%d/%d)", retryCount, conf.maxRetries))
113+
}
114+
115+
time.Sleep(conf.timeout(retryCount))
116+
117+
return executor(response, responseErr)
118+
}
119+
120+
return executor(nil, nil)
121+
}

templates/go/ingestion_helpers.mustache

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ ChunkedPush Chunks the given `objects` list in subset of 1000 elements max in or
66
@param action Action - The action to perform on the objects.
77
@param referenceIndexName *string - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).
88
@param opts ...ChunkedBatchOption - Optional parameters for the request.
9-
@return []ingestion.WatchResponse - List of push responses.
9+
@return []WatchResponse - List of push responses.
1010
@return error - Error if any.
1111
*/
12-
func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...ChunkedBatchOption) ([]ingestion.WatchResponse, error) {
12+
func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, action Action, referenceIndexName *string, opts ...RequestOption) ([]WatchResponse, error) {
1313
conf := config{
1414
headerParams: map[string]string{},
1515
waitForTasks: false,
@@ -21,13 +21,13 @@ func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, acti
2121
}
2222

2323
records := make([]map[string]any, 0, len(objects)%conf.batchSize)
24-
responses := make([]ingestion.WatchResponse, 0, len(objects)%conf.batchSize)
24+
responses := make([]WatchResponse, 0, len(objects)%conf.batchSize)
2525

2626
for i, obj := range objects {
2727
records = append(records, obj)
2828
2929
if len(records) == conf.batchSize || i == len(objects)-1 {
30-
pushRecords := make([]ingestion.PushTaskRecords, 0, len(records))
30+
pushRecords := make([]PushTaskRecords, 0, len(records))
3131
3232
rawRecords, err := json.Marshal(records)
3333
if err != nil {
@@ -36,13 +36,13 @@ func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, acti
3636

3737
err = json.Unmarshal(rawRecords, &pushRecords)
3838
if err != nil {
39-
return nil, reportError("unable to unmarshal the given `objects` to an `[]ingestion.PushTaskRecords` payload: %w", err)
39+
return nil, reportError("unable to unmarshal the given `objects` to an `[]PushTaskRecords` payload: %w", err)
4040
}
4141

4242
request := c.NewApiPushRequest(
4343
indexName,
44-
ingestion.NewEmptyPushTaskPayload().
45-
SetAction(ingestion.Action(action)).
44+
NewEmptyPushTaskPayload().
45+
SetAction(action).
4646
SetRecords(pushRecords),
4747
)
4848

@@ -63,14 +63,14 @@ func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, acti
6363
if conf.waitForTasks {
6464
for _, resp := range responses {
6565
_, err := CreateIterable( //nolint:wrapcheck
66-
func(*ingestion.Event, error) (*ingestion.Event, error) {
66+
func(*Event, error) (*Event, error) {
6767
if resp.EventID == nil {
6868
return nil, reportError("received unexpected response from the push endpoint, eventID must not be undefined")
6969
}
7070

7171
return c.GetEvent(c.NewApiGetEventRequest(resp.RunID, *resp.EventID))
7272
},
73-
func(response *ingestion.Event, err error) (bool, error) {
73+
func(response *Event, err error) (bool, error) {
7474
var apiErr *APIError
7575
if errors.As(err, &apiErr) {
7676
return apiErr.Status != 404, nil
@@ -87,4 +87,4 @@ func (c *APIClient) ChunkedPush(indexName string, objects []map[string]any, acti
8787
}
8888

8989
return responses, nil
90-
}
90+
}

0 commit comments

Comments
 (0)