319 lines
15 KiB
Python
319 lines
15 KiB
Python
import json
|
|
import logging
|
|
|
|
from django.conf import settings
|
|
from django.utils.translation import gettext_lazy
|
|
from packaging import version
|
|
from rest_framework.exceptions import APIException
|
|
|
|
from assets.models.assets import OperatingSystem, AssetManufacturer, AssetListGroup
|
|
from correlation.constants import EXPORT_TMP_PATH, MODEL_NAME_MAPPER
|
|
from correlation.constants import RuleParseErrors
|
|
from correlation.models import Group, Rule
|
|
from correlation.serializers import RuleImportSerializer
|
|
from incident.models import IncidentRecommendations, IncidentEffect, IncidentCategory
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class ImportException(APIException):
|
|
status_code = 400
|
|
|
|
|
|
class ImportRulesService:
|
|
|
|
def __init__(self, file, check_version=True):
|
|
self.file = file
|
|
self.check_rules_version = check_version
|
|
self.current_console_version = getattr(settings, 'SITE_INFO')['version']
|
|
|
|
# storage to store the full result of the import
|
|
self.result_log = []
|
|
|
|
# list of all errors (as string) during import
|
|
self.fails_list = []
|
|
|
|
def run_import(self):
|
|
""" Main method """
|
|
|
|
with open(EXPORT_TMP_PATH, 'wb') as f:
|
|
for chunk in self.file.chunks():
|
|
f.write(chunk)
|
|
try:
|
|
with open(EXPORT_TMP_PATH, 'r') as f:
|
|
self.json_data = json.load(f)
|
|
except json.JSONDecodeError:
|
|
raise ImportException(
|
|
{'status': 'error', 'message': gettext_lazy("Incorrect JSON data in provided file")})
|
|
|
|
if self.check_rules_version:
|
|
self.check_version()
|
|
self.create_missing_objects()
|
|
|
|
for rule in self.json_data['rules']:
|
|
try:
|
|
# start validation
|
|
is_rule_valid = self.validate_action_list(rule)
|
|
if is_rule_valid:
|
|
new_rule = RuleImportSerializer(data=rule)
|
|
if new_rule.is_valid():
|
|
if self.validate_sid_rev(rule):
|
|
new_rule.save()
|
|
self.result_log.append({'rule_name': rule["name"], 'import_status': 'success',
|
|
'report_message': str(gettext_lazy('Imported successfully'))})
|
|
_log.info(f'Creating new rule {rule["name"]}')
|
|
else:
|
|
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
|
|
'report_message': self.fails_list[-1]}) # get last added error
|
|
else:
|
|
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
|
|
'report_message': f'Validation errors: {new_rule.errors}'})
|
|
_log.error(f'Validation errors: {new_rule.errors}')
|
|
else:
|
|
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
|
|
'report_message': self.fails_list[-1]}) # get last added error
|
|
_log.info(f'Rule {rule["name"]} is invalid: {self.fails_list[-1]}')
|
|
|
|
except Exception as exc:
|
|
self.result_log.append({'rule_name': rule["name"], 'import_status': 'failed',
|
|
'report_message': f'Unknown error: {str(exc)}'}) # get last added error
|
|
|
|
_log.info(f'Rule import completed. List of errors, occurred during import process: {self.fails_list}')
|
|
|
|
return self.result_log
|
|
|
|
def check_version(self):
|
|
try:
|
|
importing_rules_console_version = self.json_data['meta']['version']
|
|
minimum_version = version.parse(settings.MINIMAL_VERSION_CORRELATION_RULES)
|
|
importing_version = version.parse(importing_rules_console_version)
|
|
if importing_version >= minimum_version:
|
|
return
|
|
raise ImportException(
|
|
{'status': 'error', 'message': f'{gettext_lazy("Version incompatible")}'})
|
|
except KeyError:
|
|
raise ImportException(
|
|
{'status': 'error', 'message': f'{gettext_lazy("Version incompatible")}'})
|
|
|
|
def create_missing_objects(self):
|
|
"""
|
|
Each attempt to create an object in the database is wrapped in try/except.
|
|
If the object was not created, then at later stages the rule will be marked as invalid,
|
|
because the required object does not exist in the database
|
|
"""
|
|
|
|
# creating recommendations
|
|
for recommendation in self.json_data.get('close_recommendations', []):
|
|
try:
|
|
IncidentRecommendations.objects.get_or_create(
|
|
name=recommendation['name'],
|
|
defaults={
|
|
"description": recommendation.get('description', '')
|
|
}
|
|
)
|
|
except (AttributeError, TypeError):
|
|
continue
|
|
# creating effects
|
|
for effect in self.json_data.get('effects', []):
|
|
try:
|
|
IncidentEffect.objects.get_or_create(
|
|
name=effect['name'],
|
|
defaults={
|
|
'description': effect.get('description', '')
|
|
}
|
|
)
|
|
except (AttributeError, TypeError):
|
|
continue
|
|
for rule in self.json_data.get('rules', []):
|
|
for action in rule['actions_json']:
|
|
if action['type'] == 'incident' and action['category'] != '':
|
|
# there is only one incident category for each rule, but it in list
|
|
for category in action['category']:
|
|
try:
|
|
IncidentCategory.objects.get_or_create(
|
|
name=category['name'],
|
|
defaults={
|
|
'description': category.get('description', '')
|
|
}
|
|
)
|
|
except (AttributeError, TypeError):
|
|
continue
|
|
if action['type'] == 'asset':
|
|
if action['os'] != '':
|
|
# there is only one OperatingSystem for each rule, but it in list
|
|
for os in action['os']:
|
|
try:
|
|
OperatingSystem.objects.get_or_create(name=os['name'])
|
|
except (AttributeError, TypeError):
|
|
continue
|
|
if action['manufacturer'] != '':
|
|
for manufacturer in action['manufacturer']:
|
|
try:
|
|
AssetManufacturer.objects.get_or_create(
|
|
name=manufacturer['name'],
|
|
defaults={
|
|
'description': manufacturer.get('description', '')
|
|
}
|
|
)
|
|
except (AttributeError, TypeError):
|
|
continue
|
|
if action['group'] != '':
|
|
for group in action['group']:
|
|
try:
|
|
AssetListGroup.objects.get_or_create(
|
|
name=group['name'],
|
|
defaults={
|
|
'description': group.get('description', '')
|
|
}
|
|
)
|
|
except (AttributeError, TypeError):
|
|
continue
|
|
# creating rules group
|
|
group, created = Group.objects.get_or_create(name=rule.get('group'))
|
|
if created:
|
|
_log.info(f"Creating new rule group {group.name} for rule {rule.get('name')}")
|
|
rule['group'] = group.pk
|
|
|
|
def validate_action_list(self, rule):
|
|
rule_is_valid = True
|
|
|
|
for action in rule['actions_json']:
|
|
if action['type'] == 'asset':
|
|
check_list = [
|
|
['os', True],
|
|
['group', True],
|
|
['manufacturer', True],
|
|
['vulnerabilities', False]
|
|
]
|
|
rule_is_valid = self._parse_check_list(action, check_list)
|
|
elif action['type'] == 'incident':
|
|
check_list = [
|
|
['effects', False],
|
|
['close_recommendations', False],
|
|
['assigned_to', True],
|
|
['category', True]
|
|
]
|
|
rule_is_valid = self._parse_check_list(action, check_list)
|
|
if not rule_is_valid:
|
|
break
|
|
return rule_is_valid
|
|
|
|
def validate_sid_rev(self, rule):
|
|
rule_is_valid = True
|
|
|
|
# First, check if rule with same SID already exists in DB
|
|
try:
|
|
exist_rule = Rule.objects.get(sid=rule['sid'], status=True)
|
|
# If rule exists, first - check if REV is equal to importing rule
|
|
if exist_rule.rev == rule['rev']:
|
|
# If SID and REV are equal - skip the rule import by invalidating the rule
|
|
# and add to fail list message, that this rule is skipped
|
|
rule_is_valid = False
|
|
self.fails_list.append(
|
|
gettext_lazy('{rule_name} did not import because same rule already exists in database').format(
|
|
rule_name=rule['name']))
|
|
elif exist_rule.rev > rule['rev']:
|
|
# If REV of importing rule is less than REV of rule in DB, skip this rule and add message to fail list
|
|
rule_is_valid = False
|
|
self.fails_list.append(
|
|
gettext_lazy('{rule_name} did not import because newer version of rule exists in database').format(
|
|
rule_name=rule["name"]))
|
|
elif exist_rule.rev < rule['rev']:
|
|
# If REV of importing rule is greater than REV of rule in DB,
|
|
# change active status of existing rule to False
|
|
exist_rule.archived = True
|
|
exist_rule.save(is_being_parsed=True)
|
|
else:
|
|
# Add mistake if all checks are failed
|
|
rule_is_valid = False
|
|
self.fails_list.append(
|
|
f'{rule["name"]} invalid SID or REV arguments')
|
|
except Rule.DoesNotExist:
|
|
# If there are no rules like this - rule_is_valid stays as True
|
|
return rule_is_valid
|
|
except KeyError:
|
|
rule_is_valid = False
|
|
self.fails_list.append(gettext_lazy(
|
|
'Incorrect format of importing rule'))
|
|
|
|
return rule_is_valid
|
|
|
|
def _parse_check_list(self, action, check_list):
|
|
""" Function to parse all available rule's action field
|
|
:param action: current checking action from actions_json
|
|
:param check_list: list of fields, that should be checked
|
|
:return:
|
|
True if action is valid
|
|
False otherwise
|
|
"""
|
|
for check in check_list:
|
|
tmp_check = self._name_description_model_parser(action, check[0], check[1])
|
|
if tmp_check[0]:
|
|
action[check[0]] = tmp_check[1]
|
|
else:
|
|
self.fails_list.append(tmp_check[1])
|
|
return False
|
|
return True
|
|
|
|
def _name_description_model_parser(self, action, action_key, is_single):
|
|
""" Function for checking if model instance exists in DB and add it if positive.
|
|
Otherwise, return false with error log
|
|
:param action: complete parameter of action in actions_json field of importing rule
|
|
:param action_key: NameDescription model name reference in action
|
|
:param is_single: True if in parsing action model with references as action_key has ChoiceField,
|
|
False if MultipleChoiceField
|
|
:return: list containing two elements, which could be following:
|
|
[False, 'error_log'],
|
|
[True, '']
|
|
"""
|
|
|
|
def generate_error_message(error_type, error_element=''):
|
|
translation_template_message = gettext_lazy(
|
|
'"{instance_name}" instance with "{instance_arg}" name does not exist')
|
|
if error_type == RuleParseErrors.NAME_DESCRIPTION_ERROR:
|
|
if is_single:
|
|
return translation_template_message.format(
|
|
instance_name=MODEL_NAME_MAPPER[action_key]._meta.verbose_name.title(),
|
|
instance_arg=action[action_key][0]["name"])
|
|
else:
|
|
return translation_template_message.format(
|
|
instance_name=MODEL_NAME_MAPPER[action_key]._meta.verbose_name.title(),
|
|
instance_arg=error_element)
|
|
elif error_type == RuleParseErrors.FORMAT_ERROR:
|
|
return f'{MODEL_NAME_MAPPER[action_key]._meta.model_name} value has wrong format'
|
|
else:
|
|
return 'Unknown error occurred'
|
|
|
|
if is_single:
|
|
if action_key == 'assigned_to': # always clear this field
|
|
return [True, ""]
|
|
if action[action_key] != "":
|
|
try:
|
|
existing_model_instance = MODEL_NAME_MAPPER[action_key].objects.get(
|
|
name=action[action_key][0]['name'])
|
|
return [True, str(existing_model_instance.pk)]
|
|
except MODEL_NAME_MAPPER[action_key].DoesNotExist:
|
|
return [False, generate_error_message(error_type=RuleParseErrors.NAME_DESCRIPTION_ERROR)]
|
|
else:
|
|
return [True, ""]
|
|
else:
|
|
if action_key in action:
|
|
if action_key == 'vulnerabilities': # always clear this field
|
|
return [True, []]
|
|
tmp_list = []
|
|
if not isinstance(action[action_key], list):
|
|
return [False, generate_error_message(error_type=RuleParseErrors.FORMAT_ERROR)]
|
|
for element in action[action_key]:
|
|
try:
|
|
tmp_model_instance = MODEL_NAME_MAPPER[action_key].objects.get(
|
|
name=element)
|
|
tmp_list.append(str(tmp_model_instance.pk))
|
|
except MODEL_NAME_MAPPER[action_key].DoesNotExist:
|
|
return [False, generate_error_message(error_type=RuleParseErrors.NAME_DESCRIPTION_ERROR,
|
|
error_element=element)]
|
|
except TypeError:
|
|
return [False,
|
|
generate_error_message(error_type=RuleParseErrors.FORMAT_ERROR, error_element=element)]
|
|
return [True, tmp_list]
|
|
else:
|
|
return [True, ""]
|