Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/datadog-plugin-google-cloud-pubsub/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
msg.attributes['gcloud.project_id'] = projectId
msg.attributes['pubsub.topic'] = topic

// Record publish start time for delivery duration measurement on consumer
if (!msg.attributes['x-dd-publish-start-time']) {
msg.attributes['x-dd-publish-start-time'] = String(Date.now())
}
Comment on lines +37 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
if (!msg.attributes['x-dd-publish-start-time']) {
msg.attributes['x-dd-publish-start-time'] = String(Date.now())
}
msg.attributes['x-dd-publish-start-time'] ??= String(Date.now())


if (this.config.dsmEnabled) {
const payloadSize = getHeadersSize(msg)
const dataStreamsContext = this.tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,66 @@ class GoogleCloudPubsubTransitHandlerPlugin extends TracingPlugin {
const producerParent = this.tracer.extract('text_map', attrs) || null
const effectiveParent = producerParent || undefined

// ToDo: create pubsub.delivery; create HTTP span directly as child of producer
// Compute pubsub scheduling duration (publish → HTTP receipt)
const publishStartTimeRawForHttp = attrs['x-dd-publish-start-time']
let schedulingMs = null
if (publishStartTimeRawForHttp) {
const t0 = Number.parseInt(publishStartTimeRawForHttp, 10)
if (Number.isFinite(t0) && t0 > 0) schedulingMs = Date.now() - t0
Comment on lines +127 to +128
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks rather complicated. What values can we expect in such attribute? I guess we could potentially simplify it a tad :)

The same applies below.

}

// Create pubsub.delivery span to represent infra delivery latency and parent the HTTP span
let parentForHttp = effectiveParent
const publishStartTimeRaw = attrs['x-dd-publish-start-time']
if (publishStartTimeRaw) {
const publishStartTime = Number.parseInt(publishStartTimeRaw, 10)
if (Number.isFinite(publishStartTime) && publishStartTime > 0) {
const messageId = (message && message.messageId) || req.headers['ce-id']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const messageId = (message && message.messageId) || req.headers['ce-id']
const messageId = message?.messageId || req.headers['ce-id']

const deliveryTags = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const deliveryTags = {
const tags = {

I think it's fine to use the shorter name. We always want to deliver tags :)

component: 'google-cloud-pubsub',
'span.kind': 'consumer',
'span.type': 'pubsub',
'gcloud.project_id': projectId,
'pubsub.topic': topicName,
'pubsub.subscription': subscription,
'pubsub.message_id': messageId,
'pubsub.delivery_method': isCloudEvent ? 'eventarc' : 'push',
'pubsub.operation': 'delivery',
'pubsub.scheduling_duration_ms': schedulingMs
}
// Add CloudEvents/Eventarc tags for CloudEvent requests
if (isCloudEvent) {
if (attrs['ce-source'] || req.headers['ce-source']) {
deliveryTags['cloudevents.source'] = attrs['ce-source'] || req.headers['ce-source']
}
if (attrs['ce-type'] || req.headers['ce-type']) {
deliveryTags['cloudevents.type'] = attrs['ce-type'] || req.headers['ce-type']
}
if (req.headers['ce-id']) deliveryTags['cloudevents.id'] = req.headers['ce-id']
if (req.headers['ce-specversion']) deliveryTags['cloudevents.specversion'] = req.headers['ce-specversion']
if (req.headers['ce-time']) deliveryTags['cloudevents.time'] = req.headers['ce-time']
deliveryTags['eventarc.trigger'] = 'pubsub'
}
const deliverySpan = this.tracer.startSpan('pubsub.delivery', {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const deliverySpan = this.tracer.startSpan('pubsub.delivery', {
const span = this.tracer.startSpan('pubsub.delivery', {

I guess span would also be fine here?

childOf: effectiveParent,
service: this.tracer._service ? `${this.tracer._service}-pubsub-scheduling` : undefined,
resource: `${topicName} → ${subscription}`,
type: 'pubsub',
tags: deliveryTags,
startTime: publishStartTime
})
const deliveryEnd = Date.now()
try { deliverySpan.setTag('pubsub.delivery.duration_ms', deliveryEnd - publishStartTime) } catch {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try { deliverySpan.setTag('pubsub.delivery.duration_ms', deliveryEnd - publishStartTime) } catch {}
deliverySpan.setTag('pubsub.delivery.duration_ms', deliveryEnd - publishStartTime)

It would be very bad if setTag() would fail.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the duration on the span not be sufficient here?

deliverySpan.finish(deliveryEnd)
parentForHttp = deliverySpan
}
}

// Add parsed body for downstream middleware that expects it
req.body = json
// Create enhanced HTTP span as child of producer
const httpSpan = this.tracer.startSpan('http.request', {
childOf: effectiveParent,
childOf: parentForHttp,
tags: {
'http.method': req.method,
'http.url': `${req.headers['x-forwarded-proto'] || 'http'}://${req.headers.host}${req.url}`,
Expand Down
Loading