324 lines
9.7 KiB
Python
324 lines
9.7 KiB
Python
import logging
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
from netmiko import BaseConnection, ConnectHandler, NetmikoBaseException
|
|
from netmiko.cisco import CiscoAsaSSH
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
# TODO: Move to separate file
|
|
class SSHConnectionError(Exception):
|
|
def __init__(self, *args: object, **kwargs: object) -> None:
|
|
_log.error(
|
|
"During performing operation with continent following error occurred: "
|
|
"{exc_name}".format(exc_name=self.__class__.__name__)
|
|
)
|
|
super().__init__(*args)
|
|
|
|
|
|
class GenericSSHSession:
|
|
def __init__(self) -> None:
|
|
self.device_type: str = "generic"
|
|
self.ip: str = ""
|
|
self.username: str = ""
|
|
self.password: str = ""
|
|
self.port: int = 22
|
|
self.verbose: bool = False
|
|
self.session: Optional[BaseConnection] = None
|
|
self.command_read_timeout: int = 30
|
|
self.excpected_end_of_command_string: Optional[str] = ""
|
|
self.additional_connect_data: Optional[dict] = dict()
|
|
|
|
def __del__(self):
|
|
_log.info(f"Destroying connection to SSH on {self.ip}:{self.port}")
|
|
if self.session:
|
|
self.session.disconnect()
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f"Current SSH Session type: {self.__class__.__name__}; "
|
|
f"Current SSH Session params: Device type: {self.device_type}; "
|
|
f"IP: {self.ip}; Username: {self.username}; Password: {self.password}; "
|
|
f"Port: {self.port}; Verbose level: {self.verbose}; "
|
|
f"Commands timeout: {self.command_read_timeout} "
|
|
f"Expected EOL template: {self.excpected_end_of_command_string} "
|
|
f"Additional connect data: {self.additional_connect_data} "
|
|
)
|
|
|
|
def send_command(
|
|
self, command: str, command_type: Optional[str]
|
|
) -> Union[str, List[Any], Dict[str, Any]]:
|
|
if not self.session:
|
|
_log.error("Cannot send command without session")
|
|
raise SSHConnectionError()
|
|
try:
|
|
result = self.session.send_command(
|
|
command,
|
|
read_timeout=self.command_read_timeout,
|
|
expect_string=self.excpected_end_of_command_string,
|
|
)
|
|
except NetmikoBaseException:
|
|
raise SSHConnectionError()
|
|
return result
|
|
|
|
|
|
class CiscoAsaSSHSession(GenericSSHSession):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.device_type: str = "cisco_asa"
|
|
|
|
def send_command(
|
|
self, command: str, command_type: Optional[str]
|
|
) -> Union[str, List[Any], Dict[str, Any]]:
|
|
if not self.session:
|
|
_log.error("Cannot send command without session")
|
|
raise SSHConnectionError()
|
|
try:
|
|
result = self.session.send_command(command)
|
|
except NetmikoBaseException:
|
|
raise SSHConnectionError()
|
|
return result
|
|
|
|
|
|
class SSHSessionBuilderABC(ABC):
|
|
@abstractmethod
|
|
def ssh_session(self) -> GenericSSHSession:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_device_type(self, device_type: str) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_ip(self, ip: str) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_username(self, username: str) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_password(self, password: str) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_port(self, port: int) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_verbose_level(self, verbose_level: bool) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_command_timeout(self, timeout: int) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_eol_template_symbol(self, symbol: str) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_additional_connect_data(
|
|
self, additional_connect_data: dict
|
|
) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def create_session(self) -> "SSHSessionBuilderABC":
|
|
pass
|
|
|
|
|
|
class GenericSSHSessionBuilder(SSHSessionBuilderABC):
|
|
def __init__(self) -> None:
|
|
self.reset()
|
|
|
|
def reset(self) -> None:
|
|
self._product = GenericSSHSession()
|
|
|
|
def ssh_session(self) -> GenericSSHSession:
|
|
new_session = self._product
|
|
self.reset()
|
|
return new_session
|
|
|
|
def set_device_type(self, device_type: str) -> "SSHSessionBuilderABC":
|
|
self._product.device_type = device_type
|
|
return self
|
|
|
|
def set_ip(self, ip: str) -> "SSHSessionBuilderABC":
|
|
self._product.ip = ip
|
|
return self
|
|
|
|
def set_username(self, username: str) -> "SSHSessionBuilderABC":
|
|
self._product.username = username
|
|
return self
|
|
|
|
def set_password(self, password: str) -> "SSHSessionBuilderABC":
|
|
self._product.password = password
|
|
return self
|
|
|
|
def set_port(self, port: int) -> "SSHSessionBuilderABC":
|
|
self._product.port = port
|
|
return self
|
|
|
|
def set_verbose_level(self, verbose_level: bool) -> "SSHSessionBuilderABC":
|
|
self._product.verbose = verbose_level
|
|
return self
|
|
|
|
def set_command_timeout(self, timeout: int) -> "SSHSessionBuilderABC":
|
|
self._product.command_read_timeout = timeout
|
|
return self
|
|
|
|
def set_eol_template_symbol(self, symbol: str) -> "SSHSessionBuilderABC":
|
|
self._product.excpected_end_of_command_string = symbol
|
|
return self
|
|
|
|
def set_additional_connect_data(
|
|
self, additional_connect_data: dict
|
|
) -> "SSHSessionBuilderABC":
|
|
self._product.additional_connect_data = additional_connect_data
|
|
return self
|
|
|
|
def create_session(self) -> "SSHSessionBuilderABC":
|
|
# Preparing data for connection
|
|
connection_data = {
|
|
"device_type": self._product.device_type,
|
|
"host": self._product.ip,
|
|
"username": self._product.username,
|
|
"password": self._product.password,
|
|
"port": self._product.port,
|
|
"verbose": self._product.verbose,
|
|
}
|
|
# If additional connect data present - concat it with basic settings
|
|
if self._product.additional_connect_data:
|
|
connection_data.update(self._product.additional_connect_data)
|
|
# Createing session
|
|
try:
|
|
self._product.session = ConnectHandler(**connection_data)
|
|
except NetmikoBaseException:
|
|
raise SSHConnectionError()
|
|
return self
|
|
|
|
|
|
class CiscoAsaSSHSessionBuilder(GenericSSHSessionBuilder):
|
|
def __init__(self) -> None:
|
|
self.reset()
|
|
|
|
def reset(self) -> None:
|
|
self._product = CiscoAsaSSHSession()
|
|
|
|
def create_session(self) -> "SSHSessionBuilderABC":
|
|
# Preparing data for connection
|
|
connection_data = {
|
|
"device_type": self._product.device_type,
|
|
"host": self._product.ip,
|
|
"username": self._product.username,
|
|
"password": self._product.password,
|
|
"port": self._product.port,
|
|
"verbose": self._product.verbose,
|
|
}
|
|
# If additional connect data present - concat it with basic settings
|
|
if self._product.additional_connect_data:
|
|
connection_data.update(self._product.additional_connect_data)
|
|
# Createing session
|
|
try:
|
|
self._product.session = CiscoAsaSSH(**connection_data)
|
|
self._product.session.asa_login()
|
|
except NetmikoBaseException:
|
|
raise SSHConnectionError()
|
|
return self
|
|
|
|
|
|
class SSHSessionManager:
|
|
def __init__(self) -> None:
|
|
self._builder = None
|
|
|
|
@property
|
|
def builder(self) -> SSHSessionBuilderABC:
|
|
return self._builder
|
|
|
|
@builder.setter
|
|
def builder(self, builder: SSHSessionBuilderABC):
|
|
self._builder = builder
|
|
|
|
def build_default_session(
|
|
self,
|
|
ip: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 22,
|
|
) -> GenericSSHSession:
|
|
if not self._builder:
|
|
raise SSHConnectionError()
|
|
self._builder.set_ip(ip).set_username(username).set_password(password).set_port(
|
|
port
|
|
).create_session()
|
|
return self._builder.ssh_session()
|
|
|
|
def build_configured_session(
|
|
self,
|
|
device_type: str,
|
|
ip: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 22,
|
|
verbose_level: bool = False,
|
|
command_timeout: int = 10,
|
|
eol_template_symbol: str = "#",
|
|
additional_connect_data: dict = dict(),
|
|
) -> GenericSSHSession:
|
|
if not self._builder:
|
|
raise SSHConnectionError()
|
|
self._builder.set_device_type(device_type).set_ip(ip).set_username(
|
|
username
|
|
).set_password(password).set_port(port).set_verbose_level(
|
|
verbose_level
|
|
).set_command_timeout(
|
|
command_timeout
|
|
).set_eol_template_symbol(
|
|
eol_template_symbol
|
|
).set_additional_connect_data(
|
|
additional_connect_data
|
|
)
|
|
self._builder.create_session()
|
|
return self._builder.ssh_session()
|
|
|
|
def build_enabled_cisco_asa_session(
|
|
self,
|
|
ip: str,
|
|
username: str,
|
|
password: str,
|
|
enable_password: str,
|
|
port: int = 22,
|
|
) -> GenericSSHSession:
|
|
if not self._builder:
|
|
raise SSHConnectionError()
|
|
if not isinstance(self._builder, CiscoAsaSSHSessionBuilder):
|
|
raise SSHConnectionError()
|
|
self._builder.set_ip(ip).set_username(username).set_password(password).set_port(
|
|
port
|
|
).set_additional_connect_data({"secret": enable_password}).create_session()
|
|
return self._builder.ssh_session()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestKek
|
|
TestKek which
|
|
TestKek
|
|
|