old_console/correlation/services/import_service.py
2024-11-02 14:12:45 +03:00

319 lines
15 KiB
Python

import json
import logging
from django.conf import settings
from django.utils.translation import gettext_lazy
from packaging import version
from rest_framework.exceptions import APIException
from assets.models.assets import OperatingSystem, AssetManufacturer, AssetListGroup
from correlation.constants import EXPORT_TMP_PATH, MODEL_NAME_MAPPER
from correlation.constants import RuleParseErrors
from correlation.models import Group, Rule
from correlation.serializers import RuleImportSerializer
from incident.models import IncidentRecommendations, IncidentEffect, IncidentCategory
_log = logging.getLogger(__name__)
class ImportException(APIException):
status_code = 400
class ImportRulesService:
def __init__(self, file, check_version=True):
self.file = file
self.check_rules_version = check_version
self.current_console_version = getattr(settings, 'SITE_INFO')['version']
# storage to store the full result of the import
self.result_log = []
# list of all errors (as string) during import
self.fails_list = []
def run_import(self):
""" Main method """
with open(EXPORT_TMP_PATH, 'wb') as f:
for chunk in self.file.chunks():
f.write(chunk)
try:
with open(EXPORT_TMP_PATH, 'r') as f:
self.json_data = json.load(f)
except json.JSONDecodeError:
raise ImportException(
{'status': 'error', 'message': gettext_lazy("Incorrect JSON data in provided file")})
if self.check_rules_version:
self.check_version()
self.create_missing_objects()
for rule in self.json_data['rules']:
try:
# start validation
is_rule_valid = self.validate_action_list(rule)
if is_rule_valid:
new_rule = RuleImportSerializer(data=rule)
if new_rule.is_valid():
if self.validate_sid_rev(rule):
new_rule.save()
self.result_log.append({'rule_name': rule["name"], 'import_status': 'success',
'report_message': str(gettext_lazy('Imported successfully'))})
_log.info(f'Creating new rule {rule["name"]}')
else:
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
'report_message': self.fails_list[-1]}) # get last added error
else:
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
'report_message': f'Validation errors: {new_rule.errors}'})
_log.error(f'Validation errors: {new_rule.errors}')
else:
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
'report_message': self.fails_list[-1]}) # get last added error
_log.info(f'Rule {rule["name"]} is invalid: {self.fails_list[-1]}')
except Exception as exc:
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
'report_message': f'Unknown error: {str(exc)}'}) # get last added error
_log.info(f'Rule import completed. List of errors, occurred during import process: {self.fails_list}')
return self.result_log
def check_version(self):
try:
importing_rules_console_version = self.json_data['meta']['version']
minimum_version = version.parse(settings.MINIMAL_VERSION_CORRELATION_RULES)
importing_version = version.parse(importing_rules_console_version)
if importing_version >= minimum_version:
return
raise ImportException(
{'status': 'error', 'message': f'{gettext_lazy("Version incompatible")}'})
except KeyError:
raise ImportException(
{'status': 'error', 'message': f'{gettext_lazy("Version incompatible")}'})
def create_missing_objects(self):
"""
Each attempt to create an object in the database is wrapped in try/except.
If the object was not created, then at later stages the rule will be marked as invalid,
because the required object does not exist in the database
"""
# creating recommendations
for recommendation in self.json_data.get('close_recommendations', []):
try:
IncidentRecommendations.objects.get_or_create(
name=recommendation['name'],
defaults={
"description": recommendation.get('description', '')
}
)
except (AttributeError, TypeError):
continue
# creating effects
for effect in self.json_data.get('effects', []):
try:
IncidentEffect.objects.get_or_create(
name=effect['name'],
defaults={
'description': effect.get('description', '')
}
)
except (AttributeError, TypeError):
continue
for rule in self.json_data.get('rules', []):
for action in rule['actions_json']:
if action['type'] == 'incident' and action['category'] != '':
# there is only one incident category for each rule, but it in list
for category in action['category']:
try:
IncidentCategory.objects.get_or_create(
name=category['name'],
defaults={
'description': category.get('description', '')
}
)
except (AttributeError, TypeError):
continue
if action['type'] == 'asset':
if action['os'] != '':
# there is only one OperatingSystem for each rule, but it in list
for os in action['os']:
try:
OperatingSystem.objects.get_or_create(name=os['name'])
except (AttributeError, TypeError):
continue
if action['manufacturer'] != '':
for manufacturer in action['manufacturer']:
try:
AssetManufacturer.objects.get_or_create(
name=manufacturer['name'],
defaults={
'description': manufacturer.get('description', '')
}
)
except (AttributeError, TypeError):
continue
if action['group'] != '':
for group in action['group']:
try:
AssetListGroup.objects.get_or_create(
name=group['name'],
defaults={
'description': group.get('description', '')
}
)
except (AttributeError, TypeError):
continue
# creating rules group
group, created = Group.objects.get_or_create(name=rule.get('group'))
if created:
_log.info(f"Creating new rule group {group.name} for rule {rule.get('name')}")
rule['group'] = group.pk
def validate_action_list(self, rule):
rule_is_valid = True
for action in rule['actions_json']:
if action['type'] == 'asset':
check_list = [
['os', True],
['group', True],
['manufacturer', True],
['vulnerabilities', False]
]
rule_is_valid = self._parse_check_list(action, check_list)
elif action['type'] == 'incident':
check_list = [
['effects', False],
['close_recommendations', False],
['assigned_to', True],
['category', True]
]
rule_is_valid = self._parse_check_list(action, check_list)
if not rule_is_valid:
break
return rule_is_valid
def validate_sid_rev(self, rule):
rule_is_valid = True
# First, check if rule with same SID already exists in DB
try:
exist_rule = Rule.objects.get(sid=rule['sid'], status=True)
# If rule exists, first - check if REV is equal to importing rule
if exist_rule.rev == rule['rev']:
# If SID and REV are equal - skip the rule import by invalidating the rule
# and add to fail list message, that this rule is skipped
rule_is_valid = False
self.fails_list.append(
gettext_lazy('{rule_name} did not import because same rule already exists in database').format(
rule_name=rule['name']))
elif exist_rule.rev > rule['rev']:
# If REV of importing rule is less than REV of rule in DB, skip this rule and add message to fail list
rule_is_valid = False
self.fails_list.append(
gettext_lazy('{rule_name} did not import because newer version of rule exists in database').format(
rule_name=rule["name"]))
elif exist_rule.rev < rule['rev']:
# If REV of importing rule is greater than REV of rule in DB,
# change active status of existing rule to False
exist_rule.archived = True
exist_rule.save(is_being_parsed=True)
else:
# Add mistake if all checks are failed
rule_is_valid = False
self.fails_list.append(
f'{rule["name"]} invalid SID or REV arguments')
except Rule.DoesNotExist:
# If there are no rules like this - rule_is_valid stays as True
return rule_is_valid
except KeyError:
rule_is_valid = False
self.fails_list.append(gettext_lazy(
'Incorrect format of importing rule'))
return rule_is_valid
def _parse_check_list(self, action, check_list):
""" Function to parse all available rule's action field
:param action: current checking action from actions_json
:param check_list: list of fields, that should be checked
:return:
True if action is valid
False otherwise
"""
for check in check_list:
tmp_check = self._name_description_model_parser(action, check[0], check[1])
if tmp_check[0]:
action[check[0]] = tmp_check[1]
else:
self.fails_list.append(tmp_check[1])
return False
return True
def _name_description_model_parser(self, action, action_key, is_single):
""" Function for checking if model instance exists in DB and add it if positive.
Otherwise, return false with error log
:param action: complete parameter of action in actions_json field of importing rule
:param action_key: NameDescription model name reference in action
:param is_single: True if in parsing action model with references as action_key has ChoiceField,
False if MultipleChoiceField
:return: list containing two elements, which could be following:
[False, 'error_log'],
[True, '']
"""
def generate_error_message(error_type, error_element=''):
translation_template_message = gettext_lazy(
'"{instance_name}" instance with "{instance_arg}" name does not exist')
if error_type == RuleParseErrors.NAME_DESCRIPTION_ERROR:
if is_single:
return translation_template_message.format(
instance_name=MODEL_NAME_MAPPER[action_key]._meta.verbose_name.title(),
instance_arg=action[action_key][0]["name"])
else:
return translation_template_message.format(
instance_name=MODEL_NAME_MAPPER[action_key]._meta.verbose_name.title(),
instance_arg=error_element)
elif error_type == RuleParseErrors.FORMAT_ERROR:
return f'{MODEL_NAME_MAPPER[action_key]._meta.model_name} value has wrong format'
else:
return 'Unknown error occurred'
if is_single:
if action_key == 'assigned_to': # always clear this field
return [True, ""]
if action[action_key] != "":
try:
existing_model_instance = MODEL_NAME_MAPPER[action_key].objects.get(
name=action[action_key][0]['name'])
return [True, str(existing_model_instance.pk)]
except MODEL_NAME_MAPPER[action_key].DoesNotExist:
return [False, generate_error_message(error_type=RuleParseErrors.NAME_DESCRIPTION_ERROR)]
else:
return [True, ""]
else:
if action_key in action:
if action_key == 'vulnerabilities': # always clear this field
return [True, []]
tmp_list = []
if not isinstance(action[action_key], list):
return [False, generate_error_message(error_type=RuleParseErrors.FORMAT_ERROR)]
for element in action[action_key]:
try:
tmp_model_instance = MODEL_NAME_MAPPER[action_key].objects.get(
name=element)
tmp_list.append(str(tmp_model_instance.pk))
except MODEL_NAME_MAPPER[action_key].DoesNotExist:
return [False, generate_error_message(error_type=RuleParseErrors.NAME_DESCRIPTION_ERROR,
error_element=element)]
except TypeError:
return [False,
generate_error_message(error_type=RuleParseErrors.FORMAT_ERROR, error_element=element)]
return [True, tmp_list]
else:
return [True, ""]