|
19 | 19 |
|
20 | 20 | use quickwit_proto::{LeafSearchStreamRequest, LeafSearchStreamResponse}; |
21 | 21 | use tokio::sync::mpsc::error::SendError; |
| 22 | +use tracing::warn; |
22 | 23 |
|
23 | 24 | use super::RetryPolicy; |
24 | 25 |
|
|
56 | 57 | }); |
57 | 58 | Some(request) |
58 | 59 | } |
59 | | - Err(_) => Some(request), |
| 60 | + Err(SendError(_)) => { |
| 61 | + // The receiver channel was dropped. |
| 62 | + // There is no need to retry. |
| 63 | + warn!( |
| 64 | + "Receiver channel closed during stream search request. The client probably \ |
| 65 | + closed the connection?" |
| 66 | + ); |
| 67 | + None |
| 68 | + } |
60 | 69 | } |
61 | 70 | } |
62 | 71 | } |
63 | 72 |
|
64 | 73 | #[cfg(test)] |
65 | 74 | mod tests { |
66 | | - use quickwit_proto::{ |
67 | | - LeafSearchRequest, LeafSearchResponse, SearchRequest, SplitIdAndFooterOffsets, |
68 | | - SplitSearchError, |
69 | | - }; |
| 75 | + use quickwit_proto::{LeafSearchStreamRequest, LeafSearchStreamResponse}; |
| 76 | + use tokio::sync::mpsc::error::SendError; |
70 | 77 |
|
71 | | - use crate::retry::search::LeafSearchRetryPolicy; |
| 78 | + use crate::retry::search_stream::LeafSearchStreamRetryPolicy; |
72 | 79 | use crate::retry::RetryPolicy; |
73 | | - use crate::SearchError; |
74 | | - |
75 | | - fn mock_leaf_search_request() -> LeafSearchRequest { |
76 | | - LeafSearchRequest { |
77 | | - search_request: Some(SearchRequest { |
78 | | - index_id: "test-idx".to_string(), |
79 | | - query: "test".to_string(), |
80 | | - search_fields: vec!["body".to_string()], |
81 | | - start_timestamp: None, |
82 | | - end_timestamp: None, |
83 | | - max_hits: 10, |
84 | | - start_offset: 0, |
85 | | - ..Default::default() |
86 | | - }), |
87 | | - doc_mapper: "doc_mapper".to_string(), |
88 | | - index_uri: "uri".to_string(), |
89 | | - split_offsets: vec![ |
90 | | - SplitIdAndFooterOffsets { |
91 | | - split_id: "split_1".to_string(), |
92 | | - split_footer_end: 100, |
93 | | - split_footer_start: 0, |
94 | | - }, |
95 | | - SplitIdAndFooterOffsets { |
96 | | - split_id: "split_2".to_string(), |
97 | | - split_footer_end: 100, |
98 | | - split_footer_start: 0, |
99 | | - }, |
100 | | - ], |
101 | | - } |
102 | | - } |
103 | 80 |
|
104 | | - #[test] |
105 | | - fn test_should_retry_on_error() -> anyhow::Result<()> { |
106 | | - let retry_policy = LeafSearchRetryPolicy {}; |
107 | | - let request = mock_leaf_search_request(); |
108 | | - let result = Result::<LeafSearchResponse, SearchError>::Err(SearchError::InternalError( |
109 | | - "test".to_string(), |
110 | | - )); |
111 | | - let retry_req_opt = retry_policy.retry_request(request, result.as_ref()); |
112 | | - assert!(retry_req_opt.is_some()); |
113 | | - Ok(()) |
114 | | - } |
115 | | - |
116 | | - #[test] |
117 | | - fn test_should_not_retry_if_result_is_ok_and_no_failing_splits() -> anyhow::Result<()> { |
118 | | - let retry_policy = LeafSearchRetryPolicy {}; |
119 | | - let request = mock_leaf_search_request(); |
120 | | - let leaf_response = LeafSearchResponse { |
121 | | - num_hits: 0, |
122 | | - partial_hits: vec![], |
123 | | - failed_splits: vec![], |
124 | | - num_attempted_splits: 1, |
125 | | - }; |
126 | | - let result = Result::<LeafSearchResponse, SearchError>::Ok(leaf_response); |
127 | | - let retry_req_opt = retry_policy.retry_request(request, result.as_ref()); |
| 81 | + #[tokio::test] |
| 82 | + async fn test_retry_policy_search_stream_should_not_retry_on_send_error() { |
| 83 | + let retry_policy = LeafSearchStreamRetryPolicy {}; |
| 84 | + let leaf_search_stream_req = LeafSearchStreamRequest::default(); |
| 85 | + let leaf_search_stream_res = LeafSearchStreamResponse::default(); |
| 86 | + let leaf_search_stream_send_error = Err(SendError(Ok(leaf_search_stream_res))); |
| 87 | + let retry_req_opt = retry_policy.retry_request( |
| 88 | + leaf_search_stream_req, |
| 89 | + leaf_search_stream_send_error.as_ref(), |
| 90 | + ); |
128 | 91 | assert!(retry_req_opt.is_none()); |
129 | | - Ok(()) |
130 | | - } |
131 | | - |
132 | | - #[test] |
133 | | - fn test_should_retry_on_failed_splits() -> anyhow::Result<()> { |
134 | | - let retry_policy = LeafSearchRetryPolicy {}; |
135 | | - let request = mock_leaf_search_request(); |
136 | | - let mut expected_retry_request = request.clone(); |
137 | | - expected_retry_request.split_offsets.remove(0); |
138 | | - let split_error = SplitSearchError { |
139 | | - error: "error".to_string(), |
140 | | - split_id: "split_2".to_string(), |
141 | | - retryable_error: true, |
142 | | - }; |
143 | | - let leaf_response = LeafSearchResponse { |
144 | | - num_hits: 0, |
145 | | - partial_hits: vec![], |
146 | | - failed_splits: vec![split_error], |
147 | | - num_attempted_splits: 1, |
148 | | - }; |
149 | | - let result = Result::<LeafSearchResponse, SearchError>::Ok(leaf_response); |
150 | | - let retry_request_opt = retry_policy.retry_request(request, result.as_ref()); |
151 | | - assert_eq!(retry_request_opt, Some(expected_retry_request)); |
152 | | - Ok(()) |
153 | 92 | } |
154 | 93 | } |
0 commit comments