old_console/devices/tests/test_endpoint_api.py
2024-11-02 14:12:45 +03:00

325 lines
16 KiB
Python

import glob
import os
from unittest.mock import patch
import pytest
from django.conf import settings
from django.urls import reverse
from devices.enums import DeviceType
from devices.models.device import DeviceGroup
from devices.models.endpoint_device import EndpointModel
from devices.tests.endpoint_utils import mock_vector_service_rise_exception, mock_redis_return_online, \
mock_endpoint_status_service_offline
from rest_framework import status
TMP_DIR_VECTOR = '/tmp/endpoint/vector'
TEST_FILES = os.path.join(settings.BASE_DIR, 'devices/tests/test_files/')
@pytest.mark.django_db
class TestEndpointAPI:
"""Endpoint api tests."""
@pytest.fixture(autouse=True)
def setup_tests(self, django_user_model):
DeviceGroup.objects.bulk_create({
DeviceGroup(id=1, name='Group 1'),
DeviceGroup(id=2, name='Group 2')
})
self.data = {
'name': 'test_endpoint',
'type': DeviceType.ENDPOINT,
'ip': '127.0.0.1',
'port': '5555',
'settings_changed': False,
'request_config': False,
'group': DeviceGroup.objects.first()
}
self.user = django_user_model.objects.get(username='admin')
os.makedirs(TMP_DIR_VECTOR, exist_ok=True)
yield
files = glob.glob(f'{TMP_DIR_VECTOR}/*')
for file in files:
os.remove(os.path.join(TMP_DIR_VECTOR, file))
@pytest.fixture
def create_antivirus_database(self, api_client):
api_client.force_authenticate(self.user)
file_path = os.path.join(TEST_FILES, 'antivirus_update.zip')
file = open(file_path, 'rb')
url = reverse('store-antivirus')
data = {'file': file}
api_client.post(url, data)
def test_list_api(self, api_client):
"""Test to check lis api url"""
EndpointModel.objects.create(**self.data)
endpoint_count = EndpointModel.objects.count()
api_client.force_authenticate(self.user)
response = api_client.get(reverse('endpoint_api-list'))
assert response.status_code == 200
assert response.json()['count'] == endpoint_count
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
def test_api_create_valid(self, _, api_client):
"""Test creation endpoint api url."""
assert not EndpointModel.objects.exists()
api_client.force_authenticate(self.user)
data = self.data.copy()
data['group'] = 1
data.pop('settings_changed')
response = api_client.post(reverse('endpoint_api-list'), data=data, format='json')
assert response.status_code == 201
response_data = response.json()
assert response_data['status'] == {'status': 'online'}
assert EndpointModel.objects.count() == 1
endpoint = EndpointModel.objects.last()
assert endpoint.settings_changed
assert endpoint.antivirus_update_db
@pytest.mark.skip('temporarily disabled to deal with transactions')
@patch('devices.services.endpoint.endpoint_services.VectorService', side_effect=mock_vector_service_rise_exception)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
def test_api_create_rais_err(self, vector_mok, redis_mock, api_client):
"""Test atomic create endpoint. After exception cannot create endpoint"""
assert not EndpointModel.objects.exists()
api_client.force_authenticate(self.user)
data = self.data.copy()
data['group'] = 1
response = api_client.post(reverse('endpoint_api-list'), data=data, format='json')
assert response.status_code == 400
response_data = response.json()
assert response_data['detail'] == 'Update config test mock exception'
assert not EndpointModel.objects.exists()
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
def test_api_update_valid(self, _, api_client):
"""Test updating endpoint url."""
assert not EndpointModel.objects.exists()
data = self.data.copy()
endpoint = EndpointModel.objects.create(**data)
api_client.force_authenticate(self.user)
data = self.data.copy()
data['port'] = '7777'
data['group'] = 1
response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json')
assert response.status_code == 200
response_data = response.json()
assert response_data['status'] == {'status': 'online'}
assert EndpointModel.objects.count() == 1
endpoint_after = EndpointModel.objects.get(pk=endpoint.pk)
assert endpoint_after.port == 7777
assert endpoint_after.settings_changed
@pytest.mark.skip('temporarily disabled to deal with transactions')
@patch('devices.services.endpoint.endpoint_services.VectorService', side_effect=mock_vector_service_rise_exception)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
def test_api_update_rais_err(self, vector_mock, redis_mock, api_client):
"""Test atomic update endpoint. After raise exception cannot create endpoint."""
assert not EndpointModel.objects.exists()
endpoint = EndpointModel.objects.create(**self.data)
api_client.force_authenticate(self.user)
data = self.data.copy()
data['group'] = 1
data['port'] = '7777'
response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json')
assert response.status_code == 400
response_data = response.json()
assert response_data['detail'] == 'Update config test mock exception'
assert EndpointModel.objects.count() == 1
endpoint_after = EndpointModel.objects.get(pk=endpoint.pk)
assert endpoint_after.port == 5555
assert not endpoint_after.settings_changed
@pytest.mark.unit
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
def test_updating_ie_null_group(self, _, api_client):
api_client.force_authenticate(self.user)
device_group = DeviceGroup.objects.first()
data = self.data.copy()
data['group'] = device_group
endpoint = EndpointModel.objects.create(**data)
url = reverse('endpoint_api-detail', args=[endpoint.id])
data['group'] = ''
response = api_client.patch(url, data=data)
assert response.status_code == status.HTTP_200_OK
assert response.data['group'] is None
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
@pytest.mark.unit
def test_api_scan_paths_duplicates(self, _, api_client):
"""Test updating endpoint scan paths duplicate check."""
assert not EndpointModel.objects.exists()
data = self.data.copy()
endpoint = EndpointModel.objects.create(**data)
api_client.force_authenticate(self.user)
data['group'] = DeviceGroup.objects.first().pk
data['scan_paths'] = ['C:\\1\\1.txt', 'C:\\1\\1.txt']
response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json')
assert response.status_code == 400
assert 'scan_paths' in response.data
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR)
def test_api_destroy_valid(self, api_client):
"""Test delete endpoint api"""
assert not EndpointModel.objects.exists()
endpoint = EndpointModel.objects.create(**self.data)
api_client.force_authenticate(self.user)
response = api_client.delete(reverse('endpoint_api-detail', args=[endpoint.pk]))
assert response.status_code == 204
@patch('devices.services.endpoint.endpoint_services.RedisInterface')
def test_keepalive_api(self, _, api_client):
"""Test for returning data in keepalive api. Test error and ok statuses."""
endpoint = EndpointModel.objects.create(**self.data)
url = reverse('endpoint_api-keepalive', args=[endpoint.pk])
not_valid_url = reverse('endpoint_api-keepalive', args=[0])
api_client.force_authenticate(self.user)
response = api_client.get(not_valid_url, data={})
assert response.status_code == 200
assert response.json() == {'status': 'error', 'reason': 'no such endpoint record'}
response = api_client.get(url, data={})
assert response.status_code == 200
assert response.json() == {'status': 'error', 'error_message': 'json decode error'}
response = api_client.post(url, data={'status': 'ok'}, format='json')
assert response.status_code == 200
assert response.json() == {'status': 'ok'}
def test_download_config_format_json(self, api_client):
"""Test download config data api format=json. And check status `OK`"""
endpoint = EndpointModel.objects.create(**self.data)
url = reverse('endpoint_api-download', args=[endpoint.pk])
api_client.force_authenticate(self.user)
response = api_client.get(url)
assert response.status_code == 200
data = response.json()
assert 'status' in data
assert data['status'] == 'ok'
assert 'config' in data
def test_download_config_format_api(self, api_client):
"""Test download config file with format=api."""
endpoint = EndpointModel.objects.create(**self.data)
url = reverse('endpoint_api-download', args=[endpoint.pk]) + '?format=api'
api_client.force_authenticate(self.user)
response = api_client.get(url)
assert response.status_code == 200
assert response.headers['Content-Type'] == 'application/file'
assert response.headers['Content-Disposition'] == f'attachment; filename="endpoint_config_{endpoint.pk}.json"'
assert isinstance(response.content, bytes)
def test_download_config_404(self, api_client):
"""Test download endpoint config with no-exists id."""
url = reverse('endpoint_api-download', args=[0])
api_client.force_authenticate(self.user)
response = api_client.get(url)
assert response.status_code == 404
def test_upload_valid_config(self, api_client):
"""Test upload valid config data from endpoint."""
endpoint = EndpointModel.objects.create(**self.data)
url = reverse('endpoint_api-upload', args=[endpoint.pk])
api_client.force_authenticate(self.user)
data = {
'wl_enable': True,
'antivirus_enabled': True,
'scan_paths': ['path1', 'path2'],
'integrity_control_timeout': '123'
}
response = api_client.post(url, data=data, format='json')
assert response.status_code == 200
assert response.json() == {'status': 'ok'}
def test_upload_not_valid_config(self, api_client):
"""Test upload not valid endpoint config and return error status"""
endpoint = EndpointModel.objects.create(**self.data)
url = reverse('endpoint_api-upload', args=[endpoint.pk])
api_client.force_authenticate(self.user)
response = api_client.post(url, {})
assert response.status_code == 200
assert response.json() == {'status': 'error', 'error_message': 'json decode error'}
def test_upload_config_to_not_exist_endpoint(self, api_client):
"""Test return status with non exist endpoint ID."""
url = reverse('endpoint_api-upload', args=[0])
api_client.force_authenticate(self.user)
response = api_client.post(url, data={})
assert response.status_code == 200
assert response.json() == {'status': 'error', 'reason': 'no such endpoint record'}
@patch('devices.services.endpoint.endpoint_services.EndpointStatusService')
def test_endpoint_config_request_valid(self, _, api_client):
"""Test set request config in True when endpoint status `online`."""
endpoint = EndpointModel.objects.create(**self.data)
url = reverse('endpoint_api-config-request', args=[endpoint.pk])
api_client.force_authenticate(self.user)
response = api_client.get(url)
assert response.status_code == 200
assert response.json() == {'status': 'ok'}
endpoint_after = EndpointModel.objects.get(pk=endpoint.pk)
assert endpoint_after.request_config
def test_endpoint_config_request_404(self, api_client):
"""Test raise status 404 if endpoint not-exists"""
url = reverse('endpoint_api-config-request', args=[0])
api_client.force_authenticate(self.user)
response = api_client.get(url)
assert response.status_code == 404
@patch('devices.services.endpoint.endpoint_services.EndpointStatusService',
side_effect=mock_endpoint_status_service_offline)
def test_endpoint_config_request_not_valid(self, _, api_client):
"""Test raise 400 error if endpoint is offline"""
endpoint = EndpointModel.objects.create(**self.data)
url = reverse('endpoint_api-config-request', args=[endpoint.pk])
api_client.force_authenticate(self.user)
response = api_client.get(url)
assert response.status_code == 400
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
def test_api_change_group(self, _, api_client):
"""Test change endpoint group api url."""
data = self.data.copy()
endpoint = EndpointModel.objects.create(**data)
api_client.force_authenticate(self.user)
data['group'] = DeviceGroup.objects.last().pk
response = api_client.patch(reverse('endpoint_api-detail', args=[endpoint.pk]), data=data, format='json')
assert response.status_code == 200
endpoint = EndpointModel.objects.last()
assert endpoint.group == DeviceGroup.objects.last()
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR)
@patch('devices.services.endpoint.endpoint_get_status.RedisInterface', side_effect=mock_redis_return_online)
@pytest.mark.unit
def test_api_antivirus_get_update_if_available(self, _, api_client, create_antivirus_database):
"""Test Endpoint antivirus update required"""
assert not EndpointModel.objects.exists()
data = self.data.copy()
data['antivirus_update_db'] = 'True'
endpoint = EndpointModel.objects.create(**data)
api_client.force_authenticate(self.user)
url = reverse('endpoint_api-antivirus-update', kwargs={'pk': endpoint.pk})
response = api_client.get(url)
assert response.status_code == 200
assert EndpointModel.objects.get(pk=endpoint.pk).antivirus_update_db is False
@pytest.mark.unit
def test_api_antivirus_get_update_if_not_available(self, api_client):
"""Test Endpoint antivirus no update required"""
assert not EndpointModel.objects.exists()
data = self.data.copy()
data['antivirus_update_db'] = 'False'
endpoint = EndpointModel.objects.create(**data)
api_client.force_authenticate(self.user)
url = reverse('endpoint_api-antivirus-update', kwargs={'pk': endpoint.pk})
response = api_client.get(url)
assert response.status_code == 400