diff --git a/accumulator/batch.go b/accumulator/batch.go index 1a16de78..2d25247e 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -37,6 +37,8 @@ var ( // ErrInvalidEncoding is returned for any APMData that is encoded // with any encoding format ErrInvalidEncoding = errors.New("encoded data not supported") + // ErrNoData indicates that APMData.data is empty + ErrNoData = errors.New("no data") ) var ( @@ -170,7 +172,7 @@ func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error { // before adding any events then ErrBatchFull is returned. func (b *Batch) AddAgentData(apmData APMData) error { if len(apmData.Data) == 0 { - return nil + return ErrNoData } raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding) if err != nil { diff --git a/accumulator/batch_test.go b/accumulator/batch_test.go index 244d7e66..b33ce43a 100644 --- a/accumulator/batch_test.go +++ b/accumulator/batch_test.go @@ -48,6 +48,10 @@ func TestAdd(t *testing.T) { assert.ErrorIs(t, ErrBatchFull, b.AddLambdaData([]byte(`{"log":{}}`))) }) + t.Run("empty AddAgentData", func(t *testing.T) { + b := NewBatch(1, time.Hour) + assert.ErrorIs(t, ErrNoData, b.AddAgentData(APMData{})) + }) } func TestReset(t *testing.T) { diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 8a1ee983..ff030cfa 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -59,12 +59,19 @@ func (c *Client) ForwardApmData(ctx context.Context) error { c.logger.Debug("Invocation context cancelled, not processing any more agent data") return nil case data := <-c.AgentDataChannel: + if len(data.Data) == 0 { + c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo) + continue + } if err := c.forwardAgentData(ctx, data); err != nil { return err } - // Wait for metadata to be available, metadata will be available as soon as - // the first agent data is processed. - lambdaDataChan = c.LambdaDataChannel + if lambdaDataChan == nil { + // With the first successful request to c.forwardAgent Data() metadata should be + // available and processing data from c.LambdaDataChannel can start. + lambdaDataChan = c.LambdaDataChannel + c.logger.Debug("Assigned Lambda data channel") + } case data := <-lambdaDataChan: if err := c.forwardLambdaData(ctx, data); err != nil { return err diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 40c34310..01d834c3 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -136,18 +136,20 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ agentFlushed := r.URL.Query().Get("flushed") == "true" - agentData := accumulator.APMData{ - Data: rawBytes, - ContentEncoding: r.Header.Get("Content-Encoding"), - AgentInfo: r.UserAgent(), - } + if len(rawBytes) != 0 { + agentData := accumulator.APMData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + AgentInfo: r.UserAgent(), + } - if len(agentData.Data) != 0 { select { case c.AgentDataChannel <- agentData: default: c.logger.Warnf("Channel full: dropping a subset of agent data") } + } else { + c.logger.Debugf("Received empy request from '%s'", r.UserAgent()) } if agentFlushed {