old_console/devices/services/firewall/firewall.py
2024-11-02 14:12:45 +03:00

290 lines
12 KiB
Python

import json
import logging
import re
import warnings
from hashlib import sha256
from http import HTTPStatus
from typing import Dict
import requests
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
from django.core.cache import caches
from django.utils.translation import gettext_lazy
from packaging import version
from urllib3.exceptions import InsecureRequestWarning
from core.utils import catch_exception
from devices.constants import FIREWALL_TIMEOUT, MINIMAL_AIF_VERSION
from devices.enums import ArmaIndustrialFirewallStatus
from devices.models.firewall import ArmaIndustrialFirewall
from devices.services.firewall.exception import (
ConnectionException, InvalidCredentialException, IncompatibilityVersionException, RemoteException,
NoContentDispositionException, InvalidResponseException, FailedUploadException, InvalidFileException)
_log = logging.getLogger(__name__)
class FirewallService:
SUCCESS_RESPONSE = {'status': 'ok'}
def __init__(self, firewall: ArmaIndustrialFirewall = None):
self.firewall = firewall
@catch_exception
def check_status(self) -> dict:
firewall_status = caches['redis'].get(f'firewall_{self.firewall.pk}_status')
return firewall_status
@catch_exception
def download_file(self, type_file):
type_url_mapping = {
'config': ('/api/core/system/downloadConfig', 'post'),
'rulesets': ('/api/core/system/downloadIdsRulesets', 'get')
}
additional_url_path, method = type_url_mapping[type_file]
url = self.get_addr(additional_url_path)
if type_file == 'config':
hashed_key = self._get_hashed_key()
data = {"encrypt_password": hashed_key,
"encrypt_repeat_password": hashed_key,
"exclude_rrd": "on"}
else:
data = {}
response = self.send_request(url, method, data=data)
filename_header = response.headers.get('content-disposition', None)
if filename_header is None:
_log.error(f'No content disposition during downloading file from ARMA IF {response.content.decode("utf8")}')
raise NoContentDispositionException
filename = re.search("filename=(.+)", filename_header).group(1)
return response.content, filename
@catch_exception
def upload_file(self, file, type_file):
type_url_mapping = {
'config': ('/api/core/system/restoreConfig', 'post'),
'rulesets': ('/api/ids/service/addUserlocalRulesets', 'post')
}
additional_url_path, method = type_url_mapping[type_file]
url = self.get_addr(additional_url_path)
if type_file == 'config':
kwargs = {'data': {'decrypt_password': self._get_hashed_key()},
'files': {'conffile': file}}
else:
kwargs = {'files': {'uploadRulesetFiles[]': file}}
response = self.send_request(url, method, **kwargs)
response_js = self._get_json(response)
if response.status_code == requests.codes.ok:
if response_js.get('status', '') == 'ok':
if type_file == 'config':
return self.SUCCESS_RESPONSE
else:
return self._parse_rulesets_response(response_js)
elif response_js.get('status', '') == 'invalid':
_log.error(f'InvalidFile: {response_js}')
raise InvalidFileException
elif response_js.get('status', '') == 'failed':
_log.error(f'FailedUpload: {response_js}')
raise FailedUploadException
else:
_log.error(f'Unable to read firewall response: {response_js}')
raise InvalidResponseException
else:
_log.error('Error uploading config to aif: ' + str(response.text))
raise FailedUploadException
@catch_exception
def check_connection(self, connection_data=None):
"""
API for checking the connection with firewall before adding it to database. Request method must be post, and
request body should contain JSON with following keys: 'ip', 'key', 'secret'. Otherwise API will
send JSON response with error message which signals that provided data is incorrect.
Returns different JSON responses, which depends on response conditions, like following:
status = 'ok' when connection has been established
status = 'error' with different codes:
code = 'unauthorized' if wrong authentication credentials provided
code = 'connection_error' if could not get IP address, provided by user
"""
if not connection_data:
connection_data = {
'ip': self.firewall.ip,
'key': self.firewall.key,
'secret': self.firewall.secret,
}
_log.info(f'Try connect to firewall: {connection_data}')
session = requests.Session()
session.auth = (connection_data['key'], connection_data['secret'])
session.verify = False
# Forming request URL to check connection with firewall
request_url = f'http://{connection_data["ip"]}/api/core/system/info'
try:
firewall_response = session.get(request_url, timeout=FIREWALL_TIMEOUT)
if firewall_response.status_code == HTTPStatus.UNAUTHORIZED:
_log.error(f'Error: Invalid credential provided to connect to firewall with ip {connection_data["ip"]}')
raise InvalidCredentialException
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
_log.error(f'Error: {str(e)}')
raise ConnectionException
firewall_info = firewall_response.json()
firewall_version = firewall_info['items']['product_version']
if not self.firewall_version_validator(firewall_version):
_log.error(
f'The firewall version is incompatible with the current console version. IP:{connection_data["ip"]}')
raise IncompatibilityVersionException
@catch_exception
def reboot(self):
additional_url_path, method = '/api/core/system/reboot', 'get'
url = self.get_addr(additional_url_path)
response = self.send_request(url, method)
response_js = self._get_json(response)
if response_js.get('status', '') != 'ok': # Remote error
_log.error(f'Unable to reboot firewall. Response: {response_js}')
raise RemoteException
return self.SUCCESS_RESPONSE
@catch_exception
def rule_fields(self, request):
additional_url_path, method = '/api/firewall/filter/getRule/', 'get'
url = self.get_addr(additional_url_path)
response = self.send_request(url, method, request)
response_js = self._get_json(response)
return response_js
def send_request(self, request_url, method_str, request=None, **kwargs):
try:
session = self.get_session(request)
method = getattr(session, method_str)
firewall_response = method(request_url, timeout=FIREWALL_TIMEOUT, **kwargs)
if firewall_response.status_code == HTTPStatus.UNAUTHORIZED:
_log.error('Invalid credentials provided to connect to firewall')
raise InvalidCredentialException
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
_log.error(f'Error: {str(e)}')
raise ConnectionException
except requests.exceptions.RequestException as e:
_log.error(f'Firewall is offline. Error: {str(e)}')
raise ConnectionException
return firewall_response
def get_info(self):
"""
Used only in 'devices/tasks/firewall.py'.
Returns only a dict without raising an exception so that the correct data is written to redis
"""
try:
session = self.get_session()
addr = self.get_addr('api/core/system/info')
response = session.get(addr, timeout=FIREWALL_TIMEOUT)
if response.status_code == requests.codes.UNAUTHORIZED:
return {'status': ArmaIndustrialFirewallStatus.unauthorized}
try:
response_js = response.json()
except ValueError as e:
_log.warning(f'Invalid output from firewall: {e}')
return {'status': ArmaIndustrialFirewallStatus.error,
'detail': gettext_lazy('Invalid answer from firewall')}
if response_js.get('status', '') != 'ok' or response_js.get('items', {}).get('product_id', '') != 'armaif':
return {'status': ArmaIndustrialFirewallStatus.error} # remote error
return {'status': ArmaIndustrialFirewallStatus.online, 'data': response_js['items']} # success
except requests.exceptions.RequestException as e:
_log.error(f'Invalid output from firewall: {e}')
return {'status': ArmaIndustrialFirewallStatus.offline} # unreachable
def get_session(self, request=None):
# Disabling warnings spam about certificates when ARMAIF in HTTPS mode
warnings.simplefilter('ignore', InsecureRequestWarning)
session = requests.Session()
session.auth = (self.firewall.key, self.firewall.secret)
session.verify = False
if request:
if request.LANGUAGE_CODE.lower() != "en":
lang = f"{request.LANGUAGE_CODE.lower()}, en;q=0.8"
else:
lang = request.LANGUAGE_CODE.lower()
session.headers.update({"Accept-Language": lang})
return session
def get_addr(self, add_path=''):
return 'https://{}/{}'.format(self.firewall.ip, add_path.lstrip('/'))
@staticmethod
def firewall_version_validator(fw_version):
"""
# TODO: Due to the fact that version method is not completely valid for this purpose, we need to write
our own method for comparing AIF versions
Function for checking ARMAIF version compatability. Correct ARMAIF version format for this validator is:
'[numerical_version]-[mod_to_version]', numerical version must be in the following format: x.y.z, where:
x: int, major version
y: int, minor version
z: int, patch number
Also if version mod starts with `rc`, which means 'release candidate' - also returning True
:param fw_version: armaif version to check
:return: True if version os valid, False otherwise
"""
# Remove additions
fw_version_pure_version = fw_version.split('-')[0]
# Compare with minimal allowed version
return not version.parse(fw_version_pure_version) < version.parse(MINIMAL_AIF_VERSION)
@staticmethod
def _get_hashed_key():
hashed_key = sha256(settings.SECRET_KEY.encode()).hexdigest()
return hashed_key
@staticmethod
def _get_json(response: requests.Response) -> dict:
try:
return response.json()
except json.decoder.JSONDecodeError:
_log.error(f'Unable to read firewall response: {response.text}')
raise InvalidResponseException
@staticmethod
def _parse_rulesets_response(response_js):
results_json = {
'status': 'ok',
'success': 0,
'failed': 0
}
loaded_files = response_js.get('files')
for f_name, f_info in loaded_files.items():
if f_info.get('status') in ['ok', 'warning']:
results_json['success'] += 1
else:
results_json['failed'] += 1
msg = f_info.get('msg', f_info.get('trmsg', 'unknown reason'))
_log.error(f'File {f_name} failed to upload to IDS with the following message: {msg}')
return results_json
def get_all_aif_with_status_map() -> Dict[str, Dict[str, str]]:
"""Return mapping Firewalls id with status."""
firewalls = ArmaIndustrialFirewall.objects.all()
id_status_map = {str(fw.pk): FirewallService(fw).check_status() for fw in firewalls}
return id_status_map
def firewalls_status_notification_to_ws():
"""Notification about firewalls status."""
data = get_all_aif_with_status_map()
channel_layer = get_channel_layer()
async_to_sync(
channel_layer.group_send
)('notification_incidents', {'type': 'notification', 'data': {'firewalls_status': data}})