Skip to content

Commit 0ca92fe

Browse files
authored
Avoid race conditions when handling data (#570)
* apmproxy: do not continue without APMData Signed-off-by: Florian Lehner <[email protected]> * apmproxy: set channel only once Signed-off-by: Florian Lehner <[email protected]> * accumulator: add test for call to AddAgentData without data Signed-off-by: Florian Lehner <[email protected]> * fixup: fix linter Signed-off-by: Florian Lehner <[email protected]> * fixup: add AgentInfo to debug message Signed-off-by: Florian Lehner <[email protected]> * fixup: use capital letter in debug message Signed-off-by: Florian Lehner <[email protected]> * receiver: avoid allocation if rawBytes is empty Signed-off-by: Florian Lehner <[email protected]> * fixup: check len() first before calling function Signed-off-by: Florian Lehner <[email protected]> * fixup: update comment Signed-off-by: Florian Lehner <[email protected]> * fixup: replace sync.Once with nil check Signed-off-by: Florian Lehner <[email protected]> --------- Signed-off-by: Florian Lehner <[email protected]>
1 parent 99aafb7 commit 0ca92fe

File tree

4 files changed

+25
-10
lines changed

4 files changed

+25
-10
lines changed

accumulator/batch.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ var (
3737
// ErrInvalidEncoding is returned for any APMData that is encoded
3838
// with any encoding format
3939
ErrInvalidEncoding = errors.New("encoded data not supported")
40+
// ErrNoData indicates that APMData.data is empty
41+
ErrNoData = errors.New("no data")
4042
)
4143

4244
var (
@@ -174,7 +176,7 @@ func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error {
174176
// before adding any events then ErrBatchFull is returned.
175177
func (b *Batch) AddAgentData(apmData APMData) error {
176178
if len(apmData.Data) == 0 {
177-
return nil
179+
return ErrNoData
178180
}
179181
raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding)
180182
if err != nil {

accumulator/batch_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ func TestAdd(t *testing.T) {
4848

4949
assert.ErrorIs(t, ErrBatchFull, b.AddLambdaData([]byte(`{"log":{}}`)))
5050
})
51+
t.Run("empty AddAgentData", func(t *testing.T) {
52+
b := NewBatch(1, time.Hour)
53+
assert.ErrorIs(t, ErrNoData, b.AddAgentData(APMData{}))
54+
})
5155
}
5256

5357
func TestReset(t *testing.T) {

apmproxy/apmserver.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,19 @@ func (c *Client) ForwardApmData(ctx context.Context) error {
5959
c.logger.Debug("Invocation context canceled, not processing any more agent data")
6060
return nil
6161
case data := <-c.AgentDataChannel:
62+
if len(data.Data) == 0 {
63+
c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo)
64+
continue
65+
}
6266
if err := c.forwardAgentData(ctx, data); err != nil {
6367
return err
6468
}
65-
// Wait for metadata to be available, metadata will be available as soon as
66-
// the first agent data is processed.
67-
lambdaDataChan = c.LambdaDataChannel
69+
if lambdaDataChan == nil {
70+
// With the first successful request to c.forwardAgent Data() metadata should be
71+
// available and processing data from c.LambdaDataChannel can start.
72+
lambdaDataChan = c.LambdaDataChannel
73+
c.logger.Debug("Assigned Lambda data channel")
74+
}
6875
case data := <-lambdaDataChan:
6976
if err := c.forwardLambdaData(ctx, data); err != nil {
7077
return err

apmproxy/receiver.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,18 +136,20 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ
136136

137137
agentFlushed := r.URL.Query().Get("flushed") == "true"
138138

139-
agentData := accumulator.APMData{
140-
Data: rawBytes,
141-
ContentEncoding: r.Header.Get("Content-Encoding"),
142-
AgentInfo: r.UserAgent(),
143-
}
139+
if len(rawBytes) != 0 {
140+
agentData := accumulator.APMData{
141+
Data: rawBytes,
142+
ContentEncoding: r.Header.Get("Content-Encoding"),
143+
AgentInfo: r.UserAgent(),
144+
}
144145

145-
if len(agentData.Data) != 0 {
146146
select {
147147
case c.AgentDataChannel <- agentData:
148148
default:
149149
c.logger.Warnf("Channel full: dropping a subset of agent data")
150150
}
151+
} else {
152+
c.logger.Debugf("Received empy request from '%s'", r.UserAgent())
151153
}
152154

153155
if agentFlushed {

0 commit comments

Comments
 (0)