import json import os from typing import Optional from celery import shared_task from celery.utils.log import get_task_logger from django.conf import settings from django.contrib.auth.models import User from django.core.cache import caches from django.utils.translation import gettext_lazy from elasticsearch import Elasticsearch, TransportError from core.utils import dtnow from incident.models import Incident from rotation import constants from rotation.enums import RotationType from rotation.models import EventRotationSettings, IncidentRotationSettings from rotation.serializers import DumpIncidentSerializer from storage.models import DataStorage, get_storage_path _log = get_task_logger(__name__) MEDIA_ROOT = getattr(settings, 'MEDIA_ROOT') def elk_search_next_page(search_body, es_instance, first_sort_value, file_descriptor): """ Function for execution of 'search_next' method of getting values from elasticsearch for receiving values more than default 10000 instances. :param search_body: search_body for elasticsearch request. MUST contain PIT argument :param es_instance: instance of Elasticsearch-py :param first_sort_value: sort value of last initial search hit :param file_descriptor: file to write received data """ search_active = True search_body["search_after"] = first_sort_value search_body['track_total_hits'] = 'false' while search_active: es_search_next = es_instance.search(body=search_body, filter_path=['hits.hits.*']) if es_search_next == {}: search_active = False else: search_list = es_search_next['hits']['hits'] for count, event in enumerate(search_list): file_descriptor.write(json.dumps(event, ensure_ascii=False)) file_descriptor.write(',') if count == len(search_list) - 1: search_body["search_after"] = event['sort'] search_body['track_total_hits'] = 'false' def delete_elasticsearch_indexes_by_template( index_template: str, *, es: Optional[Elasticsearch] = None, exclude_indexes: Optional[set] = None ) -> None: """Delete indexes by index template""" if es is None: es = Elasticsearch([{'host': constants.ELK_HOST, 'port': constants.ELK_PORT}], http_auth=(constants.ELK_LOGIN, constants.ELK_PASS)) indexes_for_delete = set(es.indices.get_alias(index=index_template).keys()) if not indexes_for_delete: return if exclude_indexes is not None: indexes_for_delete -= exclude_indexes _log.info(f'Delete indexes: {indexes_for_delete}') for index in indexes_for_delete: try: es.indices.delete(index=index, ignore=404) except TransportError as err: _log.exception(f'Following error occurred when trying to delete elasticsearch index {index}: {err}') continue def rotate_elasticsearch_events(settings_class): """ Function for rotating elasticsearch events :param settings_class: RotationSettings class for events rotation. Must be SOLO """ settings = settings_class.get_solo() if settings.rotation_type == RotationType.SIZE: current_amount_of_aggregated_events = caches['redis'].get(constants.REDIS_ELK_EVENTS_KEY) if current_amount_of_aggregated_events < settings.size_rotation: _log.info( f'Events rotation not initiated. Events total amount {current_amount_of_aggregated_events} rotation threshold {settings.size_rotation}' ) return _log.info('Start of aggregated events rotation') admin = User.objects.filter(is_superuser=True).first() store = DataStorage(type=DataStorage.Type.DB_DUMP, format=DataStorage.Format.JSON, created=dtnow(), last_access=dtnow(), user=admin, size=0, file='name', description=gettext_lazy('Aggregated events rotation')) file_name = get_storage_path(store, 'aggregated_events_dump.json') fpath = os.path.join(constants.MEDIA_ROOT, file_name) if not os.path.exists(os.path.dirname(fpath)): os.makedirs(os.path.dirname(fpath), exist_ok=True) # Getting PIT for search_after requests es = Elasticsearch([{'host': constants.ELK_HOST, 'port': constants.ELK_PORT}], http_auth=(constants.ELK_LOGIN, constants.ELK_PASS)) elk_pit = es.open_point_in_time(index='aggregated-*', keep_alive='1m') # Initiating a search with port and PIT argument search_body = { "size": 10000, "query": { "range": { "event_timestamp": { "lt": "now" } } }, "pit": elk_pit, "sort": [ {"event_timestamp": "asc"} ] } es_search = es.search(body=search_body, filter_path=['hits.hits.*']) events = es_search.get('hits', {}).get('hits', []) # Staring dumping the ELK data with open(fpath, 'w') as f: f.write('[') for count, event in enumerate(events): f.write(json.dumps(event, ensure_ascii=False)) f.write(',') if count == len(events) - 1: sort_value = event['sort'] elk_search_next_page(search_body, es, sort_value, f) f.write(']') delete_body = { "query": { "range": { "event_timestamp": { "lt": "now" } } } } es.delete_by_query(index='aggregated-*', body=delete_body) today = dtnow().strftime('%Y.%m.%d') index_today = f'aggregated-{today}' delete_elasticsearch_indexes_by_template(index_template='aggregated-*', es=es, exclude_indexes={index_today}) store.size = os.path.getsize(fpath) store.file = file_name store.save() _log.info(f'Dump ready in [{store.file.name}] with size of [{store.size}]') store.update_crc() def _rotate_incidents() -> None: """Rotate incidents by status RESOLVED and FALSE_ALARM. TODO: maybe need to combine with rotate_table """ settings = IncidentRotationSettings.get_solo() if settings.rotation_type == RotationType.SIZE: count_incidents = Incident.objects.filter(notification__isnull=True, status__in=[Incident.Status.RESOLVED, Incident.Status.FALSE_ALARM]).count() if count_incidents < settings.size_rotation: _log.info( f'No incident table rotation. The number of incidents ({count_incidents}) is less than required for ' f'rotation ({settings.size_rotation}) ' ) return # Need to dump data _log.info(f'Start {Incident._meta.db_table} rotation') admin = User.objects.filter(is_superuser=True).first() store = DataStorage(type=DataStorage.Type.DB_DUMP, format=DataStorage.Format.JSON, created=dtnow(), last_access=dtnow(), user=admin, size=0, file='name', description=gettext_lazy('Table rotation ') + Incident._meta.model_name) file_name = get_storage_path(store, f'{Incident._meta.model_name}_dump.json') fpath = os.path.join(constants.MEDIA_ROOT, file_name) if not os.path.exists(os.path.dirname(fpath)): os.makedirs(os.path.dirname(fpath), exist_ok=True) incident_for_delete = Incident.objects.filter(notification__isnull=True, status__in=[Incident.Status.RESOLVED, Incident.Status.FALSE_ALARM]) count = len(incident_for_delete) _log.debug(f'Will be removed {count} incidents') # dump and delete incidents data = DumpIncidentSerializer(incident_for_delete, many=True).data with open(fpath, 'w') as f: json_data = json.dumps(data) f.write(json_data) incident_for_delete.delete() store.size = os.path.getsize(fpath) store.file = file_name store.save() _log.info(f'Dump ready in [{store.file.name}] with size of [{store.size}]') store.update_crc() @shared_task def rotate_incidents(): _rotate_incidents() @shared_task def rotate_elasticsearch(): rotate_elasticsearch_events(EventRotationSettings)