from unittest.mock import patch import pytest from events.constants import ELK_CONNECT_ERROR_JSON from events.services.elk_string_search import ELKStringQuerySearchService, ELKIndexListService from events.tests.utils import mocked_elk_query_raise_elk_exception mocked_target_elk = 'events.services.elk_string_search.connect_to_elasticsearch_instance' @pytest.mark.unit class TestELKStringQuerySearchService: """Test service `sELKStringQuerySearchService`""" index = 'aggregated-2022.04.04' @pytest.mark.unit @patch(mocked_target_elk, side_effect=lambda *args, **kwargs: None) def test_create_elk_is_none(self, *args): """Test if create ELK instance is return None""" service = ELKStringQuerySearchService(index=self.index, query_params={}) data, status = service.data() assert status == 400 assert isinstance(data, dict) assert data == ELK_CONNECT_ERROR_JSON @pytest.mark.unit @patch(mocked_target_elk) @pytest.mark.parametrize('page_size, page, check_data', ( (10, 1, 0), (10, 2, 10), (10, 3, 20), (9, 1, 0), (9, 2, 9), (9, 3, 18), (1, 1, 0), (1, 2, 1), (1, 3, 2), )) def test_create_pagination(self, mock, page_size: int, page: int, check_data: int): query_params = {'page': page, 'page_size': page_size} service = ELKStringQuerySearchService(index=self.index, query_params=query_params) _from, size = service._pagination() assert _from == check_data assert size == page_size @pytest.mark.parametrize('ordering, check_data', ( ('event_timestamp', [{'event_timestamp': 'asc'}]), ('-event_timestamp', [{'event_timestamp': 'desc'}]), ('@created', [{'@created': 'asc'}]), ('-@created', [{'@created': 'desc'}]), ('event_severity', [{'event_severity': 'asc'}]), ('-event_severity', [{'event_severity': 'desc'}]), ('event_src_msg', [{'event_src_msg.keyword': 'asc'}]), ('-event_src_msg', [{'event_src_msg.keyword': 'desc'}]), ('sign_name', [{'sign_name.keyword': 'asc'}]), ('-sign_name', [{'sign_name.keyword': 'desc'}]), ('sign_category', [{'sign_category.keyword': 'asc'}]), ('-sign_category', [{'sign_category.keyword': 'desc'}]), ('source_ip', [{'source_ip.keyword': 'asc'}]), ('-source_ip', [{'source_ip.keyword': 'desc'}]), ('destination_ip', [{'destination_ip.keyword': 'asc'}]), ('-destination_ip', [{'destination_ip.keyword': 'desc'}]), )) @patch(mocked_target_elk) def test_drf_ordering_to_sort_elk(self, _, ordering: str, check_data: list): result = {'event_timestamp': 'asc'} query_params = {'page': 1, 'page_size': 10, 'ordering': ordering} service = ELKStringQuerySearchService(index=self.index, query_params=query_params) sort_data = service._sorting_data() assert sort_data == check_data class TestELKIndexListService: pattern = 'aggregated-*' @pytest.mark.unit @patch(mocked_target_elk) @pytest.mark.parametrize('index, pattern', ( ('', '*'), (pattern, pattern), ('aggregated-', pattern), ('*', '*'), )) def test_get_pattern(self, mock, index: str, pattern: str): query_params = {'index': index} service = ELKIndexListService(query_params) data = service._get_pattern() assert data == pattern @pytest.mark.unit @patch(mocked_target_elk) @pytest.mark.parametrize('pattern, indexes, check_data', ( (pattern, ['aggregated-2022.05.20', 'aggregated-2022.05.21'], [{'value': 'aggregated-2022.05.20', 'label': '2022.05.20'}, {'value': 'aggregated-2022.05.21', 'label': '2022.05.21'}] ), ('', ['aggregated-2022.05.20', 'aggregated-2022.05.21'], [{'value': 'aggregated-2022.05.20', 'label': 'aggregated-2022.05.20'}, {'value': 'aggregated-2022.05.21', 'label': 'aggregated-2022.05.21'}] ), (pattern, [], []), )) def test_get_data_mapping(self, mock, pattern: str, indexes: list, check_data: list): query_params = {'index': pattern} service = ELKIndexListService(query_params) data = service._data(indexes) assert data == check_data