old_console/logstash/api.py
2024-11-02 14:12:45 +03:00

110 lines
3.8 KiB
Python

import ipaddress
import logging
from rest_framework import viewsets
from rest_framework.mixins import CreateModelMixin, ListModelMixin
from assets.models.assets import Asset, AssetManufacturer
from console.models import Vulnerability
from core.mixins import ApiPermissionCheckMixin
from incident.models import Incident
from incident.services.ws_incidents import incident_count_notification_to_ws
from inputs.models import LogInput
from inputs.services.inputs import get_sensor
from logstash.serializers import (IncidentSerializer, AssetCorrelationSerializer,
VulnerabilitiesSerializer, ManufacturerSerializer)
from perms.models import Perm
_log = logging.getLogger(__name__)
class IncidentCreatorViewSet(ApiPermissionCheckMixin,
CreateModelMixin,
viewsets.GenericViewSet):
serializer_class = IncidentSerializer
console_permissions = [Perm.can_work_with_incidents]
def perform_create(self, serializer):
super().perform_create(serializer)
instance = serializer.instance
ip_list = set()
for event in instance.events:
ip_list.update(
[event["source_ip"], event["destination_ip"]])
# add sensor's ip, witch will be used if no match found
try:
_, sensor = get_sensor(instance.sensor)
ip_list.add(sensor.ip)
except RuntimeError as e:
_log.error(f"Bad sensor value: {instance.sensor} -> {str(e)}")
except LogInput.DoesNotExist:
_log.warning(f"No sensor with that type and pk: {instance.sensor}")
except AttributeError:
_log.warning(f"No sensor for this input exist")
# remove empty strings
ip_list.discard('')
cleaned_ip_list = []
for element in ip_list:
try:
ipaddress.ip_address(element)
cleaned_ip_list.append(element)
except ValueError:
continue
assets = Asset.objects.filter(ip__in=cleaned_ip_list)
for asset in assets:
asset.incidents.add(instance)
asset.save()
_log.info(f"Add incident {instance.incident_id} to asset {asset.name} ({asset.pk})")
incident_count_notification_to_ws()
class Meta:
model = Incident
class AssetCorrelatorViewSet(CreateModelMixin,
ApiPermissionCheckMixin,
viewsets.GenericViewSet):
serializer_class = AssetCorrelationSerializer
console_permissions = [Perm.can_edit_asset]
def perform_create(self, serializer):
# Get Asset id from serialized data
asset_ip = serializer.validated_data['ip']
# Perform creation of an asset, if there are no similar in DB already
if Asset.objects.filter(ip=asset_ip).exists():
_log.info(f'Asset with IP: {asset_ip} already exists. Creation aborted')
else:
serializer.save(status=0)
_log.info(f'Asset [{asset_ip}] has been created')
class Meta:
model = Asset
class AssetManufacturerViewSet(ApiPermissionCheckMixin,
ListModelMixin,
viewsets.GenericViewSet):
serializer_class = ManufacturerSerializer
console_permissions = [Perm.can_edit_asset]
queryset = AssetManufacturer.objects.order_by("name")
class Meta:
model = AssetManufacturer
class AssetVulnerabilitiesViewSet(ApiPermissionCheckMixin,
ListModelMixin,
viewsets.GenericViewSet):
serializer_class = VulnerabilitiesSerializer
console_permissions = [Perm.can_view_vulnerabilities]
queryset = Vulnerability.objects.order_by("name")
class Meta:
model = Vulnerability