from elasticsearch import ElasticsearchException def mocked_elk_query_valid(*args, **kwargs): """Mock ELK class with valid data for elk search.""" class MockELK: def search(self, *args, **kwargs) -> dict: return { 'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': { 'total': {'value': 31, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{ '_index': 'aggregated-2022.03.30', '_type': '_doc', '_id': '3658582492_8e24a7cb344bcb40451894acef8145a25d56016d18beb35948b58cbb5f84b12b', '_score': 1.0, '_source': { 'device_version': '3.5', 'sign_subcategory': 'Auth', 'device_product': 'Industrial Firerwall', 'event_src_msg': '<14>CEF:0|InfoWatch ARMA|ARMAIF|3.5|lighttpdaccess|Lighttpd Access|8|rt=1648644419531 deviceFacility=lighttpd dvcpid=79894 src=192.168.2.106 dst=192.168.2.1 requestMethod=GET request=/widgets/api/get.php?load\\=system%2Cgateway%2Cinterfaces&_\\=1582284700985 app=HTTP/1.1 cs1=200 cs2=2425 cs1Label=responseCode cs2Label=bodyLength requestContext=http://192.168.2.1/index.php requestClientApplication=Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0 __line=Feb 21 11:34:33 arma lighttpd[79894]: 192.168.2.106 192.168.2.1 - [21/Feb/2020:11:34:33 +0000] "GET /widgets/api/get.php?load\\=system%2Cgateway%2Cinterfaces&_\\=1582284700985 HTTP/1.1" 200 2425 "http://192.168.2.1/index.php" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0"', 'device_action': '', 'destination_port': 0, 'destination_user': '', 'event_count': 1, 'event_severity': 8, 'aggregated_id': '3658582492_8e24a7cb344bcb40451894acef8145a25d56016d18beb35948b58cbb5f84b12b', 'Index': 'arma-2022.03.30', 'type': 'armaif_2', 'event_timestamp': '2022-03-30T09:46:59.54129517Z', 'source_ip': '192.168.2.106', 'sign_name': 'Lighttpd Access', 'destination_ip': '192.168.2.1', 'source_port': 0, 'event_first': '2022-03-30T09:46:59.54129517Z', 'source_user': '', 'source_host': '', 'Updated': 0, 'sign_id': 'lighttpdaccess', 'event_protocol': '', 'destination_host': '', 'source_mac': '', '@created': '2022-03-30T09:47:52.414485952Z', 'rule_tags': ['2'], 'event_last': '2022-03-30T09:46:59.54129517Z', '@timestamp': '2022-03-30T09:46:59.53439731Z', 'event_id': '6b1c34f6-bb8e-43d5-8bf9-39fcbfd45e09', 'sign_category': 'HTTP', 'event_hash': '8e24a7cb344bcb40451894acef8145a25d56016d18beb35948b58cbb5f84b12b', 'celery_done': True, 'device_vendor': 'InfoWatch ARMA'} }] } } return MockELK() def mocked_elk_query_raise_elk_exception(*args, **kwargs): """Mock ELK with rais `ElasticsearchException` exception""" class MockELK: def search(self, *args, **kwargs): raise ElasticsearchException('Test exception') return MockELK() def mock_elk_indexes_valid(*args, **kwargs): """Mock ELK callable indices method""" class MockELKIndexes: def get(self, *args) -> dict: return {'aggregated-2022.05.20': 'aggregated-2022.05.20', 'aggregated-2022.05.21': 'aggregated-2022.05.21'} class MockELK: indices = MockELKIndexes() return MockELK()