Skip to content

Commit adfff7f

Browse files
imotovguilload
andauthored
Replaces indexing service with codegen version (#3675)
Replaces hand-crafted indexing service with auto generated version. See #3108 Co-authored-by: Adrien Guillo <[email protected]>
1 parent a83fbdf commit adfff7f

File tree

32 files changed

+685
-422
lines changed

32 files changed

+685
-422
lines changed

quickwit/Cargo.lock

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-actors/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ serde_json = { workspace = true }
2424
sync_wrapper = { workspace = true }
2525

2626
quickwit-common = { workspace = true }
27-
quickwit-proto = { workspace = true }
2827

2928
[features]
3029
testsuite = []

quickwit/quickwit-actors/src/lib.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use std::fmt;
3131
use std::num::NonZeroU64;
3232

3333
use once_cell::sync::Lazy;
34-
use quickwit_proto::{ServiceError, ServiceErrorCode};
3534
use tokio::time::Duration;
3635
mod actor;
3736
mod actor_context;
@@ -129,13 +128,3 @@ pub enum AskError<E: fmt::Debug> {
129128
#[error("The handler returned an error: `{0:?}`.")]
130129
ErrorReply(#[from] E),
131130
}
132-
133-
impl<E: fmt::Debug + ServiceError> ServiceError for AskError<E> {
134-
fn status_code(&self) -> ServiceErrorCode {
135-
match self {
136-
AskError::MessageNotDelivered => ServiceErrorCode::Internal,
137-
AskError::ProcessMessageError => ServiceErrorCode::Internal,
138-
AskError::ErrorReply(err) => err.status_code(),
139-
}
140-
}
141-
}

quickwit/quickwit-control-plane/src/indexing_plan.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,8 @@ pub(crate) fn build_indexing_plan(
313313
#[cfg(test)]
314314
mod tests {
315315
use std::collections::HashMap;
316-
use std::net::SocketAddr;
317316
use std::num::NonZeroUsize;
317+
use std::time::Duration;
318318

319319
use itertools::Itertools;
320320
use proptest::prelude::*;
@@ -324,11 +324,12 @@ mod tests {
324324
FileSourceParams, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams,
325325
CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
326326
};
327-
use quickwit_indexing::indexing_client::create_indexing_service_client;
328327
use quickwit_proto::indexing::IndexingTask;
329-
use quickwit_proto::IndexUid;
328+
use quickwit_proto::{IndexUid, IndexingServiceClient};
330329
use rand::seq::SliceRandom;
331330
use serde_json::json;
331+
use tonic::transport::Endpoint;
332+
use tower::timeout::Timeout;
332333

333334
use super::{build_physical_indexing_plan, IndexSourceId};
334335
use crate::indexing_plan::build_indexing_plan;
@@ -351,8 +352,11 @@ mod tests {
351352
) -> Vec<(String, IndexerNodeInfo)> {
352353
let mut members = Vec::new();
353354
for idx in 0..num_members {
354-
let addr: SocketAddr = ([127, 0, 0, 1], 10).into();
355-
let client = create_indexing_service_client(addr).await.unwrap();
355+
let channel = Timeout::new(
356+
Endpoint::from_static("http://127.0.0.1:10").connect_lazy(),
357+
Duration::from_secs(1),
358+
);
359+
let client = IndexingServiceClient::from_channel(channel);
356360
members.push((
357361
(1 + idx).to_string(),
358362
IndexerNodeInfo {

quickwit/quickwit-control-plane/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ use quickwit_actors::{AskError, Mailbox, Universe};
3131
use quickwit_common::pubsub::EventSubscriber;
3232
use quickwit_common::tower::Pool;
3333
use quickwit_config::SourceParams;
34-
use quickwit_indexing::indexing_client::IndexingServiceClient;
3534
use quickwit_metastore::{Metastore, MetastoreEvent};
36-
use quickwit_proto::indexing::IndexingTask;
35+
use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask};
3736
use scheduler::IndexingScheduler;
3837
use tracing::error;
3938

quickwit/quickwit-control-plane/src/scheduler.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
3030
use quickwit_config::SourceConfig;
3131
use quickwit_metastore::Metastore;
3232
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingTask};
33+
use quickwit_proto::IndexingService;
3334
use serde::Serialize;
3435
use tracing::{debug, error, info, warn};
3536

@@ -268,20 +269,26 @@ impl IndexingScheduler {
268269
) {
269270
debug!("Apply physical indexing plan: {:?}", new_physical_plan);
270271
for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_node() {
271-
let indexer = indexers
272-
.iter_mut()
273-
.find(|indexer| &indexer.0 == node_id)
274-
.expect("This should never happen as the plan was built from these indexers.");
275-
if let Err(error) = indexer
276-
.1
277-
.client
278-
.apply_indexing_plan(ApplyIndexingPlanRequest {
279-
indexing_tasks: indexing_tasks.clone(),
280-
})
281-
.await
282-
{
283-
error!(indexer_node_id=%indexer.0, err=?error, "Error occurred when appling indexing plan to indexer.");
284-
}
272+
// We don't want to block on a slow indexer so we apply this change asynchronously
273+
tokio::spawn({
274+
let indexer = indexers
275+
.iter()
276+
.find(|indexer| &indexer.0 == node_id)
277+
.expect("This should never happen as the plan was built from these indexers.")
278+
.clone();
279+
let indexing_tasks = indexing_tasks.clone();
280+
async move {
281+
if let Err(error) = indexer
282+
.1
283+
.client
284+
.clone()
285+
.apply_indexing_plan(ApplyIndexingPlanRequest { indexing_tasks })
286+
.await
287+
{
288+
error!(indexer_node_id=%indexer.0, err=?error, "Error occurred when applying indexing plan to indexer.");
289+
}
290+
}
291+
});
285292
}
286293
self.state.num_applied_physical_indexing_plan += 1;
287294
self.state.last_applied_plan_timestamp = Some(Instant::now());
@@ -522,10 +529,10 @@ mod tests {
522529
use quickwit_common::tower::{Change, Pool};
523530
use quickwit_config::service::QuickwitService;
524531
use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams};
525-
use quickwit_indexing::indexing_client::IndexingServiceClient;
526532
use quickwit_indexing::IndexingService;
527533
use quickwit_metastore::{IndexMetadata, MockMetastore};
528534
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingTask};
535+
use quickwit_proto::IndexingServiceClient;
529536
use serde_json::json;
530537

531538
use super::{IndexingScheduler, CONTROL_PLAN_LOOP_INTERVAL};
@@ -576,10 +583,9 @@ mod tests {
576583
if node.enabled_services().contains(&QuickwitService::Indexer) =>
577584
{
578585
let node_id = node.node_id().to_string();
579-
let grpc_addr = node.grpc_advertise_addr();
580586
let indexing_tasks = node.indexing_tasks().to_vec();
581587
let client_mailbox = indexing_clients.get(&node_id).unwrap().clone();
582-
let client = IndexingServiceClient::from_service(client_mailbox, grpc_addr);
588+
let client = IndexingServiceClient::from_mailbox(client_mailbox);
583589
Some(Change::Insert(
584590
node_id,
585591
IndexerNodeInfo {

quickwit/quickwit-indexing/failpoints/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ use quickwit_common::split_file;
4747
use quickwit_common::temp_dir::TempDirectory;
4848
use quickwit_indexing::actors::MergeExecutor;
4949
use quickwit_indexing::merge_policy::MergeOperation;
50-
use quickwit_indexing::models::{IndexingPipelineId, MergeScratch};
50+
use quickwit_indexing::models::MergeScratch;
5151
use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox};
5252
use quickwit_metastore::{ListSplitsQuery, Split, SplitMetadata, SplitState};
53+
use quickwit_proto::indexing_api::IndexingPipelineId;
5354
use quickwit_proto::IndexUid;
5455
use serde_json::Value as JsonValue;
5556
use tantivy::{Directory, Inventory};

quickwit/quickwit-indexing/src/actors/indexer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use quickwit_config::IndexingSettings;
3939
use quickwit_doc_mapper::DocMapper;
4040
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta};
4141
use quickwit_metastore::Metastore;
42+
use quickwit_proto::indexing_api::IndexingPipelineId;
4243
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
4344
use serde::Serialize;
4445
use tantivy::schema::Schema;
@@ -52,8 +53,8 @@ use ulid::Ulid;
5253

5354
use crate::actors::IndexSerializer;
5455
use crate::models::{
55-
CommitTrigger, EmptySplit, IndexedSplitBatchBuilder, IndexedSplitBuilder, IndexingPipelineId,
56-
NewPublishLock, ProcessedDoc, ProcessedDocBatch, PublishLock,
56+
CommitTrigger, EmptySplit, IndexedSplitBatchBuilder, IndexedSplitBuilder, NewPublishLock,
57+
ProcessedDoc, ProcessedDocBatch, PublishLock,
5758
};
5859

5960
// Random partition id used to gather partitions exceeding the maximum number of partitions.

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use quickwit_common::KillSwitch;
3131
use quickwit_config::{IndexingSettings, SourceConfig};
3232
use quickwit_doc_mapper::DocMapper;
3333
use quickwit_metastore::{Metastore, MetastoreError};
34+
use quickwit_proto::indexing_api::IndexingPipelineId;
3435
use quickwit_storage::Storage;
3536
use tokio::join;
3637
use tokio::sync::Semaphore;
@@ -44,7 +45,7 @@ use crate::actors::sequencer::Sequencer;
4445
use crate::actors::uploader::UploaderType;
4546
use crate::actors::{Indexer, Packager, Publisher, Uploader};
4647
use crate::merge_policy::MergePolicy;
47-
use crate::models::{IndexingPipelineId, IndexingStatistics, Observe};
48+
use crate::models::{IndexingStatistics, Observe};
4849
use crate::source::{quickwit_supported_sources, SourceActor, SourceExecutionContext};
4950
use crate::split_store::IndexingSplitStore;
5051
use crate::SplitsUpdateMailbox;

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 29 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use std::collections::{HashMap, HashSet};
21+
use std::fmt::Debug;
2122
use std::path::PathBuf;
2223
use std::sync::Arc;
2324

@@ -36,66 +37,26 @@ use quickwit_config::{
3637
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
3738
};
3839
use quickwit_ingest::{DropQueueRequest, IngestApiService, ListQueuesRequest, QUEUES_DIR_NAME};
39-
use quickwit_metastore::{IndexMetadata, Metastore, MetastoreError};
40-
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingTask};
41-
use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode};
42-
use quickwit_storage::{StorageError, StorageResolver, StorageResolverError};
40+
use quickwit_metastore::{IndexMetadata, Metastore};
41+
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingTask};
42+
use quickwit_proto::indexing_api::{IndexingPipelineId, IndexingServiceError};
43+
use quickwit_proto::IndexUid;
44+
use quickwit_storage::StorageResolver;
4345
use serde::{Deserialize, Serialize};
44-
use thiserror::Error;
4546
use tokio::sync::Semaphore;
4647
use tracing::{debug, error, info, warn};
4748

4849
use super::merge_pipeline::{MergePipeline, MergePipelineParams};
4950
use super::MergePlanner;
5051
use crate::models::{
51-
DetachIndexingPipeline, DetachMergePipeline, IndexingPipelineId, Observe, ObservePipeline,
52-
SpawnPipeline,
52+
DetachIndexingPipeline, DetachMergePipeline, Observe, ObservePipeline, SpawnPipeline,
5353
};
5454
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
5555
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics};
5656

5757
/// Name of the indexing directory, usually located at `<data_dir_path>/indexing`.
5858
pub const INDEXING_DIR_NAME: &str = "indexing";
5959

60-
#[derive(Error, Debug)]
61-
pub enum IndexingServiceError {
62-
#[error("Indexing pipeline `{index_id}` for source `{source_id}` does not exist.")]
63-
MissingPipeline { index_id: String, source_id: String },
64-
#[error(
65-
"Pipeline #{pipeline_ord} for index `{index_id}` and source `{source_id}` already exists."
66-
)]
67-
PipelineAlreadyExists {
68-
index_id: String,
69-
source_id: String,
70-
pipeline_ord: usize,
71-
},
72-
#[error("Failed to resolve the storage `{0}`.")]
73-
StorageResolverError(#[from] StorageResolverError),
74-
#[error("Storage error `{0}`.")]
75-
StorageError(#[from] StorageError),
76-
#[error("Metastore error `{0}`.")]
77-
MetastoreError(#[from] MetastoreError),
78-
#[error("Invalid params `{0}`.")]
79-
InvalidParams(anyhow::Error),
80-
#[error("Spanw pipelines errors `{pipeline_ids:?}`.")]
81-
SpawnPipelinesError {
82-
pipeline_ids: Vec<IndexingPipelineId>,
83-
},
84-
}
85-
86-
impl ServiceError for IndexingServiceError {
87-
fn status_code(&self) -> ServiceErrorCode {
88-
match self {
89-
Self::MissingPipeline { .. } => ServiceErrorCode::NotFound,
90-
Self::PipelineAlreadyExists { .. } => ServiceErrorCode::BadRequest,
91-
Self::StorageResolverError(_) | Self::StorageError(_) => ServiceErrorCode::Internal,
92-
Self::MetastoreError(_) => ServiceErrorCode::Internal,
93-
Self::InvalidParams(_) => ServiceErrorCode::BadRequest,
94-
Self::SpawnPipelinesError { .. } => ServiceErrorCode::Internal,
95-
}
96-
}
97-
}
98-
9960
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
10061
pub struct IndexingServiceCounters {
10162
pub num_running_pipelines: usize,
@@ -142,6 +103,17 @@ pub struct IndexingService {
142103
cooperative_indexing_permits: Option<Arc<Semaphore>>,
143104
}
144105

106+
impl Debug for IndexingService {
107+
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108+
formatter
109+
.debug_struct("IndexingService")
110+
.field("cluster_id", &self.cluster.cluster_id())
111+
.field("self_node_id", &self.node_id)
112+
.field("indexing_root_directory", &self.indexing_root_directory)
113+
.finish()
114+
}
115+
}
116+
145117
impl IndexingService {
146118
#[allow(clippy::too_many_arguments)]
147119
pub async fn new(
@@ -271,11 +243,12 @@ impl IndexingService {
271243
.join(&pipeline_id.source_id)
272244
.join(&pipeline_id.pipeline_ord.to_string())
273245
.tempdir_in(&self.indexing_root_directory)
274-
.map_err(|error| IndexingServiceError::StorageError(error.into()))?;
246+
.map_err(IndexingServiceError::Io)?;
275247
let storage = self
276248
.storage_resolver
277249
.resolve(&index_config.index_uri)
278-
.await?;
250+
.await
251+
.map_err(|err| IndexingServiceError::StorageResolverError(err.to_string()))?;
279252
let merge_policy =
280253
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
281254
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());
@@ -336,7 +309,11 @@ impl IndexingService {
336309
index_id: &str,
337310
) -> Result<IndexMetadata, IndexingServiceError> {
338311
let _protect_guard = ctx.protect_zone();
339-
let index_metadata = self.metastore.index_metadata(index_id).await?;
312+
let index_metadata = self
313+
.metastore
314+
.index_metadata(index_id)
315+
.await
316+
.map_err(|err| IndexingServiceError::MetastoreError(err.to_string()))?;
340317
Ok(index_metadata)
341318
}
342319

@@ -440,7 +417,7 @@ impl IndexingService {
440417
&mut self,
441418
ctx: &ActorContext<Self>,
442419
physical_indexing_plan_request: ApplyIndexingPlanRequest,
443-
) -> Result<(), IndexingServiceError> {
420+
) -> Result<ApplyIndexingPlanResponse, IndexingServiceError> {
444421
let mut updated_pipeline_ids: HashSet<IndexingPipelineId> = HashSet::new();
445422
let mut pipeline_ordinals: HashMap<&IndexingTask, usize> = HashMap::new();
446423
for indexing_task in physical_indexing_plan_request.indexing_tasks.iter() {
@@ -484,7 +461,7 @@ impl IndexingService {
484461
});
485462
}
486463

487-
Ok(())
464+
Ok(ApplyIndexingPlanResponse {})
488465
}
489466

490467
/// Spawns the pipelines with supplied ids and returns a list of failed pipelines.
@@ -757,7 +734,7 @@ impl Handler<Observe> for IndexingService {
757734

758735
#[async_trait]
759736
impl Handler<ApplyIndexingPlanRequest> for IndexingService {
760-
type Reply = Result<(), IndexingServiceError>;
737+
type Reply = Result<ApplyIndexingPlanResponse, IndexingServiceError>;
761738

762739
async fn handle(
763740
&mut self,

0 commit comments

Comments
 (0)