|
1 | | -use std::{ |
2 | | - collections::HashMap, |
3 | | - hash::{Hash, Hasher}, |
4 | | - ops::RangeInclusive, |
5 | | - time::Duration, |
6 | | -}; |
| 1 | +use std::{collections::HashMap, ops::RangeInclusive, time::Duration}; |
7 | 2 |
|
8 | | -use ahash::AHasher; |
| 3 | +use ahash::RandomState; |
9 | 4 | use alloy::primitives::{BlockHash, BlockNumber}; |
10 | 5 | use arrow::{datatypes::Schema, error::ArrowError}; |
11 | 6 | use arrow_flight::{ |
@@ -109,9 +104,15 @@ impl Client for FlightClient { |
109 | 104 | request_metadata: Option<RequestMetadata>, |
110 | 105 | ) -> BoxStream<'static, Result<ResponseBatch, Self::Error>> { |
111 | 106 | let query = query.to_string(); |
| 107 | + |
| 108 | + // Generates a hash from the SQL query for log correlation. |
| 109 | + // The hash allows connecting related logs without including the full SQL query in every log message. |
| 110 | + // Constant seeds ensure consistent hashes for the same query. |
| 111 | + let hasher = RandomState::with_seeds(0, 0, 0, 0); |
| 112 | + |
112 | 113 | let logger = logger |
113 | 114 | .component("AmpFlightClient") |
114 | | - .new(slog::o!("query_id" => query_id(&query))); |
| 115 | + .new(slog::o!("query_hash" => hasher.hash_one(&query))); |
115 | 116 |
|
116 | 117 | let mut raw_client = self.raw_client(); |
117 | 118 | let mut prev_block_ranges: Vec<BlockRange> = Vec::new(); |
@@ -306,16 +307,6 @@ impl From<ResumeStreamingQuery> for BlockRange { |
306 | 307 | } |
307 | 308 | } |
308 | 309 |
|
309 | | -/// Generates an ID from a SQL query for log correlation. |
310 | | -/// |
311 | | -/// The ID allows connecting related logs without including the full SQL |
312 | | -/// query in every log message. |
313 | | -fn query_id(query: &str) -> u32 { |
314 | | - let mut hasher = AHasher::default(); |
315 | | - query.hash(&mut hasher); |
316 | | - hasher.finish() as u32 |
317 | | -} |
318 | | - |
319 | 310 | /// Serializes the information required to resume a streaming SQL query to JSON. |
320 | 311 | fn serialize_resume_streaming_query(resume_streaming_query: Vec<ResumeStreamingQuery>) -> String { |
321 | 312 | #[derive(Serialize)] |
|
0 commit comments