Skip to content

Commit b882f67

Browse files
Merge pull request #65 from DrDroidLab/feature/multi-task-result-vpc
intiialise support for multiple task results
2 parents 71f7d9c + a436759 commit b882f67

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+767
-338
lines changed

agent/settings.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"""
1212

1313
import os
14+
import uuid
1415
from pathlib import Path
1516

1617
import yaml
@@ -29,7 +30,8 @@ def load_yaml(filepath, native_k8s_connector_mode=False):
2930
if native_k8s_connector_mode:
3031
if not loaded_connection:
3132
loaded_connection = {}
32-
loaded_connection.update({'native_k8s_connector': {'type': 'KUBERNETES'}})
33+
uuid_hex = uuid.uuid4().hex
34+
loaded_connection.update({f'native_k8_connection_{uuid_hex}': {'type': 'KUBERNETES'}})
3335
return loaded_connection
3436

3537

integrations/source_api_processors/grafana_api_processor.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def __init__(self, grafana_host, grafana_api_key, ssl_verify='true'):
2222
def test_connection(self):
2323
try:
2424
url = '{}/api/datasources'.format(self.__host)
25-
response = requests.get(url, headers=self.headers, verify=self.__ssl_verify)
25+
response = requests.get(url, headers=self.headers, verify=self.__ssl_verify, timeout=20)
2626
if response and response.status_code == 200:
2727
return True
2828
else:
@@ -99,17 +99,41 @@ def fetch_promql_metric_timeseries(self, promql_datasource_uid, query, start, en
9999
logger.error(f"Exception occurred while getting promql metric timeseries with error: {e}")
100100
raise e
101101

102-
def panel_query_datasource_api(self, tr: TimeRange, queries):
102+
def fetch_alert_rules(self):
103+
try:
104+
url = '{}/api/v1/provisioning/alert-rules'.format(self.__host)
105+
response = requests.get(url, headers=self.headers, verify=self.__ssl_verify)
106+
if response and response.status_code == 200:
107+
return response.json()
108+
else:
109+
raise Exception(
110+
f"Failed to fetch alert rules. Status Code: {response.status_code}. Response Text: {response.text}")
111+
except Exception as e:
112+
logger.error(f"Exception occurred while fetching grafana alert rules with error: {e}")
113+
raise e
114+
115+
def panel_query_datasource_api(self, tr: TimeRange, queries, interval_ms=300000):
103116
try:
104117
if not queries or len(queries) == 0:
105118
raise ValueError("No queries provided.")
119+
106120
url = f"{self.__host}/api/ds/query"
121+
107122
from_tr = int(tr.time_geq * 1000)
108123
to_tr = int(tr.time_lt * 1000)
109-
payload = {"queries": queries, "from": str(from_tr), "to": str(to_tr)}
124+
125+
for query in queries:
126+
query["intervalMs"] = interval_ms # 5 minutes default, in milliseconds
127+
128+
payload = {
129+
"queries": queries,
130+
"from": str(from_tr),
131+
"to": str(to_tr)
132+
}
133+
110134
response = requests.post(url, headers=self.headers, json=payload)
111135
if response.status_code == 429:
112-
logger.error("Grafana query API responded with 429 (rate limited). Headers: %s", response.headers)
136+
logger.info("Grafana query API responded with 429 (rate limited). Headers: %s", response.headers)
113137
return None
114138
elif response.status_code == 200:
115139
return response.json()

integrations/source_facade.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def get_source_manager(self, source: Source):
5555
raise ValueError(f'No executor found for source: {source}')
5656
return self._map.get(source)
5757

58-
def execute_task(self, time_range, global_variable_set, task: PlaybookTask) -> PlaybookTaskResult:
58+
def execute_task(self, time_range, global_variable_set, task: PlaybookTask):
5959
source = task.source
6060
if source not in self._map:
6161
raise ValueError(f'No executor found for source: {source}')

integrations/source_manager.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
PlaybookExecutionStatusType
1515
from protos.playbooks.playbook_pb2 import PlaybookTask
1616
from utils.proto_utils import proto_to_dict, dict_to_proto
17-
17+
from integrations.utils.executor_utils import check_multiple_task_results
1818
logger = logging.getLogger(__name__)
1919

2020

@@ -126,7 +126,7 @@ def get_resolved_task(self, global_variable_set: Struct, input_task: PlaybookTas
126126

127127
return resolved_task, resolved_source_task_proto, task_local_variable_map
128128

129-
def execute_task(self, time_range: TimeRange, global_variable_set, task: PlaybookTask) -> PlaybookTaskResult:
129+
def execute_task(self, time_range: TimeRange, global_variable_set, task: PlaybookTask):
130130
try:
131131
source_connector_proto = None
132132
if task.task_connector_sources and len(task.task_connector_sources) > 0:
@@ -142,18 +142,26 @@ def execute_task(self, time_range: TimeRange, global_variable_set, task: Playboo
142142
try:
143143
# Execute task
144144
task_type = resolved_source_task.type
145-
playbook_task_result: PlaybookTaskResult = self.task_type_callable_map[task_type]['executor'](
145+
playbook_task_result = self.task_type_callable_map[task_type]['executor'](
146146
time_range, resolved_source_task, source_connector_proto)
147-
# Set task local variables in playbook_task_result to be stored in database
148-
task_local_variable_map_proto = dict_to_proto(task_local_variable_map,
149-
Struct) if task_local_variable_map else Struct()
150-
playbook_task_result.task_local_variable_set.CopyFrom(task_local_variable_map_proto)
151-
playbook_task_result.status = PlaybookExecutionStatusType.FINISHED
152-
# Apply result transformer
153-
playbook_task_result = self.apply_task_result_transformer(resolved_task, playbook_task_result)
154-
return playbook_task_result
147+
if check_multiple_task_results(playbook_task_result):
148+
task_results = []
149+
for result in playbook_task_result:
150+
task_results.append(self.postprocess_task_result(result, resolved_task, task_local_variable_map))
151+
return task_results
152+
return self.postprocess_task_result(playbook_task_result, resolved_task, task_local_variable_map)
155153
except Exception as e:
156154
source_str = Source.Name(resolved_task.source).lower()
157155
raise Exception(f"Error while executing task for source: {source_str} with error: {e}")
158156
except Exception as e:
159157
raise Exception(f"Error while executing task: {e}")
158+
159+
def postprocess_task_result(self, playbook_task_result: PlaybookTaskResult, resolved_task: PlaybookTask, task_local_variable_map: dict):
160+
task_local_variable_map_proto = dict_to_proto(task_local_variable_map,
161+
Struct) if task_local_variable_map else Struct()
162+
playbook_task_result.task_local_variable_set.CopyFrom(task_local_variable_map_proto)
163+
playbook_task_result.status = PlaybookExecutionStatusType.FINISHED
164+
165+
# Apply result transformer
166+
playbook_task_result = self.apply_task_result_transformer(resolved_task, playbook_task_result)
167+
return playbook_task_result

integrations/source_manangers/api_source_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def __init__(self):
8080
}
8181

8282
def execute_http_request(self, time_range: TimeRange, api_task: Api,
83-
api_connector_proto: ConnectorProto) -> PlaybookTaskResult:
83+
api_connector_proto: ConnectorProto):
8484
try:
8585
http_request = api_task.http_request
8686
method = http_request.method

integrations/source_manangers/argocd_source_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def get_connector_processor(self, argocd_connector, **kwargs):
8585
return ArgoCDAPIProcessor(**generated_credentials)
8686

8787
def fetch_deployment_info(self, time_range: TimeRange, argocd_task: ArgoCD,
88-
argocd_connector: ConnectorProto) -> PlaybookTaskResult:
88+
argocd_connector: ConnectorProto):
8989
# Loop through the commits and get the diff for each one
9090
try:
9191
deployment_info = self.get_connector_processor(argocd_connector).get_deployment_info()
@@ -139,7 +139,7 @@ def fetch_deployment_info(self, time_range: TimeRange, argocd_task: ArgoCD,
139139
raise Exception(f"Error while executing ArgoCD fetch_deployment_info task: {e}")
140140

141141
def rollback_application(self, time_range: TimeRange, argocd_task: ArgoCD,
142-
argocd_connector: ConnectorProto) -> PlaybookTaskResult:
142+
argocd_connector: ConnectorProto):
143143
try:
144144
app_name = argocd_task.rollback_application.app_name
145145
revision = argocd_task.rollback_application.revision

integrations/source_manangers/azure_source_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def get_connector_processor(self, azure_connector, **kwargs):
5353
return AzureApiProcessor(**generated_credentials)
5454

5555
def filter_log_events(self, time_range: TimeRange, azure_task: Azure,
56-
azure_connector: ConnectorProto) -> PlaybookTaskResult:
56+
azure_connector: ConnectorProto):
5757
try:
5858
tr_end_time = time_range.time_lt
5959
end_time = int(tr_end_time * 1000)

integrations/source_manangers/bash_source_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def get_connector_processor(self, bash_connector, **kwargs):
5353
return BashProcessor(**generated_credentials)
5454

5555
def execute_command(self, time_range: TimeRange, bash_task: Bash,
56-
remote_server_connector: ConnectorProto) -> PlaybookTaskResult:
56+
remote_server_connector: ConnectorProto):
5757
try:
5858
bash_command: Bash.Command = bash_task.command
5959
remote_server_str = bash_command.remote_server.value if bash_command.remote_server else None

integrations/source_manangers/big_query_source_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def get_connector_processor(self, bq_connector, **kwargs):
4242
return BigQueryApiProcessor(**generated_credentials)
4343

4444
def execute_query_table(self, time_range: TimeRange, bq_task: BigQuery,
45-
bq_connector: ConnectorProto) -> PlaybookTaskResult:
45+
bq_connector: ConnectorProto):
4646
try:
4747
if not bq_connector:
4848
raise Exception("Task execution Failed:: No BigQuery source found")

integrations/source_manangers/clickhouse_source_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def get_connector_processor(self, clickhouse_connector, **kwargs):
5555
return ClickhouseDBProcessor(**generated_credentials)
5656

5757
def execute_sql_query(self, time_range: TimeRange, clickhouse_task: SqlDataFetch,
58-
clickhouse_connector: ConnectorProto) -> PlaybookTaskResult:
58+
clickhouse_connector: ConnectorProto):
5959
try:
6060
if not clickhouse_connector:
6161
raise Exception("Task execution Failed:: No Clickhouse source found")

0 commit comments

Comments
 (0)