Skip to content

Commit aceb39f

Browse files
committed
Merge branch 'main' into package-image
2 parents 1f13954 + d443941 commit aceb39f

File tree

32 files changed

+667
-35
lines changed

32 files changed

+667
-35
lines changed

.github/workflows/clp-s-generated-code-checks.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ jobs:
2424
with:
2525
submodules: "recursive"
2626

27+
- name: "Set up Java"
28+
uses: "actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00"
29+
with:
30+
distribution: "temurin"
31+
java-version: "11"
32+
2733
- name: "Install task"
2834
shell: "bash"
2935
run: "npm install -g @go-task/cli"
@@ -32,6 +38,10 @@ jobs:
3238
name: "Install coreutils (for md5sum)"
3339
run: "brew install coreutils"
3440

41+
- name: "Generate parsers"
42+
shell: "bash"
43+
run: "task clp-s-generate-parsers"
44+
3545
- name: "Check if the generated parsers are the latest"
3646
shell: "bash"
3747
run:

components/clp-package-utils/clp_package_utils/scripts/dataset_manager.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
from clp_py_utils.clp_config import (
99
ARCHIVE_MANAGER_ACTION_NAME,
10+
CLP_DB_PASS_ENV_VAR_NAME,
11+
CLP_DB_USER_ENV_VAR_NAME,
1012
StorageEngine,
1113
StorageType,
1214
)
@@ -141,8 +143,12 @@ def main(argv: List[str]) -> int:
141143
if aws_mount:
142144
necessary_mounts.append(mounts.aws_config_dir)
143145

146+
extra_env_vars = {
147+
CLP_DB_USER_ENV_VAR_NAME: clp_config.database.username,
148+
CLP_DB_PASS_ENV_VAR_NAME: clp_config.database.password,
149+
}
144150
container_start_cmd = generate_container_start_cmd(
145-
container_name, necessary_mounts, clp_config.execution_container
151+
container_name, necessary_mounts, clp_config.execution_container, extra_env_vars
146152
)
147153

148154
if len(aws_env_vars) != 0:

components/clp-package-utils/clp_package_utils/scripts/native/compress.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting):
9191
# One or more tasks in the job has failed
9292
logger.error(f"Compression failed. {job_row['status_msg']}")
9393
break # Done
94+
if CompressionJobStatus.KILLED == job_status:
95+
# The job is killed
96+
logger.error(f"Compression killed. {job_row['status_msg']}")
97+
break # Done
9498

9599
if CompressionJobStatus.RUNNING == job_status:
96100
if not no_progress_reporting:

components/clp-package-utils/clp_package_utils/scripts/native/dataset_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ def main(argv: List[str]) -> int:
222222
try:
223223
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
224224
clp_config.validate_logs_dir()
225+
clp_config.database.load_credentials_from_env()
225226
except:
226227
logger.exception("Failed to load config.")
227228
return -1

components/clp-package-utils/clp_package_utils/scripts/native/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def wait_for_query_job(sql_adapter: SQL_Adapter, job_id: int) -> QueryJobStatus:
112112
QueryJobStatus.SUCCEEDED,
113113
QueryJobStatus.FAILED,
114114
QueryJobStatus.CANCELLED,
115+
QueryJobStatus.KILLED,
115116
):
116117
return new_status
117118

components/clp-package-utils/clp_package_utils/scripts/start_clp.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
get_files_table_name,
4040
)
4141
from clp_py_utils.s3_utils import generate_container_auth_options
42-
from job_orchestration.scheduler.constants import QueueName
42+
from job_orchestration.scheduler.constants import SchedulerType
4343
from pydantic import BaseModel
4444

4545
from clp_package_utils.general import (
@@ -649,7 +649,7 @@ def start_compression_worker(
649649
mounts: CLPDockerMounts,
650650
):
651651
celery_method = "job_orchestration.executor.compress"
652-
celery_route = f"{QueueName.COMPRESSION}"
652+
celery_route = SchedulerType.COMPRESSION
653653
compression_worker_mounts = [mounts.archives_output_dir]
654654
generic_start_worker(
655655
COMPRESSION_WORKER_COMPONENT_NAME,
@@ -674,7 +674,7 @@ def start_query_worker(
674674
mounts: CLPDockerMounts,
675675
):
676676
celery_method = "job_orchestration.executor.query"
677-
celery_route = f"{QueueName.QUERY}"
677+
celery_route = SchedulerType.QUERY
678678

679679
query_worker_mounts = [mounts.stream_output_dir]
680680
if StorageType.FS == clp_config.archive_output.storage.type:

components/core/src/clp/ffi/ir_stream/Deserializer.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef CLP_FFI_IR_STREAM_DESERIALIZER_HPP
22
#define CLP_FFI_IR_STREAM_DESERIALIZER_HPP
33

4+
#include <cstddef>
45
#include <cstdint>
56
#include <memory>
67
#include <string>
@@ -196,6 +197,7 @@ class Deserializer {
196197
IrUnitHandlerType m_ir_unit_handler;
197198
bool m_is_complete{false};
198199
[[no_unique_address]] QueryHandlerType m_query_handler;
200+
size_t m_next_log_event_idx{0};
199201
};
200202

201203
/**
@@ -304,6 +306,9 @@ auto Deserializer<IrUnitHandler, QueryHandlerType>::deserialize_next_ir_unit(
304306
m_utc_offset
305307
))};
306308

309+
auto const log_event_idx{m_next_log_event_idx};
310+
m_next_log_event_idx += 1;
311+
307312
if constexpr (search::IsNonEmptyQueryHandler<QueryHandlerType>::value) {
308313
if (search::AstEvaluationResult::True
309314
!= YSTDLIB_ERROR_HANDLING_TRYX(
@@ -314,7 +319,9 @@ auto Deserializer<IrUnitHandler, QueryHandlerType>::deserialize_next_ir_unit(
314319
}
315320
}
316321

317-
if (auto const err{m_ir_unit_handler.handle_log_event(std::move(log_event))};
322+
if (auto const err{
323+
m_ir_unit_handler.handle_log_event(std::move(log_event), log_event_idx)
324+
};
318325
IRErrorCode::IRErrorCode_Success != err)
319326
{
320327
return ir_error_code_to_errc(err);

components/core/src/clp/ffi/ir_stream/IrUnitHandlerReq.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define CLP_FFI_IR_STREAM_IRUNITHANDLERREQ_HPP
33

44
#include <concepts>
5+
#include <cstddef>
56
#include <memory>
67
#include <utility>
78

@@ -23,6 +24,7 @@ template <typename IrUnitHandlerType>
2324
concept IrUnitHandlerInterfaceReq = requires(
2425
IrUnitHandlerType handler,
2526
KeyValuePairLogEvent&& log_event,
27+
size_t log_event_idx,
2628
bool is_auto_generated,
2729
UtcOffset utc_offset_old,
2830
UtcOffset utc_offset_new,
@@ -32,10 +34,11 @@ concept IrUnitHandlerInterfaceReq = requires(
3234
/**
3335
* Handles a log event IR unit.
3436
* @param log_event The deserialized result from IR deserializer.
37+
* @param log_event_idx The log event index of `log_event` in the stream.
3538
* @return IRErrorCode::Success on success, user-defined error code on failures.
3639
*/
3740
{
38-
handler.handle_log_event(std::forward<KeyValuePairLogEvent &&>(log_event))
41+
handler.handle_log_event(std::forward<KeyValuePairLogEvent &&>(log_event), log_event_idx)
3942
} -> std::same_as<IRErrorCode>;
4043

4144
/**

components/core/src/clp/ffi/ir_stream/search/test/test_deserializer_integration.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <cstddef>
12
#include <cstdint>
23
#include <memory>
34
#include <sstream>
@@ -40,7 +41,9 @@ using JsonPair = std::pair<nlohmann::json, nlohmann::json>;
4041
*/
4142
class IrUnitHandler {
4243
public:
43-
[[nodiscard]] auto handle_log_event(KeyValuePairLogEvent&& log_event) -> IRErrorCode {
44+
[[nodiscard]] auto
45+
handle_log_event(KeyValuePairLogEvent&& log_event, [[maybe_unused]] size_t log_event_idx)
46+
-> IRErrorCode {
4447
m_deserialized_log_events.emplace_back(std::move(log_event));
4548
return IRErrorCode::IRErrorCode_Success;
4649
}

components/core/src/clp_s/JsonParser.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "JsonParser.hpp"
22

3+
#include <cstddef>
34
#include <cstdint>
45
#include <iostream>
56
#include <memory>
@@ -45,7 +46,9 @@ namespace clp_s {
4546
*/
4647
class IrUnitHandler {
4748
public:
48-
[[nodiscard]] auto handle_log_event(KeyValuePairLogEvent&& log_event) -> IRErrorCode {
49+
[[nodiscard]] auto
50+
handle_log_event(KeyValuePairLogEvent&& log_event, [[maybe_unused]] size_t log_event_idx)
51+
-> IRErrorCode {
4952
m_deserialized_log_event.emplace(std::move(log_event));
5053
return IRErrorCode::IRErrorCode_Success;
5154
}

0 commit comments

Comments
 (0)