290 lines
12 KiB
Python
290 lines
12 KiB
Python
import json
|
|
import logging
|
|
import re
|
|
import warnings
|
|
from hashlib import sha256
|
|
from http import HTTPStatus
|
|
from typing import Dict
|
|
|
|
import requests
|
|
from asgiref.sync import async_to_sync
|
|
from channels.layers import get_channel_layer
|
|
from django.conf import settings
|
|
from django.core.cache import caches
|
|
from django.utils.translation import gettext_lazy
|
|
from packaging import version
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
|
|
from core.utils import catch_exception
|
|
from devices.constants import FIREWALL_TIMEOUT, MINIMAL_AIF_VERSION
|
|
from devices.enums import ArmaIndustrialFirewallStatus
|
|
from devices.models.firewall import ArmaIndustrialFirewall
|
|
from devices.services.firewall.exception import (
|
|
ConnectionException, InvalidCredentialException, IncompatibilityVersionException, RemoteException,
|
|
NoContentDispositionException, InvalidResponseException, FailedUploadException, InvalidFileException)
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class FirewallService:
|
|
SUCCESS_RESPONSE = {'status': 'ok'}
|
|
|
|
def __init__(self, firewall: ArmaIndustrialFirewall = None):
|
|
self.firewall = firewall
|
|
|
|
@catch_exception
|
|
def check_status(self) -> dict:
|
|
firewall_status = caches['redis'].get(f'firewall_{self.firewall.pk}_status')
|
|
return firewall_status
|
|
|
|
@catch_exception
|
|
def download_file(self, type_file):
|
|
type_url_mapping = {
|
|
'config': ('/api/core/system/downloadConfig', 'post'),
|
|
'rulesets': ('/api/core/system/downloadIdsRulesets', 'get')
|
|
}
|
|
|
|
additional_url_path, method = type_url_mapping[type_file]
|
|
url = self.get_addr(additional_url_path)
|
|
|
|
if type_file == 'config':
|
|
hashed_key = self._get_hashed_key()
|
|
data = {"encrypt_password": hashed_key,
|
|
"encrypt_repeat_password": hashed_key,
|
|
"exclude_rrd": "on"}
|
|
else:
|
|
data = {}
|
|
|
|
response = self.send_request(url, method, data=data)
|
|
filename_header = response.headers.get('content-disposition', None)
|
|
|
|
if filename_header is None:
|
|
_log.error(f'No content disposition during downloading file from ARMA IF {response.content.decode("utf8")}')
|
|
raise NoContentDispositionException
|
|
filename = re.search("filename=(.+)", filename_header).group(1)
|
|
return response.content, filename
|
|
|
|
@catch_exception
|
|
def upload_file(self, file, type_file):
|
|
type_url_mapping = {
|
|
'config': ('/api/core/system/restoreConfig', 'post'),
|
|
'rulesets': ('/api/ids/service/addUserlocalRulesets', 'post')
|
|
}
|
|
|
|
additional_url_path, method = type_url_mapping[type_file]
|
|
url = self.get_addr(additional_url_path)
|
|
|
|
if type_file == 'config':
|
|
kwargs = {'data': {'decrypt_password': self._get_hashed_key()},
|
|
'files': {'conffile': file}}
|
|
else:
|
|
kwargs = {'files': {'uploadRulesetFiles[]': file}}
|
|
|
|
response = self.send_request(url, method, **kwargs)
|
|
response_js = self._get_json(response)
|
|
|
|
if response.status_code == requests.codes.ok:
|
|
if response_js.get('status', '') == 'ok':
|
|
if type_file == 'config':
|
|
return self.SUCCESS_RESPONSE
|
|
else:
|
|
return self._parse_rulesets_response(response_js)
|
|
elif response_js.get('status', '') == 'invalid':
|
|
_log.error(f'InvalidFile: {response_js}')
|
|
raise InvalidFileException
|
|
elif response_js.get('status', '') == 'failed':
|
|
_log.error(f'FailedUpload: {response_js}')
|
|
raise FailedUploadException
|
|
else:
|
|
_log.error(f'Unable to read firewall response: {response_js}')
|
|
raise InvalidResponseException
|
|
else:
|
|
_log.error('Error uploading config to aif: ' + str(response.text))
|
|
raise FailedUploadException
|
|
|
|
@catch_exception
|
|
def check_connection(self, connection_data=None):
|
|
"""
|
|
API for checking the connection with firewall before adding it to database. Request method must be post, and
|
|
request body should contain JSON with following keys: 'ip', 'key', 'secret'. Otherwise API will
|
|
send JSON response with error message which signals that provided data is incorrect.
|
|
Returns different JSON responses, which depends on response conditions, like following:
|
|
status = 'ok' when connection has been established
|
|
status = 'error' with different codes:
|
|
code = 'unauthorized' if wrong authentication credentials provided
|
|
code = 'connection_error' if could not get IP address, provided by user
|
|
"""
|
|
if not connection_data:
|
|
connection_data = {
|
|
'ip': self.firewall.ip,
|
|
'key': self.firewall.key,
|
|
'secret': self.firewall.secret,
|
|
}
|
|
_log.info(f'Try connect to firewall: {connection_data}')
|
|
|
|
session = requests.Session()
|
|
session.auth = (connection_data['key'], connection_data['secret'])
|
|
session.verify = False
|
|
|
|
# Forming request URL to check connection with firewall
|
|
request_url = f'http://{connection_data["ip"]}/api/core/system/info'
|
|
|
|
try:
|
|
firewall_response = session.get(request_url, timeout=FIREWALL_TIMEOUT)
|
|
|
|
if firewall_response.status_code == HTTPStatus.UNAUTHORIZED:
|
|
_log.error(f'Error: Invalid credential provided to connect to firewall with ip {connection_data["ip"]}')
|
|
raise InvalidCredentialException
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
_log.error(f'Error: {str(e)}')
|
|
raise ConnectionException
|
|
|
|
firewall_info = firewall_response.json()
|
|
firewall_version = firewall_info['items']['product_version']
|
|
|
|
if not self.firewall_version_validator(firewall_version):
|
|
_log.error(
|
|
f'The firewall version is incompatible with the current console version. IP:{connection_data["ip"]}')
|
|
raise IncompatibilityVersionException
|
|
|
|
@catch_exception
|
|
def reboot(self):
|
|
additional_url_path, method = '/api/core/system/reboot', 'get'
|
|
url = self.get_addr(additional_url_path)
|
|
response = self.send_request(url, method)
|
|
response_js = self._get_json(response)
|
|
|
|
if response_js.get('status', '') != 'ok': # Remote error
|
|
_log.error(f'Unable to reboot firewall. Response: {response_js}')
|
|
raise RemoteException
|
|
return self.SUCCESS_RESPONSE
|
|
|
|
@catch_exception
|
|
def rule_fields(self, request):
|
|
additional_url_path, method = '/api/firewall/filter/getRule/', 'get'
|
|
url = self.get_addr(additional_url_path)
|
|
response = self.send_request(url, method, request)
|
|
response_js = self._get_json(response)
|
|
return response_js
|
|
|
|
def send_request(self, request_url, method_str, request=None, **kwargs):
|
|
try:
|
|
session = self.get_session(request)
|
|
method = getattr(session, method_str)
|
|
firewall_response = method(request_url, timeout=FIREWALL_TIMEOUT, **kwargs)
|
|
if firewall_response.status_code == HTTPStatus.UNAUTHORIZED:
|
|
_log.error('Invalid credentials provided to connect to firewall')
|
|
raise InvalidCredentialException
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
_log.error(f'Error: {str(e)}')
|
|
raise ConnectionException
|
|
except requests.exceptions.RequestException as e:
|
|
_log.error(f'Firewall is offline. Error: {str(e)}')
|
|
raise ConnectionException
|
|
return firewall_response
|
|
|
|
def get_info(self):
|
|
"""
|
|
Used only in 'devices/tasks/firewall.py'.
|
|
Returns only a dict without raising an exception so that the correct data is written to redis
|
|
"""
|
|
try:
|
|
session = self.get_session()
|
|
addr = self.get_addr('api/core/system/info')
|
|
response = session.get(addr, timeout=FIREWALL_TIMEOUT)
|
|
if response.status_code == requests.codes.UNAUTHORIZED:
|
|
return {'status': ArmaIndustrialFirewallStatus.unauthorized}
|
|
try:
|
|
response_js = response.json()
|
|
except ValueError as e:
|
|
_log.warning(f'Invalid output from firewall: {e}')
|
|
return {'status': ArmaIndustrialFirewallStatus.error,
|
|
'detail': gettext_lazy('Invalid answer from firewall')}
|
|
if response_js.get('status', '') != 'ok' or response_js.get('items', {}).get('product_id', '') != 'armaif':
|
|
return {'status': ArmaIndustrialFirewallStatus.error} # remote error
|
|
return {'status': ArmaIndustrialFirewallStatus.online, 'data': response_js['items']} # success
|
|
except requests.exceptions.RequestException as e:
|
|
_log.error(f'Invalid output from firewall: {e}')
|
|
return {'status': ArmaIndustrialFirewallStatus.offline} # unreachable
|
|
|
|
def get_session(self, request=None):
|
|
# Disabling warnings spam about certificates when ARMAIF in HTTPS mode
|
|
warnings.simplefilter('ignore', InsecureRequestWarning)
|
|
session = requests.Session()
|
|
session.auth = (self.firewall.key, self.firewall.secret)
|
|
session.verify = False
|
|
if request:
|
|
if request.LANGUAGE_CODE.lower() != "en":
|
|
lang = f"{request.LANGUAGE_CODE.lower()}, en;q=0.8"
|
|
else:
|
|
lang = request.LANGUAGE_CODE.lower()
|
|
session.headers.update({"Accept-Language": lang})
|
|
return session
|
|
|
|
def get_addr(self, add_path=''):
|
|
return 'https://{}/{}'.format(self.firewall.ip, add_path.lstrip('/'))
|
|
|
|
@staticmethod
|
|
def firewall_version_validator(fw_version):
|
|
"""
|
|
# TODO: Due to the fact that version method is not completely valid for this purpose, we need to write
|
|
our own method for comparing AIF versions
|
|
Function for checking ARMAIF version compatability. Correct ARMAIF version format for this validator is:
|
|
'[numerical_version]-[mod_to_version]', numerical version must be in the following format: x.y.z, where:
|
|
x: int, major version
|
|
y: int, minor version
|
|
z: int, patch number
|
|
Also if version mod starts with `rc`, which means 'release candidate' - also returning True
|
|
:param fw_version: armaif version to check
|
|
:return: True if version os valid, False otherwise
|
|
"""
|
|
# Remove additions
|
|
fw_version_pure_version = fw_version.split('-')[0]
|
|
# Compare with minimal allowed version
|
|
return not version.parse(fw_version_pure_version) < version.parse(MINIMAL_AIF_VERSION)
|
|
|
|
@staticmethod
|
|
def _get_hashed_key():
|
|
hashed_key = sha256(settings.SECRET_KEY.encode()).hexdigest()
|
|
return hashed_key
|
|
|
|
@staticmethod
|
|
def _get_json(response: requests.Response) -> dict:
|
|
try:
|
|
return response.json()
|
|
except json.decoder.JSONDecodeError:
|
|
_log.error(f'Unable to read firewall response: {response.text}')
|
|
raise InvalidResponseException
|
|
|
|
@staticmethod
|
|
def _parse_rulesets_response(response_js):
|
|
results_json = {
|
|
'status': 'ok',
|
|
'success': 0,
|
|
'failed': 0
|
|
}
|
|
loaded_files = response_js.get('files')
|
|
for f_name, f_info in loaded_files.items():
|
|
if f_info.get('status') in ['ok', 'warning']:
|
|
results_json['success'] += 1
|
|
else:
|
|
results_json['failed'] += 1
|
|
msg = f_info.get('msg', f_info.get('trmsg', 'unknown reason'))
|
|
_log.error(f'File {f_name} failed to upload to IDS with the following message: {msg}')
|
|
return results_json
|
|
|
|
|
|
def get_all_aif_with_status_map() -> Dict[str, Dict[str, str]]:
|
|
"""Return mapping Firewalls id with status."""
|
|
firewalls = ArmaIndustrialFirewall.objects.all()
|
|
id_status_map = {str(fw.pk): FirewallService(fw).check_status() for fw in firewalls}
|
|
return id_status_map
|
|
|
|
|
|
def firewalls_status_notification_to_ws():
|
|
"""Notification about firewalls status."""
|
|
data = get_all_aif_with_status_map()
|
|
channel_layer = get_channel_layer()
|
|
async_to_sync(
|
|
channel_layer.group_send
|
|
)('notification_incidents', {'type': 'notification', 'data': {'firewalls_status': data}})
|