Skip to content

Commit 09c248c

Browse files
authored
chore: default to use structured logging (#3056)
Signed-off-by: Derek Wang <[email protected]>
1 parent c656eb8 commit 09c248c

File tree

11 files changed

+75
-37
lines changed

11 files changed

+75
-37
lines changed

rust/Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/numaflow/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ tokio.workspace = true
1919
tokio-util.workspace = true
2020
tracing.workspace = true
2121
rustls.workspace = true
22-
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
22+
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] }
2323
clap = "4.5.40"
2424

2525
[build-dependencies]

rust/numaflow/src/setup_tracing.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use tracing::Level;
12
use tracing_subscriber::layer::SubscriberExt;
23
use tracing_subscriber::util::SubscriberInitExt;
4+
use tracing_subscriber::{Layer, filter::EnvFilter, fmt};
35

46
use std::backtrace::{Backtrace, BacktraceStatus};
57
use std::panic::PanicHookInfo;
@@ -55,14 +57,32 @@ pub fn register() {
5557
// Set up the tracing subscriber. RUST_LOG can be used to set the log level.
5658
// The default log level is `info`. The `axum::rejection=trace` enables showing
5759
// rejections from built-in extractors at `TRACE` level.
60+
let debug_mode = std::env::var("NUMAFLOW_DEBUG").map_or(false, |v| v.to_lowercase() == "true");
61+
let default_log_level = if debug_mode {
62+
"debug,h2::codec=info" // "h2::codec" is too noisy
63+
} else {
64+
"info"
65+
};
66+
67+
let filter = EnvFilter::builder()
68+
.with_default_directive(default_log_level.parse().unwrap_or(Level::INFO.into()))
69+
.from_env_lossy(); // Read RUST_LOG environment variable
70+
71+
let layer = if debug_mode {
72+
// Text format
73+
fmt::layer().boxed()
74+
} else {
75+
// JSON format, flattened
76+
fmt::layer()
77+
.with_ansi(false)
78+
.json()
79+
.flatten_event(true)
80+
.boxed()
81+
};
82+
5883
tracing_subscriber::registry()
59-
.with(
60-
tracing_subscriber::EnvFilter::try_from_default_env()
61-
// TODO: add a better default based on entry point invocation
62-
// e.g., serving/monovertex might need a different default
63-
.unwrap_or_else(|_| "info".into()),
64-
)
65-
.with(tracing_subscriber::fmt::layer().with_ansi(false))
84+
.with(filter)
85+
.with(layer)
6686
.init();
6787

6888
std::panic::set_hook(Box::new(report_panic));

test/api-e2e/api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPodsForPipeline() {
239239
VertexPodLogContains("p1", LogMapVertexStartedRustRuntime, PodLogCheckOptionWithContainer("numa")).
240240
VertexPodLogContains("output", LogSinkVertexStartedRustRuntime).
241241
DaemonPodLogContains(pipelineName, LogDaemonStarted).
242-
VertexPodLogContains("output", `"value":.*EventTime - \d+`)
242+
VertexPodLogContains("output", `\\"value\\":.*EventTime - \d+`)
243243

244244
defer w.UXServerPodPortForward(8146, 8443).TerminateAllPodPortForwards()
245245

test/e2e/functional_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (s *FunctionalSuite) TestCreateSimplePipeline() {
4949
VertexPodLogContains("p1", LogMapVertexStartedRustRuntime, PodLogCheckOptionWithContainer("numa")).
5050
VertexPodLogContains("output", LogSinkVertexStartedRustRuntime).
5151
DaemonPodLogContains(pipelineName, LogDaemonStarted).
52-
VertexPodLogContains("output", `"value":.*EventTime - \d+`)
52+
VertexPodLogContains("output", `\\"value\\":.*EventTime - \d+`)
5353

5454
defer w.VertexPodPortForward("input", 8001, dfv1.VertexMetricsPort).
5555
VertexPodPortForward("p1", 8002, dfv1.VertexMetricsPort).
@@ -332,9 +332,9 @@ func (s *FunctionalSuite) TestExponentialBackoffRetryStrategyForPipeline() {
332332
// wait for all the pods to come up
333333
w.Expect().VertexPodsRunning().DaemonPodsRunning()
334334

335-
firstRetryLog := fmt.Sprintf("retry_attempt=%d", 1)
336-
secondRetryLog := fmt.Sprintf("retry_attempt=%d", 2)
337-
thirdRetryLog := fmt.Sprintf("retry_attempt=%d", 3)
335+
firstRetryLog := fmt.Sprintf(`"retry_attempt":"%d"`, 1)
336+
secondRetryLog := fmt.Sprintf(`"retry_attempt":"%d"`, 2)
337+
thirdRetryLog := fmt.Sprintf(`"retry_attempt":"%d"`, 3)
338338
dropLog := "Retries exhausted, dropping messages."
339339
w.Expect().VertexPodLogContains(vertexName, firstRetryLog, PodLogCheckOptionWithContainer("numa"))
340340
w.Expect().VertexPodLogContains(vertexName, secondRetryLog, PodLogCheckOptionWithContainer("numa"))

test/e2e/testdata/simple-pipeline-with-retry-strategy.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ spec:
1818
factor: 2
1919
cap: 3s
2020
jitter: 0
21-
onFailure: 'drop'
21+
onFailure: "drop"
2222
udsink:
2323
container:
2424
image: quay.io/numaio/numaflow-go/sink-failure:stable

test/map-e2e/map_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ limitations under the License.
1919
package sdks_e2e
2020

2121
import (
22+
"testing"
23+
2224
"github.com/stretchr/testify/assert"
2325
"github.com/stretchr/testify/suite"
24-
"testing"
2526

2627
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
2728
daemonclient "github.com/numaproj/numaflow/pkg/daemon/client"
@@ -143,7 +144,7 @@ func (s *MapSuite) TestPipelineRateLimitWithRedisStore() {
143144
_ = client.Close()
144145
}()
145146

146-
w.Expect().VertexPodLogContains("map-udf", "processed=50", PodLogCheckOptionWithContainer("numa"), PodLogCheckOptionWithCount(20))
147+
w.Expect().VertexPodLogContains("map-udf", "processed\":\"50", PodLogCheckOptionWithContainer("numa"), PodLogCheckOptionWithCount(20))
147148
}
148149

149150
func TestMapSuite(t *testing.T) {

test/map-e2e/testdata/rate-limit-redis.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ spec:
66
limits:
77
readBatchSize: 50
88
rateLimit:
9-
max: 100 # 100 TPS maximum
10-
min: 10 # 10 TPS minimum during ramp up
11-
rampUpDuration: 10s # 10 seconds to ramp up from min to max
9+
max: 100 # 100 TPS maximum
10+
min: 10 # 10 TPS minimum during ramp up
11+
rampUpDuration: 10s # 10 seconds to ramp up from min to max
1212
store:
1313
redisStore:
1414
mode: single

test/monovertex-e2e/monovertex_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ func (s *MonoVertexSuite) TestExponentialBackoffRetryStrategy() {
7171
w := s.Given().MonoVertex("@testdata/mono-vertex-exponential-retry-strategy.yaml").When().CreateMonoVertexAndWait()
7272
defer w.DeleteMonoVertexAndWait()
7373
w.Expect().MonoVertexPodsRunning()
74-
firstRetryLog := fmt.Sprintf("retry_attempt=%d", 1)
75-
secondRetryLog := fmt.Sprintf("retry_attempt=%d", 2)
76-
thirdLog := fmt.Sprintf("retry_attempt=%d", 3)
74+
firstRetryLog := fmt.Sprintf(`"retry_attempt":"%d"`, 1)
75+
secondRetryLog := fmt.Sprintf(`"retry_attempt":"%d"`, 2)
76+
thirdLog := fmt.Sprintf(`"retry_attempt":"%d"`, 3)
7777
dropLog := "Retries exhausted, dropping messages."
7878
w.Expect().MonoVertexPodLogContains(firstRetryLog, PodLogCheckOptionWithContainer("numa"))
7979
w.Expect().MonoVertexPodLogContains(secondRetryLog, PodLogCheckOptionWithContainer("numa"))
@@ -102,7 +102,7 @@ func (s *MonoVertexSuite) TestMonoVertexRateLimitWithRedisStore() {
102102
_ = client.Close()
103103
}()
104104

105-
w.Expect().MonoVertexPodLogContains("processed=50", PodLogCheckOptionWithContainer("numa"), PodLogCheckOptionWithCount(20))
105+
w.Expect().MonoVertexPodLogContains("\"processed\":\"50", PodLogCheckOptionWithContainer("numa"), PodLogCheckOptionWithCount(20))
106106
}
107107

108108
func TestMonoVertexSuite(t *testing.T) {

test/monovertex-e2e/testdata/mono-vertex-exponential-retry-strategy.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
apiVersion: numaflow.numaproj.io/v1alpha1
2-
kind: Pipeline
2+
kind: MonoVertex
33
metadata:
44
name: retry-backoff-mvtx
55
spec:
@@ -15,8 +15,8 @@ spec:
1515
factor: 2
1616
cap: 3s
1717
jitter: 0
18-
onFailure: 'drop'
18+
onFailure: "drop"
1919
udsink:
2020
container:
2121
image: quay.io/numaio/numaflow-go/sink-retry-e2e:stable
22-
imagePullPolicy: IfNotPresent
22+
imagePullPolicy: IfNotPresent

0 commit comments

Comments
 (0)