Skip to content

Commit 7cf7559

Browse files
committed
fix tests
1 parent c08b81a commit 7cf7559

File tree

3 files changed

+42
-10
lines changed

3 files changed

+42
-10
lines changed

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func main() {
133133

134134
sugar.Debugf("Sending logs to timeseries database %s in AWS Region %s", cfg.databaseName, cfg.awsRegion)
135135

136-
timeStreamAdapter := newTimeStreamAdapter(sugar, cfg, nil, nil)
136+
timeStreamAdapter := newTimeStreamAdapter(sugar, cfg, newTimestreamQueryPaginator, nil, nil)
137137
if err := serve(ctx, sugar, cfg.listenAddr, timeStreamAdapter); err != nil {
138138
sugar.Errorw("Failed to listen", "addr", cfg.listenAddr, "err", err)
139139
os.Exit(1)

timestream.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,20 @@ type TimestreamWriteApi interface {
5151
WriteRecords(ctx context.Context, params *timestreamwrite.WriteRecordsInput, optFns ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error)
5252
}
5353

54+
type NewQueryPaginator func(client TimestreamQueryApi, params *timestreamquery.QueryInput, optFns ...func(*timestreamquery.QueryPaginatorOptions)) PaginatorApi
55+
56+
type PaginatorApi interface {
57+
HasMorePages() bool
58+
NextPage(ctx context.Context, optFns ...func(*timestreamquery.Options)) (*timestreamquery.QueryOutput, error)
59+
}
60+
5461
type TimeStreamAdapter struct {
5562
databaseName string
5663
logger *zap.SugaredLogger
5764
tableName string
5865
TimestreamQueryApi
5966
TimestreamWriteApi
67+
NewQueryPaginator
6068
}
6169

6270
type queryTask struct {
@@ -69,7 +77,11 @@ type writeTask struct {
6977
dimensions []writetypes.Dimension
7078
}
7179

72-
func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *adapterCfg, writeSvc *timestreamwrite.Client, querySvc *timestreamquery.Client) TimeStreamAdapter {
80+
func newTimestreamQueryPaginator(client TimestreamQueryApi, params *timestreamquery.QueryInput, optFns ...func(*timestreamquery.QueryPaginatorOptions)) PaginatorApi {
81+
return timestreamquery.NewQueryPaginator(client, params, optFns...)
82+
}
83+
84+
func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *adapterCfg, newQueryPaginator NewQueryPaginator, writeSvc *timestreamwrite.Client, querySvc *timestreamquery.Client) TimeStreamAdapter {
7385
tr := &http.Transport{
7486
ResponseHeaderTimeout: 20 * time.Second,
7587
// Using DefaultTransport values for other parameters: https://golang.org/pkg/net/http/#RoundTripper
@@ -105,6 +117,7 @@ func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *adapterCfg, writeSvc *
105117
return TimeStreamAdapter{
106118
TimestreamQueryApi: querySvc,
107119
TimestreamWriteApi: writeSvc,
120+
NewQueryPaginator: newQueryPaginator,
108121
databaseName: cfg.databaseName,
109122
logger: logger,
110123
tableName: cfg.tableName,
@@ -229,7 +242,7 @@ func (t TimeStreamAdapter) runReadRequestQuery(ctx context.Context, q *prompb.Qu
229242
}
230243

231244
var timeSeries []*prompb.TimeSeries
232-
paginator := timestreamquery.NewQueryPaginator(t, &timestreamquery.QueryInput{
245+
paginator := t.NewQueryPaginator(t, &timestreamquery.QueryInput{
233246
QueryString: aws.String(task.query),
234247
})
235248

timestream_test.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ var (
3939
timeStreamAdapter = &TimeStreamAdapter{
4040
TimestreamQueryApi: &TimeStreamQueryMock{},
4141
TimestreamWriteApi: &TimeStreamWriterMock{},
42+
NewQueryPaginator: NewPaginatorMock,
4243
databaseName: "mockDatabase",
4344
logger: zap.NewNop().Sugar(),
4445
tableName: "mockTable",
@@ -176,6 +177,31 @@ type TimeStreamWriterMock struct{}
176177

177178
type TimeStreamQueryMock struct{}
178179

180+
type PaginatorMock struct {
181+
callCount int
182+
}
183+
184+
func NewPaginatorMock(client TimestreamQueryApi, params *timestreamquery.QueryInput, optFns ...func(*timestreamquery.QueryPaginatorOptions)) PaginatorApi {
185+
return &PaginatorMock{}
186+
}
187+
188+
func (p *PaginatorMock) HasMorePages() bool {
189+
p.callCount++
190+
// Return true for the first two calls, then false.
191+
return p.callCount <= 2
192+
}
193+
194+
func (p *PaginatorMock) NextPage(ctx context.Context, optFns ...func(*timestreamquery.Options)) (*timestreamquery.QueryOutput, error) {
195+
switch p.callCount {
196+
case 1:
197+
return queryOutput1, nil
198+
case 2:
199+
return queryOutput2, nil
200+
default:
201+
return nil, errors.New("no more pages")
202+
}
203+
}
204+
179205
func (t *TimeStreamWriterMock) WriteRecords(ctx context.Context, input *timestreamwrite.WriteRecordsInput, optFns ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) {
180206
for _, i := range input.Records {
181207
if *i.MeasureName == "sample_name_error" {
@@ -195,13 +221,6 @@ func (t *TimeStreamQueryMock) Query(ctx context.Context, input *timestreamquery.
195221
return nil, nil
196222
}
197223

198-
func (t *TimeStreamQueryMock) QueryPages(ctx context.Context, input *timestreamquery.QueryInput, handler func(*timestreamquery.QueryOutput, bool) bool, optFns ...func(*timestreamquery.Options)) error {
199-
handler(queryOutput0, false)
200-
handler(queryOutput1, false)
201-
handler(queryOutput2, true)
202-
return nil
203-
}
204-
205224
func TestTimeSteamAdapter_readLabels(t *testing.T) {
206225
type args struct {
207226
labels []prompb.Label

0 commit comments

Comments
 (0)