-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
132 lines (100 loc) · 3.81 KB
/
main.py
File metadata and controls
132 lines (100 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python3
import os
import argparse
import sys
from src.agent_api import fetch_list
from src.execute_api import execute_api, get_file
from src.database import insert_agent_list, sql_query_agents
def main() -> None:
"""
Main entry point for the CLI tool.
"""
try:
setup_cli()
except Exception as e:
print(f"An error occurred: {e}")
def setup_cli() -> None:
parser = argparse.ArgumentParser(description="Elastic Agent API CLI Tool")
subparsers = parser.add_subparsers(dest="subcommand")
list_parser = subparsers.add_parser("list", help="List Agents")
list_parser.add_argument("-os", "--os_type", type=str, default=None)
list_parser.add_argument("-ov", "--os_version", type=str, default=None)
list_parser.add_argument("-oa", "--os_arch", type=str, default=None)
list_parser.add_argument("-oh", "--hostname", type=str, default=None)
list_parser.set_defaults(func=list_agents)
run_parser = subparsers.add_parser("run", help="Run command")
run_parser.add_argument(
"-a",
"--agent",
required=True,
type=str,
help="Agent to run command on",
)
run_parser.add_argument(
"-c", "--command", required=True, type=str, help="Command to run"
)
run_parser.set_defaults(func=run_command)
actions_parser = subparsers.add_parser("actions", help="Download file")
actions_parser.add_argument("-x", "--count", type=int, default=None)
actions_parser.set_defaults(func=print_actions)
sync_parser = subparsers.add_parser("sync", help="Sync agent list")
sync_parser.set_defaults(func=sync_agents)
playbook_parser = subparsers.add_parser("playbook", help="Load YAML file")
playbook_parser.add_argument("filename", type=str, help="YAML file to load")
playbook_parser.set_defaults(func=load_playbook)
def no_subcommand(args) -> None:
print_custom_help()
parser.set_defaults(func=no_subcommand)
args = parser.parse_args()
args.func(args)
def list_agents(args) -> None:
"""List agents based on the provided filter."""
os_type = args.os_type
os_version = args.os_version
os_arch = args.os_arch
hostname = args.hostname
os_filtered_agents: list = sql_query_agents(os_type, os_version, os_arch, hostname)
for agent in os_filtered_agents:
print(f"{agent[0]}")
def sync_agents(args=None) -> None:
"""Syncs the agent list."""
list_endpoint: str = f"{os.getenv('LIST', '')}"
local_metadata_list = fetch_list(list_endpoint)
insert_agent_list(local_metadata_list)
def run_command(args) -> None:
# Implement the logic to run a command on the specified agent
agent_ids: str = args.agent
endpoint = f"{os.getenv('RUN', '')}"
#for agent_id in agent_ids_list:
execute_api(endpoint, agent_ids, args.command)
def print_actions(args) -> None:
endpoint = f"{os.getenv('ACTIONS', '')}"
base = f"{os.getenv('BASE', '')}"
count: int = args.count
get_file(base, endpoint, count)
def load_playbook(args):
# Implement the logic to load a YAML file
pass
def print_custom_help() -> None:
help_text = r"""
Usage: response [COMMAND] [OPTIONS]
response - cli program
│ List, run, sync, or load a playbook on Elastic Agents
│
├── list - List agents
│ ├── windows - List only Windows devices
│ ├── linux - List only Linux devices
│ └── macos - List only macOS devices
├── run - Run command
│ ├── options - Show command options
│ └── agent - Agent to run command on
│ └── <agent_id> required
├── sync - Sync agent list
├── playbook - Load YAML file
│ └── <filename> required
└── help - Show help
"""
print(help_text)
sys.exit(0)
if __name__ == "__main__":
main()