Skip to content

Commit ee3da59

Browse files
authored
Merge branch 'develop' into moxa-support
2 parents ccdc949 + 908bb68 commit ee3da59

File tree

11 files changed

+298
-9
lines changed

11 files changed

+298
-9
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ bin/.netmiko.cfg
2626

2727
# Virtual Environment files
2828
.venv/
29+
.venv_old/
2930
bin/
3031
lib/
3132
pyvenv.cfg

PLATFORMS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
- Calix B6
2626
- Casa Systems CMTS
2727
- Centec Networks
28+
- Check Point GAiA
2829
- Cisco AireOS (Wireless LAN Controllers)
2930
- Cisco ASA
3031
- Cisco S200
@@ -120,6 +121,7 @@
120121
- MRV LX
121122
- Nokia/Alcatel SR-OS
122123
- Nokia SR Linux
124+
- Perle IOLAN Console Server
123125
- Optilink EOLT 9702 (telnet only)
124126
- QuantaMesh
125127
- Rad ETX

netmiko/base_connection.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def __init__(
193193
serial_settings: Optional[Dict[str, Any]] = None,
194194
fast_cli: bool = True,
195195
_legacy_mode: bool = False,
196-
session_log: Optional[SessionLog] = None,
196+
session_log: Optional[Union[str, io.BufferedIOBase, SessionLog]] = None,
197197
session_log_record_writes: bool = False,
198198
session_log_file_mode: str = "write",
199199
allow_auto_change: bool = False,
@@ -2011,6 +2011,29 @@ def check_enable_mode(self, check_string: str = "") -> bool:
20112011
output = self.read_until_prompt(read_entire_line=True)
20122012
return check_string in output
20132013

2014+
def enable_secret_handler(
2015+
self,
2016+
pattern: str,
2017+
output: str,
2018+
re_flags: int = re.IGNORECASE,
2019+
) -> str:
2020+
"""
2021+
Some platforms require special handling when entering 'enable' mode.
2022+
2023+
Send the "secret" in response to password pattern
2024+
2025+
:param pattern: pattern to search for indicating device is waiting for password
2026+
2027+
:param output: Accumulated output from 'enable()' method.
2028+
2029+
:param re_flags: Regular expression flags used in conjunction with pattern
2030+
2031+
"""
2032+
if re.search(pattern, output, flags=re_flags):
2033+
self.write_channel(self.normalize_cmd(self.secret))
2034+
new_output = self.read_until_prompt()
2035+
return new_output
2036+
20142037
def enable(
20152038
self,
20162039
cmd: str = "",
@@ -2054,10 +2077,9 @@ def enable(
20542077
pattern=pattern, re_flags=re_flags, read_entire_line=True
20552078
)
20562079

2057-
# Send the "secret" in response to password pattern
2058-
if re.search(pattern, output, flags=re_flags):
2059-
self.write_channel(self.normalize_cmd(self.secret))
2060-
output += self.read_until_prompt()
2080+
output += self.enable_secret_handler(
2081+
pattern=pattern, output=output, re_flags=re_flags
2082+
)
20612083

20622084
# Search for terminating pattern if defined
20632085
if enable_pattern and not re.search(enable_pattern, output):

netmiko/checkpoint/checkpoint_gaia_ssh.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import time
2+
import re
3+
from typing import Optional, Any
4+
15
from netmiko.no_config import NoConfig
26
from netmiko.base_connection import BaseConnection
37

@@ -8,16 +12,87 @@ class CheckPointGaiaSSH(NoConfig, BaseConnection):
812
firewalls.
913
"""
1014

15+
prompt_pattern = r"[>#]"
16+
17+
def __init__(self, *args: Any, **kwargs: Any) -> None:
18+
# Kept running into issues with command_echo and duplicate echoes of commands.
19+
self.fast_cli = False
20+
fast_cli = kwargs.get("fast_cli") or False
21+
kwargs["fast_cli"] = fast_cli
22+
return super().__init__(*args, **kwargs)
23+
1124
def session_preparation(self) -> None:
1225
"""
1326
Prepare the session after the connection has been established.
1427
1528
Set the base prompt for interaction ('>').
1629
"""
17-
self._test_channel_read(pattern=r"[>#]")
30+
self._test_channel_read(pattern=self.prompt_pattern)
1831
self.set_base_prompt()
1932
self.disable_paging(command="set clienv rows 0")
2033

34+
# Clear read buffer
35+
time.sleep(0.3 * self.global_delay_factor)
36+
self.clear_buffer()
37+
38+
def check_enable_mode(self, check_string: str = "#") -> bool:
39+
"""Check if in enable mode. Return boolean."""
40+
return super().check_enable_mode(check_string=check_string)
41+
42+
def enable_secret_handler(
43+
self,
44+
pattern: str,
45+
output: str,
46+
re_flags: int = re.IGNORECASE,
47+
) -> str:
48+
"""
49+
Check Point Gaia requires very particular timing for this 'expert'
50+
password handling to work.
51+
52+
Send the "secret" in response to password pattern
53+
"""
54+
if re.search(pattern, output, flags=re_flags):
55+
self.write_channel(self.secret)
56+
time.sleep(0.3 * self.global_delay_factor)
57+
self.write_channel(self.RETURN)
58+
time.sleep(0.3 * self.global_delay_factor)
59+
new_output = self.read_until_pattern(pattern=self.prompt_pattern)
60+
return new_output
61+
62+
def enable(
63+
self,
64+
cmd: str = "expert",
65+
pattern: str = r"expert password",
66+
enable_pattern: Optional[str] = r"\#",
67+
check_state: bool = True,
68+
re_flags: int = re.IGNORECASE,
69+
) -> str:
70+
"""
71+
Enter expert mode.
72+
73+
Check Point Gaia is very finicky on the timing of sending this 'expert' password.
74+
"""
75+
output = super().enable(
76+
cmd=cmd,
77+
pattern=pattern,
78+
enable_pattern=enable_pattern,
79+
check_state=check_state,
80+
re_flags=re_flags,
81+
)
82+
self.set_base_prompt()
83+
return output
84+
85+
def exit_enable_mode(self, exit_command: str = "exit") -> str:
86+
"""Exit expert mode."""
87+
output = ""
88+
if self.check_enable_mode():
89+
self.write_channel(self.normalize_cmd(exit_command))
90+
output += self.read_until_pattern(pattern=r">")
91+
self.set_base_prompt()
92+
if self.check_enable_mode():
93+
raise ValueError("Failed to exit enable mode.")
94+
return output
95+
2196
def save_config(
2297
self, cmd: str = "", confirm: bool = False, confirm_response: str = ""
2398
) -> str:

netmiko/mikrotik/mikrotik_ssh.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,8 @@ def remote_space_available(self, search_pattern: str = "") -> int:
222222
sys_res = self.ssh_ctl_chan._send_command_timing_str(remote_cmd).splitlines()
223223
for res in sys_res:
224224
if "free-memory" in res:
225-
spaceMib = res.strip().replace("free-memory: ", "").replace("MiB", "")
226-
return int(float(spaceMib) * 1048576)
225+
space_str = res.strip().replace("free-memory: ", "")
226+
return self._format_to_bytes(space_str)
227227
raise ValueError("Unexpected output from remote_space_available")
228228

229229
def remote_file_size(

netmiko/nokia/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,12 @@
55
)
66

77
from netmiko.nokia.nokia_srl import NokiaSrlSSH
8+
from netmiko.nokia.nokia_isam import NokiaIsamSSH
89

9-
__all__ = ["NokiaSrosSSH", "NokiaSrosFileTransfer", "NokiaSrosTelnet", "NokiaSrlSSH"]
10+
__all__ = [
11+
"NokiaSrosSSH",
12+
"NokiaSrosFileTransfer",
13+
"NokiaSrosTelnet",
14+
"NokiaSrlSSH",
15+
"NokiaIsamSSH",
16+
]

netmiko/nokia/nokia_isam.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import time
2+
import re
3+
from typing import Any
4+
5+
from netmiko.base_connection import BaseConnection
6+
from netmiko.no_enable import NoEnable
7+
8+
9+
class NokiaIsamSSH(BaseConnection, NoEnable):
10+
11+
def session_preparation(self) -> None:
12+
self._test_channel_read()
13+
self.set_base_prompt()
14+
commands = [
15+
"environment inhibit-alarms",
16+
"environment screen-length 0",
17+
]
18+
for command in commands:
19+
self.disable_paging(command=command, cmd_verify=True, pattern=r"#")
20+
time.sleep(0.3 * self.global_delay_factor)
21+
self.clear_buffer()
22+
23+
def set_base_prompt(self, *args: Any, **kwargs: Any) -> str:
24+
"""Remove the > when navigating into the different config level."""
25+
cur_base_prompt = super().set_base_prompt(*args, **kwargs)
26+
match = re.search(r"\*?(.*?)(>.*)*#", cur_base_prompt)
27+
if match:
28+
# strip off >... from base_prompt; strip off leading *
29+
self.base_prompt: str = match.group(1)
30+
31+
return self.base_prompt
32+
33+
def cleanup(self, command: str = "logout") -> None:
34+
"""Gracefully exit the SSH session."""
35+
try:
36+
if self.check_config_mode():
37+
self.exit_config_mode()
38+
except Exception:
39+
pass
40+
# Always try to send final command
41+
if self.session_log:
42+
self.session_log.fin = True
43+
self.write_channel(command + self.RETURN)
44+
45+
def check_config_mode(
46+
self,
47+
check_string: str = ">configure",
48+
pattern: str = "#",
49+
force_regex: bool = False,
50+
) -> bool:
51+
"""Use equivalent enable method."""
52+
return super().check_config_mode(
53+
check_string=check_string, pattern=pattern, force_regex=force_regex
54+
)
55+
56+
def config_mode(
57+
self, config_command: str = "configure", pattern: str = "", re_flags: int = 0
58+
) -> str:
59+
return super().config_mode(
60+
config_command=config_command, pattern=pattern, re_flags=re_flags
61+
)
62+
63+
def exit_config_mode(self, exit_config: str = "exit", pattern: str = "") -> str:
64+
return super().exit_config_mode(exit_config=exit_config)
65+
66+
def save_config(
67+
self, cmd: str = "admin save", confirm: bool = False, confirm_response: str = ""
68+
) -> str:
69+
return self._send_command_str(
70+
command_string=cmd,
71+
strip_prompt=False,
72+
strip_command=False,
73+
)

netmiko/perle/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from netmiko.perle.perle_ssh import PerleIolanSSH
2+
3+
__all__ = ("PerleIolanSSH",)

netmiko/perle/perle_ssh.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import re
2+
from typing import Any, Dict, List, Optional, Union
3+
4+
from netmiko.cisco_base_connection import CiscoBaseConnection
5+
from netmiko.no_config import NoConfig
6+
from netmiko.utilities import structured_data_converter
7+
8+
9+
class PerleIolanSSH(NoConfig, CiscoBaseConnection):
10+
def __init__(self, *args: Any, **kwargs: Any) -> None:
11+
super().__init__(*args, **kwargs)
12+
self.default_enter = kwargs.get("default_enter", "\r")
13+
14+
def session_preparation(self) -> None:
15+
self._test_channel_read()
16+
self.set_base_prompt(alt_prompt_terminator="$")
17+
18+
def enable(
19+
self,
20+
cmd: str = "admin",
21+
pattern: str = "ssword:",
22+
enable_pattern: Optional[str] = None,
23+
check_state: bool = True,
24+
re_flags: int = re.IGNORECASE,
25+
) -> str:
26+
return super().enable(cmd, pattern, enable_pattern, check_state, re_flags)
27+
28+
def exit_enable_mode(self, *args: Any, **kwargs: Any) -> str:
29+
# Perle does not have this concept
30+
return ""
31+
32+
def save_config(
33+
self, cmd: str = "save", confirm: bool = True, confirm_response: str = "y"
34+
) -> str:
35+
return super().save_config(cmd, confirm, confirm_response)
36+
37+
def send_command_timing(
38+
self,
39+
command_string: str,
40+
*args: Any,
41+
**kwargs: Any,
42+
) -> Union[str, List[Any], Dict[str, Any]]:
43+
real_command_string = command_string
44+
real_strip_command = kwargs.get("strip_command", True)
45+
real_strip_prompt = kwargs.get("strip_prompt", True)
46+
command = command_string
47+
more = r"< Hit any key >"
48+
kwargs["strip_prompt"] = False
49+
kwargs["strip_command"] = False
50+
51+
output = str(super().send_command_timing(command_string, *args, **kwargs))
52+
53+
command_string = " "
54+
kwargs["normalize"] = False
55+
kwargs["strip_command"] = True
56+
while more in output:
57+
output = re.sub(r"\n" + more, "", output)
58+
output += str(
59+
super().send_command_timing(
60+
command_string,
61+
*args,
62+
**kwargs,
63+
)
64+
)
65+
66+
output = self._sanitize_output(
67+
output,
68+
strip_command=real_strip_command,
69+
command_string=command,
70+
strip_prompt=real_strip_prompt,
71+
)
72+
return_data = structured_data_converter(
73+
command=real_command_string,
74+
raw_data=output,
75+
platform=self.device_type,
76+
use_textfsm=kwargs.get("use_textfsm", False),
77+
use_ttp=kwargs.get("use_ttp", False),
78+
use_genie=kwargs.get("use_genie", False),
79+
textfsm_template=kwargs.get("textfsm_template", None),
80+
ttp_template=kwargs.get("ttp_template", None),
81+
raise_parsing_error=kwargs.get("raise_parsing_error", False),
82+
)
83+
84+
return return_data
85+
86+
def strip_prompt(self, a_string: str) -> str:
87+
# Delete repeated prompts
88+
old_string = a_string
89+
while (a_string := super().strip_prompt(a_string)) != old_string:
90+
old_string = a_string
91+
return a_string
92+
93+
def cleanup(self, command: str = "logout") -> None:
94+
return super().cleanup(command)

0 commit comments

Comments
 (0)