Skip to content

Commit d3a671e

Browse files
committed
Poller: More minor fixes and debug messages
We were continuing to attempt to poll when we had authentication failures and other such errors. When we retrieved netbox inventory, it was hard to know how many entries we got and how many we ignored because of an error. Finally, when we raised NotImplementedError, we didn't trap it in an exception handling clause causing an entire worker to die and we didn't print additional info about what was not implemented and on what device IP. This patch addresses all this. Signed-off-by: Dinesh Dutt <[email protected]>
1 parent 86b40b6 commit d3a671e

File tree

3 files changed

+63
-28
lines changed

3 files changed

+63
-28
lines changed

suzieq/poller/controller/source/netbox.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ async def get_inventory_list(self) -> List:
173173
except Exception as e:
174174
raise InventorySourceError(f'{self.name}: error while '
175175
f'getting devices: {e}')
176+
177+
logger.info(
178+
f'Netbox: Retrieved inventory list of {len(devices)} devices')
176179
return devices
177180

178181
async def _get_devices(self, url: str) -> Tuple[List, str]:
@@ -224,6 +227,7 @@ def get_field_value(entry: dict, fields_str: str):
224227
return cur_field
225228

226229
inventory = {}
230+
ignored_device_count = 0
227231

228232
for device in inventory_list:
229233
ipv4 = get_field_value(device, 'primary_ip4.address')
@@ -245,6 +249,7 @@ def get_field_value(entry: dict, fields_str: str):
245249
logger.warning(
246250
f"Skipping {namespace}.{hostname}: doesn't have a "
247251
"management IP")
252+
ignored_device_count += 1
248253
continue
249254

250255
address = address.split("/")[0]
@@ -255,6 +260,10 @@ def get_field_value(entry: dict, fields_str: str):
255260
'hostname': hostname,
256261
}
257262

263+
logger.info(
264+
f'Netbox: Acting on inventory of {len(inventory)} devices, '
265+
f'ignoring {ignored_device_count} devices')
266+
258267
return inventory
259268

260269
async def _execute(self):

suzieq/poller/worker/inventory/inventory.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,12 @@ async def _init_nodes(self, inventory_list:
125125
)]
126126

127127
for n in asyncio.as_completed(init_tasks):
128-
newnode = await n
128+
try:
129+
newnode = await n
130+
except Exception as e: # pylint: disable=broad-except
131+
logger.error(
132+
f'Encountered error {e} in initializing node')
133+
continue
129134
if newnode.devtype == "unsupported":
130135
logger.error(
131136
f'Unsupported device type for '

suzieq/poller/worker/nodes/node.py

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,11 @@ async def initialize(self, **kwargs) -> TNode:
184184

185185
# Now we know the dev type, fetch the data we need about the
186186
# device to get cracking
187-
if not self.devtype:
187+
if not self.devtype and self._retry:
188188
self.backoff = min(600, self.backoff * 2) + \
189189
(random.randint(0, 1000) / 1000)
190190
self.init_again_at = time.time() + self.backoff
191-
elif self.devtype != 'unsupported':
191+
elif self.devtype not in ['unsupported', None] and self._retry:
192192
# OK, we know the devtype, now initialize the info we need
193193
# to start proper operation
194194

@@ -271,6 +271,9 @@ def _create_error(self, cmd) -> dict:
271271
status = HTTPStatus.REQUEST_TIMEOUT
272272
elif isinstance(self.current_exception, asyncssh.misc.ProtocolError):
273273
status = HTTPStatus.FORBIDDEN
274+
elif isinstance(self.current_exception,
275+
asyncssh.misc.PermissionDenied):
276+
status = HTTPStatus.FORBIDDEN
274277
elif hasattr(self.current_exception, 'code'):
275278
status = self.current_exception.code
276279
else:
@@ -631,14 +634,16 @@ async def _init_ssh(self, init_dev_data=True, use_lock=True) -> None:
631634
f'{self.address}:{self.port}, {e}')
632635
self.current_exception = e
633636
await self._close_connection()
637+
self._conn = None
634638
finally:
635639
if use_lock:
636640
self.ssh_ready.release()
637641

638642
@abstractmethod
639643
async def _init_rest(self):
640644
'''Check that connectivity exists and works'''
641-
raise NotImplementedError
645+
raise NotImplementedError(
646+
f'{self.address}: REST transport is not supported')
642647

643648
# pylint: disable=unused-argument
644649
async def _local_gather(self, service_callback: Callable,
@@ -789,7 +794,7 @@ async def _exec_service(self, service_callback, svc_defn: dict,
789794
if not svc_defn:
790795
return result
791796

792-
if not self.devtype:
797+
if not self.devtype and self._retry:
793798
if self.init_again_at < time.time():
794799
await self._detect_node_type()
795800

@@ -882,7 +887,8 @@ async def _fetch_init_dev_data(self):
882887
This is where the list of commands specific to the device for
883888
extracting the said info is specified.
884889
"""
885-
raise NotImplementedError
890+
raise NotImplementedError(
891+
f'{self.address}: initing base Node class')
886892

887893
@abstractmethod
888894
async def _parse_init_dev_data(self, output: List,
@@ -897,7 +903,8 @@ async def _parse_init_dev_data(self, output: List,
897903
output: The list of outputs, one per command specified
898904
cb_token: The callback token we passed to the data fetcher function
899905
"""
900-
raise NotImplementedError
906+
raise NotImplementedError(
907+
f'{self.address}: parsing init base Node class')
901908

902909
@abstractmethod
903910
def _extract_nos_version(self, data: str) -> str:
@@ -910,7 +917,8 @@ def _extract_nos_version(self, data: str) -> str:
910917
Returns:
911918
The version as a string
912919
"""
913-
raise NotImplementedError
920+
raise NotImplementedError(
921+
f'{self.address}: extracting NOS in init base Node class')
914922

915923
@abstractmethod
916924
async def _rest_gather(self, service_callback: Callable,
@@ -932,7 +940,8 @@ async def _rest_gather(self, service_callback: Callable,
932940
only_one: Run till the first command in the list succeeds and
933941
then return
934942
"""
935-
raise NotImplementedError
943+
raise NotImplementedError(
944+
f'{self.address}: REST transport is not supported')
936945

937946
def post_commands(self, service_callback: Callable, svc_defn: Dict,
938947
cb_token: RsltToken):
@@ -1148,7 +1157,7 @@ async def _parse_init_dev_data(self, output, cb_token) -> None:
11481157
self.logger.error(f'nodeinit: Error getting version for '
11491158
f'{self.address}: {self.port}')
11501159

1151-
if output[1]["status"] == 0 or output[1]["status"] == 200:
1160+
if (len(output) > 1) and (output[1]["status"] in [0, 200]):
11521161
if self.transport == 'ssh':
11531162
try:
11541163
data = json.loads(output[1]["data"])
@@ -1192,12 +1201,12 @@ async def _parse_init_dev_data(self, output, _) -> None:
11921201
upsecs = output[0]["data"].split()[0]
11931202
self.bootupTimestamp = int(int(time.time()*1000)
11941203
- float(upsecs)*1000)
1195-
if output[1]["status"] == 0:
1204+
if (len(output) > 1) and (output[1]["status"] == 0):
11961205
data = output[1].get("data", '')
11971206
hostname = data.splitlines()[0].strip()
11981207
self.hostname = hostname
11991208

1200-
if output[2]["status"] == 0:
1209+
if (len(output) > 2) and (output[2]["status"] == 0):
12011210
data = output[2].get("data", '')
12021211
self._extract_nos_version(data)
12031212

@@ -1291,11 +1300,13 @@ class IosXRNode(Node):
12911300
'''IOSXR Node specific implementation'''
12921301

12931302
async def _init_rest(self):
1294-
raise NotImplementedError
1303+
raise NotImplementedError(
1304+
f'{self.address}: REST transport is not supported')
12951305

12961306
async def _rest_gather(self, service_callback, cmd_list, cb_token,
12971307
oformat='json', timeout=None):
1298-
raise NotImplementedError
1308+
raise NotImplementedError(
1309+
f'{self.address}: REST transport is not supported')
12991310

13001311
async def _fetch_init_dev_data(self):
13011312
"""Fill in the boot time of the node by executing certain cmds"""
@@ -1363,7 +1374,7 @@ async def _parse_init_dev_data(self, output, cb_token) -> None:
13631374

13641375
self._extract_nos_version(data)
13651376

1366-
if output[1]["status"] == 0:
1377+
if (len(output) > 1) and (output[1]["status"] == 0):
13671378
data = output[1]['data']
13681379
hostname = re.search(r'hostname (\S+)', data.strip())
13691380
if hostname:
@@ -1385,11 +1396,13 @@ class IosXENode(Node):
13851396
'''IOS-XE Node-sepcific telemetry gather implementation'''
13861397

13871398
async def _init_rest(self):
1388-
raise NotImplementedError
1399+
raise NotImplementedError(
1400+
f'{self.address}: REST transport is not supported')
13891401

13901402
async def _rest_gather(self, service_callback, cmd_list, cb_token,
13911403
oformat='json', timeout=None):
1392-
raise NotImplementedError
1404+
raise NotImplementedError(
1405+
f'{self.address}: REST transport is not supported')
13931406

13941407
async def _fetch_init_dev_data(self):
13951408
"""Fill in the boot time of the node by executing certain cmds"""
@@ -1610,22 +1623,26 @@ class IOSNode(IosXENode):
16101623
'''Classic IOS Node-specific implementation'''
16111624

16121625
async def _init_rest(self):
1613-
raise NotImplementedError
1626+
raise NotImplementedError(
1627+
f'{self.address}: REST transport is not supported')
16141628

16151629
async def _rest_gather(self, service_callback, cmd_list, cb_token,
16161630
oformat='json', timeout=None):
1617-
raise NotImplementedError
1631+
raise NotImplementedError(
1632+
f'{self.address}: REST transport is not supported')
16181633

16191634

16201635
class JunosNode(Node):
16211636
'''Juniper's Junos node-specific implementation'''
16221637

16231638
async def _init_rest(self):
1624-
raise NotImplementedError
1639+
raise NotImplementedError(
1640+
f'{self.address}: REST transport is not supported')
16251641

16261642
async def _rest_gather(self, service_callback, cmd_list, cb_token,
16271643
oformat='json', timeout=None):
1628-
raise NotImplementedError
1644+
raise NotImplementedError(
1645+
f'{self.address}: REST transport is not supported')
16291646

16301647
async def _fetch_init_dev_data(self):
16311648
"""Fill in the boot time of the node by running requisite cmd"""
@@ -1653,7 +1670,7 @@ async def _parse_init_dev_data(self, output, cb_token) -> None:
16531670
self.bootupTimestamp = (get_timestamp_from_junos_time(
16541671
timestr, output[0]['timestamp']/1000)/1000)
16551672

1656-
if output[1]["status"] == 0:
1673+
if (len(output) > 1) and (output[1]["status"] == 0):
16571674
data = output[1]["data"]
16581675
hmatch = re.search(r'\nHostname:\s+(\S+)\n', data)
16591676
if hmatch:
@@ -1676,12 +1693,14 @@ class NxosNode(Node):
16761693
'''Cisco's NXOS Node-specific implementation'''
16771694

16781695
async def _init_rest(self):
1679-
raise NotImplementedError
1696+
raise NotImplementedError(
1697+
f'{self.address}: REST transport is not supported')
16801698

16811699
async def _rest_gather(self, service_callback, cmd_list, cb_token,
16821700
oformat="json", timeout=None):
16831701
'''Gather data for service via device REST API'''
1684-
raise NotImplementedError
1702+
raise NotImplementedError(
1703+
f'{self.address}: REST transport is not supported')
16851704

16861705
async def _fetch_init_dev_data(self):
16871706
"""Fill in the boot time of the node by running requisite cmd"""
@@ -1731,12 +1750,14 @@ class SonicNode(Node):
17311750
'''SONiC Node-specific implementtaion'''
17321751

17331752
async def _init_rest(self):
1734-
raise NotImplementedError
1753+
raise NotImplementedError(
1754+
f'{self.address}: REST transport is not supported')
17351755

17361756
async def _rest_gather(self, service_callback, cmd_list, cb_token,
17371757
oformat="json", timeout=None):
17381758
'''Gather data for service via device REST API'''
1739-
raise NotImplementedError
1759+
raise NotImplementedError(
1760+
f'{self.address}: REST transport is not supported')
17401761

17411762
async def _fetch_init_dev_data(self):
17421763
"""Fill in the boot time of the node by running requisite cmd"""
@@ -1751,9 +1772,9 @@ async def _parse_init_dev_data(self, output, cb_token) -> None:
17511772
upsecs = output[0]["data"].split()[0]
17521773
self.bootupTimestamp = int(int(time.time()*1000)
17531774
- float(upsecs)*1000)
1754-
if output[1]["status"] == 0:
1775+
if (len(output) > 1) and (output[1]["status"] == 0):
17551776
self.hostname = output[1]["data"].strip()
1756-
if output[2]["status"] == 0:
1777+
if (len(output) > 2) and (output[2]["status"] == 0):
17571778
self._extract_nos_version(output[1]["data"])
17581779

17591780
def _extract_nos_version(self, data: str) -> None:

0 commit comments

Comments
 (0)