sandbox/ssh_connection_builder.py
2024-11-20 13:19:18 +03:00

324 lines
9.7 KiB
Python

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Union
from netmiko import BaseConnection, ConnectHandler, NetmikoBaseException
from netmiko.cisco import CiscoAsaSSH
_log = logging.getLogger(__name__)
# TODO: Move to separate file
class SSHConnectionError(Exception):
def __init__(self, *args: object, **kwargs: object) -> None:
_log.error(
"During performing operation with continent following error occurred: "
"{exc_name}".format(exc_name=self.__class__.__name__)
)
super().__init__(*args)
class GenericSSHSession:
def __init__(self) -> None:
self.device_type: str = "generic"
self.ip: str = ""
self.username: str = ""
self.password: str = ""
self.port: int = 22
self.verbose: bool = False
self.session: Optional[BaseConnection] = None
self.command_read_timeout: int = 30
self.excpected_end_of_command_string: Optional[str] = ""
self.additional_connect_data: Optional[dict] = dict()
def __del__(self):
_log.info(f"Destroying connection to SSH on {self.ip}:{self.port}")
if self.session:
self.session.disconnect()
def __str__(self) -> str:
return (
f"Current SSH Session type: {self.__class__.__name__}; "
f"Current SSH Session params: Device type: {self.device_type}; "
f"IP: {self.ip}; Username: {self.username}; Password: {self.password}; "
f"Port: {self.port}; Verbose level: {self.verbose}; "
f"Commands timeout: {self.command_read_timeout} "
f"Expected EOL template: {self.excpected_end_of_command_string} "
f"Additional connect data: {self.additional_connect_data} "
)
def send_command(
self, command: str, command_type: Optional[str]
) -> Union[str, List[Any], Dict[str, Any]]:
if not self.session:
_log.error("Cannot send command without session")
raise SSHConnectionError()
try:
result = self.session.send_command(
command,
read_timeout=self.command_read_timeout,
expect_string=self.excpected_end_of_command_string,
)
except NetmikoBaseException:
raise SSHConnectionError()
return result
class CiscoAsaSSHSession(GenericSSHSession):
def __init__(self) -> None:
super().__init__()
self.device_type: str = "cisco_asa"
def send_command(
self, command: str, command_type: Optional[str]
) -> Union[str, List[Any], Dict[str, Any]]:
if not self.session:
_log.error("Cannot send command without session")
raise SSHConnectionError()
try:
result = self.session.send_command(command)
except NetmikoBaseException:
raise SSHConnectionError()
return result
class SSHSessionBuilderABC(ABC):
@abstractmethod
def ssh_session(self) -> GenericSSHSession:
pass
@abstractmethod
def set_device_type(self, device_type: str) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_ip(self, ip: str) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_username(self, username: str) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_password(self, password: str) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_port(self, port: int) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_verbose_level(self, verbose_level: bool) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_command_timeout(self, timeout: int) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_eol_template_symbol(self, symbol: str) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def set_additional_connect_data(
self, additional_connect_data: dict
) -> "SSHSessionBuilderABC":
pass
@abstractmethod
def create_session(self) -> "SSHSessionBuilderABC":
pass
class GenericSSHSessionBuilder(SSHSessionBuilderABC):
def __init__(self) -> None:
self.reset()
def reset(self) -> None:
self._product = GenericSSHSession()
def ssh_session(self) -> GenericSSHSession:
new_session = self._product
self.reset()
return new_session
def set_device_type(self, device_type: str) -> "SSHSessionBuilderABC":
self._product.device_type = device_type
return self
def set_ip(self, ip: str) -> "SSHSessionBuilderABC":
self._product.ip = ip
return self
def set_username(self, username: str) -> "SSHSessionBuilderABC":
self._product.username = username
return self
def set_password(self, password: str) -> "SSHSessionBuilderABC":
self._product.password = password
return self
def set_port(self, port: int) -> "SSHSessionBuilderABC":
self._product.port = port
return self
def set_verbose_level(self, verbose_level: bool) -> "SSHSessionBuilderABC":
self._product.verbose = verbose_level
return self
def set_command_timeout(self, timeout: int) -> "SSHSessionBuilderABC":
self._product.command_read_timeout = timeout
return self
def set_eol_template_symbol(self, symbol: str) -> "SSHSessionBuilderABC":
self._product.excpected_end_of_command_string = symbol
return self
def set_additional_connect_data(
self, additional_connect_data: dict
) -> "SSHSessionBuilderABC":
self._product.additional_connect_data = additional_connect_data
return self
def create_session(self) -> "SSHSessionBuilderABC":
# Preparing data for connection
connection_data = {
"device_type": self._product.device_type,
"host": self._product.ip,
"username": self._product.username,
"password": self._product.password,
"port": self._product.port,
"verbose": self._product.verbose,
}
# If additional connect data present - concat it with basic settings
if self._product.additional_connect_data:
connection_data.update(self._product.additional_connect_data)
# Createing session
try:
self._product.session = ConnectHandler(**connection_data)
except NetmikoBaseException:
raise SSHConnectionError()
return self
class CiscoAsaSSHSessionBuilder(GenericSSHSessionBuilder):
def __init__(self) -> None:
self.reset()
def reset(self) -> None:
self._product = CiscoAsaSSHSession()
def create_session(self) -> "SSHSessionBuilderABC":
# Preparing data for connection
connection_data = {
"device_type": self._product.device_type,
"host": self._product.ip,
"username": self._product.username,
"password": self._product.password,
"port": self._product.port,
"verbose": self._product.verbose,
}
# If additional connect data present - concat it with basic settings
if self._product.additional_connect_data:
connection_data.update(self._product.additional_connect_data)
# Createing session
try:
self._product.session = CiscoAsaSSH(**connection_data)
self._product.session.asa_login()
except NetmikoBaseException:
raise SSHConnectionError()
return self
class SSHSessionManager:
def __init__(self) -> None:
self._builder = None
@property
def builder(self) -> SSHSessionBuilderABC:
return self._builder
@builder.setter
def builder(self, builder: SSHSessionBuilderABC):
self._builder = builder
def build_default_session(
self,
ip: str,
username: str,
password: str,
port: int = 22,
) -> GenericSSHSession:
if not self._builder:
raise SSHConnectionError()
self._builder.set_ip(ip).set_username(username).set_password(password).set_port(
port
).create_session()
return self._builder.ssh_session()
def build_configured_session(
self,
device_type: str,
ip: str,
username: str,
password: str,
port: int = 22,
verbose_level: bool = False,
command_timeout: int = 10,
eol_template_symbol: str = "#",
additional_connect_data: dict = dict(),
) -> GenericSSHSession:
if not self._builder:
raise SSHConnectionError()
self._builder.set_device_type(device_type).set_ip(ip).set_username(
username
).set_password(password).set_port(port).set_verbose_level(
verbose_level
).set_command_timeout(
command_timeout
).set_eol_template_symbol(
eol_template_symbol
).set_additional_connect_data(
additional_connect_data
)
self._builder.create_session()
return self._builder.ssh_session()
def build_enabled_cisco_asa_session(
self,
ip: str,
username: str,
password: str,
enable_password: str,
port: int = 22,
) -> GenericSSHSession:
if not self._builder:
raise SSHConnectionError()
if not isinstance(self._builder, CiscoAsaSSHSessionBuilder):
raise SSHConnectionError()
self._builder.set_ip(ip).set_username(username).set_password(password).set_port(
port
).set_additional_connect_data({"secret": enable_password}).create_session()
return self._builder.ssh_session()
class TestKek
TestKek which
TestKek