Skip to content

Commit cf3f038

Browse files
Merge branch 'main' into oidc-fix-userinfo
2 parents df6ef56 + dc04eb3 commit cf3f038

File tree

22 files changed

+242
-249
lines changed

22 files changed

+242
-249
lines changed

src/about.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ pub fn print_about(
112112
current_version,
113113
); // " " " "
114114

115-
if let Some(latest_release) = latest_release {
116-
if latest_release.version > current_version {
117-
print_latest_release(latest_release);
118-
}
115+
if let Some(latest_release) = latest_release
116+
&& latest_release.version > current_version
117+
{
118+
print_latest_release(latest_release);
119119
}
120120

121121
eprintln!(

src/catalog/manifest.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,12 @@ pub fn create_from_parquet_file(
112112
let columns = column_statistics(row_groups);
113113
manifest_file.columns = columns.into_values().collect();
114114
let mut sort_orders = sort_order(row_groups);
115-
if let Some(last_sort_order) = sort_orders.pop() {
116-
if sort_orders
115+
if let Some(last_sort_order) = sort_orders.pop()
116+
&& sort_orders
117117
.into_iter()
118118
.all(|sort_order| sort_order == last_sort_order)
119-
{
120-
manifest_file.sort_order_id = last_sort_order;
121-
}
119+
{
120+
manifest_file.sort_order_id = last_sort_order;
122121
}
123122

124123
Ok(manifest_file)

src/catalog/mod.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -268,27 +268,27 @@ async fn create_manifest(
268268
..Manifest::default()
269269
};
270270
let mut first_event_at = PARSEABLE.get_stream(stream_name)?.get_first_event();
271-
if first_event_at.is_none() {
272-
if let Some(first_event) = manifest.files.first() {
273-
let time_partition = &meta.time_partition;
274-
let lower_bound = match time_partition {
275-
Some(time_partition) => {
276-
let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string());
277-
lower_bound
278-
}
279-
None => {
280-
let (lower_bound, _) =
281-
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
282-
lower_bound
283-
}
284-
};
285-
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
286-
match PARSEABLE.get_stream(stream_name) {
287-
Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()),
288-
Err(err) => error!(
289-
"Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}"
290-
),
271+
if first_event_at.is_none()
272+
&& let Some(first_event) = manifest.files.first()
273+
{
274+
let time_partition = &meta.time_partition;
275+
let lower_bound = match time_partition {
276+
Some(time_partition) => {
277+
let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string());
278+
lower_bound
291279
}
280+
None => {
281+
let (lower_bound, _) =
282+
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
283+
lower_bound
284+
}
285+
};
286+
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
287+
match PARSEABLE.get_stream(stream_name) {
288+
Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()),
289+
Err(err) => error!(
290+
"Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}"
291+
),
292292
}
293293
}
294294

@@ -366,8 +366,8 @@ pub async fn get_first_event(
366366
Mode::All | Mode::Ingest => {
367367
// get current snapshot
368368
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
369-
if stream_first_event.is_some() {
370-
first_event_at = stream_first_event.unwrap();
369+
if let Some(first_event) = stream_first_event {
370+
first_event_at = first_event;
371371
} else {
372372
let mut meta = storage.get_object_store_format(stream_name).await?;
373373
let meta_clone = meta.clone();

src/event/format/json.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,8 @@ fn valid_type(
315315

316316
fn validate_int(value: &Value, static_schema_flag: bool) -> bool {
317317
// allow casting string to int for static schema
318-
if static_schema_flag {
319-
if let Value::String(s) = value {
320-
return s.trim().parse::<i64>().is_ok();
321-
}
318+
if static_schema_flag && let Value::String(s) = value {
319+
return s.trim().parse::<i64>().is_ok();
322320
}
323321
value.is_i64()
324322
}

src/handlers/http/audit.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ pub async fn audit_log_middleware(
5151
log_builder = log_builder.with_stream(message.common_attributes.x_p_stream);
5252
} else if let Some(stream) = req.match_info().get("logstream") {
5353
log_builder = log_builder.with_stream(stream);
54-
} else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) {
55-
if let Ok(stream) = value.to_str() {
56-
log_builder = log_builder.with_stream(stream);
57-
}
54+
} else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY)
55+
&& let Ok(stream) = value.to_str()
56+
{
57+
log_builder = log_builder.with_stream(stream);
5858
}
5959

6060
// Get username and authorization method

src/handlers/http/cluster/mod.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,25 +1073,25 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
10731073
.run(move || async {
10741074
let result: Result<(), PostError> = async {
10751075
let cluster_metrics = fetch_cluster_metrics().await;
1076-
if let Ok(metrics) = cluster_metrics {
1077-
if !metrics.is_empty() {
1078-
info!("Cluster metrics fetched successfully from all ingestors");
1079-
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
1080-
if matches!(
1081-
ingest_internal_stream(
1082-
INTERNAL_STREAM_NAME.to_string(),
1083-
bytes::Bytes::from(metrics_bytes),
1084-
)
1085-
.await,
1086-
Ok(())
1087-
) {
1088-
info!("Cluster metrics successfully ingested into internal stream");
1089-
} else {
1090-
error!("Failed to ingest cluster metrics into internal stream");
1091-
}
1076+
if let Ok(metrics) = cluster_metrics
1077+
&& !metrics.is_empty()
1078+
{
1079+
info!("Cluster metrics fetched successfully from all ingestors");
1080+
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
1081+
if matches!(
1082+
ingest_internal_stream(
1083+
INTERNAL_STREAM_NAME.to_string(),
1084+
bytes::Bytes::from(metrics_bytes),
1085+
)
1086+
.await,
1087+
Ok(())
1088+
) {
1089+
info!("Cluster metrics successfully ingested into internal stream");
10921090
} else {
1093-
error!("Failed to serialize cluster metrics");
1091+
error!("Failed to ingest cluster metrics into internal stream");
10941092
}
1093+
} else {
1094+
error!("Failed to serialize cluster metrics");
10951095
}
10961096
}
10971097
Ok(())
@@ -1186,21 +1186,21 @@ async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
11861186
});
11871187

11881188
// Find the next available querier using round-robin strategy
1189-
if let Some(selected_domain) = select_next_querier(&mut map).await {
1190-
if let Some(status) = map.get_mut(&selected_domain) {
1191-
status.available = false;
1192-
status.last_used = Some(Instant::now());
1193-
return Ok(status.metadata.clone());
1194-
}
1189+
if let Some(selected_domain) = select_next_querier(&mut map).await
1190+
&& let Some(status) = map.get_mut(&selected_domain)
1191+
{
1192+
status.available = false;
1193+
status.last_used = Some(Instant::now());
1194+
return Ok(status.metadata.clone());
11951195
}
11961196

11971197
// If no querier is available, use least-recently-used strategy
1198-
if let Some(selected_domain) = select_least_recently_used_querier(&mut map) {
1199-
if let Some(status) = map.get_mut(&selected_domain) {
1200-
status.available = false;
1201-
status.last_used = Some(Instant::now());
1202-
return Ok(status.metadata.clone());
1203-
}
1198+
if let Some(selected_domain) = select_least_recently_used_querier(&mut map)
1199+
&& let Some(status) = map.get_mut(&selected_domain)
1200+
{
1201+
status.available = false;
1202+
status.last_used = Some(Instant::now());
1203+
return Ok(status.metadata.clone());
12041204
}
12051205

12061206
// If no querier is available, return an error

src/handlers/http/logstream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
6868
)
6969
}
7070

71-
if let Some(hot_tier_manager) = HotTierManager::global() {
72-
if hot_tier_manager.check_stream_hot_tier_exists(&stream_name) {
73-
hot_tier_manager.delete_hot_tier(&stream_name).await?;
74-
}
71+
if let Some(hot_tier_manager) = HotTierManager::global()
72+
&& hot_tier_manager.check_stream_hot_tier_exists(&stream_name)
73+
{
74+
hot_tier_manager.delete_hot_tier(&stream_name).await?;
7575
}
7676

7777
// Delete from memory

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
7676
)
7777
}
7878

79-
if let Some(hot_tier_manager) = HotTierManager::global() {
80-
if hot_tier_manager.check_stream_hot_tier_exists(&stream_name) {
81-
hot_tier_manager.delete_hot_tier(&stream_name).await?;
82-
}
79+
if let Some(hot_tier_manager) = HotTierManager::global()
80+
&& hot_tier_manager.check_stream_hot_tier_exists(&stream_name)
81+
{
82+
hot_tier_manager.delete_hot_tier(&stream_name).await?;
8383
}
8484

8585
let ingestor_metadata: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Ingestor)

src/handlers/http/modal/ssl_acceptor.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@ pub fn get_ssl_acceptor(
3838

3939
let mut certs = rustls_pemfile::certs(cert_file).collect::<Result<Vec<_>, _>>()?;
4040
// Load CA certificates from the directory
41-
if let Some(other_cert_dir) = other_certs {
42-
if other_cert_dir.is_dir() {
43-
for entry in fs::read_dir(other_cert_dir)? {
44-
let path = entry.unwrap().path();
41+
if let Some(other_cert_dir) = other_certs
42+
&& other_cert_dir.is_dir()
43+
{
44+
for entry in fs::read_dir(other_cert_dir)? {
45+
let path = entry.unwrap().path();
4546

46-
if path.is_file() {
47-
let other_cert_file = &mut BufReader::new(File::open(&path)?);
48-
let mut other_certs = rustls_pemfile::certs(other_cert_file)
49-
.collect::<Result<Vec<_>, _>>()?;
50-
certs.append(&mut other_certs);
51-
}
47+
if path.is_file() {
48+
let other_cert_file = &mut BufReader::new(File::open(&path)?);
49+
let mut other_certs = rustls_pemfile::certs(other_cert_file)
50+
.collect::<Result<Vec<_>, _>>()?;
51+
certs.append(&mut other_certs);
5252
}
5353
}
5454
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -174,34 +174,35 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin
174174
}
175175

176176
let header_name = header_name.as_str();
177-
if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
178-
if let Ok(value) = header_value.to_str() {
179-
let key = header_name.trim_start_matches("x-p-");
180-
if !key.is_empty() {
181-
// Truncate value if it exceeds the maximum length
182-
let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH {
183-
warn!(
184-
"Header value for '{}' exceeds maximum length, truncating",
185-
header_name
186-
);
187-
&value[..MAX_FIELD_VALUE_LENGTH]
188-
} else {
189-
value
190-
};
191-
p_custom_fields.insert(key.to_string(), truncated_value.to_string());
192-
} else {
177+
if header_name.starts_with("x-p-")
178+
&& !IGNORE_HEADERS.contains(&header_name)
179+
&& let Ok(value) = header_value.to_str()
180+
{
181+
let key = header_name.trim_start_matches("x-p-");
182+
if !key.is_empty() {
183+
// Truncate value if it exceeds the maximum length
184+
let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH {
193185
warn!(
194-
"Ignoring header with empty key after prefix: {}",
186+
"Header value for '{}' exceeds maximum length, truncating",
195187
header_name
196188
);
197-
}
189+
&value[..MAX_FIELD_VALUE_LENGTH]
190+
} else {
191+
value
192+
};
193+
p_custom_fields.insert(key.to_string(), truncated_value.to_string());
194+
} else {
195+
warn!(
196+
"Ignoring header with empty key after prefix: {}",
197+
header_name
198+
);
198199
}
199200
}
200201

201-
if header_name == LOG_SOURCE_KEY {
202-
if let Ok(value) = header_value.to_str() {
203-
p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string());
204-
}
202+
if header_name == LOG_SOURCE_KEY
203+
&& let Ok(value) = header_value.to_str()
204+
{
205+
p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string());
205206
}
206207
}
207208

0 commit comments

Comments
 (0)