import json import logging from django.conf import settings from django.utils.translation import gettext_lazy from packaging import version from rest_framework.exceptions import APIException from assets.models.assets import OperatingSystem, AssetManufacturer, AssetListGroup from correlation.constants import EXPORT_TMP_PATH, MODEL_NAME_MAPPER from correlation.constants import RuleParseErrors from correlation.models import Group, Rule from correlation.serializers import RuleImportSerializer from incident.models import IncidentRecommendations, IncidentEffect, IncidentCategory _log = logging.getLogger(__name__) class ImportException(APIException): status_code = 400 class ImportRulesService: def __init__(self, file, check_version=True): self.file = file self.check_rules_version = check_version self.current_console_version = getattr(settings, 'SITE_INFO')['version'] # storage to store the full result of the import self.result_log = [] # list of all errors (as string) during import self.fails_list = [] def run_import(self): """ Main method """ with open(EXPORT_TMP_PATH, 'wb') as f: for chunk in self.file.chunks(): f.write(chunk) try: with open(EXPORT_TMP_PATH, 'r') as f: self.json_data = json.load(f) except json.JSONDecodeError: raise ImportException( {'status': 'error', 'message': gettext_lazy("Incorrect JSON data in provided file")}) if self.check_rules_version: self.check_version() self.create_missing_objects() for rule in self.json_data['rules']: try: # start validation is_rule_valid = self.validate_action_list(rule) if is_rule_valid: new_rule = RuleImportSerializer(data=rule) if new_rule.is_valid(): if self.validate_sid_rev(rule): new_rule.save() self.result_log.append({'rule_name': rule["name"], 'import_status': 'success', 'report_message': str(gettext_lazy('Imported successfully'))}) _log.info(f'Creating new rule {rule["name"]}') else: self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed', 'report_message': self.fails_list[-1]}) # get last added error else: self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed', 'report_message': f'Validation errors: {new_rule.errors}'}) _log.error(f'Validation errors: {new_rule.errors}') else: self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed', 'report_message': self.fails_list[-1]}) # get last added error _log.info(f'Rule {rule["name"]} is invalid: {self.fails_list[-1]}') except Exception as exc: self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed', 'report_message': f'Unknown error: {str(exc)}'}) # get last added error _log.info(f'Rule import completed. List of errors, occurred during import process: {self.fails_list}') return self.result_log def check_version(self): try: importing_rules_console_version = self.json_data['meta']['version'] minimum_version = version.parse(settings.MINIMAL_VERSION_CORRELATION_RULES) importing_version = version.parse(importing_rules_console_version) if importing_version >= minimum_version: return raise ImportException( {'status': 'error', 'message': f'{gettext_lazy("Version incompatible")}'}) except KeyError: raise ImportException( {'status': 'error', 'message': f'{gettext_lazy("Version incompatible")}'}) def create_missing_objects(self): """ Each attempt to create an object in the database is wrapped in try/except. If the object was not created, then at later stages the rule will be marked as invalid, because the required object does not exist in the database """ # creating recommendations for recommendation in self.json_data.get('close_recommendations', []): try: IncidentRecommendations.objects.get_or_create( name=recommendation['name'], defaults={ "description": recommendation.get('description', '') } ) except (AttributeError, TypeError): continue # creating effects for effect in self.json_data.get('effects', []): try: IncidentEffect.objects.get_or_create( name=effect['name'], defaults={ 'description': effect.get('description', '') } ) except (AttributeError, TypeError): continue for rule in self.json_data.get('rules', []): for action in rule['actions_json']: if action['type'] == 'incident' and action['category'] != '': # there is only one incident category for each rule, but it in list for category in action['category']: try: IncidentCategory.objects.get_or_create( name=category['name'], defaults={ 'description': category.get('description', '') } ) except (AttributeError, TypeError): continue if action['type'] == 'asset': if action['os'] != '': # there is only one OperatingSystem for each rule, but it in list for os in action['os']: try: OperatingSystem.objects.get_or_create(name=os['name']) except (AttributeError, TypeError): continue if action['manufacturer'] != '': for manufacturer in action['manufacturer']: try: AssetManufacturer.objects.get_or_create( name=manufacturer['name'], defaults={ 'description': manufacturer.get('description', '') } ) except (AttributeError, TypeError): continue if action['group'] != '': for group in action['group']: try: AssetListGroup.objects.get_or_create( name=group['name'], defaults={ 'description': group.get('description', '') } ) except (AttributeError, TypeError): continue # creating rules group group, created = Group.objects.get_or_create(name=rule.get('group')) if created: _log.info(f"Creating new rule group {group.name} for rule {rule.get('name')}") rule['group'] = group.pk def validate_action_list(self, rule): rule_is_valid = True for action in rule['actions_json']: if action['type'] == 'asset': check_list = [ ['os', True], ['group', True], ['manufacturer', True], ['vulnerabilities', False] ] rule_is_valid = self._parse_check_list(action, check_list) elif action['type'] == 'incident': check_list = [ ['effects', False], ['close_recommendations', False], ['assigned_to', True], ['category', True] ] rule_is_valid = self._parse_check_list(action, check_list) if not rule_is_valid: break return rule_is_valid def validate_sid_rev(self, rule): rule_is_valid = True # First, check if rule with same SID already exists in DB try: exist_rule = Rule.objects.get(sid=rule['sid'], status=True) # If rule exists, first - check if REV is equal to importing rule if exist_rule.rev == rule['rev']: # If SID and REV are equal - skip the rule import by invalidating the rule # and add to fail list message, that this rule is skipped rule_is_valid = False self.fails_list.append( gettext_lazy('{rule_name} did not import because same rule already exists in database').format( rule_name=rule['name'])) elif exist_rule.rev > rule['rev']: # If REV of importing rule is less than REV of rule in DB, skip this rule and add message to fail list rule_is_valid = False self.fails_list.append( gettext_lazy('{rule_name} did not import because newer version of rule exists in database').format( rule_name=rule["name"])) elif exist_rule.rev < rule['rev']: # If REV of importing rule is greater than REV of rule in DB, # change active status of existing rule to False exist_rule.archived = True exist_rule.save(is_being_parsed=True) else: # Add mistake if all checks are failed rule_is_valid = False self.fails_list.append( f'{rule["name"]} invalid SID or REV arguments') except Rule.DoesNotExist: # If there are no rules like this - rule_is_valid stays as True return rule_is_valid except KeyError: rule_is_valid = False self.fails_list.append(gettext_lazy( 'Incorrect format of importing rule')) return rule_is_valid def _parse_check_list(self, action, check_list): """ Function to parse all available rule's action field :param action: current checking action from actions_json :param check_list: list of fields, that should be checked :return: True if action is valid False otherwise """ for check in check_list: tmp_check = self._name_description_model_parser(action, check[0], check[1]) if tmp_check[0]: action[check[0]] = tmp_check[1] else: self.fails_list.append(tmp_check[1]) return False return True def _name_description_model_parser(self, action, action_key, is_single): """ Function for checking if model instance exists in DB and add it if positive. Otherwise, return false with error log :param action: complete parameter of action in actions_json field of importing rule :param action_key: NameDescription model name reference in action :param is_single: True if in parsing action model with references as action_key has ChoiceField, False if MultipleChoiceField :return: list containing two elements, which could be following: [False, 'error_log'], [True, ''] """ def generate_error_message(error_type, error_element=''): translation_template_message = gettext_lazy( '"{instance_name}" instance with "{instance_arg}" name does not exist') if error_type == RuleParseErrors.NAME_DESCRIPTION_ERROR: if is_single: return translation_template_message.format( instance_name=MODEL_NAME_MAPPER[action_key]._meta.verbose_name.title(), instance_arg=action[action_key][0]["name"]) else: return translation_template_message.format( instance_name=MODEL_NAME_MAPPER[action_key]._meta.verbose_name.title(), instance_arg=error_element) elif error_type == RuleParseErrors.FORMAT_ERROR: return f'{MODEL_NAME_MAPPER[action_key]._meta.model_name} value has wrong format' else: return 'Unknown error occurred' if is_single: if action_key == 'assigned_to': # always clear this field return [True, ""] if action[action_key] != "": try: existing_model_instance = MODEL_NAME_MAPPER[action_key].objects.get( name=action[action_key][0]['name']) return [True, str(existing_model_instance.pk)] except MODEL_NAME_MAPPER[action_key].DoesNotExist: return [False, generate_error_message(error_type=RuleParseErrors.NAME_DESCRIPTION_ERROR)] else: return [True, ""] else: if action_key in action: if action_key == 'vulnerabilities': # always clear this field return [True, []] tmp_list = [] if not isinstance(action[action_key], list): return [False, generate_error_message(error_type=RuleParseErrors.FORMAT_ERROR)] for element in action[action_key]: try: tmp_model_instance = MODEL_NAME_MAPPER[action_key].objects.get( name=element) tmp_list.append(str(tmp_model_instance.pk)) except MODEL_NAME_MAPPER[action_key].DoesNotExist: return [False, generate_error_message(error_type=RuleParseErrors.NAME_DESCRIPTION_ERROR, error_element=element)] except TypeError: return [False, generate_error_message(error_type=RuleParseErrors.FORMAT_ERROR, error_element=element)] return [True, tmp_list] else: return [True, ""]