import json import logging from rest_framework import serializers from rest_framework.exceptions import ValidationError from assets.models.assets import OperatingSystem, AssetListGroup, AssetManufacturer from assets.serializers.assets import AssetManufacturerSerializer, OsNameSerializer, AssetGroupSerializer from core.serializers import ModelLocalizedSerializer from core.validators import ValidateFileExtension from correlation.constants import ActionType from correlation.fields import RuleGroupRelatedField, RuleRelatedField from correlation.models import Rule, Group from incident.models import IncidentEffect, IncidentRecommendations, IncidentCategory from incident.serializers.incident import (IncidentRecommendationsSerializer, IncidentEffectSerializer, IncidentCategorySerializer) _log = logging.getLogger(__name__) class RuleSerializer(ModelLocalizedSerializer): kind = serializers.CharField(source='get_type_display') group = RuleGroupRelatedField(queryset=Group.objects.all(), allow_null=True) class Meta: model = Rule fields = ["id", "name", "description", "sid", "rev", "kind", "group", "status", "created", "updated", "multi", "archived", "type", "depth", "rule_json", "actions_json"] def create(self, validated_data): validated_data.pop("get_type_display") return super().create(validated_data) def update(self, request, validated_data, *args, **kwargs): validated_data.pop("get_type_display", None) return super().update(request, validated_data, *args, **kwargs) def validate_rule_json(self, value): list_fields = ['type', 'field', 'operands'] try: str_json = json.dumps(value) json.loads(str_json) for field in value: if field not in list_fields: raise serializers.ValidationError(f"Invalid value: unknown field - {field},\ allowed {list_fields}") except json.decoder.JSONDecodeError as err: raise serializers.ValidationError(f"Invalid JSON: {err}") value['field'] = 'NULL' value['type'] = 'query_string' return value def validate_actions_json(self, value): action_type_list = [item[0] for item in ActionType.choices] try: if type(value) == list: for item in value: str_json = json.dumps(item) json.loads(str_json) if item["type"] not in action_type_list: raise ValidationError( f"Invalid JSON: unknown action type - {item['type']}, allowed {action_type_list}") else: str_json = json.dumps(value) json.loads(str_json) if value["type"] not in action_type_list: raise ValidationError( f"Invalid JSON: unknown action type - {value['type']}, allowed {action_type_list}") except json.decoder.JSONDecodeError as err: raise ValidationError(f"Invalid JSON: {err} - {value}") except KeyError: raise ValidationError(f"Invalid JSON: type not found ") return value def validate_sid(self, sid): qs = Rule.objects.filter(sid=sid, status=True) current_rule_id = getattr(self.instance, 'id', None) # self.instance is Rule or None if current_rule_id: if qs.exclude(sid=self.instance.sid).exists(): raise ValidationError('This SID is already in use') return sid else: if qs.exists(): raise ValidationError('This SID is already in use') return sid def validate(self, attrs): if not self.instance: # creating Rule return attrs # if the rule is disabled and we are trying to enable it, but there is already an enabled rule - return an error if self.instance.status is False and attrs.get('status', False): if Rule.objects.filter(sid=self.instance.sid, status=True).exists(): raise ValidationError({'status': f'There is already an enabled rule with this sid: {self.instance.sid}'}) # if we are editing a rule, but there is already a saved rule with the current SID and REV+1 - return an error if Rule.objects.filter(sid=self.instance.sid, rev=self.instance.rev+1).exists(): raise ValidationError( {'sid': f'Check that there is no rule with SID {self.instance.sid} and REV {self.instance.rev+1} ' f'and try again'}) return attrs class RuleExportSerializer(ModelLocalizedSerializer): group = serializers.SerializerMethodField() actions_json = serializers.SerializerMethodField() class Meta: model = Rule fields = '__all__' def get_actions_json(self, obj): export_actions = obj.actions_json for action in export_actions: if action['type'] == 'incident': if 'effects' in action: action['effects'] = IncidentEffectSerializer( IncidentEffect.objects.filter(pk__in=list(map(int, action['effects']))), many=True).data if 'close_recommendations' in action: action['close_recommendations'] = IncidentRecommendationsSerializer( IncidentRecommendations.objects.filter(pk__in=list(map(int, action['close_recommendations']))), many=True).data if action['assigned_to'] != '': action['assigned_to'] = '' if action['category'] != '': action['category'] = IncidentCategorySerializer( IncidentCategory.objects.filter(pk=int(action['category'])), many=True).data if action['type'] == 'asset': if action['os'] != '': action['os'] = OsNameSerializer(OperatingSystem.objects.filter(pk=int(action['os'])), many=True).data if action['group'] != '': action['group'] = AssetGroupSerializer(AssetListGroup.objects.filter(pk=int(action['group'])), many=True).data if action['manufacturer'] != '': action['manufacturer'] = AssetManufacturerSerializer( AssetManufacturer.objects.filter(pk=int(action['manufacturer'])), many=True).data if 'vulnerabilities' in action: action['vulnerabilities'] = '' return export_actions def get_group(self, obj: Rule): if obj.group: return obj.group.name else: return "" class GroupSerializer(serializers.ModelSerializer): rules = RuleRelatedField(queryset=Rule.objects.all(), default=[], many=True) class Meta: model = Group fields = ["id", "name", "description", "rules"] class ImporRulesFileSerializer(serializers.Serializer): uploaded_file = serializers.FileField(validators=[ValidateFileExtension(['.json'])]) class RuleImportSerializer(ModelLocalizedSerializer): class Meta: model = Rule fields = [ "name", "description", "group", "type", "status", "depth", "actions_json", "rule_json", "multi", "sid", "rev" ]