Skip to content

Conversation

@mattsp1290
Copy link
Member

Description

Adds a dead letter queue. Also went through and did a few renames where the symbol had the package name in its name.

Testing

Unit tests were added. Manually tested with a faulty HTTP client to ensure logs were sent to a DLQ and the DLQ could serialize/deserialize from Azure.

logs := make([]datadogV2.HTTPLogItem, 0, len(c.logsBuffer))
for _, currLog := range c.logsBuffer {
logs = append(logs, newHTTPLogItem(currLog))
_, _, err = c.logsSubmitter.SubmitLog(ctx, c.logsBuffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is SubmitLog used elsewhere? Why are 2 of the return values being discarded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is the only place where it is called. The first object is the unmarshalled return object of the api and the second is the http response. Neither end up being needed by our code

@mattsp1290 mattsp1290 requested a review from parsons90 January 30, 2025 19:18
@cit-pr-commenter
Copy link

cit-pr-commenter bot commented Jan 30, 2025

Coverage Report

Control Plane Coverage:

Filename Stmts Miss Cover Missing
control_plane/tasks/deployer_task.py 121 9 92.56% 133-134, 183-186, 210, 226-227, 231-232
control_plane/tasks/diagnostic_settings_task.py 85 20 76.47% 140-141, 157, 193-208, 217-222
control_plane/tasks/resources_task.py 54 6 88.89% 91-96
control_plane/tasks/scaling_task.py 286 28 90.21% 189, 200, 222, 284-290, 297-298, 335-338, 367-384, 431, 497-498, 649-657
control_plane/tasks/client/log_forwarder_client.py 234 1 99.57% 215
control_plane/tasks/client/resource_client.py 102 4 96.08% 76, 88-90
TOTAL 2625 68 97.41%

Forwarder Coverage:

Filename Stmts Miss Cover Missing
cmd/forwarder/forwarder.go 288 138 52.08% 45-51, 53-55, 62-70, 140-142, 167-170, 176-178, 208-210, 229-230, 265-267, 278-389
internal/collections/iterator.go 35 6 82.86% 31-33, 59-61
internal/cursor/cursor.go 60 14 76.67% 94, 102-105, 38-40, 63-65, 67-69
internal/deadletterqueue/dead_letter_queue.go 68 28 58.82% 40, 49-51, 85-89, 94-96, 98-100, 105-118
internal/environment/variables.go 9 3 66.67% 27-29
internal/logs/logs.go 245 69 71.84% 72-102, 168-170, 178-180, 188-189, 200-202, 243, 253, 411-418, 132, 334-336, 344-346, 354-367, 378-380
internal/logs/mocks/mock_logs.go 24 3 87.50% 48-50
internal/metrics/metrics.go 23 8 65.22% 26-27, 31-33, 43-45
internal/storage/blobs.go 102 18 82.35% 68-74, 94-95, 119-121, 143-145, 148-150
internal/storage/client.go 5 0 100.00%
internal/storage/containers.go 29 8 72.41% 54-61
internal/storage/errors.go 3 3 0.00% 9-11
internal/storage/segments.go 23 0 100.00%
internal/storage/mocks/mock_client.go 61 0 100.00%
TOTAL 975 298 69.44%

}

// FromBytes creates a DeadLetterQueue object from the given bytes.
func FromBytes(logsClient *logs.Client, data []byte) (*DeadLetterQueue, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it wasn't for the logsClient argument, DeadLetterQueue is actually really close to implementing the JSON Marshaler and Unmarshaler interfaces.
And so is Cursor.
Probably not worth doing anything about, but just something I'm noticing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good callout. Might be something worth following up on as a tech debt item

@mattsp1290 mattsp1290 requested a review from parsons90 January 30, 2025 21:32
@mattsp1290 mattsp1290 requested review from a team and saranyadevi and removed request for a team January 30, 2025 21:56
Copy link
Collaborator

@ava-silver ava-silver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple small comments but overall LGTM

}

// Content converts the log content to a string.
func (l *Log) Content() string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be better as String() so Log implements the Stringer interface? or is that more for like debugging purposes (still new to go idioms)

Copy link
Member Author

@mattsp1290 mattsp1290 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately doesn't offer much if any improvement here. Stringer isn't automatically applied in the case where this is used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stringer is most useful if you are passing something to anything in fmt

return string(*l.content)
}
// ValidateDatadogLog checks if the log is valid to send to Datadog and returns the log size when it is.
func ValidateDatadogLog(log datadogV2.HTTPLogItem, logger *log.Entry) (int64, bool) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to consolidate the usage of this function and Log.Validate()? They seem to be doing similar things, I wonder if we could just have one path for log validation rather than two

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat. I have created a shared validation function and these now take a Log or HHTTPLogItem and pass the appropriate fields to that shared function

@mattsp1290 mattsp1290 merged commit 715a7e0 into main Feb 7, 2025
13 checks passed
@mattsp1290 mattsp1290 deleted the matt.spurlin/AZINTS-2955/dead-letter-queue branch February 7, 2025 21:08
// Load loads the DeadLetterQueue from the storage client.
func Load(ctx context.Context, storageClient *storage.Client, logsClient *logs.Client) (*DeadLetterQueue, error) {
span, ctx := tracer.StartSpanFromContext(ctx, "deadletterqueue.Load")
defer span.Finish()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't picking up the error, needs to generally be of the form:

defer func() { span.Finish(tracer.WithError(err)) }()

defer span.Finish()

// prune invalid logs
queue := make([]datadogV2.HTTPLogItem, 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we expect the majority of logs to be invalid, should preallocate this to len(d.client.FailedLogs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any guidance on when to prefer preallocating the excess vs erring on the size of a 0 allocation?


// ValidateDatadogLog checks if the log is valid to send to Datadog and returns the log size when it is.
func ValidateDatadogLog(log datadogV2.HTTPLogItem, logger *log.Entry) (int64, bool) {
logBytes, err := log.MarshalJSON()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a serialization pass just to throw the result away will be very inefficient, could we either cache this result somewhere if we can use it later on, or deal with serialization errors later on when we try to serialize them to submit them to the API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't serializing to catch serialization errors. The goal is to get the final byte size of the log as the Datadog API has per log and per payload limits on byte sizes. The length of the result does get used in multiple places.

Copy link

@thieman thieman Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still a ton of overhead, assuming most of the log size is from the Content, which is already a string and can be measured cheaply, I'd think you'd be able to bypass this by adding some buffer to the max batch size. You might also want to add error handling in the submit flow to split a batch up in case you're still over the limit.

Also, what's the story with compression here? The batch size limit is after compression as I understand it Batch size limit is before compression

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how the Go API works, if you're able to send Protobufs to the API then you can also Marshal them yourself and get the size that way without wasting effort. That's what we do from the resource crawlers https://github.com/DataDog/dd-source/blob/main/domains/cloud_platform/libs/resource_sink/eventplatform.go#L246-L261

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need to add gzip compression.

Individual log size still applies. The limit does specifically mentioned that the limit applies to the uncompressed data. https://github.com/DataDog/datadog-api-client-go/blob/master/api/datadogV2/api_logs.go#L529

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public logs submission API only has support for JSON sadly https://docs.datadoghq.com/api/latest/logs/#send-logs

Copy link
Member Author

@mattsp1290 mattsp1290 Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough about logs intake to know how track types maps to the public API. Does this apply to anything sent to https://http-intake.logs.datadoghq.com/api/v2/[TRACK-NAME]?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ask, there are existing entires in there for GCP and AWS, guessing they're doing something similar but I'm not sure.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relatedly I think there must be some way to send them zstd compression, that'd be way better than gzip DataDog/datadog-lambda-extension#558

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been able to confirm zstd and protobuf are on the table here. Added tasks to our backlog to correct this in the future.

Add protobuf support https://datadoghq.atlassian.net/browse/AZINTS-3210
Add zstd compression https://datadoghq.atlassian.net/browse/AZINTS-3209

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants