Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/api_gomux.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ func (api *API) id(next http.Handler) http.Handler {
// @Param query query string true "Selector"
// @Param labels query string false "Comma-separated list of labels to return"
// @Param distinct query boolean false "Reduce results to one per distinct value"
// @Param limit query integer false "Maximum number of results to return" (0 for no limit)
// @Success 200 {array} etre.Entity "OK"
// @Failure 400,404 {object} etre.Error
// @Router /entities/:type [get]
Expand Down Expand Up @@ -574,6 +575,14 @@ func (api *API) getEntitiesHandler(w http.ResponseWriter, r *http.Request) {
api.readError(rc, w, ErrInvalidQuery.New("distinct requires only 1 return label but %d specified: %v", len(f.ReturnLabels), f.ReturnLabels))
return
}
if v, ok := qv["limit"]; ok {
limit, err := strconv.ParseInt(v[0], 10, 64)
if err != nil || limit < 0 {
api.readError(rc, w, ErrInvalidQuery.New("invalid limit: %s", v[0]))
return
}
f.Limit = limit
}

// Query data store (instrumented)
rc.inst.Start("db")
Expand Down
82 changes: 82 additions & 0 deletions api/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,88 @@ func TestQueryErrorsTimeout(t *testing.T) {
assert.Equal(t, expectMetrics, server.metricsrec.Called)
}

func TestQueryLimit(t *testing.T) {
// Test that GET /entities/:type?query=Q&limit=N caps the result set
var gotFilter etre.QueryFilter
store := mock.EntityStore{
StreamEntitiesFunc: func(ctx context.Context, entityType string, q query.Query, f etre.QueryFilter) <-chan entity.EntityResult {
gotFilter = f
return mock.DoStreamEntities(testEntitiesWithObjectIDs, nil)
},
}
server := setup(t, defaultConfig, store)
defer server.ts.Close()

etreurl := server.url + etre.API_ROOT + "/entities/" + entityType +
"?query=" + url.QueryEscape("a=b") + "&limit=2"

var gotEntities []etre.Entity
statusCode, err := test.MakeHTTPRequest("GET", etreurl, nil, &gotEntities)
require.NoError(t, err)

assert.Equal(t, http.StatusOK, statusCode)
assert.Equal(t, int64(2), gotFilter.Limit)
}

func TestQueryLimitZero(t *testing.T) {
// Test that limit=0 means no limit (backward compat with clients that don't set limit)
var gotFilter etre.QueryFilter
store := mock.EntityStore{
StreamEntitiesFunc: func(ctx context.Context, entityType string, q query.Query, f etre.QueryFilter) <-chan entity.EntityResult {
gotFilter = f
return mock.DoStreamEntities(testEntitiesWithObjectIDs, nil)
},
}
server := setup(t, defaultConfig, store)
defer server.ts.Close()

etreurl := server.url + etre.API_ROOT + "/entities/" + entityType +
"?query=" + url.QueryEscape("a=b") + "&limit=0"

var gotEntities []etre.Entity
statusCode, err := test.MakeHTTPRequest("GET", etreurl, nil, &gotEntities)
require.NoError(t, err)

assert.Equal(t, http.StatusOK, statusCode)
assert.Equal(t, int64(0), gotFilter.Limit)
}

func TestQueryErrorsInvalidLimitNegative(t *testing.T) {
// Test that a negative limit returns HTTP 400 with an invalid-query error
store := mock.EntityStore{}
server := setup(t, defaultConfig, store)
defer server.ts.Close()

etreurl := server.url + etre.API_ROOT + "/entities/" + entityType +
"?query=" + url.QueryEscape("a=b") + "&limit=-1"

var gotError etre.Error
statusCode, err := test.MakeHTTPRequest("GET", etreurl, nil, &gotError)
require.NoError(t, err)

assert.Equal(t, http.StatusBadRequest, statusCode)
assert.Equal(t, "invalid-query", gotError.Type)
assert.Contains(t, gotError.Message, "invalid limit")
}

func TestQueryErrorsInvalidLimitNonNumeric(t *testing.T) {
// Test that a non-numeric limit returns HTTP 400 with an invalid-query error
store := mock.EntityStore{}
server := setup(t, defaultConfig, store)
defer server.ts.Close()

etreurl := server.url + etre.API_ROOT + "/entities/" + entityType +
"?query=" + url.QueryEscape("a=b") + "&limit=abc"

var gotError etre.Error
statusCode, err := test.MakeHTTPRequest("GET", etreurl, nil, &gotError)
require.NoError(t, err)

assert.Equal(t, http.StatusBadRequest, statusCode)
assert.Equal(t, "invalid-query", gotError.Type)
assert.Contains(t, gotError.Message, "invalid limit")
}

func TestResponseCompression(t *testing.T) {
// Stand up the server
store := mock.EntityStore{
Expand Down
50 changes: 50 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,56 @@ func TestQueryUnhandledError(t *testing.T) {
assert.Nil(t, got)
}

func TestQueryLimitFilter(t *testing.T) {
// Test that QueryFilter.Limit is serialized as a query parameter
setup(t)

// Set global vars used by httptest.Server
respData = []etre.Entity{
{
"_id": "abc",
"hostname": "localhost",
},
}

ec := etre.NewEntityClient("node", ts.URL, httpClient)

ctx := testContext()
got, err := ec.Query(ctx, "x=y", etre.QueryFilter{Limit: 5})
require.NoError(t, err)

assert.Equal(t, "GET", gotMethod)
assert.Equal(t, etre.API_ROOT+"/entities/node", gotPath)
assert.Contains(t, gotQuery, "query=x=y")
assert.Contains(t, gotQuery, "limit=5")
assert.Equal(t, got, respData)
}

func TestQueryLimitFilterZeroNotSent(t *testing.T) {
// Test that QueryFilter.Limit=0 does not add a limit query parameter
setup(t)

// Set global vars used by httptest.Server
respData = []etre.Entity{
{
"_id": "abc",
"hostname": "localhost",
},
}

ec := etre.NewEntityClient("node", ts.URL, httpClient)

ctx := testContext()
got, err := ec.Query(ctx, "x=y", etre.QueryFilter{Limit: 0})
require.NoError(t, err)

assert.Equal(t, "GET", gotMethod)
assert.Equal(t, etre.API_ROOT+"/entities/node", gotPath)
assert.Equal(t, "query=x=y", gotQuery)
assert.NotContains(t, gotQuery, "limit")
assert.Equal(t, got, respData)
}

// //////////////////////////////////////////////////////////////////////////
// Get
// //////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 8 additions & 1 deletion entity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func (s store) StreamEntities(ctx context.Context, entityType string, q query.Qu
s.writeErrToChannel(ctx, ch, s.dbError(ctx, err, "db-query-distinct"))
return
}
// Distinct doesn't return a cursor, so we just have to loop and send the results
// Distinct doesn't return a cursor, so we just have to loop and send the results.
// MongoDB's distinct command doesn't support a limit option, so we apply it here.
if f.Limit > 0 && int64(len(values)) > f.Limit {
values = values[:f.Limit]
}
for _, v := range values {
s.writeEntityToChannel(ctx, ch, etre.Entity{f.ReturnLabels[0]: v})
}
Expand All @@ -148,6 +152,9 @@ func (s store) StreamEntities(ctx context.Context, entityType string, q query.Qu

// Run the query
opts := options.Find().SetProjection(p).SetBatchSize(int32(s.config.BatchSize))
if f.Limit > 0 {
opts.SetLimit(f.Limit)
}
cursor, err := c.Find(ctx, Filter(q), opts)
if err != nil {
s.writeErrToChannel(ctx, ch, s.dbError(ctx, err, "db-query"))
Expand Down
80 changes: 80 additions & 0 deletions entity/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,86 @@ func TestDeleteLabel(t *testing.T) {
assert.Equal(t, expectEvent, gotEvents)
}

func TestStreamEntitiesLimit(t *testing.T) {
// Test that Limit caps the number of entities returned. There are 3 test
// nodes, so limit=2 should return only 2.
store := setup(t, &mock.CDCStore{})
q, err := query.Translate("y") // all test nodes have label "y"
require.NoError(t, err)

f := etre.QueryFilter{
Limit: 2,
}
got, err := readStream(store.StreamEntities(context.Background(), entityType, q, f))
require.NoError(t, err)
assert.Len(t, got, 2)
}

func TestStreamEntitiesLimitZeroReturnsAll(t *testing.T) {
// Test that Limit=0 (the default) returns all entities.
store := setup(t, &mock.CDCStore{})
q, err := query.Translate("y") // all test nodes have label "y"
require.NoError(t, err)

f := etre.QueryFilter{
Limit: 0,
}
got, err := readStream(store.StreamEntities(context.Background(), entityType, q, f))
require.NoError(t, err)
assert.Len(t, got, 3) // all 3 test nodes
}

func TestStreamEntitiesLimitGreaterThanResults(t *testing.T) {
// Test that a limit larger than the result set returns all results.
store := setup(t, &mock.CDCStore{})
q, err := query.Translate("y") // all test nodes have label "y"
require.NoError(t, err)

f := etre.QueryFilter{
Limit: 100,
}
got, err := readStream(store.StreamEntities(context.Background(), entityType, q, f))
require.NoError(t, err)
assert.Len(t, got, 3) // all 3 test nodes
}

func TestStreamEntitiesLimitWithReturnLabels(t *testing.T) {
// Test that Limit works together with ReturnLabels.
store := setup(t, &mock.CDCStore{})
q, err := query.Translate("y") // all test nodes have label "y"
require.NoError(t, err)

f := etre.QueryFilter{
ReturnLabels: []string{"x"},
Limit: 1,
}
got, err := readStream(store.StreamEntities(context.Background(), entityType, q, f))
require.NoError(t, err)
require.Len(t, got, 1)
// Should only have the "x" label
_, hasX := got[0]["x"]
assert.True(t, hasX)
_, hasY := got[0]["y"]
assert.False(t, hasY)
}

func TestStreamEntitiesLimitDistinct(t *testing.T) {
// Test that Limit works with the distinct optimization. There are 2 distinct
// values of "y" (a, b), so limit=1 should return only 1.
store := setup(t, &mock.CDCStore{})
q, err := query.Translate("y") // all test nodes have label "y"
require.NoError(t, err)

f := etre.QueryFilter{
ReturnLabels: []string{"y"},
Distinct: true,
Limit: 1,
}
got, err := readStream(store.StreamEntities(context.Background(), entityType, q, f))
require.NoError(t, err)
assert.Len(t, got, 1)
}

func readStream(ch <-chan entity.EntityResult) ([]etre.Entity, error) {
entities := []etre.Entity{}
for r := range ch {
Expand Down
4 changes: 4 additions & 0 deletions entity_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -164,6 +165,9 @@ func (c entityClient) Query(ctx context.Context, query string, filter QueryFilte
if filter.Distinct {
path += "&distinct"
}
if filter.Limit > 0 {
path += "&limit=" + strconv.FormatInt(filter.Limit, 10)
}

var entities []Entity
err := c.apiRetry(func() (bool, error) {
Expand Down
6 changes: 4 additions & 2 deletions es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ type Options struct {
Timeout string `arg:"env:ES_TIMEOUT" yaml:"timeout"`
Trace string `arg:"env:ES_TRACE" yaml:"trace"`
Update bool
Unique bool `arg:"-u"`
Version bool `arg:"-v"`
Limit int64 `arg:"--limit"`
Unique bool `arg:"-u"`
Version bool `arg:"-v"`
Watch bool
}

Expand Down Expand Up @@ -123,6 +124,7 @@ func Help() {
" --insert Insert one entity\n"+
" --json Print entities as JSON\n"+
" --labels Print label: before value\n"+
" --limit Limit the number of entities returned (default: 0, no limit)\n"+
" --old Print old values on --update\n"+
" --query-timeout Query timeout on server (default: %s)\n"+
" --retry Retry count on network or API error (default: %d)\n"+
Expand Down
6 changes: 6 additions & 0 deletions es/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,15 @@ func Run(ctx app.Context) {
os.Exit(1)
}

if ctx.Options.Limit < 0 {
fmt.Fprintf(os.Stderr, "--limit must be a non-negative integer, got %d\n", ctx.Options.Limit)
os.Exit(1)
}

f := etre.QueryFilter{
ReturnLabels: ctx.ReturnLabels,
Distinct: ctx.Options.Unique,
Limit: ctx.Options.Limit,
}

// Create a context with the queryTimeout
Expand Down
3 changes: 3 additions & 0 deletions etre.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ type QueryFilter struct {
// Distinct returns unique entities if ReturnLabels contains a single value.
// Etre returns an error if enabled and ReturnLabels has more than one value.
Distinct bool

// Limit caps the number of entities returned. Zero means no limit.
Limit int64
}

// WriteResult represents the result of a write operation (insert, update delete).
Expand Down
Loading