Skip to content
4 changes: 3 additions & 1 deletion accumulator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
// ErrInvalidEncoding is returned for any APMData that is encoded
// with any encoding format
ErrInvalidEncoding = errors.New("encoded data not supported")
// ErrNoData indicates that APMData.data is empty
ErrNoData = errors.New("no data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this, no data is not an error for the batch and I don't like returning no data errors. Now that we have already handled no data case in ForwardAPMData, can we remove this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your suggestion to return for thelen(apmData.Data) == 0 case in AddAgentData()?
Just because there is a check in ForwardAPMData() does not prevent other users of AddAgentData() running into this issue.

Copy link
Contributor

@lahsivjar lahsivjar Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that for the batch having an empty data is not an error. The batch can just not add data in this case. If we want to protect against edge cases we should do it at the source (like how we do in ForwardAPMData).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we want to be more specific in handling the behaviour, we could introduce another method in the batch to check the metadata status, something like bool HasMetadata() and incorporate it where we require such a check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still consider the use case len(apmData.Data) == 0 in AddAgentData() dangerous even if the only caller atm ™️ is using a check. What makes this package and its API safe to use, if something else calls AddAgentData()?
Even if no data is considered a non error case, what should it be considered instead and how should it be treated? With the current behavior it seems to run into issues.

we could introduce another method in the batch to check the metadata status, something like bool HasMetadata() and incorporate it where we require such a check.

If AddAgentData() implements such a check, what should be returned in a case where

  • there is not metadata but len(apmData.Data) != 0. Is in this case data loss the expected result?
  • there is metadata but len(apmData.Data) == 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes this package and its API safe to use, if something else calls AddAgentData()?

Agent data here refers to data from the APM agents and intake-v2 is the only supported protocol. What other cases do you have in mind here?

there is not metadata but len(apmData.Data) != 0. Is in this case data loss the expected result?

If data from the agent is not nil then it MUST have the metadata as the first line.

there is metadata but len(apmData.Data) == 0

apmData.Data is including the metadata so this is also not possible. We could have only metadata in the body and that is okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lahsivjar So essentially in your view, ErrNoData and associated logic is redundant as "no data" can take place during normal operation, and if we want to check for this case, we should do it before calling AddAgentData. Is that correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, yes! I don't have a strong opinion here so feel free to ignore it.

)

var (
Expand Down Expand Up @@ -170,7 +172,7 @@ func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error {
// before adding any events then ErrBatchFull is returned.
func (b *Batch) AddAgentData(apmData APMData) error {
if len(apmData.Data) == 0 {
return nil
return ErrNoData
}
raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions accumulator/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func TestAdd(t *testing.T) {

assert.ErrorIs(t, ErrBatchFull, b.AddLambdaData([]byte(`{"log":{}}`)))
})
t.Run("empty AddAgentData", func(t *testing.T) {
b := NewBatch(1, time.Hour)
assert.ErrorIs(t, ErrNoData, b.AddAgentData(APMData{}))
})
}

func TestReset(t *testing.T) {
Expand Down
14 changes: 11 additions & 3 deletions apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"math"
"math/rand"
"net/http"
"sync"
"time"

"github.com/elastic/apm-aws-lambda/accumulator"
Expand All @@ -52,19 +53,26 @@ func (c *Client) ForwardApmData(ctx context.Context) error {
c.logger.Warn("Failed to start APM data forwarder due to client unhealthy")
return nil
}
var once sync.Once
var lambdaDataChan chan []byte
for {
select {
case <-ctx.Done():
c.logger.Debug("Invocation context cancelled, not processing any more agent data")
return nil
case data := <-c.AgentDataChannel:
if len(data.Data) == 0 {
c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo)
continue
}
if err := c.forwardAgentData(ctx, data); err != nil {
return err
}
// Wait for metadata to be available, metadata will be available as soon as
// the first agent data is processed.
lambdaDataChan = c.LambdaDataChannel
once.Do(func() {
// With the first successful request to c.forwardAgent Data() metadata should be
// available and processing data from c.LambdaDataChannel can start.
lambdaDataChan = c.LambdaDataChannel
})
case data := <-lambdaDataChan:
if err := c.forwardLambdaData(ctx, data); err != nil {
return err
Expand Down
14 changes: 8 additions & 6 deletions apmproxy/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,20 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ

agentFlushed := r.URL.Query().Get("flushed") == "true"

agentData := accumulator.APMData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
AgentInfo: r.UserAgent(),
}
if len(rawBytes) != 0 {
agentData := accumulator.APMData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
AgentInfo: r.UserAgent(),
}

if len(agentData.Data) != 0 {
select {
case c.AgentDataChannel <- agentData:
default:
c.logger.Warnf("Channel full: dropping a subset of agent data")
}
} else {
c.logger.Debugf("Received empy request from '%s'", r.UserAgent())
}

if agentFlushed {
Expand Down