88 lines
2.8 KiB
Python
88 lines
2.8 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
|
|
from time import sleep
|
|
|
|
import redis
|
|
import requests
|
|
from celery import shared_task
|
|
from celery.utils.log import get_task_logger
|
|
from django.conf import settings
|
|
from django.db.models.signals import post_delete
|
|
from django.dispatch import receiver
|
|
from correlation.services.rules import to_correlator_data
|
|
|
|
from correlation.models import Rule
|
|
|
|
_log = get_task_logger(__name__)
|
|
CORRELATOR_URL = getattr(settings, 'CORRELATOR_URL', 'http://localhost:5566')
|
|
STARTUP_UPDATE_NAME = "correlator_startup_tasks"
|
|
redis_instance = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
|
|
|
|
|
|
@shared_task
|
|
def initial_task_update(*args, **kwargs):
|
|
_log.info("Init correlator rules")
|
|
done = redis_instance.get(STARTUP_UPDATE_NAME)
|
|
if done == "True":
|
|
_log.info("Already done")
|
|
return
|
|
|
|
for iter in range(180):
|
|
if update_correlator_tasks(False):
|
|
redis_instance.set(STARTUP_UPDATE_NAME, "True")
|
|
return
|
|
sleep(1)
|
|
|
|
_log.error("Can't set initial correlation rules")
|
|
|
|
|
|
@shared_task
|
|
def update_correlator_tasks(clear_store=True):
|
|
# First - remove all rules
|
|
# Next - add new rules
|
|
if clear_store:
|
|
try:
|
|
url = settings.CORRELATOR_URL + '/clear/'
|
|
resp = requests.get(url)
|
|
if resp.status_code != 200:
|
|
_log.error("Store wasn't cleared")
|
|
return False
|
|
except requests.RequestException:
|
|
_log.error("Can't send clear request")
|
|
return False
|
|
|
|
rules = Rule.objects.filter(status=True).order_by("pk")
|
|
data = []
|
|
for rule in rules:
|
|
data.append(to_correlator_data(rule))
|
|
|
|
if len(data) == 0:
|
|
_log.info("No rules found. Correlator initialization complete")
|
|
return True
|
|
|
|
try:
|
|
url = settings.CORRELATOR_URL + '/add_many/'
|
|
headers = {'Content-type': 'application/json',
|
|
'Accept': 'text/plain',
|
|
'Content-Encoding': 'utf-8'}
|
|
_log.debug(f"Sending {len(data)} rules to correlator")
|
|
resp = requests.post(url, json=data, headers=headers)
|
|
_log.debug(f"Response code: {resp.status_code}")
|
|
_log.debug(f"Response: {resp.text}")
|
|
return resp.status_code == 200
|
|
except requests.ConnectionError as ex:
|
|
_log.error("Can't send rules to correlator. Reason -> {}".format(ex))
|
|
return False
|
|
|
|
_log.info(f"{len(data)} rules sended to correlator")
|
|
return True
|
|
|
|
|
|
@receiver(post_delete, sender=Rule)
|
|
def sync_rules(sender, **kwargs):
|
|
""" Sync correlation rules after rule delete """
|
|
rule = kwargs['instance']
|
|
if not update_correlator_tasks(True):
|
|
_log.error(f"Can't sync correlator rules after [{rule.name}] delete")
|
|
else:
|
|
_log.info(f"Sync correlator rules after [{rule.name}] delete")
|