Skip to content

Commit 89efe07

Browse files
authored
ref(transaction): Send transaction attachments to the objectstore (#5710)
1 parent a7d6b74 commit 89efe07

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

relay-server/src/processing/transactions/types/output.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ use crate::processing::spans::Indexed;
1414
use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile};
1515
use crate::processing::{Forward, ForwardContext};
1616
use crate::services::outcome::{DiscardReason, Outcome};
17-
#[cfg(feature = "processing")]
18-
use crate::services::store::StoreEnvelope;
1917

2018
/// Output of the transaction processor.
2119
#[derive(Debug)]
@@ -74,11 +72,25 @@ impl Forward for TransactionOutput {
7472
s: StoreHandle<'_>,
7573
ctx: ForwardContext<'_>,
7674
) -> Result<(), Rejected<()>> {
77-
let envelope: ManagedEnvelope = self.serialize_envelope(ctx)?.into();
75+
use crate::envelope::ItemType;
76+
77+
let envelope = self.serialize_envelope(ctx)?;
78+
let envelope = ManagedEnvelope::from(envelope).into_processed();
7879

79-
s.send_to_store(StoreEnvelope {
80-
envelope: envelope.into_processed(),
81-
});
80+
let has_attachments = envelope
81+
.envelope()
82+
.items()
83+
.any(|item| item.ty() == &ItemType::Attachment);
84+
let use_objectstore = || {
85+
let options = &ctx.global_config.options;
86+
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
87+
};
88+
89+
if has_attachments && use_objectstore() {
90+
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
91+
} else {
92+
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
93+
}
8294

8395
Ok(())
8496
}

tests/integration/test_attachments.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -567,19 +567,31 @@ def test_view_hierarchy_processing(
567567
outcomes_consumer.assert_empty()
568568

569569

570+
@pytest.mark.parametrize("use_objectstore", [False, True])
570571
def test_event_with_attachment(
571572
mini_sentry,
572573
relay_with_processing,
573574
attachments_consumer,
574575
transactions_consumer,
576+
use_objectstore,
577+
objectstore,
575578
):
576579
project_id = 42
577580
event_id = "515539018c9b4260a6f999572f1661ee"
578581

579582
mini_sentry.add_full_project_config(project_id)
580-
relay = relay_with_processing()
583+
584+
if use_objectstore:
585+
mini_sentry.global_config["options"][
586+
"relay.objectstore-attachments.sample-rate"
587+
] = 1.0
588+
589+
relay = relay_with_processing(
590+
{"processing": {"upload": {"objectstore_url": "http://127.0.0.1:8888/"}}}
591+
)
581592
attachments_consumer = attachments_consumer()
582593
transactions_consumer = transactions_consumer()
594+
objectstore = objectstore("attachments", project_id)
583595

584596
# event attachments are always sent as chunks, and added to events
585597
envelope = Envelope(headers=[["event_id", event_id]])
@@ -593,8 +605,9 @@ def test_event_with_attachment(
593605

594606
relay.send_envelope(project_id, envelope)
595607

596-
chunk, _ = attachments_consumer.get_attachment_chunk()
597-
assert chunk == b"event attachment"
608+
if not use_objectstore:
609+
chunk, _ = attachments_consumer.get_attachment_chunk()
610+
assert chunk == b"event attachment"
598611

599612
_, event_message = attachments_consumer.get_event()
600613

@@ -606,10 +619,14 @@ def test_event_with_attachment(
606619
"content_type": "application/octet-stream",
607620
"attachment_type": "event.attachment",
608621
"size": len(b"event attachment"),
609-
"chunks": 1,
622+
**({"stored_id": mock.ANY} if use_objectstore else {"chunks": 1}),
610623
}
611624
]
612625

626+
if use_objectstore:
627+
stored_id = event_message["attachments"][0]["stored_id"]
628+
assert objectstore.get(stored_id).payload.read() == b"event attachment"
629+
613630
# transaction attachments are sent as individual attachments,
614631
# either using chunks by default, or contents inlined
615632
envelope = Envelope(headers=[["event_id", event_id]])
@@ -629,11 +646,20 @@ def test_event_with_attachment(
629646
"content_type": "application/octet-stream",
630647
"attachment_type": "event.attachment",
631648
"size": len(b"transaction attachment"),
632-
"data": b"transaction attachment",
649+
**(
650+
{"stored_id": mock.ANY}
651+
if use_objectstore
652+
else {"data": b"transaction attachment"}
653+
),
633654
}
634655

635656
attachment = attachments_consumer.get_individual_attachment()
636657
assert attachment["attachment"].pop("id")
658+
659+
if use_objectstore:
660+
stored_id = attachment["attachment"]["stored_id"]
661+
assert objectstore.get(stored_id).payload.read() == b"transaction attachment"
662+
637663
assert attachment == {
638664
"type": "attachment",
639665
"attachment": expected_attachment,

0 commit comments

Comments
 (0)