Skip to content

Commit 1946c12

Browse files
guilloadfmassotfulmicoton
authored
Use an enum rather than JSON value for storing source params (#1058)
* Use an enum rather than json value for storing source params * Fix backward compatibility tests and use absolute path in FileSourceParams. * Renamed SourceType -> SourceParams * Added a SourceParams::validate function Co-authored-by: François Massot <[email protected]> Co-authored-by: Paul Masurel <[email protected]>
1 parent d6bbdf3 commit 1946c12

34 files changed

+412
-267
lines changed

quickwit-cli/src/index.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ use itertools::Itertools;
3131
use quickwit_actors::{ActorHandle, ObservationType};
3232
use quickwit_common::uri::Uri;
3333
use quickwit_common::{run_checklist, GREEN_COLOR};
34-
use quickwit_config::{IndexConfig, IndexerConfig, SourceConfig};
34+
use quickwit_config::{IndexConfig, IndexerConfig, SourceConfig, SourceParams};
3535
use quickwit_core::{create_index, delete_index, garbage_collect_index, reset_index};
3636
use quickwit_doc_mapper::tag_pruning::match_tag_field_name;
3737
use quickwit_indexing::actors::{IndexingPipeline, IndexingServer};
3838
use quickwit_indexing::models::IndexingStatistics;
39-
use quickwit_indexing::source::{FileSourceParams, INGEST_SOURCE_ID};
39+
use quickwit_indexing::source::INGEST_SOURCE_ID;
4040
use quickwit_metastore::{quickwit_metastore_uri_resolver, IndexMetadata, Split, SplitState};
4141
use quickwit_proto::{SearchRequest, SearchResponse};
4242
use quickwit_search::{single_node_search, SearchResponseRest};
@@ -625,16 +625,14 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
625625

626626
let config = load_quickwit_config(args.config_uri, args.data_dir).await?;
627627

628-
let file_source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
629-
FileSourceParams::for_file(filepath.clone())
628+
let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
629+
SourceParams::file(filepath)
630630
} else {
631-
FileSourceParams::stdin()
631+
SourceParams::stdin()
632632
};
633-
let params = serde_json::to_value(file_source_params)?;
634633
let source = SourceConfig {
635634
source_id: INGEST_SOURCE_ID.to_string(),
636-
source_type: "file".to_string(),
637-
params,
635+
source_params,
638636
};
639637
run_index_checklist(&config.metastore_uri, &args.index_id, Some(&source)).await?;
640638
let metastore_uri_resolver = quickwit_metastore_uri_resolver();

quickwit-cli/src/source.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use anyhow::{bail, Context};
2121
use clap::ArgMatches;
2222
use itertools::Itertools;
2323
use quickwit_common::uri::Uri;
24-
use quickwit_config::SourceConfig;
24+
use quickwit_config::{SourceConfig, SourceParams};
2525
use quickwit_indexing::check_source_connectivity;
2626
use quickwit_metastore::checkpoint::SourceCheckpoint;
2727
use quickwit_metastore::{quickwit_metastore_uri_resolver, IndexMetadata};
2828
use quickwit_storage::load_file;
29-
use serde_json::Value;
29+
use serde_json::{Map, Value};
3030
use tabled::{Table, Tabled};
3131

3232
use crate::{load_quickwit_config, make_table};
@@ -184,12 +184,17 @@ async fn add_source_cli(args: AddSourceArgs) -> anyhow::Result<()> {
184184
.resolve(&config.metastore_uri)
185185
.await?;
186186
let params = sniff_params(&args.params).await?;
187+
let mut source_params_json: Map<String, Value> = Map::new();
188+
source_params_json.insert("source_type".to_string(), Value::String(args.source_type));
189+
source_params_json.insert("params".to_string(), Value::Object(params));
190+
let source_params: SourceParams = serde_json::from_value(Value::Object(source_params_json))?;
187191
let source = SourceConfig {
188192
source_id: args.source_id.clone(),
189-
source_type: args.source_type,
190-
params,
193+
source_params,
191194
};
195+
source.validate()?;
192196
check_source_connectivity(&source).await?;
197+
193198
metastore.add_source(&args.index_id, source).await?;
194199
println!(
195200
"Source `{}` successfully created for index `{}`.",
@@ -244,12 +249,12 @@ where
244249
.with_context(|| format!("Source `{}` does not exist.", source_id))?;
245250

246251
let source_rows = vec![SourceRow {
247-
source_id: source.source_id,
248-
source_type: source.source_type,
252+
source_id: source.source_id.clone(),
253+
source_type: source.source_type().to_string(),
249254
}];
250255
let source_table = make_table("Source", source_rows);
251256

252-
let params_rows = flatten_json(source.params)
257+
let params_rows = flatten_json(source.params())
253258
.into_iter()
254259
.map(|(key, value)| ParamsRow { key, value })
255260
.sorted_by(|left, right| left.key.cmp(&right.key));
@@ -279,8 +284,8 @@ where I: IntoIterator<Item = SourceConfig> {
279284
let rows = sources
280285
.into_iter()
281286
.map(|source| SourceRow {
287+
source_type: source.source_type().to_string(),
282288
source_id: source.source_id,
283-
source_type: source.source_type,
284289
})
285290
.sorted_by(|left, right| left.source_id.cmp(&right.source_id));
286291
make_table("Sources", rows)
@@ -346,14 +351,14 @@ fn flatten_json(value: Value) -> Vec<(String, Value)> {
346351

347352
/// Tries to read a JSON object from a string, assuming the string is an inline JSON object or a
348353
/// path to a file holding a JSON object.
349-
async fn sniff_params(params: &str) -> anyhow::Result<Value> {
350-
if let Ok(object @ Value::Object(_)) = serde_json::from_str(params) {
351-
return Ok(object);
354+
async fn sniff_params(params: &str) -> anyhow::Result<Map<String, Value>> {
355+
if let Ok(Value::Object(values)) = serde_json::from_str(params) {
356+
return Ok(values);
352357
}
353358
let params_uri = Uri::try_new(params)?;
354359
let params_bytes = load_file(&params_uri).await?;
355-
if let Ok(object @ Value::Object(_)) = serde_json::from_slice(params_bytes.as_slice()) {
356-
return Ok(object);
360+
if let Ok(Value::Object(values)) = serde_json::from_slice(params_bytes.as_slice()) {
361+
return Ok(values);
357362
}
358363
bail!("Failed to parse JSON object from `{}`.", params)
359364
}
@@ -402,10 +407,10 @@ mod tests {
402407
sniff_params("0").await.unwrap_err();
403408
sniff_params("[]").await.unwrap_err();
404409

405-
assert!(matches!(
406-
sniff_params(r#"{"foo": 0}"#).await.unwrap(),
407-
Value::Object(map) if map.contains_key("foo")
408-
));
410+
assert!(sniff_params(r#"{"foo": 0}"#)
411+
.await
412+
.unwrap()
413+
.contains_key("foo"));
409414

410415
let storage = quickwit_storage_uri_resolver()
411416
.resolve("ram:///tmp")
@@ -416,10 +421,10 @@ mod tests {
416421
.await
417422
.unwrap();
418423

419-
assert!(matches!(
420-
sniff_params("ram:///tmp/params.json").await.unwrap(),
421-
Value::Object(map) if map.contains_key("bar")
422-
));
424+
assert!(sniff_params("ram:///tmp/params.json")
425+
.await
426+
.unwrap()
427+
.contains_key("bar"));
423428
}
424429

425430
#[test]
@@ -520,8 +525,7 @@ mod tests {
520525
.collect();
521526
let sources = vec![SourceConfig {
522527
source_id: "foo-source".to_string(),
523-
source_type: "file".to_string(),
524-
params: json!({"filepath": "path/to/file"}),
528+
source_params: SourceParams::file("path/to/file"),
525529
}];
526530
let expected_source = vec![SourceRow {
527531
source_id: "foo-source".to_string(),
@@ -584,13 +588,11 @@ mod tests {
584588
let sources = [
585589
SourceConfig {
586590
source_id: "foo-source".to_string(),
587-
source_type: "file".to_string(),
588-
params: json!({}),
591+
source_params: SourceParams::stdin(),
589592
},
590593
SourceConfig {
591594
source_id: "bar-source".to_string(),
592-
source_type: "file".to_string(),
593-
params: json!({}),
595+
source_params: SourceParams::stdin(),
594596
},
595597
];
596598
let expected_sources = [

quickwit-cli/tests/cli.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ fn test_cmd_ingest_simple() -> Result<()> {
214214
.stdout(predicate::str::contains("Indexed"))
215215
.stdout(predicate::str::contains("documents in"))
216216
.stdout(predicate::str::contains("Now, you can query the index"));
217+
println!("piped input");
217218
Ok(())
218219
}
219220

quickwit-cli/update_on_config.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
This does not directly reflect the overall memory usage of `quickwit index ingest`, but doubling this value should give a fair approximation.

quickwit-config/resources/tests/index_config/hdfs-logs.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@
6666
"source_id": "hdfs-logs-kafka-source",
6767
"source_type": "kafka",
6868
"params": {
69-
"bootstrap.servers": "host:9092",
70-
"topic": "cloudera-cluster-logs"
69+
"topic": "cloudera-cluster-logs",
70+
"client_params": {
71+
"bootstrap.servers": "host:9092"
72+
}
7173
}
7274
},
7375
{

quickwit-config/resources/tests/index_config/hdfs-logs.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ default_search_fields = [ "severity_text", "body" ]
3636
[[sources]]
3737
source_id = "hdfs-logs-kafka-source"
3838
source_type = "kafka"
39-
params = { "bootstrap.servers" = "host:9092", "topic" = "cloudera-cluster-logs" }
39+
params = { "topic" = "cloudera-cluster-logs", "client_params" = { "bootstrap.servers" = "host:9092" } }
4040

4141
[[sources]]
4242
source_id = "hdfs-logs-kinesis-source"

quickwit-config/resources/tests/index_config/hdfs-logs.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ sources:
4848
- source_id: hdfs-logs-kafka-source
4949
source_type: kafka
5050
params:
51-
bootstrap.servers: host:9092
5251
topic: cloudera-cluster-logs
52+
client_params:
53+
bootstrap.servers: host:9092
5354

5455
- source_id: hdfs-logs-kinesis-source
5556
source_type: kinesis

quickwit-config/src/index_config.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,15 @@ use quickwit_doc_mapper::{
3232
};
3333
use serde::{Deserialize, Serialize};
3434

35+
use crate::source_config::SourceConfig;
36+
3537
// Note(fmassot): `DocMapping` is a struct only used for
3638
// serialization/deserialization of `DocMapper` parameters.
3739
// This is partly a duplicate of the `DocMapper` and can
3840
// be viewed as a temporary hack for 0.2 release before
3941
// refactoring.
4042
#[derive(Clone, Debug, Serialize, Deserialize)]
43+
#[serde(deny_unknown_fields)]
4144
pub struct DocMapping {
4245
pub field_mappings: Vec<FieldMappingEntry>,
4346
#[serde(default)]
@@ -47,6 +50,7 @@ pub struct DocMapping {
4750
}
4851

4952
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
53+
#[serde(deny_unknown_fields)]
5054
pub struct IndexingResources {
5155
#[serde(default = "IndexingResources::default_num_threads")]
5256
pub num_threads: usize,
@@ -81,6 +85,7 @@ impl Default for IndexingResources {
8185
}
8286

8387
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
88+
#[serde(deny_unknown_fields)]
8489
pub struct MergePolicy {
8590
#[serde(default = "MergePolicy::default_demux_factor")]
8691
pub demux_factor: usize,
@@ -119,6 +124,7 @@ fn is_false(val: &bool) -> bool {
119124
}
120125

121126
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
127+
#[serde(deny_unknown_fields)]
122128
pub struct IndexingSettings {
123129
#[serde(default, skip_serializing_if = "is_false")]
124130
pub demux_enabled: bool,
@@ -195,19 +201,14 @@ impl Default for IndexingSettings {
195201
}
196202

197203
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
204+
#[serde(deny_unknown_fields)]
198205
pub struct SearchSettings {
199206
#[serde(default)]
200207
pub default_search_fields: Vec<String>,
201208
}
202209

203-
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
204-
pub struct SourceConfig {
205-
pub source_id: String,
206-
pub source_type: String,
207-
pub params: serde_json::Value,
208-
}
209-
210210
#[derive(Clone, Debug, Serialize, Deserialize)]
211+
#[serde(deny_unknown_fields)]
211212
pub struct IndexConfig {
212213
pub version: usize,
213214
pub index_id: String,
@@ -275,6 +276,10 @@ impl IndexConfig {
275276
bail!("Index config contains duplicate sources.")
276277
}
277278

279+
for source in self.sources.iter() {
280+
source.validate()?;
281+
}
282+
278283
// Validation is made by building the doc mapper.
279284
// Note: this needs a deep refactoring to separate the doc mapping configuration,
280285
// and doc mapper implementations.
@@ -319,9 +324,9 @@ pub fn build_doc_mapper(
319324

320325
#[cfg(test)]
321326
mod tests {
322-
use serde_json::json;
323327

324328
use super::*;
329+
use crate::SourceParams;
325330

326331
fn get_resource_path(resource_filename: &str) -> String {
327332
format!(
@@ -415,12 +420,12 @@ mod tests {
415420
{
416421
let source = &index_config.sources[0];
417422
assert_eq!(source.source_id, "hdfs-logs-kafka-source");
418-
assert_eq!(source.source_type, "kafka");
423+
assert!(matches!(source.source_params, SourceParams::Kafka(_)));
419424
}
420425
{
421426
let source = &index_config.sources[1];
422427
assert_eq!(source.source_id, "hdfs-logs-kinesis-source");
423-
assert_eq!(source.source_type, "kinesis");
428+
assert!(matches!(source.source_params, SourceParams::Kinesis(_)));
424429
}
425430
Ok(())
426431
}
@@ -537,13 +542,11 @@ mod tests {
537542
invalid_index_config.sources = vec![
538543
SourceConfig {
539544
source_id: "void_1".to_string(),
540-
source_type: "void".to_string(),
541-
params: json!(null),
545+
source_params: SourceParams::void(),
542546
},
543547
SourceConfig {
544548
source_id: "void_1".to_string(),
545-
source_type: "void".to_string(),
546-
params: json!(null),
549+
source_params: SourceParams::void(),
547550
},
548551
];
549552
assert!(invalid_index_config.validate().is_err());
@@ -553,6 +556,20 @@ mod tests {
553556
.to_string()
554557
.contains("Index config contains duplicate sources."));
555558
}
559+
{
560+
// Add source file params with no filepath.
561+
let mut invalid_index_config = index_config.clone();
562+
invalid_index_config.sources = vec![SourceConfig {
563+
source_id: "file_params_1".to_string(),
564+
source_params: SourceParams::stdin(),
565+
}];
566+
assert!(invalid_index_config.validate().is_err());
567+
assert!(invalid_index_config
568+
.validate()
569+
.unwrap_err()
570+
.to_string()
571+
.contains("must contain a `filepath`"));
572+
}
556573
{
557574
// Add a demux field not declared in the mapping.
558575
let mut invalid_index_config = index_config;

quickwit-config/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@
1919

2020
mod config;
2121
mod index_config;
22+
mod source_config;
2223

2324
pub use config::{
2425
get_searcher_config_instance, IndexerConfig, QuickwitConfig, SearcherConfig,
2526
SEARCHER_CONFIG_INSTANCE,
2627
};
2728
pub use index_config::{
2829
build_doc_mapper, DocMapping, IndexConfig, IndexingResources, IndexingSettings, MergePolicy,
29-
SearchSettings, SourceConfig,
30+
SearchSettings,
31+
};
32+
pub use source_config::{
33+
FileSourceParams, KafkaSourceParams, SourceConfig, SourceParams, VecSourceParams,
34+
VoidSourceParams,
3035
};

0 commit comments

Comments
 (0)