1
- import json
2
- from types import SimpleNamespace
3
-
4
- from pysnmp .entity .engine import SnmpEngine
5
1
from pysnmp .entity .rfc3413 .oneliner import cmdgen
6
- from pysnmp .hlapi import nextCmd , CommunityData , UdpTransportTarget , ContextData
7
- from pysnmp .smi .rfc1902 import ObjectType , ObjectIdentity
2
+ from snmp_cmds import Session , SNMPTimeout
8
3
9
- PATH_TO_LIST = "./OIDS.json"
10
4
11
5
class SnmpUtils :
12
6
"""
13
- SNMP Utils Class for walk/bulk/set more easily in python.
7
+ SNMP Utils Class for walk/bulk/get/ set/table more easily in python.
14
8
@author: HakkaOfDev
15
- @version: 1.0.0
9
+ @version: 1.0.1
16
10
"""
17
11
18
- def __init__ (self , host , port = 161 , community = "public" ):
12
+ def __init__ (self , host : str , port : int = 161 , read_community : str = "public" , write_community : str = "private" ,
13
+ timeout : int = 1 ):
14
+ """
15
+ Default constructor
16
+ :param host: IP of device
17
+ :param port: PORT of SNMP Agent
18
+ :param read_community: Community for read oids
19
+ :param write_community: Community for write (set)
20
+ :param timeout: Timeout for a request
21
+ """
19
22
self .host = host
20
23
self .port = port
21
- self .community = community
22
- self .defineOIDsList ()
23
-
24
- def defineOIDsList (self ):
25
- with open (PATH_TO_LIST , ) as file :
26
- self .oids = json .loads (file .read ().replace ('\n ' , '' ), object_hook = lambda d : SimpleNamespace (** d ))
27
-
28
- def get (self , oid ):
29
- return list (self .walk (oid , 1 ).values ())[- 1 ]
24
+ self .read_community = read_community
25
+ self .write_community = write_community
26
+ self .timeout = timeout
27
+ self .session = Session (ipaddress = host , port = port , read_community = read_community ,
28
+ write_community = write_community ,
29
+ timeout = timeout )
30
30
31
- def getByID (self , oid , id ):
32
- return list (self .walk (oid + "." + str (id ), 1 ).values ())[- 1 ]
33
-
34
- def isConnected (self ):
31
+ def is_online (self ):
32
+ """
33
+ Check if a device is online
34
+ :return: boolean
35
+ """
35
36
try :
36
- self .find ( " 1.3.6.1" )
37
+ self .get ( ' 1.3.6.1.2.1.1.5' )
37
38
return True
38
- except :
39
+ except SNMPTimeout as exception :
40
+ print (exception .message )
39
41
return False
40
42
41
- def bulk (self , * oids_list ):
43
+ def set (self , oid : str , value_type : str , value : str ):
44
+ """
45
+ Set value of oid
46
+ :param oid:
47
+ :param value_type: can be one of i/u/t/a/o/s/x/d/b
48
+ :param value:
49
+ """
50
+ self .session .set (oid = oid , value_type = value_type , value = value )
51
+
52
+ def get (self , oid : str ):
53
+ """
54
+ Get a specific value from an oid
55
+ :param oid:
56
+ :return: str: value
57
+ """
58
+ return self .session .walk (oid = oid )[0 ][- 1 ]
59
+
60
+ def get_by_id (self , oid : str , ID : int = 0 ):
61
+ """
62
+ Get a specific value from an oid with a specific id
63
+ :param oid:
64
+ :param ID:
65
+ :return: str: value
66
+ """
67
+ return self .get (oid = f"{ oid } .{ ID } " )
68
+
69
+ def get_table (self , oid : str , sort_key : str = None ):
70
+ """
71
+ Get a SNMP table
72
+ :param oid:
73
+ :param sort_key:
74
+ :return: list of dicts
75
+ """
76
+ return self .session .get_table (oid = oid , sortkey = sort_key )
77
+
78
+ def walk (self , oid : str ):
79
+ """
80
+ Walk command for SNMP
81
+ :param oid:
82
+ :return: dict
83
+ """
84
+ return dict (self .session .walk (oid = oid ))
85
+
86
+ def bulk (self , * OIDS_list : list ):
87
+ """
88
+ Bulk command for SNMP
89
+ :param OIDS_list: list of oids
90
+ :return: dict
91
+ """
42
92
errorIndication , errorStatus , errorIndex , snmpDataTable = cmdgen .CommandGenerator ().bulkCmd (
43
- cmdgen .CommunityData (self .community ),
93
+ cmdgen .CommunityData (self .read_community ),
44
94
cmdgen .UdpTransportTarget ((self .host , self .port )),
45
95
0 , 25 ,
46
- * oids_list ,
96
+ * OIDS_list ,
97
+ lexicographicMode = False
47
98
)
48
99
100
+ results = {}
49
101
if not errorIndication and not errorStatus :
50
- results = {}
51
102
for snmpDataTableRow in snmpDataTable :
52
103
for name , val in snmpDataTableRow :
53
104
results [str (name )] = str (snmpDataTableRow [0 ]).split (" = " )[1 ]
54
-
55
- return results
56
- return None
57
-
58
- def set (self , oid , value ):
59
- errorIndication , errorStatus , errorIndex , snmpData = cmdgen .CommandGenerator ().setCmd (
60
- cmdgen .CommunityData (self .community ),
61
- cmdgen .UdpTransportTarget ((self .host , self .port )),
62
- (ObjectType (ObjectIdentity (oid )), value )
63
- )
64
- if errorIndication :
65
- print (errorIndication )
66
- elif errorStatus :
67
- print ('%s at %s' % (
68
- errorStatus .prettyPrint (),
69
- errorIndex and snmpData [int (errorIndex ) - 1 ][0 ] or '?'
70
- ))
71
-
72
- def walk (self , oid , n = 0 , dotPrefix = False ):
73
- if dotPrefix :
74
- oid = "." + oid
75
-
76
- results = {}
77
- i = 0
78
- for (errorIndication , errorStatus , errorIndex , snmpData ) in nextCmd (SnmpEngine (),
79
- CommunityData (self .community ),
80
- UdpTransportTarget ((self .host , self .port )),
81
- ContextData (),
82
- ObjectType (ObjectIdentity (oid ))):
83
- if not errorIndication and not errorStatus :
84
- for data in snmpData :
85
- if n == 0 :
86
- results [str (data [0 ].__str__ ).split ("payload [" )[1 ][:- 4 ]] = \
87
- str (data [1 ].__str__ ).split ("payload [" )[1 ][:- 3 ]
88
- elif n != i :
89
- results [str (data [0 ].__str__ ).split ("payload [" )[1 ][:- 4 ]] = \
90
- str (data [1 ].__str__ ).split ("payload [" )[1 ][:- 3 ]
91
- i += 1
92
- else :
93
- return results
94
- return None
105
+ return results
0 commit comments