Skip to content

Commit 5ba5ed3

Browse files
committed
Merge branch 'ffi-sfa-test' into clp-s-ffi-sfa-archive-reader
2 parents 1120b6d + c9fee67 commit 5ba5ed3

File tree

96 files changed

+2331
-605
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+2331
-605
lines changed

components/api-server/src/client.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ pub struct QueryConfig {
2828
/// The search query as a KQL string.
2929
pub query_string: String,
3030

31-
/// The dataset to search within. If not provided, only `default` dataset will be searched.
31+
/// The datasets to search within. If not provided, only `default` dataset will be searched.
3232
#[serde(default)]
33-
pub dataset: Option<String>,
33+
pub datasets: Option<Vec<String>>,
3434

3535
/// The maximum number of results to return. Set to `0` for no limit.
3636
#[serde(default)]
@@ -58,7 +58,7 @@ pub struct QueryConfig {
5858
impl From<QueryConfig> for SearchJobConfig {
5959
fn from(value: QueryConfig) -> Self {
6060
Self {
61-
dataset: value.dataset,
61+
datasets: value.datasets,
6262
query_string: value.query_string,
6363
max_num_results: value.max_num_results,
6464
begin_timestamp: value.time_range_begin_millisecs,
@@ -128,10 +128,10 @@ impl Client {
128128
/// * Forwards [`sqlx::query::Query::execute`]'s return values on failure.
129129
pub async fn submit_query(&self, query_config: QueryConfig) -> Result<u64, ClientError> {
130130
let mut search_job_config: SearchJobConfig = query_config.into();
131-
if search_job_config.dataset.is_none() {
132-
search_job_config.dataset = match self.config.package.storage_engine {
131+
if search_job_config.datasets.is_none() {
132+
search_job_config.datasets = match self.config.package.storage_engine {
133133
StorageEngine::Clp => None,
134-
StorageEngine::ClpS => Some("default".to_owned()),
134+
StorageEngine::ClpS => Some(vec!["default".to_owned()]),
135135
}
136136
}
137137
if search_job_config.max_num_results == 0 {

components/api-server/src/routes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn health() -> String {
8585
content= QueryConfig,
8686
example = json!({
8787
"query_string": "*",
88-
"dataset": "default",
88+
"datasets": ["default"],
8989
"time_range_begin_millisecs": 0,
9090
"time_range_end_millisecs": 17_356_896,
9191
"ignore_case": true,

components/clp-mcp-server/clp_mcp_server/clp_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def submit_query(
6060
job_config = msgpack.packb(
6161
{
6262
"begin_timestamp": begin_ts,
63-
"dataset": CLP_DEFAULT_DATASET_NAME,
63+
"datasets": [CLP_DEFAULT_DATASET_NAME],
6464
"end_timestamp": end_ts,
6565
"ignore_case": True,
6666
"max_num_results": SEARCH_MAX_NUM_RESULTS,

components/clp-package-utils/clp_package_utils/controller.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ def _set_up_env_for_database(self) -> EnvVarsDict:
191191
env_vars |= {
192192
"CLP_DB_NAME": self._clp_config.database.names[ClpDbNameType.CLP],
193193
}
194+
if self._clp_config.compression_scheduler.type == OrchestrationType.SPIDER:
195+
env_vars["SPIDER_DB_NAME"] = self._clp_config.database.names[ClpDbNameType.SPIDER]
196+
194197
if BundledService.DATABASE not in self._clp_config.bundled:
195198
env_vars |= {
196199
"CLP_DB_PORT": str(self._clp_config.database.port),
@@ -199,9 +202,8 @@ def _set_up_env_for_database(self) -> EnvVarsDict:
199202
self._clp_config.database.host
200203
),
201204
}
202-
203-
if self._clp_config.compression_scheduler.type == OrchestrationType.SPIDER:
204-
env_vars["SPIDER_DB_NAME"] = self._clp_config.database.names[ClpDbNameType.SPIDER]
205+
else:
206+
env_vars["CLP_DB_HOST"] = _get_ip_from_hostname(self._clp_config.database.host)
205207

206208
# Credentials
207209
credentials = self._clp_config.database.credentials
@@ -275,6 +277,8 @@ def _set_up_env_for_queue(self) -> EnvVarsDict:
275277
"CLP_EXTRA_HOST_QUEUE_NAME": QUEUE_COMPONENT_NAME,
276278
"CLP_EXTRA_HOST_QUEUE_ADDR": _resolve_external_host(self._clp_config.queue.host),
277279
}
280+
else:
281+
env_vars["CLP_QUEUE_HOST"] = _get_ip_from_hostname(self._clp_config.queue.host)
278282

279283
# Credentials
280284
env_vars |= {
@@ -359,6 +363,8 @@ def _set_up_env_for_redis(self) -> EnvVarsDict:
359363
"CLP_EXTRA_HOST_REDIS_NAME": REDIS_COMPONENT_NAME,
360364
"CLP_EXTRA_HOST_REDIS_ADDR": _resolve_external_host(self._clp_config.redis.host),
361365
}
366+
else:
367+
env_vars["CLP_REDIS_HOST"] = _get_ip_from_hostname(self._clp_config.redis.host)
362368

363369
# Credentials
364370
env_vars |= {
@@ -463,6 +469,10 @@ def _set_up_env_for_results_cache(self) -> EnvVarsDict:
463469
self._clp_config.results_cache.host
464470
),
465471
}
472+
else:
473+
env_vars["CLP_RESULTS_CACHE_HOST"] = _get_ip_from_hostname(
474+
self._clp_config.results_cache.host
475+
)
466476

467477
return env_vars
468478

@@ -701,6 +711,7 @@ def _set_up_env_for_webui(self, container_clp_config: ClpConfig) -> EnvVarsDict:
701711
"ClpStorageEngine": self._clp_config.package.storage_engine,
702712
"ClpQueryEngine": self._clp_config.package.query_engine,
703713
"LogsInputType": self._clp_config.logs_input.type,
714+
"MaxDatasetsPerQuery": self._clp_config.query_scheduler.max_datasets_per_query,
704715
"MongoDbSearchResultsMetadataCollectionName": (
705716
self._clp_config.webui.results_metadata_collection_name
706717
),

components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
END_TS_ARG,
2929
FIND_COMMAND,
3030
)
31-
from clp_package_utils.scripts.native.utils import validate_dataset_exists
31+
from clp_package_utils.scripts.native.utils import validate_datasets_exist
3232

3333
logger: logging.Logger = logging.getLogger(__file__)
3434

@@ -200,7 +200,7 @@ def main(argv: list[str]) -> int:
200200
dataset = parsed_args.dataset
201201
if dataset is not None:
202202
try:
203-
validate_dataset_exists(database_config, dataset)
203+
validate_datasets_exist(database_config, [dataset])
204204
except Exception as e:
205205
logger.error(e)
206206
return -1

components/clp-package-utils/clp_package_utils/scripts/native/decompress.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
CLP_DB_PASS_ENV_VAR_NAME,
1313
CLP_DB_USER_ENV_VAR_NAME,
1414
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
15+
CLP_DEFAULT_DATASET_NAME,
1516
ClpConfig,
1617
ClpDbNameType,
1718
ClpDbUserType,
@@ -36,7 +37,7 @@
3637
from clp_package_utils.scripts.native.utils import (
3738
run_function_in_process,
3839
submit_query_job,
39-
validate_dataset_exists,
40+
validate_datasets_exist,
4041
wait_for_query_job,
4142
)
4243

@@ -144,11 +145,9 @@ def handle_extract_stream_cmd(
144145
)
145146
elif EXTRACT_JSON_CMD == command:
146147
dataset = parsed_args.dataset
147-
if dataset is None:
148-
logger.error(f"Dataset unspecified, but must be specified for command `{command}'.")
149-
return -1
148+
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
150149
try:
151-
validate_dataset_exists(clp_config.database, dataset)
150+
validate_datasets_exist(clp_config.database, [dataset])
152151
except Exception as e:
153152
logger.error(e)
154153
return -1

components/clp-package-utils/clp_package_utils/scripts/native/search.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from clp_package_utils.scripts.native.utils import (
2828
run_function_in_process,
2929
submit_query_job,
30-
validate_dataset_exists,
30+
validate_datasets_exist,
3131
wait_for_query_job,
3232
)
3333

@@ -37,7 +37,7 @@
3737
def create_and_monitor_job_in_db(
3838
db_config: Database,
3939
results_cache: ResultsCache,
40-
dataset: str | None,
40+
datasets: list[str] | None,
4141
wildcard_query: str,
4242
begin_timestamp: int | None,
4343
end_timestamp: int | None,
@@ -48,7 +48,7 @@ def create_and_monitor_job_in_db(
4848
count_by_time_bucket_size: int | None,
4949
):
5050
search_config = SearchJobConfig(
51-
dataset=dataset,
51+
datasets=datasets,
5252
query_string=wildcard_query,
5353
begin_timestamp=begin_timestamp,
5454
end_timestamp=end_timestamp,
@@ -115,7 +115,7 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci
115115
async def do_search_without_aggregation(
116116
db_config: Database,
117117
results_cache: ResultsCache,
118-
dataset: str | None,
118+
datasets: list[str] | None,
119119
wildcard_query: str,
120120
begin_timestamp: int | None,
121121
end_timestamp: int | None,
@@ -144,7 +144,7 @@ async def do_search_without_aggregation(
144144
create_and_monitor_job_in_db,
145145
db_config,
146146
results_cache,
147-
dataset,
147+
datasets,
148148
wildcard_query,
149149
begin_timestamp,
150150
end_timestamp,
@@ -181,7 +181,7 @@ async def do_search_without_aggregation(
181181
async def do_search(
182182
db_config: Database,
183183
results_cache: ResultsCache,
184-
dataset: str | None,
184+
datasets: list[str] | None,
185185
wildcard_query: str,
186186
begin_timestamp: int | None,
187187
end_timestamp: int | None,
@@ -195,7 +195,7 @@ async def do_search(
195195
await do_search_without_aggregation(
196196
db_config,
197197
results_cache,
198-
dataset,
198+
datasets,
199199
wildcard_query,
200200
begin_timestamp,
201201
end_timestamp,
@@ -208,7 +208,7 @@ async def do_search(
208208
create_and_monitor_job_in_db,
209209
db_config,
210210
results_cache,
211-
dataset,
211+
datasets,
212212
wildcard_query,
213213
begin_timestamp,
214214
end_timestamp,
@@ -235,9 +235,9 @@ def main(argv):
235235
args_parser.add_argument("wildcard_query", help="Wildcard query.")
236236
args_parser.add_argument(
237237
"--dataset",
238-
type=str,
238+
action="append",
239239
default=None,
240-
help="The dataset that the archives belong to.",
240+
help="A dataset to search. Can be specified multiple times.",
241241
)
242242
args_parser.add_argument(
243243
"--begin-time",
@@ -297,10 +297,18 @@ def main(argv):
297297
return -1
298298

299299
database_config: Database = clp_config.database
300-
dataset = parsed_args.dataset
301-
if dataset is not None:
300+
datasets = parsed_args.dataset
301+
if datasets is not None:
302+
max_datasets_per_query = clp_config.query_scheduler.max_datasets_per_query
303+
if max_datasets_per_query is not None and len(datasets) > max_datasets_per_query:
304+
logger.error(
305+
"Number of datasets (%d) exceeds max_datasets_per_query=%s.",
306+
len(datasets),
307+
max_datasets_per_query,
308+
)
309+
return -1
302310
try:
303-
validate_dataset_exists(database_config, dataset)
311+
validate_datasets_exist(database_config, datasets)
304312
except Exception as e:
305313
logger.error(e)
306314
return -1
@@ -310,7 +318,7 @@ def main(argv):
310318
do_search(
311319
database_config,
312320
clp_config.results_cache,
313-
dataset,
321+
datasets,
314322
parsed_args.wildcard_query,
315323
parsed_args.begin_time,
316324
parsed_args.end_time,

components/clp-package-utils/clp_package_utils/scripts/native/utils.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ def submit_query_job(
7272
return db_cursor.lastrowid
7373

7474

75-
def validate_dataset_exists(db_config: Database, dataset: str) -> None:
75+
def validate_datasets_exist(db_config: Database, datasets: list[str]) -> None:
7676
"""
77-
Validates that `dataset` exists in the metadata database.
77+
Validates that all datasets in `datasets` exist in the metadata database.
7878
7979
:param db_config:
80-
:param dataset:
81-
:raise: ValueError if the dataset doesn't exist.
80+
:param datasets:
81+
:raise: ValueError if any dataset doesn't exist.
8282
"""
8383
sql_adapter = SqlAdapter(db_config)
8484
clp_db_connection_params = db_config.get_clp_connection_params_and_type(True)
@@ -87,8 +87,11 @@ def validate_dataset_exists(db_config: Database, dataset: str) -> None:
8787
closing(sql_adapter.create_connection(True)) as db_conn,
8888
closing(db_conn.cursor(dictionary=True)) as db_cursor,
8989
):
90-
if dataset not in fetch_existing_datasets(db_cursor, table_prefix):
91-
raise ValueError(f"Dataset `{dataset}` doesn't exist.")
90+
existing_datasets = fetch_existing_datasets(db_cursor, table_prefix)
91+
missing = [ds for ds in datasets if ds not in existing_datasets]
92+
if len(missing) > 0:
93+
err_msg = f"Dataset(s) {missing} don't exist."
94+
raise ValueError(err_msg)
9295

9396

9497
def wait_for_query_job(sql_adapter: SqlAdapter, job_id: int) -> QueryJobStatus:

components/clp-package-utils/clp_package_utils/scripts/search.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ def main(argv):
5252
args_parser.add_argument("wildcard_query", help="Wildcard query.")
5353
args_parser.add_argument(
5454
"--dataset",
55-
type=str,
55+
action="append",
5656
default=None,
57-
help="The dataset that the archives belong to.",
57+
help="A dataset to search. Can be specified multiple times.",
5858
)
5959
args_parser.add_argument(
6060
"--begin-time",
@@ -113,16 +113,17 @@ def main(argv):
113113
)
114114
return -1
115115

116-
dataset = parsed_args.dataset
116+
datasets = parsed_args.dataset
117117
if StorageEngine.CLP_S == storage_engine:
118-
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
118+
datasets = [CLP_DEFAULT_DATASET_NAME] if datasets is None else datasets
119119
try:
120120
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
121-
validate_dataset_name(clp_db_connection_params["table_prefix"], dataset)
121+
for ds in datasets:
122+
validate_dataset_name(clp_db_connection_params["table_prefix"], ds)
122123
except Exception as e:
123124
logger.error(e)
124125
return -1
125-
elif dataset is not None:
126+
elif datasets is not None:
126127
logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.")
127128
return -1
128129

@@ -152,9 +153,10 @@ def main(argv):
152153
# fmt: on
153154
if parsed_args.verbose:
154155
search_cmd.append("--verbose")
155-
if dataset is not None:
156-
search_cmd.append("--dataset")
157-
search_cmd.append(dataset)
156+
if datasets is not None:
157+
for ds in datasets:
158+
search_cmd.append("--dataset")
159+
search_cmd.append(ds)
158160
if parsed_args.begin_time is not None:
159161
search_cmd.append("--begin-time")
160162
search_cmd.append(str(parsed_args.begin_time))

components/clp-py-utils/clp_py_utils/clp_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ class QueryScheduler(BaseModel):
433433
host: DomainStr = "localhost"
434434
port: Port = DEFAULT_PORT
435435
jobs_poll_delay: PositiveFloat = 0.1 # seconds
436+
max_datasets_per_query: PositiveInt | None = 10
436437
num_archives_to_search_per_sub_job: PositiveInt = 16
437438
logging_level: LoggingLevel = "INFO"
438439

0 commit comments

Comments
 (0)