import logging from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Tuple, Union from netmiko import BaseConnection, ConnectHandler, NetmikoBaseException from netmiko.cisco import CiscoAsaSSH _log = logging.getLogger(__name__) # TODO: Move to separate file class SSHConnectionError(Exception): def __init__(self, *args: object, **kwargs: object) -> None: _log.error( "During performing operation with continent following error occurred: " "{exc_name}".format(exc_name=self.__class__.__name__) ) super().__init__(*args) class GenericSSHSession: def __init__(self) -> None: self.device_type: str = "generic" self.ip: str = "" self.username: str = "" self.password: str = "" self.port: int = 22 self.verbose: bool = False self.session: Optional[BaseConnection] = None self.command_read_timeout: int = 30 self.excpected_end_of_command_string: Optional[str] = "" self.additional_connect_data: Optional[dict] = dict() def __del__(self): _log.info(f"Destroying connection to SSH on {self.ip}:{self.port}") if self.session: self.session.disconnect() def __str__(self) -> str: return ( f"Current SSH Session type: {self.__class__.__name__}; " f"Current SSH Session params: Device type: {self.device_type}; " f"IP: {self.ip}; Username: {self.username}; Password: {self.password}; " f"Port: {self.port}; Verbose level: {self.verbose}; " f"Commands timeout: {self.command_read_timeout} " f"Expected EOL template: {self.excpected_end_of_command_string} " f"Additional connect data: {self.additional_connect_data} " ) def send_command( self, command: str, command_type: Optional[str] ) -> Union[str, List[Any], Dict[str, Any]]: if not self.session: _log.error("Cannot send command without session") raise SSHConnectionError() try: result = self.session.send_command( command, read_timeout=self.command_read_timeout, expect_string=self.excpected_end_of_command_string, ) except NetmikoBaseException: raise SSHConnectionError() return result class CiscoAsaSSHSession(GenericSSHSession): def __init__(self) -> None: super().__init__() self.device_type: str = "cisco_asa" def send_command( self, command: str, command_type: Optional[str] ) -> Union[str, List[Any], Dict[str, Any]]: if not self.session: _log.error("Cannot send command without session") raise SSHConnectionError() try: result = self.session.send_command(command) except NetmikoBaseException: raise SSHConnectionError() return result class SSHSessionBuilderABC(ABC): @abstractmethod def ssh_session(self) -> GenericSSHSession: pass @abstractmethod def set_device_type(self, device_type: str) -> "SSHSessionBuilderABC": pass @abstractmethod def set_ip(self, ip: str) -> "SSHSessionBuilderABC": pass @abstractmethod def set_username(self, username: str) -> "SSHSessionBuilderABC": pass @abstractmethod def set_password(self, password: str) -> "SSHSessionBuilderABC": pass @abstractmethod def set_port(self, port: int) -> "SSHSessionBuilderABC": pass @abstractmethod def set_verbose_level(self, verbose_level: bool) -> "SSHSessionBuilderABC": pass @abstractmethod def set_command_timeout(self, timeout: int) -> "SSHSessionBuilderABC": pass @abstractmethod def set_eol_template_symbol(self, symbol: str) -> "SSHSessionBuilderABC": pass @abstractmethod def set_additional_connect_data( self, additional_connect_data: dict ) -> "SSHSessionBuilderABC": pass @abstractmethod def create_session(self) -> "SSHSessionBuilderABC": pass class GenericSSHSessionBuilder(SSHSessionBuilderABC): def __init__(self) -> None: self.reset() def reset(self) -> None: self._product = GenericSSHSession() def ssh_session(self) -> GenericSSHSession: new_session = self._product self.reset() return new_session def set_device_type(self, device_type: str) -> "SSHSessionBuilderABC": self._product.device_type = device_type return self def set_ip(self, ip: str) -> "SSHSessionBuilderABC": self._product.ip = ip return self def set_username(self, username: str) -> "SSHSessionBuilderABC": self._product.username = username return self def set_password(self, password: str) -> "SSHSessionBuilderABC": self._product.password = password return self def set_port(self, port: int) -> "SSHSessionBuilderABC": self._product.port = port return self def set_verbose_level(self, verbose_level: bool) -> "SSHSessionBuilderABC": self._product.verbose = verbose_level return self def set_command_timeout(self, timeout: int) -> "SSHSessionBuilderABC": self._product.command_read_timeout = timeout return self def set_eol_template_symbol(self, symbol: str) -> "SSHSessionBuilderABC": self._product.excpected_end_of_command_string = symbol return self def set_additional_connect_data( self, additional_connect_data: dict ) -> "SSHSessionBuilderABC": self._product.additional_connect_data = additional_connect_data return self def create_session(self) -> "SSHSessionBuilderABC": # Preparing data for connection connection_data = { "device_type": self._product.device_type, "host": self._product.ip, "username": self._product.username, "password": self._product.password, "port": self._product.port, "verbose": self._product.verbose, } # If additional connect data present - concat it with basic settings if self._product.additional_connect_data: connection_data.update(self._product.additional_connect_data) # Createing session try: self._product.session = ConnectHandler(**connection_data) except NetmikoBaseException: raise SSHConnectionError() return self class CiscoAsaSSHSessionBuilder(GenericSSHSessionBuilder): def __init__(self) -> None: self.reset() def reset(self) -> None: self._product = CiscoAsaSSHSession() def create_session(self) -> "SSHSessionBuilderABC": # Preparing data for connection connection_data = { "device_type": self._product.device_type, "host": self._product.ip, "username": self._product.username, "password": self._product.password, "port": self._product.port, "verbose": self._product.verbose, } # If additional connect data present - concat it with basic settings if self._product.additional_connect_data: connection_data.update(self._product.additional_connect_data) # Createing session try: self._product.session = CiscoAsaSSH(**connection_data) self._product.session.asa_login() except NetmikoBaseException: raise SSHConnectionError() return self class SSHSessionManager: def __init__(self) -> None: self._builder = None @property def builder(self) -> SSHSessionBuilderABC: return self._builder @builder.setter def builder(self, builder: SSHSessionBuilderABC): self._builder = builder def build_default_session( self, ip: str, username: str, password: str, port: int = 22, ) -> GenericSSHSession: if not self._builder: raise SSHConnectionError() self._builder.set_ip(ip).set_username(username).set_password(password).set_port( port ).create_session() return self._builder.ssh_session() def build_configured_session( self, device_type: str, ip: str, username: str, password: str, port: int = 22, verbose_level: bool = False, command_timeout: int = 10, eol_template_symbol: str = "#", additional_connect_data: dict = dict(), ) -> GenericSSHSession: if not self._builder: raise SSHConnectionError() self._builder.set_device_type(device_type).set_ip(ip).set_username( username ).set_password(password).set_port(port).set_verbose_level( verbose_level ).set_command_timeout( command_timeout ).set_eol_template_symbol( eol_template_symbol ).set_additional_connect_data( additional_connect_data ) self._builder.create_session() return self._builder.ssh_session() def build_enabled_cisco_asa_session( self, ip: str, username: str, password: str, enable_password: str, port: int = 22, ) -> GenericSSHSession: if not self._builder: raise SSHConnectionError() if not isinstance(self._builder, CiscoAsaSSHSessionBuilder): raise SSHConnectionError() self._builder.set_ip(ip).set_username(username).set_password(password).set_port( port ).set_additional_connect_data({"secret": enable_password}).create_session() return self._builder.ssh_session()