Skip to content

Commit 715a7e0

Browse files
authored
Merge pull request #271 from DataDog/matt.spurlin/AZINTS-2955/dead-letter-queue
[azints-2955][forwarder] dead letter queue
2 parents d3a1318 + 573290f commit 715a7e0

File tree

8 files changed

+504
-92
lines changed

8 files changed

+504
-92
lines changed

forwarder/cmd/forwarder/forwarder.go

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
// project
2525
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/cursor"
26+
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/deadletterqueue"
2627
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/environment"
2728
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/logs"
2829
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/metrics"
@@ -40,15 +41,15 @@ type resourceBytes struct {
4041
}
4142

4243
func getLogs(ctx context.Context, storageClient *storage.Client, cursors *cursor.Cursors, blob storage.Blob, logsChannel chan<- *logs.Log) (err error) {
43-
currentOffset := cursors.GetCursor(blob.Container.Name, blob.Name)
44-
if currentOffset == blob.ContentLength {
44+
cursorOffset := cursors.Get(blob.Container.Name, blob.Name)
45+
if cursorOffset == blob.ContentLength {
4546
// Cursor is at the end of the blob, no need to process
4647
return nil
4748
}
48-
if currentOffset > blob.ContentLength {
49+
if cursorOffset > blob.ContentLength {
4950
return fmt.Errorf("cursor is ahead of blob length for %s", blob.Name)
5051
}
51-
content, err := storageClient.DownloadSegment(ctx, blob, currentOffset, blob.ContentLength)
52+
content, err := storageClient.DownloadSegment(ctx, blob, cursorOffset, blob.ContentLength)
5253
if err != nil {
5354
return fmt.Errorf("download range for %s: %w", blob.Name, err)
5455
}
@@ -58,18 +59,18 @@ func getLogs(ctx context.Context, storageClient *storage.Client, cursors *cursor
5859
// linux newlines are 1 byte, but windows newlines are 2
5960
// if adding another byte per line equals the content length, we have processed a file written by a windows machine.
6061
// we know we have hit the end and can safely set our cursor to the end of the file.
61-
if processedBytes+processedLogs+currentOffset == blob.ContentLength {
62-
processedBytes = blob.ContentLength - currentOffset
62+
if processedBytes+processedLogs+cursorOffset == blob.ContentLength {
63+
processedBytes = blob.ContentLength - cursorOffset
6364
}
6465

65-
if processedBytes+currentOffset > blob.ContentLength {
66+
if processedBytes+cursorOffset > blob.ContentLength {
6667
// we have processed more bytes than expected
6768
// unsafe to save cursor
6869
return errors.Join(err, fmt.Errorf("processed more bytes than expected for %s", blob.Name))
6970
}
7071

71-
// we have processed and submitted logs up to currentOffset+processedBytes whether the error is nil or not
72-
cursors.SetCursor(blob.Container.Name, blob.Name, currentOffset+processedBytes)
72+
// we have processed and submitted logs up to cursorOffset+processedBytes whether the error is nil or not
73+
cursors.Set(blob.Container.Name, blob.Name, cursorOffset+processedBytes)
7374

7475
return err
7576
}
@@ -80,7 +81,7 @@ func parseLogs(reader io.ReadCloser, containerName string, logsChannel chan<- *l
8081

8182
var currLog *logs.Log
8283
var err error
83-
for currLog, err = range logs.ParseLogs(reader, containerName) {
84+
for currLog, err = range logs.Parse(reader, containerName) {
8485
if err != nil {
8586
break
8687
}
@@ -152,7 +153,7 @@ func writeMetrics(ctx context.Context, storageClient *storage.Client, resourceVo
152153
return logCount, nil
153154
}
154155

155-
func run(ctx context.Context, storageClient *storage.Client, logsClients []*logs.Client, logger *log.Entry, now customtime.Now) (err error) {
156+
func fetchAndProcessLogs(ctx context.Context, storageClient *storage.Client, logsClients []*logs.Client, logger *log.Entry, now customtime.Now) (err error) {
156157
start := now()
157158

158159
span, ctx := tracer.StartSpanFromContext(ctx, "forwarder.Run")
@@ -171,7 +172,7 @@ func run(ctx context.Context, storageClient *storage.Client, logsClients []*logs
171172
}()
172173

173174
// Download cursors
174-
cursors, err := cursor.LoadCursors(ctx, storageClient, logger)
175+
cursors, err := cursor.Load(ctx, storageClient, logger)
175176
if err != nil {
176177
return err
177178
}
@@ -248,7 +249,7 @@ func run(ctx context.Context, storageClient *storage.Client, logsClients []*logs
248249
err = errors.Join(err, logBytesEg.Wait())
249250

250251
// Save cursors
251-
cursorErr := cursors.SaveCursors(ctx, storageClient)
252+
cursorErr := cursors.Save(ctx, storageClient)
252253
err = errors.Join(err, cursorErr)
253254

254255
// Write forwarder metrics
@@ -259,6 +260,45 @@ func run(ctx context.Context, storageClient *storage.Client, logsClients []*logs
259260
return err
260261
}
261262

263+
func processDeadLetterQueue(ctx context.Context, logger *log.Entry, storageClient *storage.Client, logsClient *logs.Client, flushedLogsClients []*logs.Client) error {
264+
dlq, err := deadletterqueue.Load(ctx, storageClient, logsClient)
265+
if err != nil {
266+
return err
267+
}
268+
269+
dlq.Process(ctx, logger)
270+
271+
for _, client := range flushedLogsClients {
272+
dlq.Add(client.FailedLogs)
273+
}
274+
275+
return dlq.Save(ctx, storageClient, logger)
276+
}
277+
278+
func run(ctx context.Context, logger *log.Entry, goroutineCount int, datadogClient *datadog.APIClient, azBlobClient *azblob.Client) error {
279+
start := time.Now()
280+
logger.Info(fmt.Sprintf("Start time: %v", start.String()))
281+
282+
storageClient := storage.NewClient(azBlobClient)
283+
284+
// Initialize log submission client
285+
logsApiClient := datadogV2.NewLogsApi(datadogClient)
286+
287+
var logsClients []*logs.Client
288+
for range goroutineCount {
289+
logsClients = append(logsClients, logs.NewClient(logsApiClient))
290+
}
291+
292+
processErr := fetchAndProcessLogs(ctx, storageClient, logsClients, logger, time.Now)
293+
294+
dlqErr := processDeadLetterQueue(ctx, logger, storageClient, logs.NewClient(logsApiClient), logsClients)
295+
296+
logger.Info(fmt.Sprintf("Run time: %v", time.Since(start).String()))
297+
logger.Info(fmt.Sprintf("Final time: %v", (time.Now()).String()))
298+
299+
return errors.Join(processErr, dlqErr)
300+
}
301+
262302
func main() {
263303
apmEnabled := environment.APMEnabled()
264304

@@ -292,7 +332,6 @@ func main() {
292332
"site": ddSite,
293333
})
294334

295-
start := time.Now()
296335
log.SetFormatter(&log.JSONFormatter{})
297336
logger := log.WithFields(log.Fields{"service": serviceName})
298337

@@ -315,21 +354,24 @@ func main() {
315354
defer profiler.Stop()
316355
}
317356

318-
logger.Info(fmt.Sprintf("Start time: %v", start.String()))
319-
320357
forceProfile := environment.Get(environment.DD_FORCE_PROFILE)
321358
if forceProfile != "" {
322359
// Sleep for 5 seconds to allow profiler to start
323360
time.Sleep(5 * time.Second)
324361
}
325362

363+
// Initialize Datadog API client
364+
datadogConfig := datadog.NewConfiguration()
365+
datadogConfig.RetryConfiguration.HTTPRetryTimeout = 90 * time.Second
366+
datadogClient := datadog.NewAPIClient(datadogConfig)
367+
326368
goroutineString := environment.Get(environment.NUM_GOROUTINES)
327369
if goroutineString == "" {
328370
goroutineString = "10"
329371
}
330-
goroutineAmount, err := strconv.ParseInt(goroutineString, 10, 64)
372+
goroutineCount, err := strconv.ParseInt(goroutineString, 10, 64)
331373
if err != nil {
332-
logger.Fatalf(fmt.Errorf("error parsing MAX_GOROUTINES: %w", err).Error())
374+
logger.Fatalf(fmt.Errorf("error parsing %s: %w", environment.NUM_GOROUTINES, err).Error())
333375
}
334376

335377
// Initialize storage client
@@ -339,25 +381,9 @@ func main() {
339381
logger.Fatalf(fmt.Errorf("error creating azure blob client: %w", err).Error())
340382
return
341383
}
342-
storageClient := storage.NewClient(azBlobClient)
343-
344-
// Initialize log submission client
345-
datadogConfig := datadog.NewConfiguration()
346-
datadogConfig.RetryConfiguration.HTTPRetryTimeout = 90 * time.Second
347-
apiClient := datadog.NewAPIClient(datadogConfig)
348-
logsApiClient := datadogV2.NewLogsApi(apiClient)
349-
350-
var logsClients []*logs.Client
351-
for range goroutineAmount {
352-
logsClients = append(logsClients, logs.NewClient(logsApiClient))
353-
}
354-
355-
runErr := run(ctx, storageClient, logsClients, logger, time.Now)
356384

357-
err = errors.Join(runErr, err)
385+
err = run(ctx, logger, int(goroutineCount), datadogClient, azBlobClient)
358386

359-
logger.Info(fmt.Sprintf("Run time: %v", time.Since(start).String()))
360-
logger.Info(fmt.Sprintf("Final time: %v", (time.Now()).String()))
361387
if err != nil {
362388
logger.Fatalf(fmt.Errorf("error while running: %w", err).Error())
363389
}

forwarder/cmd/forwarder/forwarder_test.go

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package main
22

33
import (
44
// stdlib
5-
5+
"bytes"
66
"context"
77
"fmt"
88
"io"
@@ -15,6 +15,7 @@ import (
1515
// 3p
1616
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
1717
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
18+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
1819
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
1920
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
2021
"github.com/Azure/go-autorest/autorest/to"
@@ -30,6 +31,7 @@ import (
3031
// project
3132
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/collections"
3233
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/cursor"
34+
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/deadletterqueue"
3335
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/logs"
3436
datadogmocks "github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/logs/mocks"
3537
"github.com/DataDog/azure-log-forwarding-orchestration/forwarder/internal/metrics"
@@ -137,7 +139,7 @@ func mockedRun(t *testing.T, containers []*service.ContainerItem, blobs []*conta
137139

138140
ctx := context.Background()
139141

140-
err := run(ctx, client, []*logs.Client{logClient}, nullLogger(), time.Now)
142+
err := fetchAndProcessLogs(ctx, client, []*logs.Client{logClient}, nullLogger(), time.Now)
141143
return submittedLogs, err
142144
}
143145

@@ -470,7 +472,7 @@ func TestCursors(t *testing.T) {
470472
var currentLogData []byte
471473
now := time.Now()
472474

473-
lastCursor := cursor.NewCursors(nil)
475+
lastCursor := cursor.New(nil)
474476

475477
for i := 0; i < n; i++ {
476478
// REPEATED GIVEN
@@ -507,7 +509,7 @@ func TestCursors(t *testing.T) {
507509
// THEN
508510
assert.NoError(t, err)
509511

510-
assert.Equal(t, int64(len(currentLogData)), lastCursor.GetCursor(containerName, blobName))
512+
assert.Equal(t, int64(len(currentLogData)), lastCursor.Get(containerName, blobName))
511513

512514
for _, logItem := range submittedLogs {
513515
assert.Equal(t, "azure", *logItem.Ddsource)
@@ -537,7 +539,7 @@ func TestCursors(t *testing.T) {
537539
var currentLogData []byte
538540
now := time.Now()
539541

540-
lastCursor := cursor.NewCursors(nil)
542+
lastCursor := cursor.New(nil)
541543

542544
for i := 0; i < n; i++ {
543545
// REPEATED GIVEN
@@ -574,7 +576,7 @@ func TestCursors(t *testing.T) {
574576
// THEN
575577
assert.NoError(t, err)
576578

577-
assert.Equal(t, int64(len(currentLogData)), lastCursor.GetCursor(containerName, blobName))
579+
assert.Equal(t, int64(len(currentLogData)), lastCursor.Get(containerName, blobName))
578580

579581
for _, logItem := range submittedLogs {
580582
assert.Equal(t, "azure", *logItem.Ddsource)
@@ -584,11 +586,70 @@ func TestCursors(t *testing.T) {
584586
})
585587
}
586588

589+
type FaultyRoundTripper struct {
590+
}
591+
592+
func (f *FaultyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
593+
return nil, fmt.Errorf("faulty")
594+
}
595+
596+
func TestProcessDLQ(t *testing.T) {
597+
t.Parallel()
598+
599+
t.Run("processes the dead letter queue", func(t *testing.T) {
600+
t.Parallel()
601+
// GIVEN
602+
ctrl := gomock.NewController(t)
603+
dlq, err := deadletterqueue.FromBytes(nil, []byte("[]"))
604+
require.NoError(t, err)
605+
currentTime := time.Now().UTC()
606+
formattedTime := currentTime.Format(time.RFC3339)
607+
logItem := datadogV2.HTTPLogItem{
608+
Message: fmt.Sprintf("{\"time\":\"%s\"}", formattedTime),
609+
AdditionalProperties: map[string]string{"time": formattedTime},
610+
}
611+
queue := []datadogV2.HTTPLogItem{logItem}
612+
dlq.Add(queue)
613+
data, err := dlq.JSONBytes()
614+
require.NoError(t, err)
615+
reader := io.NopCloser(bytes.NewReader(data))
616+
response := azblob.DownloadStreamResponse{
617+
DownloadResponse: blob.DownloadResponse{
618+
Body: reader,
619+
},
620+
}
621+
mockClient := storagemocks.NewMockAzureBlobClient(ctrl)
622+
mockClient.EXPECT().DownloadStream(gomock.Any(), storage.ForwarderContainer, deadletterqueue.BlobName, nil).Return(response, nil)
623+
createContainerResponse := azblob.CreateContainerResponse{}
624+
mockClient.EXPECT().CreateContainer(gomock.Any(), storage.ForwarderContainer, nil).Return(createContainerResponse, nil)
625+
uploadResponse := azblob.UploadBufferResponse{}
626+
mockClient.EXPECT().UploadBuffer(gomock.Any(), storage.ForwarderContainer, deadletterqueue.BlobName, gomock.Any(), gomock.Any()).Return(uploadResponse, nil)
627+
628+
storageClient := storage.NewClient(mockClient)
629+
630+
datadogClient := datadogmocks.NewMockDatadogLogsSubmitter(ctrl)
631+
datadogClient.EXPECT().SubmitLog(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil, nil)
632+
logsClient := logs.NewClient(datadogClient)
633+
634+
processedDatadogClient := datadogmocks.NewMockDatadogLogsSubmitter(ctrl)
635+
processedLogsClient := logs.NewClient(processedDatadogClient)
636+
processedClients := []*logs.Client{processedLogsClient}
637+
638+
ctx := context.Background()
639+
640+
// WHEN
641+
err = processDeadLetterQueue(ctx, nullLogger(), storageClient, logsClient, processedClients)
642+
643+
// THEN
644+
assert.NoError(t, err)
645+
})
646+
}
647+
587648
// TestRunMain exists for performance testing purposes.
588649
func TestRunMain(t *testing.T) {
589650
t.Parallel()
590651

591-
t.Run("run main", func(t *testing.T) {
652+
t.Run("fetchAndProcessLogs main", func(t *testing.T) {
592653
t.Parallel()
593654
if os.Getenv("CI") != "" {
594655
t.Skip("Skipping testing in CI environment")

0 commit comments

Comments
 (0)