|
| 1 | +use std::collections::HashSet; |
| 2 | +use std::env; |
| 3 | +use std::sync::Arc; |
| 4 | + |
1 | 5 | use assert_matches::assert_matches;
|
2 | 6 | use futures::TryStreamExt;
|
3 |
| -use scylla::errors::PagerExecutionError; |
| 7 | +use scylla::client::session_builder::SessionBuilder; |
| 8 | +use scylla::errors::{ExecutionError, PagerExecutionError, RequestAttemptError}; |
| 9 | +use scylla::observability::history::{HistoryCollector, StructuredHistory}; |
| 10 | +use scylla::observability::tracing::TracingInfo; |
4 | 11 | use scylla::{
|
5 | 12 | batch::{Batch, BatchType},
|
6 | 13 | client::session::Session,
|
7 | 14 | query::Query,
|
8 | 15 | };
|
9 | 16 | use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};
|
| 17 | +use scylla_proxy::{Condition, RequestReaction, RequestRule, ShardAwareness}; |
| 18 | +use uuid::Uuid; |
10 | 19 |
|
11 |
| -use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; |
| 20 | +use crate::utils::{ |
| 21 | + create_new_session_builder, setup_tracing, test_with_3_node_cluster, unique_keyspace_name, |
| 22 | + PerformDDL, |
| 23 | +}; |
12 | 24 |
|
13 | 25 | const PAGE_SIZE: i32 = 100;
|
14 | 26 | const ROWS_PER_PARTITION: i32 = 1000;
|
@@ -295,3 +307,127 @@ async fn execute_iter_serialization_error() {
|
295 | 307 | Err(PagerExecutionError::SerializationError(_))
|
296 | 308 | )
|
297 | 309 | }
|
| 310 | + |
| 311 | +#[tokio::test] |
| 312 | +async fn should_expose_execution_info_on_exceptions() { |
| 313 | + setup_tracing(); |
| 314 | + let res = test_with_3_node_cluster( |
| 315 | + ShardAwareness::Unaware, |
| 316 | + |proxy_uris, translation_map, mut running_proxy| async move { |
| 317 | + let session: Session = SessionBuilder::new() |
| 318 | + .known_node(proxy_uris[0].as_str()) |
| 319 | + .address_translator(Arc::new(translation_map)) |
| 320 | + .build() |
| 321 | + .await |
| 322 | + .unwrap(); |
| 323 | + |
| 324 | + let forge_error_rule = RequestRule(Condition::True, RequestReaction::forge().invalid()); |
| 325 | + running_proxy.running_nodes[0] |
| 326 | + .change_request_rules(Some(vec![forge_error_rule.clone()])); |
| 327 | + running_proxy.running_nodes[1] |
| 328 | + .change_request_rules(Some(vec![forge_error_rule.clone()])); |
| 329 | + running_proxy.running_nodes[2] |
| 330 | + .change_request_rules(Some(vec![forge_error_rule.clone()])); |
| 331 | + |
| 332 | + let mut query = Query::from("select * from foo"); |
| 333 | + let history_listener = Arc::new(HistoryCollector::new()); |
| 334 | + query.set_history_listener(history_listener.clone()); |
| 335 | + let err = session |
| 336 | + .query_unpaged(query, &[]) |
| 337 | + .await |
| 338 | + .expect_err("expecting error"); |
| 339 | + let structured_history: StructuredHistory = history_listener.clone_structured_history(); |
| 340 | + |
| 341 | + assert_matches!( |
| 342 | + err, |
| 343 | + ExecutionError::LastAttemptError(RequestAttemptError::DbError(_, _)) |
| 344 | + ); |
| 345 | + assert_eq!(structured_history.requests.len(), 1); |
| 346 | + running_proxy |
| 347 | + }, |
| 348 | + ) |
| 349 | + .await; |
| 350 | + |
| 351 | + match res { |
| 352 | + Ok(()) => (), |
| 353 | + Err(err) => panic!("{}", err), |
| 354 | + } |
| 355 | +} |
| 356 | + |
| 357 | +#[tokio::test] |
| 358 | +async fn test_session_should_have_metadata() { |
| 359 | + setup_tracing(); |
| 360 | + let session = create_new_session_builder().build().await.unwrap(); |
| 361 | + let state = session.get_cluster_state(); |
| 362 | + let keys = ["SCYLLA_URI", "SCYLLA_URI2", "SCYLLA_URI3"]; |
| 363 | + let expected_addresses: HashSet<String> = keys |
| 364 | + .iter() |
| 365 | + .map(|key| env::var(key).unwrap_or_else(|_| panic!("{} not set", key))) |
| 366 | + .collect(); |
| 367 | + |
| 368 | + let got_addresses: HashSet<String> = state |
| 369 | + .get_nodes_info() |
| 370 | + .iter() |
| 371 | + .map(|node| node.address.to_string()) |
| 372 | + .collect(); |
| 373 | + |
| 374 | + assert_eq!( |
| 375 | + got_addresses, expected_addresses, |
| 376 | + "Cluster node addresses do not match environment variables" |
| 377 | + ); |
| 378 | +} |
| 379 | + |
| 380 | +#[tokio::test] |
| 381 | +async fn test_should_not_have_tracing_id_when_tracing_disabled() { |
| 382 | + setup_tracing(); |
| 383 | + let session = create_new_session_builder().build().await.unwrap(); |
| 384 | + let query: Query = Query::new("SELECT release_version FROM system.local"); |
| 385 | + |
| 386 | + let result = session.query_unpaged(query, &[]).await.unwrap(); |
| 387 | + let tracing_id: Option<Uuid> = result.tracing_id(); |
| 388 | + assert!(tracing_id.is_none()); |
| 389 | +} |
| 390 | + |
| 391 | +#[tokio::test] |
| 392 | +async fn test_should_fetch_trace_when_tracing_enabled() { |
| 393 | + let session = create_new_session_builder().build().await.unwrap(); |
| 394 | + |
| 395 | + let mut query = Query::from("SELECT release_version FROM system.local"); |
| 396 | + query.set_tracing(true); |
| 397 | + |
| 398 | + let result = session.query_unpaged(query, &[]).await.unwrap(); |
| 399 | + let tracing_id: Option<Uuid> = result.tracing_id(); |
| 400 | + assert!(tracing_id.is_some()); |
| 401 | + let tracing_info: TracingInfo = session |
| 402 | + .get_tracing_info(&tracing_id.unwrap()) |
| 403 | + .await |
| 404 | + .unwrap(); |
| 405 | + |
| 406 | + // Verify trace information is present and has expected format |
| 407 | + assert!(!tracing_info.events.is_empty()); |
| 408 | + |
| 409 | + // Check if the request type matches |
| 410 | + assert_eq!(tracing_info.request.unwrap(), "Execute CQL3 query"); |
| 411 | + |
| 412 | + // Verify duration is positive |
| 413 | + assert!(tracing_info.duration.unwrap() > 0); |
| 414 | + |
| 415 | + // Verify started_at timestamp is present |
| 416 | + assert!(tracing_info.started_at.unwrap().0 > 0); |
| 417 | + |
| 418 | + // Check parameters |
| 419 | + assert!(tracing_info |
| 420 | + .parameters |
| 421 | + .clone() |
| 422 | + .unwrap() |
| 423 | + .contains_key("consistency_level")); |
| 424 | + assert!(tracing_info.parameters.unwrap().contains_key("query")); |
| 425 | + |
| 426 | + // Check events |
| 427 | + for event in tracing_info.events { |
| 428 | + assert!(!event.activity.clone().unwrap().is_empty()); |
| 429 | + assert!(event.source.is_some()); |
| 430 | + assert!(event.source_elapsed.unwrap() >= 0); |
| 431 | + assert!(!event.activity.unwrap().is_empty()); |
| 432 | + } |
| 433 | +} |
0 commit comments