Skip to content

Commit e7afdba

Browse files
committed
fix issues in the schema
1 parent 50d8d4b commit e7afdba

File tree

3 files changed

+48
-67
lines changed

3 files changed

+48
-67
lines changed

schemas/otel_logs_and_spans.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ fields:
5252
data_type: Utf8
5353
nullable: true
5454
- name: severity___severity_number
55-
data_type: Utf8
55+
data_type: Int32
5656
nullable: true
5757
- name: body
5858
data_type: Utf8
@@ -133,16 +133,16 @@ fields:
133133
data_type: Int32
134134
nullable: true
135135
- name: attributes___code___file___path
136-
data_type: Int32
136+
data_type: Utf8
137137
nullable: true
138138
- name: attributes___code___function___name
139-
data_type: Int32
139+
data_type: Utf8
140140
nullable: true
141141
- name: attributes___code___line___number
142142
data_type: Int32
143143
nullable: true
144144
- name: attributes___code___stacktrace
145-
data_type: Int32
145+
data_type: Utf8
146146
nullable: true
147147
- name: attributes___log__record___original
148148
data_type: Utf8

src/pgwire_handlers.rs

Lines changed: 30 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
use async_trait::async_trait;
22
use datafusion::execution::context::SessionContext;
3-
use datafusion_postgres::{DfSessionService, auth::AuthManager};
4-
use datafusion_postgres::pgwire::api::auth::{StartupHandler, noop::NoopStartupHandler};
5-
use datafusion_postgres::pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
6-
use datafusion_postgres::pgwire::api::results::{Response, DescribeStatementResponse, DescribePortalResponse};
7-
use datafusion_postgres::pgwire::api::{ClientInfo, PgWireServerHandlers, ErrorHandler};
3+
use datafusion_postgres::pgwire::api::auth::{noop::NoopStartupHandler, StartupHandler};
84
use datafusion_postgres::pgwire::api::portal::Portal;
5+
use datafusion_postgres::pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
6+
use datafusion_postgres::pgwire::api::results::{DescribePortalResponse, DescribeStatementResponse, Response};
97
use datafusion_postgres::pgwire::api::stmt::StoredStatement;
108
use datafusion_postgres::pgwire::api::store::PortalStore;
119
use datafusion_postgres::pgwire::api::ClientPortalStore;
12-
use datafusion_postgres::pgwire::error::{PgWireResult, PgWireError};
10+
use datafusion_postgres::pgwire::api::{ClientInfo, ErrorHandler, PgWireServerHandlers};
11+
use datafusion_postgres::pgwire::error::{PgWireError, PgWireResult};
1312
use datafusion_postgres::pgwire::messages::PgWireBackendMessage;
13+
use datafusion_postgres::{auth::AuthManager, DfSessionService};
1414
use futures::Sink;
15-
use std::sync::Arc;
1615
use std::fmt::Debug;
17-
use tracing::{info, instrument, Instrument};
16+
use std::sync::Arc;
1817
use tracing::field::Empty;
18+
use tracing::{info, instrument, Instrument};
1919

2020
/// Custom handler factory that creates handlers which log UPDATE queries
2121
pub struct LoggingHandlerFactory {
@@ -25,10 +25,7 @@ pub struct LoggingHandlerFactory {
2525

2626
impl LoggingHandlerFactory {
2727
pub fn new(session_context: Arc<SessionContext>, auth_manager: Arc<AuthManager>) -> Self {
28-
Self {
29-
session_context,
30-
auth_manager,
31-
}
28+
Self { session_context, auth_manager }
3229
}
3330
}
3431

@@ -40,17 +37,11 @@ impl NoopStartupHandler for SimpleStartupHandler {}
4037

4138
impl PgWireServerHandlers for LoggingHandlerFactory {
4239
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
43-
Arc::new(LoggingSimpleQueryHandler::new(
44-
self.session_context.clone(),
45-
self.auth_manager.clone(),
46-
))
40+
Arc::new(LoggingSimpleQueryHandler::new(self.session_context.clone(), self.auth_manager.clone()))
4741
}
4842

4943
fn extended_query_handler(&self) -> Arc<impl ExtendedQueryHandler> {
50-
Arc::new(LoggingExtendedQueryHandler::new(
51-
self.session_context.clone(),
52-
self.auth_manager.clone(),
53-
))
44+
Arc::new(LoggingExtendedQueryHandler::new(self.session_context.clone(), self.auth_manager.clone()))
5445
}
5546

5647
fn startup_handler(&self) -> Arc<impl StartupHandler> {
@@ -100,18 +91,14 @@ impl SimpleQueryHandler for LoggingSimpleQueryHandler {
10091
db.operation = Empty,
10192
)
10293
)]
103-
async fn do_query<'a, C>(
104-
&self,
105-
client: &mut C,
106-
query: &str,
107-
) -> PgWireResult<Vec<Response<'a>>>
94+
async fn do_query<'a, C>(&self, client: &mut C, query: &str) -> PgWireResult<Vec<Response<'a>>>
10895
where
10996
C: ClientInfo + ClientPortalStore + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
11097
C::Error: Debug,
11198
PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
11299
{
113100
let span = tracing::Span::current();
114-
101+
115102
// Determine query type and operation
116103
let query_lower = query.trim().to_lowercase();
117104
let (query_type, operation) = if query_lower.starts_with("select") || query_lower.contains(" select ") {
@@ -131,25 +118,23 @@ impl SimpleQueryHandler for LoggingSimpleQueryHandler {
131118
} else {
132119
("OTHER", "UNKNOWN")
133120
};
134-
121+
135122
span.record("query.type", query_type);
136123
span.record("query.operation", operation);
137124
span.record("db.operation", operation);
138-
125+
139126
// Truncate sensitive data from DML queries
140127
let sanitized_query = match operation {
141128
"INSERT" => query_lower.find(" values").map(|i| format!("{} VALUES ...", &query[..i])).unwrap_or_else(|| query.to_string()),
142129
"UPDATE" => query_lower.find(" set").map(|i| format!("{} SET ...", &query[..i])).unwrap_or_else(|| query.to_string()),
143130
_ => query.to_string(),
144131
};
145132
span.record("query.text", &sanitized_query.as_str());
146-
133+
147134
// Delegate to inner handler with the span context
148135
// Use the current span as parent to ensure proper context propagation
149136
let execute_span = tracing::trace_span!(parent: &span, "datafusion.execute");
150-
<DfSessionService as SimpleQueryHandler>::do_query(&self.inner, client, query)
151-
.instrument(execute_span)
152-
.await
137+
<DfSessionService as SimpleQueryHandler>::do_query(&self.inner, client, query).instrument(execute_span).await
153138
}
154139
}
155140

@@ -175,11 +160,7 @@ impl ExtendedQueryHandler for LoggingExtendedQueryHandler {
175160
self.inner.query_parser()
176161
}
177162

178-
async fn do_describe_statement<C>(
179-
&self,
180-
client: &mut C,
181-
statement: &StoredStatement<Self::Statement>,
182-
) -> PgWireResult<DescribeStatementResponse>
163+
async fn do_describe_statement<C>(&self, client: &mut C, statement: &StoredStatement<Self::Statement>) -> PgWireResult<DescribeStatementResponse>
183164
where
184165
C: ClientInfo + ClientPortalStore + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
185166
C::PortalStore: PortalStore<Statement = Self::Statement>,
@@ -189,11 +170,7 @@ impl ExtendedQueryHandler for LoggingExtendedQueryHandler {
189170
self.inner.do_describe_statement(client, statement).await
190171
}
191172

192-
async fn do_describe_portal<C>(
193-
&self,
194-
client: &mut C,
195-
portal: &Portal<Self::Statement>,
196-
) -> PgWireResult<DescribePortalResponse>
173+
async fn do_describe_portal<C>(&self, client: &mut C, portal: &Portal<Self::Statement>) -> PgWireResult<DescribePortalResponse>
197174
where
198175
C: ClientInfo + ClientPortalStore + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
199176
C::PortalStore: PortalStore<Statement = Self::Statement>,
@@ -216,23 +193,18 @@ impl ExtendedQueryHandler for LoggingExtendedQueryHandler {
216193
db.operation = Empty,
217194
)
218195
)]
219-
async fn do_query<'a, C>(
220-
&self,
221-
client: &mut C,
222-
portal: &Portal<Self::Statement>,
223-
max_rows: usize,
224-
) -> PgWireResult<Response<'a>>
196+
async fn do_query<'a, C>(&self, client: &mut C, portal: &Portal<Self::Statement>, max_rows: usize) -> PgWireResult<Response<'a>>
225197
where
226198
C: ClientInfo + ClientPortalStore + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
227199
C::PortalStore: PortalStore<Statement = Self::Statement>,
228200
C::Error: Debug,
229201
PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
230202
{
231203
let span = tracing::Span::current();
232-
204+
233205
// Get query text and determine type
234206
let query = &portal.statement.statement.0;
235-
207+
236208
let query_lower = query.trim().to_lowercase();
237209
let (query_type, operation) = if query_lower.starts_with("select") || query_lower.contains(" select ") {
238210
("SELECT", "SELECT")
@@ -251,19 +223,19 @@ impl ExtendedQueryHandler for LoggingExtendedQueryHandler {
251223
} else {
252224
("OTHER", "UNKNOWN")
253225
};
254-
226+
255227
span.record("query.type", query_type);
256228
span.record("query.operation", operation);
257229
span.record("db.operation", operation);
258-
230+
259231
// Truncate sensitive data from DML queries
260232
let sanitized_query = match operation {
261233
"INSERT" => query_lower.find(" values").map(|i| format!("{} VALUES ...", &query[..i])).unwrap_or_else(|| query.to_string()),
262234
"UPDATE" => query_lower.find(" set").map(|i| format!("{} SET ...", &query[..i])).unwrap_or_else(|| query.to_string()),
263235
_ => query.to_string(),
264236
};
265237
span.record("query.text", &sanitized_query.as_str());
266-
238+
267239
// Delegate to inner handler with the span context
268240
// Use the current span as parent to ensure proper context propagation
269241
let execute_span = tracing::trace_span!(parent: &span, "datafusion.execute");
@@ -275,14 +247,13 @@ impl ExtendedQueryHandler for LoggingExtendedQueryHandler {
275247

276248
/// Start the server with custom handlers that log UPDATE queries
277249
pub async fn serve_with_logging(
278-
session_context: Arc<SessionContext>,
279-
options: &datafusion_postgres::ServerOptions,
280-
auth_manager: Arc<AuthManager>,
250+
session_context: Arc<SessionContext>, options: &datafusion_postgres::ServerOptions, auth_manager: Arc<AuthManager>,
281251
) -> Result<(), Box<dyn std::error::Error>> {
282252
let handlers = Arc::new(LoggingHandlerFactory::new(session_context, auth_manager));
283-
253+
284254
// Use datafusion-postgres's serve_with_handlers
285255
datafusion_postgres::serve_with_handlers(handlers, options).await?;
286-
256+
287257
Ok(())
288-
}
258+
}
259+

src/telemetry.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,22 @@ pub fn init_telemetry() -> anyhow::Result<()> {
6464
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
6565

6666
// Initialize tracing subscriber with telemetry and formatting layers
67+
let is_json = env::var("LOG_FORMAT").unwrap_or_default() == "json";
68+
6769
let subscriber = Registry::default()
6870
.with(env_filter)
69-
.with(telemetry_layer)
70-
.with(tracing_subscriber::fmt::layer().json().with_target(true).with_thread_ids(true).with_thread_names(true));
71-
72-
subscriber.try_init().map_err(|e| anyhow::anyhow!("Failed to set tracing subscriber: {}", e))?;
71+
.with(telemetry_layer);
72+
73+
if is_json {
74+
subscriber
75+
.with(tracing_subscriber::fmt::layer().json().with_target(true).with_thread_ids(true).with_thread_names(true))
76+
.try_init()
77+
} else {
78+
subscriber
79+
.with(tracing_subscriber::fmt::layer().with_target(true).with_thread_ids(true).with_thread_names(true))
80+
.try_init()
81+
}
82+
.map_err(|e| anyhow::anyhow!("Failed to set tracing subscriber: {}", e))?;
7383

7484
info!("OpenTelemetry initialized successfully with service name: {}", service_name);
7585

0 commit comments

Comments
 (0)