Skip to content

Commit b6db6ef

Browse files
authored
Merge pull request #736 from netenglabs/pacer-pt2
Support pacing commands/logins
2 parents 3af8b51 + 6d6fe1e commit b6db6ef

File tree

12 files changed

+130
-57
lines changed

12 files changed

+130
-57
lines changed

suzieq/poller/controller/source/base_source.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, input_data, validate: bool = True) -> None:
4949
'jump_host_key_file',
5050
'ignore_known_hosts',
5151
'slow_host',
52+
'per_cmd_auth',
5253
]
5354

5455
self._load(input_data)
@@ -204,6 +205,8 @@ def set_device(self, inventory: Dict[str, Dict]):
204205
transport = None
205206
ignore_known_hosts = False
206207
slow_host = False
208+
per_cmd_auth = True
209+
retries_on_auth_fail = 0
207210
port = None
208211
devtype = None
209212

@@ -223,6 +226,8 @@ def set_device(self, inventory: Dict[str, Dict]):
223226
transport = transport.value
224227
ignore_known_hosts = self._device.get('ignore-known-hosts', False)
225228
slow_host = self._device.get('slow-host', False)
229+
per_cmd_auth = self._device.get('per-cmd-auth', True)
230+
retries_on_auth_fail = self._device.get('retries-on-auth-fail', 0)
226231
port = self._device.get('port')
227232
devtype = self._device.get('devtype')
228233

@@ -237,13 +242,19 @@ def set_device(self, inventory: Dict[str, Dict]):
237242
'port': node.get('port') or port or 22,
238243
'devtype': node.get('devtype') or devtype,
239244
'slow_host': node.get('slow_host', '') or slow_host,
245+
'per_cmd_auth': ((node.get('per_cmd_auth', '') != '')
246+
or per_cmd_auth),
247+
'retries_on_auth_fail': ((node.get('retries_on_auth_fail',
248+
-1) != -1) or
249+
retries_on_auth_fail)
240250
})
241251

242252
def _validate_device(self):
243253
if self._device:
244254
dev_fields = ['name', 'jump-host', 'jump-host-key-file',
245255
'ignore-known-hosts', 'transport', 'port',
246-
'slow-host', 'devtype']
256+
'slow-host', 'per_cmd_auth', 'retries_on_auth_fail',
257+
'devtype']
247258
inv_fields = [x for x in self._device if x not in dev_fields]
248259
if inv_fields:
249260
raise InventorySourceError(

suzieq/poller/controller/utils/inventory_models.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,14 @@ class DeviceModel(BaseModel):
3030
ignore_known_hosts: Optional[bool] = Field(
3131
alias='ignore-known-hosts', default=False)
3232
slow_host: Optional[bool] = Field(alias='slow-host', default=False)
33+
per_cmd_auth: Optional[bool] = Field(alias='per-cmd-auth', default=True)
34+
retries_on_auth_fail: Optional[int] = Field(alias='retries-on-auth-fail',
35+
default=1)
3336
transport: Optional[PollerTransport]
3437
port: Optional[str]
3538
devtype: Optional[str]
3639

40+
# pylint: disable=no-self-argument
3741
@validator('jump_host')
3842
def jump_host_validator(cls, value):
3943
'''Validate jump host format'''

suzieq/poller/worker/inventory/inventory.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(self, add_task_fn: Callable, **kwargs) -> None:
3333
self.add_task_fn = add_task_fn
3434
self._max_outstanding_cmd = 0
3535
self._cmd_semaphore = None
36+
self._cmd_pacer_mutex = None
3637

3738
self.connect_timeout = kwargs.pop('connect_timeout', 15)
3839
self.ssh_config_file = kwargs.pop('ssh_config_file', None)
@@ -75,6 +76,7 @@ async def build_inventory(self) -> Dict[str, Node]:
7576

7677
if self._max_outstanding_cmd:
7778
self._cmd_semaphore = asyncio.Semaphore(self._max_outstanding_cmd)
79+
self._cmd_pacer_mutex = asyncio.Lock()
7880

7981
# Initialize the nodes in the inventory
8082
self._nodes = await self._init_nodes(inventory_list)
@@ -123,19 +125,30 @@ async def _init_nodes(self, inventory_list:
123125

124126
for host in inventory_list:
125127
new_node = Node()
126-
init_tasks += [new_node.initialize(
127-
**host,
128-
cmd_sem=self._cmd_semaphore,
129-
connect_timeout=self.connect_timeout,
130-
ssh_config_file=self.ssh_config_file
131-
)]
128+
if self._max_outstanding_cmd > 0:
129+
init_tasks += [new_node.initialize(
130+
**host,
131+
cmd_sem=self._cmd_semaphore,
132+
cmd_mutex=self._cmd_pacer_mutex,
133+
cmd_pacer_sleep=float(1/self._max_outstanding_cmd),
134+
connect_timeout=self.connect_timeout,
135+
ssh_config_file=self.ssh_config_file
136+
)]
137+
else:
138+
init_tasks += [new_node.initialize(
139+
**host,
140+
cmd_sem=self._cmd_semaphore,
141+
cmd_mutex=self._cmd_pacer_mutex,
142+
connect_timeout=self.connect_timeout,
143+
ssh_config_file=self.ssh_config_file
144+
)]
132145

133146
for n in asyncio.as_completed(init_tasks):
134147
try:
135148
newnode = await n
136149
except Exception as e: # pylint: disable=broad-except
137150
logger.error(
138-
f'Encountered error {e} in initializing node')
151+
f'Encountered error {str(e)} in initializing node')
139152
continue
140153
if newnode.devtype == "unsupported":
141154
logger.error(

suzieq/poller/worker/nodes/node.py

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,14 @@ async def initialize(self, **kwargs) -> TNode:
109109
self._current_exception = None
110110
self.api_key = None
111111
self._stdin = self._stdout = self._long_proc = None
112-
self._retry = True # set to False if authentication fails
112+
self._max_retries_on_auth_fail = (kwargs.get('retries_on_auth_fail')
113+
or 0) + 1
114+
self._retry = self._max_retries_on_auth_fail
113115
self._discovery_lock = asyncio.Lock()
114116
self._cmd_sem = kwargs.get('cmd_sem', None)
117+
self._cmd_mutex = kwargs.get('cmd_mutex', None)
118+
self._cmd_pacer_sleep = kwargs.get('cmd_pacer_sleep', None)
119+
self.per_cmd_auth = kwargs.get('per_cmd_auth', True)
115120

116121
self.address = kwargs["address"]
117122
self.hostname = kwargs["address"] # default till we get hostname
@@ -242,31 +247,33 @@ def is_connected(self):
242247
return self._conn is not None
243248

244249
@asynccontextmanager
245-
async def limit_pipeline(self):
250+
async def cmd_pacer(self, use_sem: bool = True):
246251
'''Context Manager to implement throttling of commands.
247252
248253
In many networks, backend authentication servers such as TACACS which
249254
handle authentication of logins and even command execution, cannot
250255
large volumes of authentication requests. Thanks to our use of
251256
asyncio, we can easily sends hundreds of connection requests to such
252257
servers, which effectively turns into authentication failures. To
253-
handle this, we add a user-specified maximum of simultaneous
254-
commands/logins at any given time. This code implements that locking
255-
context.
258+
handle this, we add a user-specified maximum of rate of cmds/sec
259+
that the authentication can handle, and we pace it out. This code
260+
implements that pacer.
261+
262+
Some networks communicate with a backend authentication server only
263+
on login while others contact it for authorization of a command as
264+
well. Its to handle this difference that we pass use_sem. Users set
265+
the per_cmd_auth to True if authorization is used. The caller of this
266+
function sets the use_sem apppropriately depending on when the context
267+
is invoked.
268+
269+
Args:
270+
use_sem(bool): True if you want to use the pacer
256271
'''
257-
if self._cmd_sem:
258-
self.logger.debug(
259-
f'{self.transport}://{self.hostname}:{self.port}: Get lock')
260-
await self._cmd_sem.acquire()
261-
self.logger.debug(
262-
f'{self.transport}://{self.hostname}:{self.port}: Got lock')
263-
try:
272+
if self._cmd_sem and use_sem:
273+
async with self._cmd_sem:
274+
async with self._cmd_mutex:
275+
await asyncio.sleep(self._cmd_pacer_sleep)
264276
yield
265-
finally:
266-
self.logger.debug(
267-
f'{self.transport}://{self.hostname}:{self.port}: '
268-
'Free lock')
269-
self._cmd_sem.release()
270277
else:
271278
yield
272279

@@ -643,7 +650,7 @@ async def _init_ssh(self, init_dev_data=True, use_lock=True) -> None:
643650
self.ssh_ready.release()
644651
return
645652

646-
async with self.limit_pipeline():
653+
async with self.cmd_pacer():
647654
try:
648655
if self._tunnel:
649656
self._conn = await self._tunnel.connect_ssh(
@@ -659,6 +666,8 @@ async def _init_ssh(self, init_dev_data=True, use_lock=True) -> None:
659666
self.logger.info(
660667
f"Connected to {self.address}:{self.port} at "
661668
f"{time.time()}")
669+
# Reset authentication fail attempt on success
670+
self._retry = self._max_retries_on_auth_fail
662671
except Exception as e: # pylint: disable=broad-except
663672
if isinstance(e, asyncssh.HostKeyNotVerifiable):
664673
self.logger.error(
@@ -672,7 +681,7 @@ async def _init_ssh(self, init_dev_data=True, use_lock=True) -> None:
672681
f'Authentication failed to {self.address}. '
673682
'Not retrying to avoid locking out user. Please '
674683
'restart poller with proper authentication')
675-
self._retry = False
684+
self._retry -= 1
676685
else:
677686
self.logger.error('Unable to connect to '
678687
f'{self.address}:{self.port}, {e}')
@@ -790,7 +799,7 @@ async def _ssh_gather(self, service_callback: Callable,
790799
cb_token.node_token = self.bootupTimestamp
791800

792801
timeout = timeout or self.cmd_timeout
793-
async with self.limit_pipeline():
802+
async with self.cmd_pacer(self.per_cmd_auth):
794803
for cmd in cmd_list:
795804
try:
796805
output = await asyncio.wait_for(self._conn.run(cmd),
@@ -1162,7 +1171,7 @@ async def _rest_gather(self, service_callback, cmd_list, cb_token,
11621171
output = []
11631172
status = 200 # status OK
11641173

1165-
async with self.limit_pipeline():
1174+
async with self.cmd_pacer(self.per_cmd_auth):
11661175
try:
11671176
async with aiohttp.ClientSession(
11681177
auth=auth, conn_timeout=self.connect_timeout,
@@ -1309,7 +1318,7 @@ async def _init_rest(self):
13091318
url = "https://{0}:{1}/nclu/v1/rpc".format(self.address, self.port)
13101319
headers = {"Content-Type": "application/json"}
13111320

1312-
async with self.limit_pipeline():
1321+
async with self.cmd_pacer(self.per_cmd_auth):
13131322
try:
13141323
async with aiohttp.ClientSession(
13151324
auth=auth, timeout=self.cmd_timeout,
@@ -1334,7 +1343,7 @@ async def _rest_gather(self, service_callback, cmd_list, cb_token,
13341343
url = "https://{0}:{1}/nclu/v1/rpc".format(self.address, self.port)
13351344
headers = {"Content-Type": "application/json"}
13361345

1337-
async with self.limit_pipeline():
1346+
async with self.cmd_pacer(self.per_cmd_auth):
13381347
try:
13391348
async with aiohttp.ClientSession(
13401349
auth=auth,
@@ -1524,7 +1533,7 @@ async def _init_ssh(self, init_dev_data=True,
15241533
if self.is_connected and not self._stdin:
15251534
self.logger.info(
15261535
f'Trying to create Persistent SSH for {self.hostname}')
1527-
async with self.limit_pipeline():
1536+
async with self.cmd_pacer(self.per_cmd_auth):
15281537
try:
15291538
self._stdin, self._stdout, self._stderr = \
15301539
await self._conn.open_session(term_type='xterm')
@@ -1537,11 +1546,15 @@ async def _init_ssh(self, init_dev_data=True,
15371546
await self._close_connection()
15381547
self._conn = None
15391548
self._stdin = None
1540-
self._retry = False # No retry if escalation fails
1549+
self._retry -= 1
15411550
if use_lock:
15421551
self.ssh_ready.release()
15431552
return
1553+
# Reset number of retries on successful auth
1554+
self._retry = self._max_retries_on_auth_fail
15441555
except Exception as e:
1556+
if isinstance(e, asyncssh.misc.PermissionDenied):
1557+
self._retry -= 1
15451558
self.current_exception = e
15461559
self.logger.error('Unable to create persistent SSH session'
15471560
f' for {self.hostname} due to {str(e)}')
@@ -1657,7 +1670,7 @@ async def _ssh_gather(self, service_callback, cmd_list, cb_token, oformat,
16571670
return
16581671

16591672
timeout = timeout or self.cmd_timeout
1660-
async with self.limit_pipeline():
1673+
async with self.cmd_pacer(self.per_cmd_auth):
16611674
for cmd in cmd_list:
16621675
try:
16631676
if self.slow_host:
@@ -1885,7 +1898,7 @@ async def _fetch_init_dev_data(self):
18851898
try:
18861899
res = []
18871900
# temporary hack to detect device info using ssh
1888-
async with self.limit_pipeline():
1901+
async with self.cmd_pacer():
18891902
async with asyncssh.connect(
18901903
self.address, port=22, username=self.username,
18911904
password=self.password, known_hosts=None) as conn:
@@ -1922,7 +1935,7 @@ async def get_api_key(self):
19221935
url = f"https://{self.address}:{self.port}/api/?type=keygen&user=" \
19231936
f"{self.username}&password={self.password}"
19241937

1925-
async with self.limit_pipeline():
1938+
async with self.cmd_pacer(self.per_cmd_auth):
19261939
async with self._session.get(url, timeout=self.connect_timeout) \
19271940
as response:
19281941
status, xml = response.status, await response.text()
@@ -1974,7 +1987,7 @@ def _extract_nos_version(self, data: str) -> None:
19741987
async def _init_rest(self):
19751988
# In case of PANOS, getting here means REST is up
19761989
if not self._session:
1977-
async with self.limit_pipeline():
1990+
async with self.cmd_pacer(self.per_cmd_auth):
19781991
try:
19791992
self._session = aiohttp.ClientSession(
19801993
conn_timeout=self.connect_timeout,
@@ -2010,7 +2023,7 @@ async def _rest_gather(self, service_callback, cmd_list, cb_token,
20102023
await service_callback(result, cb_token)
20112024
return
20122025

2013-
async with self.limit_pipeline():
2026+
async with self.cmd_pacer(self.per_cmd_auth):
20142027
try:
20152028
for cmd in cmd_list:
20162029
url_cmd = f"{url}?type=op&cmd={cmd}&key={self.api_key}"

tests/integration/sqcmds/common-samples/deprecated.yml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,23 +68,23 @@ tests:
6868
marks: network show deprecated
6969
output: "WARNING: 'network show' is deprecated. Use 'namespace show' instead.\n\
7070
\ namespace deviceCnt ... hasMlag lastUpdate\n0 \
71-
\ dual-bgp 14 ... True 2021-12-10 18:37:11.921000+00:00\n1 \
72-
\ dual-evpn 14 ... True 2021-12-10 18:34:58.729000+00:00\n2 \
73-
\ eos 14 ... True 2021-12-10 18:33:51.163000+00:00\n3 \
74-
\ junos 12 ... False 2022-02-05 14:03:21.736000+00:00\n4 mixed\
75-
\ 8 ... False 2021-12-10 18:33:53.280000+00:00\n5 nxos \
76-
\ 14 ... True 2021-12-10 18:33:51.216000+00:00\n6 ospf-ibgp \
77-
\ 14 ... True 2022-01-10 16:01:05.167000+00:00\n7 ospf-single \
78-
\ 14 ... False 2021-12-10 18:36:06.022000+00:00\n8 panos \
79-
\ 14 ... True 2021-12-14 16:21:20.095000+00:00\n9 vmx 5\
80-
\ ... False 2021-12-10 18:33:50.623000+00:00\n\n[10 rows x 9 columns]\n"
71+
\ dual-bgp 14 ... True 2022-05-15 04:08:24.070000+00:00\n1 \
72+
\ dual-evpn 14 ... True 2022-05-15 04:06:10.720000+00:00\n2 \
73+
\ eos 14 ... True 2022-05-15 04:05:01.207000+00:00\n3 \
74+
\ junos 12 ... False 2022-05-15 04:05:01.227000+00:00\n4 mixed\
75+
\ 8 ... False 2022-05-15 04:05:00.852000+00:00\n5 nxos \
76+
\ 14 ... True 2022-05-15 13:25:17.544000+00:00\n6 ospf-ibgp \
77+
\ 14 ... True 2022-05-15 04:05:00.802000+00:00\n7 ospf-single \
78+
\ 14 ... False 2022-05-15 04:07:17.124000+00:00\n8 panos \
79+
\ 14 ... True 2022-05-15 04:05:01.957000+00:00\n9 vmx 5\
80+
\ ... False 2022-05-15 04:04:59.544000+00:00\n\n[10 rows x 9 columns]\n"
8181
- command: network summarize
8282
data-directory: tests/data/parquet/
8383
format: text
8484
marks: network summarize deprecated
8585
output: "WARNING: 'network summarize' is deprecated. Use 'namespace summarize' instead.\n\
8686
\ summary\nnamespacesCnt 10\nservicePerNsStat\
87-
\ [13, 18, 17.5]\nnsWithMlagCnt 6\nnsWithBgpCnt \
87+
\ [13, 18, 17.0]\nnsWithMlagCnt 6\nnsWithBgpCnt \
8888
\ 8\nnsWithOspfCnt 7\nnsWithVxlanCnt 6\n\
8989
nsWithErrsvcCnt 0\n"
9090
- command: network top --what=deviceCnt
@@ -93,11 +93,11 @@ tests:
9393
marks: network top deprecated
9494
output: "WARNING: 'network top' is deprecated. Use 'namespace top' instead.\n \
9595
\ namespace deviceCnt ... hasMlag lastUpdate\n0 \
96-
\ panos 14 ... True 2021-12-14 16:21:20.095000+00:00\n1 ospf-single\
97-
\ 14 ... False 2021-12-10 18:36:06.022000+00:00\n2 ospf-ibgp \
98-
\ 14 ... True 2022-01-10 16:01:05.167000+00:00\n3 nxos \
99-
\ 14 ... True 2021-12-10 18:33:51.216000+00:00\n4 eos \
100-
\ 14 ... True 2021-12-10 18:33:51.163000+00:00\n\n[5 rows x 9 columns]\n"
96+
\ panos 14 ... True 2022-05-15 04:05:01.957000+00:00\n1 ospf-single\
97+
\ 14 ... False 2022-05-15 04:07:17.124000+00:00\n2 ospf-ibgp \
98+
\ 14 ... True 2022-05-15 04:05:00.802000+00:00\n3 nxos \
99+
\ 14 ... True 2022-05-15 13:25:17.544000+00:00\n4 eos \
100+
\ 14 ... True 2022-05-15 04:05:01.207000+00:00\n\n[5 rows x 9 columns]\n"
101101
- command: network unique
102102
data-directory: tests/data/parquet/
103103
format: text

tests/integration/test_rest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,6 @@ def get_args_to_match(sqobj: SqObject, verbs: List[str]) -> List[str]:
397397
def get_supported_verbs(sqobj: SqObject) -> List[str]:
398398
if sqobj.table == 'routes':
399399
supported_verbs = [e.value for e in RouteVerbs]
400-
elif sqobj.table == 'network':
401-
supported_verbs = [e.value for e in NetworkVerbs]
402400
else:
403401
if sqobj._valid_assert_args:
404402
# the assertion is supported for this class
@@ -408,6 +406,9 @@ def get_supported_verbs(sqobj: SqObject) -> List[str]:
408406
if sqobj.table == 'tables':
409407
supported_verbs.append('describe')
410408

409+
if sqobj.table == 'network':
410+
supported_verbs += [e.value for e in NetworkVerbs]
411+
411412
return supported_verbs
412413

413414

@@ -685,6 +686,9 @@ def test_routes_sqobj_consistency():
685686
assert_missing_args(top_args, query_params, 'top', table)
686687
query_params = query_params.difference(top_args)
687688

689+
if table == 'network' and 'find' not in route.verbs:
690+
# do not check deprecated commands. They are already checked
691+
continue
688692
args_to_match = get_args_to_match(sqobj, route.verbs)
689693
args_to_match = {a for a in args_to_match
690694
if a not in common_args.union(top_args)}

0 commit comments

Comments
 (0)