old_console/devices/tests/test_endpoint_device_service.py
2024-11-02 14:12:45 +03:00

201 lines
8.5 KiB
Python

import glob
import os
import tempfile
from unittest.mock import patch
import pytest
from devices.enums import DeviceType, EndpointRotationType
from devices.models.endpoint_device import EndpointModel
from devices.services.endpoint.endpoint_services import EndpointManagementService, EndpointDownloadConfigService, \
EndpointUploadConfigService
TMP_DIR_VECTOR = tempfile.TemporaryDirectory()
@pytest.mark.django_db
@pytest.mark.unit
class TestEndpointManagementService:
"""Test Endpoint management Services method's """
@pytest.fixture(autouse=True)
def setup_test(self):
os.makedirs(TMP_DIR_VECTOR.name, exist_ok=True)
self.endpoint = EndpointModel.objects.create(
type=DeviceType.ENDPOINT, ip='127.0.0.1', port=5555,
)
yield
files = glob.glob(f'{TMP_DIR_VECTOR.name}/*')
for file in files:
os.remove(os.path.join(TMP_DIR_VECTOR.name, file))
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR.name)
def test_create(self):
"""Test Creation Endpoint with valid data and creation vector config."""
assert not len(os.listdir(TMP_DIR_VECTOR.name))
service = EndpointManagementService(self.endpoint)
service.create()
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
file_name = os.path.join(TMP_DIR_VECTOR.name, f'endpoint_{self.endpoint.pk}.toml')
assert os.path.exists(file_name)
with open(file_name, 'r') as file:
data = file.read()
assert f'0.0.0.0:{self.endpoint.port}' in data
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR.name)
def test_update(self):
"""Test Updating Endpoint with valid data and updating vector config"""
assert not len(os.listdir(TMP_DIR_VECTOR.name))
service = EndpointManagementService(self.endpoint)
service.create()
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
self.endpoint.port = 7777
self.endpoint.save()
service.update()
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
file_name = os.path.join(TMP_DIR_VECTOR.name, f'endpoint_{self.endpoint.pk}.toml')
assert os.path.exists(file_name)
with open(file_name, 'r') as file:
data = file.read()
assert '0.0.0.0:7777' in data
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR.name)
def test_destroy(self):
"""Test destroy endpoint data with delete vector config"""
assert not len(os.listdir(TMP_DIR_VECTOR.name))
service = EndpointManagementService(self.endpoint)
service.create()
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
service.destroy()
assert not len(os.listdir(TMP_DIR_VECTOR.name))
@pytest.mark.django_db
@pytest.mark.unit
class TestEndpointDownloadConfigService:
@pytest.fixture(autouse=True)
def setup_test(self):
self.endpoint = EndpointModel.objects.create(
type=DeviceType.ENDPOINT,
ip='127.0.0.1',
port=5555,
event_rotation_type=EndpointRotationType.SIZE,
)
def test_convert_endpoint_settings_to_dict(self):
service = EndpointDownloadConfigService(self.endpoint.pk)
data_dict = service.save_endpoint_settings_to_dict()
assert isinstance(data_dict, dict)
assert 'rotation' in data_dict
assert 'device_control' in data_dict
assert 'integrity_control' in data_dict
assert 'white_list' in data_dict
assert 'usb_control' in data_dict
assert 'antivirus' in data_dict
assert data_dict['rotation']['type'] == EndpointRotationType.SIZE.value
assert data_dict['rotation']['size'] == 100
@pytest.mark.django_db
@pytest.mark.utit
class TestEndpointUploadConfigService:
@pytest.fixture(autouse=True)
def setup_test(self):
self.endpoint = EndpointModel.objects.create(
type=DeviceType.ENDPOINT,
ip='127.0.0.1',
port=5555,
event_rotation_type=EndpointRotationType.SIZE,
device_control_enabled=True,
integrity_control_enabled=True,
scan_paths=[],
integrity_control_timeout=10,
whitelist_enabled=True,
whitelist_admin=True,
white_list_paths=[],
antivirus_enabled=True,
antivirus_paths=[],
antivirus_remove_infected_files=True,
)
self.raw_data = b'{"dc_enabled":false,"dc_apply":true,"prohibit_floppy_read":false,' \
b'"prohibit_floppy_write":false,"prohibit_cd_enabled":false,"prohibit_removable_read":false,' \
b'"prohibit_removable_write":false,"prohibit_tape_read":false,"prohibit_tape_write":false,' \
b'"prohibit_wpd_read":false,"prohibit_wpd_write":false,"ic_enabled":false,"scan_folders":[],' \
b'"ic_timeout":30,"rescan_enabled":false,"rescan_timeout":1000,"wl_enable":false,' \
b'"wl_appy":true,"wl_admin":false,"white_list":[' \
b'"%HKEY_LOCAL_MACHINE\\\\SOFTWARE\\\\Microsoft\\\\Windows ' \
b'NT\\\\CurrentVersion\\\\SystemRoot%",' \
b'"%HKEY_LOCAL_MACHINE\\\\SOFTWARE\\\\Microsoft\\\\Windows\\\\CurrentVersion' \
b'\\\\ProgramFilesDir%"],"gui_enabled":true,"gui_port":4509,"updated":"","ip":"",' \
b'"usb_control_enabled":false,"usb_allowed_storage":[],"usb_allowed_classes":[],' \
b'"usb_hid_allow_other_subclasses":false,"usb_hid_allowed_subclasses":[],' \
b'"usb_hid_deny_subclasses":[2,4,6],"usb_connected":[],"clamav_enabled":false,' \
b'"clamav_scan_on_add":false,"clamav_live_scan":false,"clamav_stop_all_tasks":false,' \
b'"clamav_paths":null,"clamav_remove_infected_files":false,"clamav_last_update":"09-01-2022",' \
b'"event_rotation_type":1,"event_rotation_size":100,"event_rotation_period":1,' \
b'"event_rotation_time":"12:57:01"}'
def test_preparation_data(self):
service = EndpointUploadConfigService(self.endpoint.pk, self.raw_data)
assert 'device_control_enabled' in service.data
assert not service.data['device_control_enabled']
assert 'integrity_control_enabled' in service.data
assert not service.data['integrity_control_enabled']
assert 'scan_paths' in service.data
assert service.data['scan_paths'] == []
assert 'integrity_control_timeout' in service.data
assert service.data['integrity_control_timeout'] == 30
assert 'whitelist_enabled' in service.data
assert not service.data['whitelist_enabled']
assert 'whitelist_admin' in service.data
assert not service.data['whitelist_admin']
assert 'white_list_paths' in service.data
'HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\SystemRoot%'
assert service.data['white_list_paths'] == [
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\SystemRoot%",
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\ProgramFilesDir%"]
assert 'antivirus_enabled' in service.data
assert not service.data['antivirus_enabled']
assert 'antivirus_paths' in service.data
assert service.data['antivirus_paths'] is None
assert 'antivirus_remove_infected_files' in service.data
assert not service.data['antivirus_remove_infected_files']
def test_valid_update_data(self):
service = EndpointUploadConfigService(self.endpoint.pk, self.raw_data)
result = service.upload()
assert result['status'] == 'ok'
endpoint = EndpointModel.objects.get(pk=self.endpoint.pk)
assert not endpoint.device_control_enabled
assert not endpoint.integrity_control_enabled
assert endpoint.scan_paths == []
assert endpoint.integrity_control_timeout == 30
assert not endpoint.whitelist_enabled
assert not endpoint.whitelist_admin
assert endpoint.white_list_paths == [
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\SystemRoot%",
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\ProgramFilesDir%"]
assert not endpoint.antivirus_enabled
assert endpoint.antivirus_paths is None
assert not endpoint.antivirus_remove_infected_files