Skip to content

Commit a141f00

Browse files
Vidushee GeetamVidushee Geetam
authored andcommitted
opensearch tasks
1 parent 37982bc commit a141f00

File tree

5 files changed

+477
-17
lines changed

5 files changed

+477
-17
lines changed

integrations/source_api_processors/open_search_api_processor.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,85 @@ def delete_index(self, index):
9696
logger.error(f"OpenSearchApiProcessor.delete_index:: Exception occurred while deleting index: {index} from "
9797
f"host: {self.base_url} with error: {e}")
9898
raise e
99+
100+
# cluster health
101+
def get_cluster_health(self):
102+
try:
103+
result = self._make_request("GET", "_cluster/health")
104+
return result
105+
except Exception as e:
106+
logger.error(f"OpenSearchApiProcessor.get_cluster_health:: Exception occurred while fetching cluster health "
107+
f"for host: {self.base_url} with error: {e}")
108+
raise e
109+
110+
# cluster settings
111+
def get_cluster_settings(self):
112+
try:
113+
result = self._make_request("GET", "_cluster/settings")
114+
return result
115+
except Exception as e:
116+
logger.error(f"OpenSearchApiProcessor.get_cluster_settings:: Exception occurred while fetching cluster settings "
117+
f"for host: {self.base_url} with error: {e}")
118+
raise e
119+
120+
# cluster stats
121+
def get_cluster_stats(self):
122+
try:
123+
result = self._make_request("GET", "_cluster/stats")
124+
return result
125+
except Exception as e:
126+
logger.error(f"OpenSearchApiProcessor.get_cluster_stats:: Exception occurred while fetching cluster stats "
127+
f"for host: {self.base_url} with error: {e}")
128+
raise e
129+
130+
# list index and shard recoveries
131+
def get_index_and_shard_recoveries(self):
132+
try:
133+
result = self._make_request("GET", "_recovery")
134+
return result
135+
except Exception as e:
136+
logger.error(f"OpenSearchApiProcessor.get_index_and_shard_recoveries:: Exception occurred while fetching index and shard recoveries "
137+
f"for host: {self.base_url} with error: {e}")
138+
raise e
139+
140+
# list indices
141+
def get_indices(self):
142+
try:
143+
result = self._make_request("GET", "_cat/indices?format=json")
144+
response = {"api_response": result}
145+
return response
146+
except Exception as e:
147+
logger.error(f"OpenSearchApiProcessor.get_indices:: Exception occurred while fetching indices for host: "
148+
f"{self.base_url} with error: {e}")
149+
raise e
150+
151+
# list pending tasks
152+
def get_pending_tasks(self):
153+
try:
154+
result = self._make_request("GET", "_cluster/pending_tasks")
155+
return result
156+
except Exception as e:
157+
logger.error(f"OpenSearchApiProcessor.get_pending_tasks:: Exception occurred while fetching pending tasks for "
158+
f"host: {self.base_url} with error: {e}")
159+
raise e
160+
161+
# list shards
162+
def get_shards(self):
163+
try:
164+
result = self._make_request("GET", "_cat/shards?format=json")
165+
response = {"api_response": result}
166+
return response
167+
except Exception as e:
168+
logger.error(f"OpenSearchApiProcessor.get_shards:: Exception occurred while fetching shards for host: "
169+
f"{self.base_url} with error: {e}")
170+
raise e
171+
172+
# list tasks
173+
def get_tasks(self):
174+
try:
175+
result = self._make_request("GET", "_tasks")
176+
return result
177+
except Exception as e:
178+
logger.error(f"OpenSearchApiProcessor.get_tasks:: Exception occurred while fetching tasks for "
179+
f"host: {self.base_url} with error: {e}")
180+
raise e

integrations/source_manangers/open_search_source_manager.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,70 @@ def __init__(self):
8080
'category': 'Metrics',
8181
'form_fields': []
8282
},
83+
OpenSearch.TaskType.GET_CLUSTER_HEALTH: {
84+
'executor': self.execute_get_cluster_health,
85+
'model_types': [],
86+
'result_type': PlaybookTaskResultType.API_RESPONSE,
87+
'display_name': 'Get OpenSearch Cluster Health',
88+
'category': 'Metrics',
89+
'form_fields': []
90+
},
91+
OpenSearch.TaskType.GET_CLUSTER_SETTINGS: {
92+
'executor': self.execute_get_cluster_settings,
93+
'model_types': [],
94+
'result_type': PlaybookTaskResultType.API_RESPONSE,
95+
'display_name': 'Get OpenSearch Cluster Settings',
96+
'category': 'Metrics',
97+
'form_fields': []
98+
},
99+
OpenSearch.TaskType.GET_CLUSTER_STATS: {
100+
'executor': self.execute_get_cluster_stats,
101+
'model_types': [],
102+
'result_type': PlaybookTaskResultType.API_RESPONSE,
103+
'display_name': 'Get OpenSearch Cluster Stats',
104+
'category': 'Metrics',
105+
'form_fields': []
106+
},
107+
OpenSearch.TaskType.GET_INDEX_AND_SHARD_RECOVERIES: {
108+
'executor': self.execute_get_index_and_shard_recoveries,
109+
'model_types': [],
110+
'result_type': PlaybookTaskResultType.API_RESPONSE,
111+
'display_name': 'Get OpenSearch Index and Shard Recoveries',
112+
'category': 'Metrics',
113+
'form_fields': []
114+
},
115+
OpenSearch.TaskType.GET_INDICES: {
116+
'executor': self.execute_get_indices,
117+
'model_types': [],
118+
'result_type': PlaybookTaskResultType.API_RESPONSE,
119+
'display_name': 'Get OpenSearch Indices',
120+
'category': 'Metrics',
121+
'form_fields': []
122+
},
123+
OpenSearch.TaskType.GET_PENDING_TASKS: {
124+
'executor': self.execute_get_pending_tasks,
125+
'model_types': [],
126+
'result_type': PlaybookTaskResultType.API_RESPONSE,
127+
'display_name': 'Get OpenSearch Pending Tasks',
128+
'category': 'Metrics',
129+
'form_fields': []
130+
},
131+
OpenSearch.TaskType.GET_SHARDS: {
132+
'executor': self.execute_get_shards,
133+
'model_types': [],
134+
'result_type': PlaybookTaskResultType.API_RESPONSE,
135+
'display_name': 'Get OpenSearch Shards',
136+
'category': 'Metrics',
137+
'form_fields': []
138+
},
139+
OpenSearch.TaskType.GET_TASKS: {
140+
'executor': self.execute_get_tasks,
141+
'model_types': [],
142+
'result_type': PlaybookTaskResultType.API_RESPONSE,
143+
'display_name': 'Get OpenSearch Tasks',
144+
'category': 'Metrics',
145+
'form_fields': []
146+
}
83147
}
84148

85149
def get_connector_processor(self, os_connector, **kwargs):
@@ -223,3 +287,147 @@ def execute_get_index_stats(self, time_range: TimeRange, os_task: OpenSearch,
223287
except Exception as e:
224288
raise Exception(
225289
f"OpenSearchSourceManager.execute_get_index_stats:: Error while executing OpenSearch task: {str(e)}")
290+
291+
# cluster health task functions
292+
def execute_get_cluster_health(self, time_range: TimeRange, os_task: OpenSearch,
293+
os_connector: ConnectorProto):
294+
try:
295+
if not os_connector:
296+
raise ValueError("OpenSearchSourceManager.execute_get_cluster_health:: Task execution Failed:: "
297+
"No OpenSearch source found")
298+
299+
os_client = self.get_connector_processor(os_connector)
300+
result = os_client.get_cluster_health()
301+
result_struct = dict_to_proto(result, Struct)
302+
api_response = ApiResponseResult(response_body=result_struct)
303+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
304+
source=self.source)
305+
except Exception as e:
306+
raise Exception(
307+
f"OpenSearchSourceManager.execute_get_cluster_health:: Error while executing OpenSearch task: {str(e)}")
308+
309+
# cluster settings task functions
310+
def execute_get_cluster_settings(self, time_range: TimeRange, os_task: OpenSearch,
311+
os_connector: ConnectorProto):
312+
try:
313+
if not os_connector:
314+
raise ValueError("OpenSearchSourceManager.execute_get_cluster_settings:: Task execution Failed:: "
315+
"No OpenSearch source found")
316+
317+
os_client = self.get_connector_processor(os_connector)
318+
result = os_client.get_cluster_settings()
319+
result_struct = dict_to_proto(result, Struct)
320+
api_response = ApiResponseResult(response_body=result_struct)
321+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
322+
source=self.source)
323+
except Exception as e:
324+
raise Exception(
325+
f"OpenSearchSourceManager.execute_get_cluster_settings:: Error while executing OpenSearch task: {str(e)}")
326+
327+
# cluster stats task functions
328+
def execute_get_cluster_stats(self, time_range: TimeRange, os_task: OpenSearch,
329+
os_connector: ConnectorProto):
330+
try:
331+
if not os_connector:
332+
raise ValueError("OpenSearchSourceManager.execute_get_cluster_stats:: Task execution Failed:: "
333+
"No OpenSearch source found")
334+
335+
os_client = self.get_connector_processor(os_connector)
336+
result = os_client.get_cluster_stats()
337+
result_struct = dict_to_proto(result, Struct)
338+
api_response = ApiResponseResult(response_body=result_struct)
339+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
340+
source=self.source)
341+
except Exception as e:
342+
raise Exception(
343+
f"OpenSearchSourceManager.execute_get_cluster_stats:: Error while executing OpenSearch task: {str(e)}")
344+
345+
# index and shard recoveries task functions
346+
def execute_get_index_and_shard_recoveries(self, time_range: TimeRange, os_task: OpenSearch,
347+
os_connector: ConnectorProto):
348+
try:
349+
if not os_connector:
350+
raise ValueError("OpenSearchSourceManager.execute_get_index_and_shard_recoveries:: Task execution Failed:: "
351+
"No OpenSearch source found")
352+
353+
os_client = self.get_connector_processor(os_connector)
354+
result = os_client.get_index_and_shard_recoveries()
355+
result_struct = dict_to_proto(result, Struct)
356+
api_response = ApiResponseResult(response_body=result_struct)
357+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
358+
source=self.source)
359+
except Exception as e:
360+
raise Exception(
361+
f"OpenSearchSourceManager.execute_get_index_and_shard_recoveries:: Error while executing OpenSearch task: {str(e)}")
362+
363+
# indices task functions
364+
def execute_get_indices(self, time_range: TimeRange, os_task: OpenSearch,
365+
os_connector: ConnectorProto):
366+
try:
367+
if not os_connector:
368+
raise ValueError("OpenSearchSourceManager.execute_get_indices:: Task execution Failed:: "
369+
"No OpenSearch source found")
370+
371+
os_client = self.get_connector_processor(os_connector)
372+
result = os_client.get_indices()
373+
result_struct = dict_to_proto(result, Struct)
374+
api_response = ApiResponseResult(response_body=result_struct)
375+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
376+
source=self.source)
377+
except Exception as e:
378+
raise Exception(
379+
f"OpenSearchSourceManager.execute_get_indices:: Error while executing OpenSearch task: {str(e)}")
380+
381+
# pending tasks task functions
382+
def execute_get_pending_tasks(self, time_range: TimeRange, os_task: OpenSearch,
383+
os_connector: ConnectorProto):
384+
try:
385+
if not os_connector:
386+
raise ValueError("OpenSearchSourceManager.execute_get_pending_tasks:: Task execution Failed:: "
387+
"No OpenSearch source found")
388+
389+
os_client = self.get_connector_processor(os_connector)
390+
result = os_client.get_pending_tasks()
391+
result_struct = dict_to_proto(result, Struct)
392+
api_response = ApiResponseResult(response_body=result_struct)
393+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
394+
source=self.source)
395+
except Exception as e:
396+
raise Exception(
397+
f"OpenSearchSourceManager.execute_get_pending_tasks:: Error while executing OpenSearch task: {str(e)}")
398+
399+
# shards task functions
400+
def execute_get_shards(self, time_range: TimeRange, os_task: OpenSearch,
401+
os_connector: ConnectorProto):
402+
try:
403+
if not os_connector:
404+
raise ValueError("OpenSearchSourceManager.execute_get_shards:: Task execution Failed:: "
405+
"No OpenSearch source found")
406+
407+
os_client = self.get_connector_processor(os_connector)
408+
result = os_client.get_shards()
409+
result_struct = dict_to_proto(result, Struct)
410+
api_response = ApiResponseResult(response_body=result_struct)
411+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
412+
source=self.source)
413+
except Exception as e:
414+
raise Exception(
415+
f"OpenSearchSourceManager.execute_get_shards:: Error while executing OpenSearch task: {str(e)}")
416+
417+
# tasks task functions
418+
def execute_get_tasks(self, time_range: TimeRange, os_task: OpenSearch,
419+
os_connector: ConnectorProto):
420+
try:
421+
if not os_connector:
422+
raise ValueError("OpenSearchSourceManager.execute_get_tasks:: Task execution Failed:: "
423+
"No OpenSearch source found")
424+
425+
os_client = self.get_connector_processor(os_connector)
426+
result = os_client.get_tasks()
427+
result_struct = dict_to_proto(result, Struct)
428+
api_response = ApiResponseResult(response_body=result_struct)
429+
return PlaybookTaskResult(type=PlaybookTaskResultType.API_RESPONSE, api_response=api_response,
430+
source=self.source)
431+
except Exception as e:
432+
raise Exception(
433+
f"OpenSearchSourceManager.execute_get_tasks:: Error while executing OpenSearch task: {str(e)}")

protos/playbooks/source_task_definitions/open_search_task.proto

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,50 @@ message OpenSearch {
1818
google.protobuf.StringValue index = 1;
1919
}
2020

21-
message GetNodeStats {}
21+
message GetNodeStats {
22+
}
23+
24+
message GetIndexStats {
25+
}
26+
27+
message GetClusterHealth {
28+
}
29+
30+
message GetClusterSettings {
31+
}
32+
33+
message GetClusterStats {
34+
}
2235

23-
message GetIndexStats {}
36+
message GetIndexAndShardRecoveries {
37+
}
38+
39+
message GetIndices {
40+
}
41+
42+
message GetPendingTasks {
43+
}
44+
45+
message GetShards {
46+
}
47+
48+
message GetTasks {
49+
}
2450

2551
enum TaskType {
2652
UNKNOWN = 0;
2753
QUERY_LOGS = 1;
2854
DELETE_INDEX = 2;
2955
GET_NODE_STATS = 3;
3056
GET_INDEX_STATS = 4;
57+
GET_CLUSTER_HEALTH = 5;
58+
GET_CLUSTER_SETTINGS = 6;
59+
GET_CLUSTER_STATS = 7;
60+
GET_INDEX_AND_SHARD_RECOVERIES = 8;
61+
GET_INDICES = 9;
62+
GET_PENDING_TASKS = 10;
63+
GET_SHARDS = 11;
64+
GET_TASKS = 12;
3165
}
3266

3367
TaskType type = 1;
@@ -36,5 +70,13 @@ message OpenSearch {
3670
DeleteIndex delete_index = 102;
3771
GetNodeStats get_node_stats = 103;
3872
GetIndexStats get_index_stats = 104;
73+
GetClusterHealth get_cluster_health = 105;
74+
GetClusterSettings get_cluster_settings = 106;
75+
GetClusterStats get_cluster_stats = 107;
76+
GetIndexAndShardRecoveries get_index_and_shard_recoveries = 108;
77+
GetIndices get_indices = 109;
78+
GetPendingTasks get_pending_tasks = 110;
79+
GetShards get_shards = 111;
80+
GetTasks get_tasks = 112;
3981
}
4082
}

0 commit comments

Comments
 (0)