old_console/correlation/tasks.py
2024-11-02 14:12:45 +03:00

101 lines
3.1 KiB
Python

from __future__ import absolute_import, unicode_literals
import subprocess
from time import sleep
import requests
from celery import shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver
from correlation.constants import redis_instance, STARTUP_UPDATE_NAME
from correlation.models import Rule
from correlation.services.rules import to_correlator_data
_log = get_task_logger(__name__)
@shared_task
def initial_task_update(*args, **kwargs):
_log.info("Init correlator rules")
done = redis_instance.get(STARTUP_UPDATE_NAME)
if done == "True":
_log.info("Already done")
return
for iter in range(180):
if update_correlator_tasks(False):
redis_instance.set(STARTUP_UPDATE_NAME, "True")
return
sleep(1)
_log.error("Can't set initial correlation rules")
@shared_task
def update_correlator_tasks(clear_store=True):
# First - remove all rules
# Next - add new rules
if clear_store:
try:
url = settings.CORRELATOR_URL + '/clear/'
resp = requests.get(url)
if resp.status_code != 200:
_log.error("Store wasn't cleared")
return False
except requests.RequestException:
_log.error("Can't send clear request")
return False
rules = Rule.objects.filter(status=True).order_by("pk")
data = []
for rule in rules:
data.append(to_correlator_data(rule))
if len(data) == 0:
_log.info("No rules found. Correlator initialization complete")
return True
try:
url = settings.CORRELATOR_URL + '/add_many/'
headers = {'Content-type': 'application/json',
'Accept': 'text/plain',
'Content-Encoding': 'utf-8'}
_log.debug(f"Sending {len(data)} rules to correlator")
resp = requests.post(url, json=data, headers=headers)
_log.debug(f"Response code: {resp.status_code}")
_log.debug(f"Response: {resp.text}")
return resp.status_code == 200
except requests.ConnectionError as ex:
_log.error("Can't send rules to correlator. Reason -> {}".format(ex))
return False
_log.info(f"{len(data)} rules sended to correlator")
return True
@receiver(post_delete, sender=Rule)
@receiver(post_save, sender=Rule)
def sync_rules(sender, **kwargs):
""" Sync correlation rules after rule delete """
rule = kwargs['instance']
if not update_correlator_tasks(True):
_log.error(f"Can't sync correlator rules after [{rule.name}] delete")
else:
_log.info(f"Sync correlator rules after [{rule.name}] delete")
@shared_task
def reboot_correlator_task():
"""HOTFIX Task. rebut correlator and sync rules"""
_log.info('Rebut correlator')
subprocess.run(['sudo', 'systemctl', 'restart', 'amccorrelator'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
for _ in range(5):
result = update_correlator_tasks()
if result:
break
sleep(30) # wait correlator