166 lines
7.3 KiB
Python
166 lines
7.3 KiB
Python
import json
|
|
import logging
|
|
|
|
from rest_framework import serializers
|
|
from rest_framework.exceptions import ValidationError
|
|
|
|
from assets.models.assets import OperatingSystem, AssetListGroup, AssetManufacturer
|
|
from assets.serializers.assets import AssetManufacturerSerializer, OsNameSerializer, AssetGroupSerializer
|
|
from core.serializers import ModelLocalizedSerializer
|
|
from core.validators import ValidateFileExtension
|
|
from correlation.constants import ActionType
|
|
from correlation.fields import RuleGroupRelatedField, RuleRelatedField
|
|
from correlation.models import Rule, Group
|
|
from incident.models import IncidentEffect, IncidentRecommendations, IncidentCategory
|
|
from incident.serializers.incident import (IncidentRecommendationsSerializer, IncidentEffectSerializer,
|
|
IncidentCategorySerializer)
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class RuleSerializer(ModelLocalizedSerializer):
|
|
kind = serializers.CharField(source='get_type_display')
|
|
group = RuleGroupRelatedField(queryset=Group.objects.all(), allow_null=True)
|
|
|
|
class Meta:
|
|
model = Rule
|
|
fields = ["id", "name", "description", "sid", "rev", "kind", "group", "status", "created", "updated", "multi",
|
|
"archived", "type", "depth", "rule_json", "actions_json"]
|
|
|
|
def create(self, validated_data):
|
|
validated_data.pop("get_type_display")
|
|
return super().create(validated_data)
|
|
|
|
def update(self, request, validated_data, *args, **kwargs):
|
|
validated_data.pop("get_type_display", None)
|
|
return super().update(request, validated_data, *args, **kwargs)
|
|
|
|
def validate_rule_json(self, value):
|
|
list_fields = ['type', 'field', 'operands']
|
|
try:
|
|
str_json = json.dumps(value)
|
|
json.loads(str_json)
|
|
for field in value:
|
|
if field not in list_fields:
|
|
raise serializers.ValidationError(f"Invalid value: unknown field - {field},\
|
|
allowed {list_fields}")
|
|
except json.decoder.JSONDecodeError as err:
|
|
raise serializers.ValidationError(f"Invalid JSON: {err}")
|
|
value['field'] = 'NULL'
|
|
value['type'] = 'query_string'
|
|
return value
|
|
|
|
def validate_actions_json(self, value):
|
|
action_type_list = [item[0] for item in ActionType.choices]
|
|
try:
|
|
if type(value) == list:
|
|
for item in value:
|
|
str_json = json.dumps(item)
|
|
json.loads(str_json)
|
|
if item["type"] not in action_type_list:
|
|
raise ValidationError(
|
|
f"Invalid JSON: unknown action type - {item['type']}, allowed {action_type_list}")
|
|
else:
|
|
str_json = json.dumps(value)
|
|
json.loads(str_json)
|
|
if value["type"] not in action_type_list:
|
|
raise ValidationError(
|
|
f"Invalid JSON: unknown action type - {value['type']}, allowed {action_type_list}")
|
|
except json.decoder.JSONDecodeError as err:
|
|
raise ValidationError(f"Invalid JSON: {err} - {value}")
|
|
except KeyError:
|
|
raise ValidationError(f"Invalid JSON: type not found ")
|
|
return value
|
|
|
|
def validate_sid(self, sid):
|
|
qs = Rule.objects.filter(sid=sid, status=True)
|
|
current_rule_id = getattr(self.instance, 'id', None) # self.instance is Rule or None
|
|
if current_rule_id:
|
|
if qs.exclude(sid=self.instance.sid).exists():
|
|
raise ValidationError('This SID is already in use')
|
|
return sid
|
|
else:
|
|
if qs.exists():
|
|
raise ValidationError('This SID is already in use')
|
|
return sid
|
|
|
|
def validate(self, attrs):
|
|
if not self.instance: # creating Rule
|
|
return attrs
|
|
# if the rule is disabled and we are trying to enable it, but there is already an enabled rule - return an error
|
|
if self.instance.status is False and attrs.get('status', False):
|
|
if Rule.objects.filter(sid=self.instance.sid, status=True).exists():
|
|
raise ValidationError({'status': f'There is already an enabled rule with this sid: {self.instance.sid}'})
|
|
|
|
# if we are editing a rule, but there is already a saved rule with the current SID and REV+1 - return an error
|
|
if Rule.objects.filter(sid=self.instance.sid, rev=self.instance.rev+1).exists():
|
|
raise ValidationError(
|
|
{'sid': f'Check that there is no rule with SID {self.instance.sid} and REV {self.instance.rev+1} '
|
|
f'and try again'})
|
|
return attrs
|
|
|
|
|
|
class RuleExportSerializer(ModelLocalizedSerializer):
|
|
group = serializers.SerializerMethodField()
|
|
actions_json = serializers.SerializerMethodField()
|
|
|
|
class Meta:
|
|
model = Rule
|
|
fields = '__all__'
|
|
|
|
def get_actions_json(self, obj):
|
|
export_actions = obj.actions_json
|
|
for action in export_actions:
|
|
if action['type'] == 'incident':
|
|
if 'effects' in action:
|
|
action['effects'] = IncidentEffectSerializer(
|
|
IncidentEffect.objects.filter(pk__in=list(map(int, action['effects']))), many=True).data
|
|
if 'close_recommendations' in action:
|
|
action['close_recommendations'] = IncidentRecommendationsSerializer(
|
|
IncidentRecommendations.objects.filter(pk__in=list(map(int, action['close_recommendations']))),
|
|
many=True).data
|
|
if action['assigned_to'] != '':
|
|
action['assigned_to'] = ''
|
|
if action['category'] != '':
|
|
action['category'] = IncidentCategorySerializer(
|
|
IncidentCategory.objects.filter(pk=int(action['category'])), many=True).data
|
|
if action['type'] == 'asset':
|
|
if action['os'] != '':
|
|
action['os'] = OsNameSerializer(OperatingSystem.objects.filter(pk=int(action['os'])),
|
|
many=True).data
|
|
if action['group'] != '':
|
|
action['group'] = AssetGroupSerializer(AssetListGroup.objects.filter(pk=int(action['group'])),
|
|
many=True).data
|
|
if action['manufacturer'] != '':
|
|
action['manufacturer'] = AssetManufacturerSerializer(
|
|
AssetManufacturer.objects.filter(pk=int(action['manufacturer'])), many=True).data
|
|
if 'vulnerabilities' in action:
|
|
action['vulnerabilities'] = ''
|
|
return export_actions
|
|
|
|
def get_group(self, obj: Rule):
|
|
if obj.group:
|
|
return obj.group.name
|
|
else:
|
|
return ""
|
|
|
|
|
|
class GroupSerializer(serializers.ModelSerializer):
|
|
rules = RuleRelatedField(queryset=Rule.objects.all(), default=[], many=True)
|
|
|
|
class Meta:
|
|
model = Group
|
|
fields = ["id", "name", "description", "rules"]
|
|
|
|
|
|
class ImporRulesFileSerializer(serializers.Serializer):
|
|
uploaded_file = serializers.FileField(validators=[ValidateFileExtension(['.json'])])
|
|
|
|
|
|
class RuleImportSerializer(ModelLocalizedSerializer):
|
|
class Meta:
|
|
model = Rule
|
|
fields = [
|
|
"name", "description", "group", "type", "status", "depth",
|
|
"actions_json", "rule_json", "multi", "sid", "rev"
|
|
]
|