import glob import os from unittest.mock import patch import pytest from django.conf import settings from django.urls import reverse from devices.enums import DeviceType from devices.models.device import DeviceGroup from devices.models.endpoint_device import EndpointModel from devices.tests.endpoint_utils import mock_vector_service_rise_exception, mock_redis_return_online, \ mock_endpoint_status_service_offline from rest_framework import status TMP_DIR_VECTOR = '/tmp/endpoint/vector' TEST_FILES = os.path.join(settings.BASE_DIR, 'devices/tests/test_files/') @pytest.mark.django_db class TestEndpointAPI: """Endpoint api tests.""" @pytest.fixture(autouse=True) def setup_tests(self, django_user_model): DeviceGroup.objects.bulk_create({ DeviceGroup(id=1, name='Group 1'), DeviceGroup(id=2, name='Group 2') }) self.data = { 'name': 'test_endpoint', 'type': DeviceType.ENDPOINT, 'ip': '127.0.0.1', 'port': '5555', 'settings_changed': False, 'request_config': False, 'group': DeviceGroup.objects.first() } self.user = django_user_model.objects.get(username='admin') os.makedirs(TMP_DIR_VECTOR, exist_ok=True) yield files = glob.glob(f'{TMP_DIR_VECTOR}/*') for file in files: os.remove(os.path.join(TMP_DIR_VECTOR, file)) @pytest.fixture def create_antivirus_database(self, api_client): api_client.force_authenticate(self.user) file_path = os.path.join(TEST_FILES, 'antivirus_update.zip') file = open(file_path, 'rb') url = reverse('store-antivirus') data = {'file': file} api_client.post(url, data) def test_list_api(self, api_client): """Test to check lis api url""" EndpointModel.objects.create(**self.data) endpoint_count = EndpointModel.objects.count() api_client.force_authenticate(self.user) response = api_client.get(reverse('endpoint_api-list')) assert response.status_code == 200 assert response.json()['count'] == endpoint_count @patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) def test_api_create_valid(self, _, api_client): """Test creation endpoint api url.""" assert not EndpointModel.objects.exists() api_client.force_authenticate(self.user) data = self.data.copy() data['group'] = 1 data.pop('settings_changed') response = api_client.post(reverse('endpoint_api-list'), data=data, format='json') assert response.status_code == 201 response_data = response.json() assert response_data['status'] == {'status': 'online'} assert EndpointModel.objects.count() == 1 endpoint = EndpointModel.objects.last() assert endpoint.settings_changed assert endpoint.antivirus_update_db @pytest.mark.skip('temporarily disabled to deal with transactions') @patch('devices.services.endpoint.endpoint_services.VectorService', side_effect=mock_vector_service_rise_exception) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) def test_api_create_rais_err(self, vector_mok, redis_mock, api_client): """Test atomic create endpoint. After exception cannot create endpoint""" assert not EndpointModel.objects.exists() api_client.force_authenticate(self.user) data = self.data.copy() data['group'] = 1 response = api_client.post(reverse('endpoint_api-list'), data=data, format='json') assert response.status_code == 400 response_data = response.json() assert response_data['detail'] == 'Update config test mock exception' assert not EndpointModel.objects.exists() @patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) def test_api_update_valid(self, _, api_client): """Test updating endpoint url.""" assert not EndpointModel.objects.exists() data = self.data.copy() endpoint = EndpointModel.objects.create(**data) api_client.force_authenticate(self.user) data = self.data.copy() data['port'] = '7777' data['group'] = 1 response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json') assert response.status_code == 200 response_data = response.json() assert response_data['status'] == {'status': 'online'} assert EndpointModel.objects.count() == 1 endpoint_after = EndpointModel.objects.get(pk=endpoint.pk) assert endpoint_after.port == 7777 assert endpoint_after.settings_changed @pytest.mark.skip('temporarily disabled to deal with transactions') @patch('devices.services.endpoint.endpoint_services.VectorService', side_effect=mock_vector_service_rise_exception) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) def test_api_update_rais_err(self, vector_mock, redis_mock, api_client): """Test atomic update endpoint. After raise exception cannot create endpoint.""" assert not EndpointModel.objects.exists() endpoint = EndpointModel.objects.create(**self.data) api_client.force_authenticate(self.user) data = self.data.copy() data['group'] = 1 data['port'] = '7777' response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json') assert response.status_code == 400 response_data = response.json() assert response_data['detail'] == 'Update config test mock exception' assert EndpointModel.objects.count() == 1 endpoint_after = EndpointModel.objects.get(pk=endpoint.pk) assert endpoint_after.port == 5555 assert not endpoint_after.settings_changed @pytest.mark.unit @patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) def test_updating_ie_null_group(self, _, api_client): api_client.force_authenticate(self.user) device_group = DeviceGroup.objects.first() data = self.data.copy() data['group'] = device_group endpoint = EndpointModel.objects.create(**data) url = reverse('endpoint_api-detail', args=[endpoint.id]) data['group'] = '' response = api_client.patch(url, data=data) assert response.status_code == status.HTTP_200_OK assert response.data['group'] is None @patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) @pytest.mark.unit def test_api_scan_paths_duplicates(self, _, api_client): """Test updating endpoint scan paths duplicate check.""" assert not EndpointModel.objects.exists() data = self.data.copy() endpoint = EndpointModel.objects.create(**data) api_client.force_authenticate(self.user) data['group'] = DeviceGroup.objects.first().pk data['scan_paths'] = ['C:\\1\\1.txt', 'C:\\1\\1.txt'] response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json') assert response.status_code == 400 assert 'scan_paths' in response.data @patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR) def test_api_destroy_valid(self, api_client): """Test delete endpoint api""" assert not EndpointModel.objects.exists() endpoint = EndpointModel.objects.create(**self.data) api_client.force_authenticate(self.user) response = api_client.delete(reverse('endpoint_api-detail', args=[endpoint.pk])) assert response.status_code == 204 @patch('devices.services.endpoint.endpoint_services.RedisInterface') def test_keepalive_api(self, _, api_client): """Test for returning data in keepalive api. Test error and ok statuses.""" endpoint = EndpointModel.objects.create(**self.data) url = reverse('endpoint_api-keepalive', args=[endpoint.pk]) not_valid_url = reverse('endpoint_api-keepalive', args=[0]) api_client.force_authenticate(self.user) response = api_client.get(not_valid_url, data={}) assert response.status_code == 200 assert response.json() == {'status': 'error', 'reason': 'no such endpoint record'} response = api_client.get(url, data={}) assert response.status_code == 200 assert response.json() == {'status': 'error', 'error_message': 'json decode error'} response = api_client.post(url, data={'status': 'ok'}, format='json') assert response.status_code == 200 assert response.json() == {'status': 'ok'} def test_download_config_format_json(self, api_client): """Test download config data api format=json. And check status `OK`""" endpoint = EndpointModel.objects.create(**self.data) url = reverse('endpoint_api-download', args=[endpoint.pk]) api_client.force_authenticate(self.user) response = api_client.get(url) assert response.status_code == 200 data = response.json() assert 'status' in data assert data['status'] == 'ok' assert 'config' in data def test_download_config_format_api(self, api_client): """Test download config file with format=api.""" endpoint = EndpointModel.objects.create(**self.data) url = reverse('endpoint_api-download', args=[endpoint.pk]) + '?format=api' api_client.force_authenticate(self.user) response = api_client.get(url) assert response.status_code == 200 assert response.headers['Content-Type'] == 'application/file' assert response.headers['Content-Disposition'] == f'attachment; filename="endpoint_config_{endpoint.pk}.json"' assert isinstance(response.content, bytes) def test_download_config_404(self, api_client): """Test download endpoint config with no-exists id.""" url = reverse('endpoint_api-download', args=[0]) api_client.force_authenticate(self.user) response = api_client.get(url) assert response.status_code == 404 def test_upload_valid_config(self, api_client): """Test upload valid config data from endpoint.""" endpoint = EndpointModel.objects.create(**self.data) url = reverse('endpoint_api-upload', args=[endpoint.pk]) api_client.force_authenticate(self.user) data = { 'wl_enable': True, 'antivirus_enabled': True, 'scan_paths': ['path1', 'path2'], 'integrity_control_timeout': '123' } response = api_client.post(url, data=data, format='json') assert response.status_code == 200 assert response.json() == {'status': 'ok'} def test_upload_not_valid_config(self, api_client): """Test upload not valid endpoint config and return error status""" endpoint = EndpointModel.objects.create(**self.data) url = reverse('endpoint_api-upload', args=[endpoint.pk]) api_client.force_authenticate(self.user) response = api_client.post(url, {}) assert response.status_code == 200 assert response.json() == {'status': 'error', 'error_message': 'json decode error'} def test_upload_config_to_not_exist_endpoint(self, api_client): """Test return status with non exist endpoint ID.""" url = reverse('endpoint_api-upload', args=[0]) api_client.force_authenticate(self.user) response = api_client.post(url, data={}) assert response.status_code == 200 assert response.json() == {'status': 'error', 'reason': 'no such endpoint record'} @patch('devices.services.endpoint.endpoint_services.EndpointStatusService') def test_endpoint_config_request_valid(self, _, api_client): """Test set request config in True when endpoint status `online`.""" endpoint = EndpointModel.objects.create(**self.data) url = reverse('endpoint_api-config-request', args=[endpoint.pk]) api_client.force_authenticate(self.user) response = api_client.get(url) assert response.status_code == 200 assert response.json() == {'status': 'ok'} endpoint_after = EndpointModel.objects.get(pk=endpoint.pk) assert endpoint_after.request_config def test_endpoint_config_request_404(self, api_client): """Test raise status 404 if endpoint not-exists""" url = reverse('endpoint_api-config-request', args=[0]) api_client.force_authenticate(self.user) response = api_client.get(url) assert response.status_code == 404 @patch('devices.services.endpoint.endpoint_services.EndpointStatusService', side_effect=mock_endpoint_status_service_offline) def test_endpoint_config_request_not_valid(self, _, api_client): """Test raise 400 error if endpoint is offline""" endpoint = EndpointModel.objects.create(**self.data) url = reverse('endpoint_api-config-request', args=[endpoint.pk]) api_client.force_authenticate(self.user) response = api_client.get(url) assert response.status_code == 400 @patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) def test_api_change_group(self, _, api_client): """Test change endpoint group api url.""" data = self.data.copy() endpoint = EndpointModel.objects.create(**data) api_client.force_authenticate(self.user) data['group'] = DeviceGroup.objects.last().pk response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json') assert response.status_code == 200 endpoint = EndpointModel.objects.last() assert endpoint.group == DeviceGroup.objects.last() @patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR) @patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online) @pytest.mark.unit def test_api_antivirus_get_update_if_available(self, _, api_client, create_antivirus_database): """Test Endpoint antivirus update required""" assert not EndpointModel.objects.exists() data = self.data.copy() data['antivirus_update_db'] = 'True' endpoint = EndpointModel.objects.create(**data) api_client.force_authenticate(self.user) url = reverse('endpoint_api-antivirus-update', kwargs={'pk': endpoint.pk}) response = api_client.get(url) assert response.status_code == 200 assert EndpointModel.objects.get(pk=endpoint.pk).antivirus_update_db is False @pytest.mark.unit def test_api_antivirus_get_update_if_not_available(self, api_client): """Test Endpoint antivirus no update required""" assert not EndpointModel.objects.exists() data = self.data.copy() data['antivirus_update_db'] = 'False' endpoint = EndpointModel.objects.create(**data) api_client.force_authenticate(self.user) url = reverse('endpoint_api-antivirus-update', kwargs={'pk': endpoint.pk}) response = api_client.get(url) assert response.status_code == 400