Skip to content

Commit 74420b6

Browse files
authored
feat(logs): Add pipeline id to replicator logs (supabase#304)
1 parent fa05fdd commit 74420b6

File tree

3 files changed

+62
-17
lines changed

3 files changed

+62
-17
lines changed

etl-replicator/src/core.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async fn init_store(
149149
/// Launches the pipeline, sets up signal handlers for SIGTERM and SIGINT,
150150
/// and ensures proper cleanup on shutdown. The pipeline will attempt to
151151
/// finish processing current batches before terminating.
152-
#[tracing::instrument(skip(pipeline), fields(pipeline_id = pipeline.id()))]
152+
#[tracing::instrument(skip(pipeline))]
153153
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
154154
where
155155
S: StateStore + SchemaStore + CleanupStore + Clone + Send + Sync + 'static,

etl-replicator/src/main.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::core::start_replicator_with_config;
99
use etl_config::Environment;
1010
use etl_config::shared::ReplicatorConfig;
1111
use etl_telemetry::metrics::init_metrics;
12-
use etl_telemetry::tracing::init_tracing_with_project;
12+
use etl_telemetry::tracing::init_tracing_with_top_level_fields;
1313
use std::sync::Arc;
1414
use thiserror::__private::AsDynError;
1515
use tracing::{error, info};
@@ -34,7 +34,11 @@ fn main() -> anyhow::Result<()> {
3434
.map(|s| s.project_ref.clone());
3535

3636
// Initialize tracing with project reference
37-
let _log_flusher = init_tracing_with_project(env!("CARGO_BIN_NAME"), project_ref.clone())?;
37+
let _log_flusher = init_tracing_with_top_level_fields(
38+
env!("CARGO_BIN_NAME"),
39+
project_ref.clone(),
40+
Some(replicator_config.pipeline.id),
41+
)?;
3842

3943
// Initialize Sentry before the async runtime starts
4044
let _sentry_guard = init_sentry()?;

etl-telemetry/src/tracing.rs

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber, Registry, fmt, layer::Subscri
1919

2020
/// JSON field name for project identification in logs.
2121
const PROJECT_KEY_IN_LOG: &str = "project";
22+
/// JSON field name for pipeline identification in logs.
23+
const PIPELINE_KEY_IN_LOG: &str = "pipeline_id";
2224

2325
/// Errors that can occur during tracing initialization.
2426
#[derive(Debug, Error)]
@@ -70,6 +72,8 @@ pub fn init_test_tracing() {
7072

7173
/// Global project reference storage
7274
static PROJECT_REF: OnceLock<String> = OnceLock::new();
75+
/// Global pipeline id storage.
76+
static PIPELINE_ID: OnceLock<u64> = OnceLock::new();
7377

7478
/// Sets the global project reference for all tracing events.
7579
///
@@ -86,6 +90,21 @@ pub fn get_global_project_ref() -> Option<&'static str> {
8690
PROJECT_REF.get().map(|s| s.as_str())
8791
}
8892

93+
/// Sets the global pipeline id for all tracing events.
94+
///
95+
/// The pipeline id will be injected into all structured log entries
96+
/// as a top-level field named "pipeline_id" for identification and filtering.
97+
pub fn set_global_pipeline_id(pipeline_id: u64) {
98+
let _ = PIPELINE_ID.set(pipeline_id);
99+
}
100+
101+
/// Returns the current global pipeline id.
102+
///
103+
/// Returns `None` if no pipeline id has been set.
104+
pub fn get_global_pipeline_id() -> Option<u64> {
105+
PIPELINE_ID.get().copied()
106+
}
107+
89108
/// Writer wrapper that injects project field into JSON log entries.
90109
///
91110
/// Parses JSON log entries and adds a project field if one doesn't already exist,
@@ -105,31 +124,47 @@ impl<W> Write for ProjectInjectingWriter<W>
105124
where
106125
W: Write,
107126
{
108-
/// Writes log data, injecting project field into JSON entries.
127+
/// Writes log data, injecting project and pipeline fields into JSON entries.
109128
///
110129
/// Attempts to parse the buffer as JSON and inject a project field if:
111130
/// - A global project reference is set
112131
/// - The content is valid JSON
113132
/// - No project field already exists
114133
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
115-
// Only try to inject project field if we have one and the content looks like JSON
116-
if let Some(project_ref) = get_global_project_ref()
117-
&& let Ok(json_str) = std::str::from_utf8(buf)
118-
{
134+
// Only try to inject fields if the content looks like JSON
135+
if let Ok(json_str) = std::str::from_utf8(buf) {
119136
// Try to parse as JSON
120137
if let Ok(serde_json::Value::Object(mut map)) =
121138
serde_json::from_str::<serde_json::Value>(json_str)
122139
{
123-
// Only inject if "project" field doesn't already exist
124-
if !map.contains_key(PROJECT_KEY_IN_LOG) {
140+
let mut modified = false;
141+
142+
// Inject project if available and not present
143+
if let Some(project_ref) = get_global_project_ref()
144+
&& !map.contains_key(PROJECT_KEY_IN_LOG)
145+
{
125146
map.insert(
126147
PROJECT_KEY_IN_LOG.to_string(),
127148
serde_json::Value::String(project_ref.to_string()),
128149
);
150+
modified = true;
151+
}
129152

153+
// Inject pipeline_id if available and not present
154+
if let Some(pipeline_id) = get_global_pipeline_id()
155+
&& !map.contains_key(PIPELINE_KEY_IN_LOG)
156+
{
157+
map.insert(
158+
PIPELINE_KEY_IN_LOG.to_string(),
159+
serde_json::Value::Number(serde_json::Number::from(pipeline_id)),
160+
);
161+
modified = true;
162+
}
163+
164+
if modified {
130165
// Try to serialize back to JSON
131166
if let Ok(modified) = serde_json::to_string(&map) {
132-
// Add new line if it was there
167+
// Preserve trailing newline if present
133168
let output = if json_str.ends_with('\n') {
134169
format!("{modified}\n")
135170
} else {
@@ -161,22 +196,28 @@ where
161196
/// Sets up structured logging with environment-appropriate configuration.
162197
/// Production environments log to rotating files, development to console.
163198
pub fn init_tracing(app_name: &str) -> Result<LogFlusher, TracingError> {
164-
init_tracing_with_project(app_name, None)
199+
init_tracing_with_top_level_fields(app_name, None, None)
165200
}
166201

167-
/// Initializes tracing with optional project identification.
202+
/// Initializes tracing with optional top-level fields.
168203
///
169-
/// Like [`init_tracing`] but allows specifying a project reference that will
170-
/// be injected into all structured log entries for identification.
171-
pub fn init_tracing_with_project(
204+
/// Like [`init_tracing`] but allows specifying multiple top-level fields that will be added to each
205+
/// log entry.
206+
pub fn init_tracing_with_top_level_fields(
172207
app_name: &str,
173208
project_ref: Option<String>,
209+
pipeline_id: Option<u64>,
174210
) -> Result<LogFlusher, TracingError> {
175-
// Set global project reference if provided
211+
// Set global project reference if provided.
176212
if let Some(ref project) = project_ref {
177213
set_global_project_ref(project.clone());
178214
}
179215

216+
// Set global pipeline id if provided.
217+
if let Some(pipeline_id) = pipeline_id {
218+
set_global_pipeline_id(pipeline_id);
219+
}
220+
180221
// Initialize the log tracer to capture logs from the `log` crate
181222
// and send them to the `tracing` subscriber. This captures logs
182223
// from libraries that use the `log` crate.

0 commit comments

Comments
 (0)