import json import logging import re import warnings from hashlib import sha256 from http import HTTPStatus from typing import Dict import requests from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from django.conf import settings from django.core.cache import caches from django.utils.translation import gettext_lazy from packaging import version from urllib3.exceptions import InsecureRequestWarning from core.utils import catch_exception from devices.constants import FIREWALL_TIMEOUT, MINIMAL_AIF_VERSION from devices.enums import ArmaIndustrialFirewallStatus from devices.models.firewall import ArmaIndustrialFirewall from devices.services.firewall.exception import ( ConnectionException, InvalidCredentialException, IncompatibilityVersionException, RemoteException, NoContentDispositionException, InvalidResponseException, FailedUploadException, InvalidFileException) _log = logging.getLogger(__name__) class FirewallService: SUCCESS_RESPONSE = {'status': 'ok'} def __init__(self, firewall: ArmaIndustrialFirewall = None): self.firewall = firewall @catch_exception def check_status(self) -> dict: firewall_status = caches['redis'].get(f'firewall_{self.firewall.pk}_status') return firewall_status @catch_exception def download_file(self, type_file): type_url_mapping = { 'config': ('/api/core/system/downloadConfig', 'post'), 'rulesets': ('/api/core/system/downloadIdsRulesets', 'get') } additional_url_path, method = type_url_mapping[type_file] url = self.get_addr(additional_url_path) if type_file == 'config': hashed_key = self._get_hashed_key() data = {"encrypt_password": hashed_key, "encrypt_repeat_password": hashed_key, "exclude_rrd": "on"} else: data = {} response = self.send_request(url, method, data=data) filename_header = response.headers.get('content-disposition', None) if filename_header is None: _log.error(f'No content disposition during downloading file from ARMA IF {response.content.decode("utf8")}') raise NoContentDispositionException filename = re.search("filename=(.+)", filename_header).group(1) return response.content, filename @catch_exception def upload_file(self, file, type_file): type_url_mapping = { 'config': ('/api/core/system/restoreConfig', 'post'), 'rulesets': ('/api/ids/service/addUserlocalRulesets', 'post') } additional_url_path, method = type_url_mapping[type_file] url = self.get_addr(additional_url_path) if type_file == 'config': kwargs = {'data': {'decrypt_password': self._get_hashed_key()}, 'files': {'conffile': file}} else: kwargs = {'files': {'uploadRulesetFiles[]': file}} response = self.send_request(url, method, **kwargs) response_js = self._get_json(response) if response.status_code == requests.codes.ok: if response_js.get('status', '') == 'ok': if type_file == 'config': return self.SUCCESS_RESPONSE else: return self._parse_rulesets_response(response_js) elif response_js.get('status', '') == 'invalid': _log.error(f'InvalidFile: {response_js}') raise InvalidFileException elif response_js.get('status', '') == 'failed': _log.error(f'FailedUpload: {response_js}') raise FailedUploadException else: _log.error(f'Unable to read firewall response: {response_js}') raise InvalidResponseException else: _log.error('Error uploading config to aif: ' + str(response.text)) raise FailedUploadException @catch_exception def check_connection(self, connection_data=None): """ API for checking the connection with firewall before adding it to database. Request method must be post, and request body should contain JSON with following keys: 'ip', 'key', 'secret'. Otherwise API will send JSON response with error message which signals that provided data is incorrect. Returns different JSON responses, which depends on response conditions, like following: status = 'ok' when connection has been established status = 'error' with different codes: code = 'unauthorized' if wrong authentication credentials provided code = 'connection_error' if could not get IP address, provided by user """ if not connection_data: connection_data = { 'ip': self.firewall.ip, 'key': self.firewall.key, 'secret': self.firewall.secret, } _log.info(f'Try connect to firewall: {connection_data}') session = requests.Session() session.auth = (connection_data['key'], connection_data['secret']) session.verify = False # Forming request URL to check connection with firewall request_url = f'http://{connection_data["ip"]}/api/core/system/info' try: firewall_response = session.get(request_url, timeout=FIREWALL_TIMEOUT) if firewall_response.status_code == HTTPStatus.UNAUTHORIZED: _log.error(f'Error: Invalid credential provided to connect to firewall with ip {connection_data["ip"]}') raise InvalidCredentialException except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: _log.error(f'Error: {str(e)}') raise ConnectionException firewall_info = firewall_response.json() firewall_version = firewall_info['items']['product_version'] if not self.firewall_version_validator(firewall_version): _log.error( f'The firewall version is incompatible with the current console version. IP:{connection_data["ip"]}') raise IncompatibilityVersionException @catch_exception def reboot(self): additional_url_path, method = '/api/core/system/reboot', 'get' url = self.get_addr(additional_url_path) response = self.send_request(url, method) response_js = self._get_json(response) if response_js.get('status', '') != 'ok': # Remote error _log.error(f'Unable to reboot firewall. Response: {response_js}') raise RemoteException return self.SUCCESS_RESPONSE @catch_exception def rule_fields(self, request): additional_url_path, method = '/api/firewall/filter/getRule/', 'get' url = self.get_addr(additional_url_path) response = self.send_request(url, method, request) response_js = self._get_json(response) return response_js def send_request(self, request_url, method_str, request=None, **kwargs): try: session = self.get_session(request) method = getattr(session, method_str) firewall_response = method(request_url, timeout=FIREWALL_TIMEOUT, **kwargs) if firewall_response.status_code == HTTPStatus.UNAUTHORIZED: _log.error('Invalid credentials provided to connect to firewall') raise InvalidCredentialException except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: _log.error(f'Error: {str(e)}') raise ConnectionException except requests.exceptions.RequestException as e: _log.error(f'Firewall is offline. Error: {str(e)}') raise ConnectionException return firewall_response def get_info(self): """ Used only in 'devices/tasks/firewall.py'. Returns only a dict without raising an exception so that the correct data is written to redis """ try: session = self.get_session() addr = self.get_addr('api/core/system/info') response = session.get(addr, timeout=FIREWALL_TIMEOUT) if response.status_code == requests.codes.UNAUTHORIZED: return {'status': ArmaIndustrialFirewallStatus.unauthorized} try: response_js = response.json() except ValueError as e: _log.warning(f'Invalid output from firewall: {e}') return {'status': ArmaIndustrialFirewallStatus.error, 'detail': gettext_lazy('Invalid answer from firewall')} if response_js.get('status', '') != 'ok' or response_js.get('items', {}).get('product_id', '') != 'armaif': return {'status': ArmaIndustrialFirewallStatus.error} # remote error return {'status': ArmaIndustrialFirewallStatus.online, 'data': response_js['items']} # success except requests.exceptions.RequestException as e: _log.error(f'Invalid output from firewall: {e}') return {'status': ArmaIndustrialFirewallStatus.offline} # unreachable def get_session(self, request=None): # Disabling warnings spam about certificates when ARMAIF in HTTPS mode warnings.simplefilter('ignore', InsecureRequestWarning) session = requests.Session() session.auth = (self.firewall.key, self.firewall.secret) session.verify = False if request: if request.LANGUAGE_CODE.lower() != "en": lang = f"{request.LANGUAGE_CODE.lower()}, en;q=0.8" else: lang = request.LANGUAGE_CODE.lower() session.headers.update({"Accept-Language": lang}) return session def get_addr(self, add_path=''): return 'https://{}/{}'.format(self.firewall.ip, add_path.lstrip('/')) @staticmethod def firewall_version_validator(fw_version): """ # TODO: Due to the fact that version method is not completely valid for this purpose, we need to write our own method for comparing AIF versions Function for checking ARMAIF version compatability. Correct ARMAIF version format for this validator is: '[numerical_version]-[mod_to_version]', numerical version must be in the following format: x.y.z, where: x: int, major version y: int, minor version z: int, patch number Also if version mod starts with `rc`, which means 'release candidate' - also returning True :param fw_version: armaif version to check :return: True if version os valid, False otherwise """ # Remove additions fw_version_pure_version = fw_version.split('-')[0] # Compare with minimal allowed version return not version.parse(fw_version_pure_version) < version.parse(MINIMAL_AIF_VERSION) @staticmethod def _get_hashed_key(): hashed_key = sha256(settings.SECRET_KEY.encode()).hexdigest() return hashed_key @staticmethod def _get_json(response: requests.Response) -> dict: try: return response.json() except json.decoder.JSONDecodeError: _log.error(f'Unable to read firewall response: {response.text}') raise InvalidResponseException @staticmethod def _parse_rulesets_response(response_js): results_json = { 'status': 'ok', 'success': 0, 'failed': 0 } loaded_files = response_js.get('files') for f_name, f_info in loaded_files.items(): if f_info.get('status') in ['ok', 'warning']: results_json['success'] += 1 else: results_json['failed'] += 1 msg = f_info.get('msg', f_info.get('trmsg', 'unknown reason')) _log.error(f'File {f_name} failed to upload to IDS with the following message: {msg}') return results_json def get_all_aif_with_status_map() -> Dict[str, Dict[str, str]]: """Return mapping Firewalls id with status.""" firewalls = ArmaIndustrialFirewall.objects.all() id_status_map = {str(fw.pk): FirewallService(fw).check_status() for fw in firewalls} return id_status_map def firewalls_status_notification_to_ws(): """Notification about firewalls status.""" data = get_all_aif_with_status_map() channel_layer = get_channel_layer() async_to_sync( channel_layer.group_send )('notification_incidents', {'type': 'notification', 'data': {'firewalls_status': data}})