import logging from typing import Tuple, Dict, Any from elasticsearch import ElasticsearchException, Elasticsearch from events.constants import ELK_HOST, ELK_PORT, ELK_LOGIN, ELK_PASS, ELK_CONNECT_ERROR_JSON, ERROR_STATUS _log = logging.getLogger(__name__) def connect_to_elasticsearch_instance(elk_host, elk_port, elk_login, elk_password): """ Function for connecting to elasticsearch instance with provided credentials :param elk_host: elasticsearch host name :param elk_port: elasticsearch port number :param elk_login: elasticsearch login :param elk_password: elasticsearch password :return: if connection established returns instance of Elasticsearch. Otherwise - throws an error, write it to log and generate JSON response with err status and message of an error """ try: es = Elasticsearch([{'host': elk_host, 'port': elk_port}], http_auth=(elk_login, elk_password)) return es except ElasticsearchException as err: _log.exception(f'Following error occurred when trying to perform actions with elasticsearch: {err}') class ELKStringQuerySearchService: """Service for searching in ELK by index""" def __init__(self, index: str, query_params: Dict) -> None: _log.debug(f'Start search by index: {index}; and query: {query_params}') self.index = index self.query_params = query_params self.es = connect_to_elasticsearch_instance(ELK_HOST, ELK_PORT, ELK_LOGIN, ELK_PASS) def _pagination(self) -> tuple: page = int(self.query_params.get('page', 1)) size = int(self.query_params.get('page_size', 10)) if page <= 0: page = 1 if size <= 0: size = 10 _form = (page - 1) * size return _form, size def _sorting_data(self) -> list: """Mapping DRF ordering to ELK.""" default_sort_field = {'event_timestamp', '@created', 'event_severity'} ordering_field = self.query_params['ordering'] asc_desc_map = {'-': 'desc'} order = asc_desc_map.get(ordering_field[0], 'asc') if order == 'desc': ordering_field = ordering_field[1:] if ordering_field not in default_sort_field: ordering_field = f'{ordering_field}.keyword' ordering = [{ordering_field: order}] return ordering def create_search_body(self) -> Dict: search_body = {} try: _from, size = self._pagination() except ValueError: _from, size = 0, 10 search_body.update({'from': _from, 'size': size, 'track_total_hits': 'true'}) if 'ordering' in self.query_params and self.query_params['ordering'] != '': sort_data = self._sorting_data() search_body['sort'] = sort_data else: search_body['sort'] = [{'@created': 'desc'}] if 'q' in self.query_params: event_filters_array = ['event_first:', 'event_last:', 'event_count:', 'event_timestamp:', 'event_severity:', 'event_src_msg:', 'event_protocol:', 'device_vendor:', 'device_product:', 'device_version:', 'device_action:', 'sign_id:', 'sign_category:', 'sign_subcategory:', 'sign_name:', 'source_ip:', 'source_mac', 'source_host:', 'source_port:', 'source_user:', 'destination_host:', 'destination_ip:', 'destination_port:', 'destination_user:'] default_field = True text_request = self.query_params['q'].replace(' ', '') for event_filter in event_filters_array: if event_filter in text_request: default_field = False break search_body['query'] = { 'query_string': { 'query': self.query_params['q'] } } if default_field: search_body['query']['query_string']['default_field'] = '' return search_body def data(self) -> Tuple[Dict[str, Any], int]: """return elastic data and response status. 200 if ok 400 if bad""" if not self.es: return ELK_CONNECT_ERROR_JSON, 400 search_body = self.create_search_body() if not search_body: return {'status': ERROR_STATUS, 'error_message': 'Invalid search request body'}, 400 try: _log.info(f'Sending query: {search_body}') return self.es.search(index=self.index, body=search_body), 200 except ElasticsearchException as err: error = ''.join(map(str, err.args)) if 'No mapping found for [@created] in order to sort on' in error: return {"result": "No data found"}, 200 return {'status': ERROR_STATUS, 'error_message': err.args}, 400 class ELKIndexListService: """"Service for getting all index or index by pattern""" def __init__(self, query_params: dict): _log.debug(f'Start search indexes') self.query_params = query_params self.es = connect_to_elasticsearch_instance(ELK_HOST, ELK_PORT, ELK_LOGIN, ELK_PASS) self._pattern = self._get_pattern() def _get_pattern(self) -> str: pattern = '' if self.query_params: pattern = self.query_params.get('index', '*') if not pattern.endswith('*'): pattern += '*' return pattern def _data(self, indexes: list) -> list: """Return data mapping by patten""" data = [{'value': index, 'label': str(index).replace(self._pattern[:-1], '')} for index in indexes] return data def data(self) -> tuple: if not self.es: _log.error(f'Get indexes: {ELK_CONNECT_ERROR_JSON} ') return ELK_CONNECT_ERROR_JSON, 400 try: indexes = self.es.indices.get(self._pattern).keys() except ElasticsearchException as err: _log.error(f'Get indexes: {err}') return {'status': ERROR_STATUS, 'error_message': err}, 400 data = self._data(indexes) return data, 200