Skip to content

Commit ab0507d

Browse files
authored
Merge branch 'main' into integration-tests/clp-ffi-js
2 parents 9bfb82e + dcc105c commit ab0507d

File tree

49 files changed

+707
-350
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+707
-350
lines changed

components/api-server/src/client.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ pub struct QueryConfig {
2828
/// The search query as a KQL string.
2929
pub query_string: String,
3030

31-
/// The dataset to search within. If not provided, only `default` dataset will be searched.
31+
/// The datasets to search within. If not provided, only `default` dataset will be searched.
3232
#[serde(default)]
33-
pub dataset: Option<String>,
33+
pub datasets: Option<Vec<String>>,
3434

3535
/// The maximum number of results to return. Set to `0` for no limit.
3636
#[serde(default)]
@@ -58,7 +58,7 @@ pub struct QueryConfig {
5858
impl From<QueryConfig> for SearchJobConfig {
5959
fn from(value: QueryConfig) -> Self {
6060
Self {
61-
dataset: value.dataset,
61+
datasets: value.datasets,
6262
query_string: value.query_string,
6363
max_num_results: value.max_num_results,
6464
begin_timestamp: value.time_range_begin_millisecs,
@@ -128,10 +128,10 @@ impl Client {
128128
/// * Forwards [`sqlx::query::Query::execute`]'s return values on failure.
129129
pub async fn submit_query(&self, query_config: QueryConfig) -> Result<u64, ClientError> {
130130
let mut search_job_config: SearchJobConfig = query_config.into();
131-
if search_job_config.dataset.is_none() {
132-
search_job_config.dataset = match self.config.package.storage_engine {
131+
if search_job_config.datasets.is_none() {
132+
search_job_config.datasets = match self.config.package.storage_engine {
133133
StorageEngine::Clp => None,
134-
StorageEngine::ClpS => Some("default".to_owned()),
134+
StorageEngine::ClpS => Some(vec!["default".to_owned()]),
135135
}
136136
}
137137
if search_job_config.max_num_results == 0 {

components/api-server/src/routes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn health() -> String {
8585
content= QueryConfig,
8686
example = json!({
8787
"query_string": "*",
88-
"dataset": "default",
88+
"datasets": ["default"],
8989
"time_range_begin_millisecs": 0,
9090
"time_range_end_millisecs": 17_356_896,
9191
"ignore_case": true,

components/clp-package-utils/clp_package_utils/controller.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,7 @@ def _set_up_env_for_webui(self, container_clp_config: ClpConfig) -> EnvVarsDict:
701701
"ClpStorageEngine": self._clp_config.package.storage_engine,
702702
"ClpQueryEngine": self._clp_config.package.query_engine,
703703
"LogsInputType": self._clp_config.logs_input.type,
704+
"MaxDatasetsPerQuery": self._clp_config.query_scheduler.max_datasets_per_query,
704705
"MongoDbSearchResultsMetadataCollectionName": (
705706
self._clp_config.webui.results_metadata_collection_name
706707
),

components/clp-package-utils/clp_package_utils/scripts/native/search.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from clp_package_utils.scripts.native.utils import (
2828
run_function_in_process,
2929
submit_query_job,
30-
validate_dataset_exists,
30+
validate_datasets_exist,
3131
wait_for_query_job,
3232
)
3333

@@ -37,7 +37,7 @@
3737
def create_and_monitor_job_in_db(
3838
db_config: Database,
3939
results_cache: ResultsCache,
40-
dataset: str | None,
40+
datasets: list[str] | None,
4141
wildcard_query: str,
4242
begin_timestamp: int | None,
4343
end_timestamp: int | None,
@@ -48,7 +48,7 @@ def create_and_monitor_job_in_db(
4848
count_by_time_bucket_size: int | None,
4949
):
5050
search_config = SearchJobConfig(
51-
dataset=dataset,
51+
datasets=datasets,
5252
query_string=wildcard_query,
5353
begin_timestamp=begin_timestamp,
5454
end_timestamp=end_timestamp,
@@ -115,7 +115,7 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci
115115
async def do_search_without_aggregation(
116116
db_config: Database,
117117
results_cache: ResultsCache,
118-
dataset: str | None,
118+
datasets: list[str] | None,
119119
wildcard_query: str,
120120
begin_timestamp: int | None,
121121
end_timestamp: int | None,
@@ -144,7 +144,7 @@ async def do_search_without_aggregation(
144144
create_and_monitor_job_in_db,
145145
db_config,
146146
results_cache,
147-
dataset,
147+
datasets,
148148
wildcard_query,
149149
begin_timestamp,
150150
end_timestamp,
@@ -181,7 +181,7 @@ async def do_search_without_aggregation(
181181
async def do_search(
182182
db_config: Database,
183183
results_cache: ResultsCache,
184-
dataset: str | None,
184+
datasets: list[str] | None,
185185
wildcard_query: str,
186186
begin_timestamp: int | None,
187187
end_timestamp: int | None,
@@ -195,7 +195,7 @@ async def do_search(
195195
await do_search_without_aggregation(
196196
db_config,
197197
results_cache,
198-
dataset,
198+
datasets,
199199
wildcard_query,
200200
begin_timestamp,
201201
end_timestamp,
@@ -208,7 +208,7 @@ async def do_search(
208208
create_and_monitor_job_in_db,
209209
db_config,
210210
results_cache,
211-
dataset,
211+
datasets,
212212
wildcard_query,
213213
begin_timestamp,
214214
end_timestamp,
@@ -235,9 +235,9 @@ def main(argv):
235235
args_parser.add_argument("wildcard_query", help="Wildcard query.")
236236
args_parser.add_argument(
237237
"--dataset",
238-
type=str,
238+
action="append",
239239
default=None,
240-
help="The dataset that the archives belong to.",
240+
help="A dataset to search. Can be specified multiple times.",
241241
)
242242
args_parser.add_argument(
243243
"--begin-time",
@@ -297,10 +297,18 @@ def main(argv):
297297
return -1
298298

299299
database_config: Database = clp_config.database
300-
dataset = parsed_args.dataset
301-
if dataset is not None:
300+
datasets = parsed_args.dataset
301+
if datasets is not None:
302+
max_datasets_per_query = clp_config.query_scheduler.max_datasets_per_query
303+
if max_datasets_per_query is not None and len(datasets) > max_datasets_per_query:
304+
logger.error(
305+
"Number of datasets (%d) exceeds max_datasets_per_query=%s.",
306+
len(datasets),
307+
max_datasets_per_query,
308+
)
309+
return -1
302310
try:
303-
validate_dataset_exists(database_config, dataset)
311+
validate_datasets_exist(database_config, datasets)
304312
except Exception as e:
305313
logger.error(e)
306314
return -1
@@ -310,7 +318,7 @@ def main(argv):
310318
do_search(
311319
database_config,
312320
clp_config.results_cache,
313-
dataset,
321+
datasets,
314322
parsed_args.wildcard_query,
315323
parsed_args.begin_time,
316324
parsed_args.end_time,

components/clp-package-utils/clp_package_utils/scripts/native/utils.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ def submit_query_job(
7272
return db_cursor.lastrowid
7373

7474

75-
def validate_dataset_exists(db_config: Database, dataset: str) -> None:
75+
def validate_datasets_exist(db_config: Database, datasets: list[str]) -> None:
7676
"""
77-
Validates that `dataset` exists in the metadata database.
77+
Validates that all datasets in `datasets` exist in the metadata database.
7878
7979
:param db_config:
80-
:param dataset:
81-
:raise: ValueError if the dataset doesn't exist.
80+
:param datasets:
81+
:raise: ValueError if any dataset doesn't exist.
8282
"""
8383
sql_adapter = SqlAdapter(db_config)
8484
clp_db_connection_params = db_config.get_clp_connection_params_and_type(True)
@@ -87,8 +87,11 @@ def validate_dataset_exists(db_config: Database, dataset: str) -> None:
8787
closing(sql_adapter.create_connection(True)) as db_conn,
8888
closing(db_conn.cursor(dictionary=True)) as db_cursor,
8989
):
90-
if dataset not in fetch_existing_datasets(db_cursor, table_prefix):
91-
raise ValueError(f"Dataset `{dataset}` doesn't exist.")
90+
existing_datasets = fetch_existing_datasets(db_cursor, table_prefix)
91+
missing = [ds for ds in datasets if ds not in existing_datasets]
92+
if len(missing) > 0:
93+
err_msg = f"Dataset(s) {missing} don't exist."
94+
raise ValueError(err_msg)
9295

9396

9497
def wait_for_query_job(sql_adapter: SqlAdapter, job_id: int) -> QueryJobStatus:

components/clp-package-utils/clp_package_utils/scripts/search.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ def main(argv):
5252
args_parser.add_argument("wildcard_query", help="Wildcard query.")
5353
args_parser.add_argument(
5454
"--dataset",
55-
type=str,
55+
action="append",
5656
default=None,
57-
help="The dataset that the archives belong to.",
57+
help="A dataset to search. Can be specified multiple times.",
5858
)
5959
args_parser.add_argument(
6060
"--begin-time",
@@ -113,16 +113,17 @@ def main(argv):
113113
)
114114
return -1
115115

116-
dataset = parsed_args.dataset
116+
datasets = parsed_args.dataset
117117
if StorageEngine.CLP_S == storage_engine:
118-
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
118+
datasets = [CLP_DEFAULT_DATASET_NAME] if datasets is None else datasets
119119
try:
120120
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
121-
validate_dataset_name(clp_db_connection_params["table_prefix"], dataset)
121+
for ds in datasets:
122+
validate_dataset_name(clp_db_connection_params["table_prefix"], ds)
122123
except Exception as e:
123124
logger.error(e)
124125
return -1
125-
elif dataset is not None:
126+
elif datasets is not None:
126127
logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.")
127128
return -1
128129

@@ -152,9 +153,10 @@ def main(argv):
152153
# fmt: on
153154
if parsed_args.verbose:
154155
search_cmd.append("--verbose")
155-
if dataset is not None:
156-
search_cmd.append("--dataset")
157-
search_cmd.append(dataset)
156+
if datasets is not None:
157+
for ds in datasets:
158+
search_cmd.append("--dataset")
159+
search_cmd.append(ds)
158160
if parsed_args.begin_time is not None:
159161
search_cmd.append("--begin-time")
160162
search_cmd.append(str(parsed_args.begin_time))

components/clp-py-utils/clp_py_utils/clp_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ class QueryScheduler(BaseModel):
433433
host: DomainStr = "localhost"
434434
port: Port = DEFAULT_PORT
435435
jobs_poll_delay: PositiveFloat = 0.1 # seconds
436+
max_datasets_per_query: PositiveInt | None = 10
436437
num_archives_to_search_per_sub_job: PositiveInt = 16
437438
logging_level: LoggingLevel = "INFO"
438439

components/clp-rust-utils/src/job_config/search.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub const QUERY_JOBS_TABLE_NAME: &str = "query_jobs";
1111
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
1212
#[serde(default)]
1313
pub struct SearchJobConfig {
14-
pub dataset: Option<String>,
14+
pub datasets: Option<Vec<String>>,
1515
pub query_string: String,
1616
pub max_num_results: u32,
1717
pub begin_timestamp: Option<i64>,

components/core/src/clp_s/CommandLineArguments.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,10 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
605605
po::value<uint64_t>(&m_max_num_results)->value_name("MAX")->
606606
default_value(m_max_num_results),
607607
"The maximum number of results to output"
608+
)(
609+
"dataset",
610+
po::value<std::string>(&m_dataset)->value_name("DATASET"),
611+
"The dataset name to include in each result document"
608612
);
609613

610614
po::options_description file_output_handler_options("File Output Handler Options");

components/core/src/clp_s/CommandLineArguments.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ class CommandLineArguments {
123123

124124
bool get_record_log_order() const { return false == m_disable_log_order; }
125125

126+
std::string const& get_dataset() const { return m_dataset; }
127+
126128
private:
127129
// Methods
128130
/**
@@ -227,6 +229,7 @@ class CommandLineArguments {
227229
std::optional<epochtime_t> m_search_end_ts;
228230
bool m_ignore_case{false};
229231
std::vector<std::string> m_projection_columns;
232+
std::string m_dataset;
230233

231234
// Search aggregation variables
232235
std::string m_reducer_host;

0 commit comments

Comments
 (0)