201 lines
8.5 KiB
Python
201 lines
8.5 KiB
Python
import glob
|
|
import os
|
|
import tempfile
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from devices.enums import DeviceType, EndpointRotationType
|
|
from devices.models.endpoint_device import EndpointModel
|
|
from devices.services.endpoint.endpoint_services import EndpointManagementService, EndpointDownloadConfigService, \
|
|
EndpointUploadConfigService
|
|
|
|
TMP_DIR_VECTOR = tempfile.TemporaryDirectory()
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@pytest.mark.unit
|
|
class TestEndpointManagementService:
|
|
"""Test Endpoint management Services method's """
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self):
|
|
os.makedirs(TMP_DIR_VECTOR.name, exist_ok=True)
|
|
self.endpoint = EndpointModel.objects.create(
|
|
type=DeviceType.ENDPOINT, ip='127.0.0.1', port=5555,
|
|
)
|
|
yield
|
|
files = glob.glob(f'{TMP_DIR_VECTOR.name}/*')
|
|
for file in files:
|
|
os.remove(os.path.join(TMP_DIR_VECTOR.name, file))
|
|
|
|
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR.name)
|
|
def test_create(self):
|
|
"""Test Creation Endpoint with valid data and creation vector config."""
|
|
assert not len(os.listdir(TMP_DIR_VECTOR.name))
|
|
service = EndpointManagementService(self.endpoint)
|
|
service.create()
|
|
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
|
|
file_name = os.path.join(TMP_DIR_VECTOR.name, f'endpoint_{self.endpoint.pk}.toml')
|
|
assert os.path.exists(file_name)
|
|
with open(file_name, 'r') as file:
|
|
data = file.read()
|
|
assert f'0.0.0.0:{self.endpoint.port}' in data
|
|
|
|
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR.name)
|
|
def test_update(self):
|
|
"""Test Updating Endpoint with valid data and updating vector config"""
|
|
assert not len(os.listdir(TMP_DIR_VECTOR.name))
|
|
service = EndpointManagementService(self.endpoint)
|
|
service.create()
|
|
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
|
|
self.endpoint.port = 7777
|
|
self.endpoint.save()
|
|
service.update()
|
|
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
|
|
file_name = os.path.join(TMP_DIR_VECTOR.name, f'endpoint_{self.endpoint.pk}.toml')
|
|
assert os.path.exists(file_name)
|
|
with open(file_name, 'r') as file:
|
|
data = file.read()
|
|
assert '0.0.0.0:7777' in data
|
|
|
|
@patch('devices.services.vector.VECTOR_CONFIG_DIR', TMP_DIR_VECTOR.name)
|
|
def test_destroy(self):
|
|
"""Test destroy endpoint data with delete vector config"""
|
|
assert not len(os.listdir(TMP_DIR_VECTOR.name))
|
|
service = EndpointManagementService(self.endpoint)
|
|
service.create()
|
|
assert len(os.listdir(TMP_DIR_VECTOR.name)) == 1
|
|
service.destroy()
|
|
assert not len(os.listdir(TMP_DIR_VECTOR.name))
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@pytest.mark.unit
|
|
class TestEndpointDownloadConfigService:
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self):
|
|
self.endpoint = EndpointModel.objects.create(
|
|
type=DeviceType.ENDPOINT,
|
|
ip='127.0.0.1',
|
|
port=5555,
|
|
event_rotation_type=EndpointRotationType.SIZE,
|
|
)
|
|
|
|
def test_convert_endpoint_settings_to_dict(self):
|
|
service = EndpointDownloadConfigService(self.endpoint.pk)
|
|
|
|
data_dict = service.save_endpoint_settings_to_dict()
|
|
assert isinstance(data_dict, dict)
|
|
assert 'rotation' in data_dict
|
|
assert 'device_control' in data_dict
|
|
assert 'integrity_control' in data_dict
|
|
assert 'white_list' in data_dict
|
|
assert 'usb_control' in data_dict
|
|
assert 'antivirus' in data_dict
|
|
|
|
assert data_dict['rotation']['type'] == EndpointRotationType.SIZE.value
|
|
assert data_dict['rotation']['size'] == 100
|
|
|
|
|
|
@pytest.mark.django_db
|
|
@pytest.mark.utit
|
|
class TestEndpointUploadConfigService:
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self):
|
|
self.endpoint = EndpointModel.objects.create(
|
|
type=DeviceType.ENDPOINT,
|
|
ip='127.0.0.1',
|
|
port=5555,
|
|
event_rotation_type=EndpointRotationType.SIZE,
|
|
|
|
device_control_enabled=True,
|
|
|
|
integrity_control_enabled=True,
|
|
scan_paths=[],
|
|
integrity_control_timeout=10,
|
|
|
|
whitelist_enabled=True,
|
|
whitelist_admin=True,
|
|
white_list_paths=[],
|
|
|
|
antivirus_enabled=True,
|
|
antivirus_paths=[],
|
|
antivirus_remove_infected_files=True,
|
|
)
|
|
|
|
self.raw_data = b'{"dc_enabled":false,"dc_apply":true,"prohibit_floppy_read":false,' \
|
|
b'"prohibit_floppy_write":false,"prohibit_cd_enabled":false,"prohibit_removable_read":false,' \
|
|
b'"prohibit_removable_write":false,"prohibit_tape_read":false,"prohibit_tape_write":false,' \
|
|
b'"prohibit_wpd_read":false,"prohibit_wpd_write":false,"ic_enabled":false,"scan_folders":[],' \
|
|
b'"ic_timeout":30,"rescan_enabled":false,"rescan_timeout":1000,"wl_enable":false,' \
|
|
b'"wl_appy":true,"wl_admin":false,"white_list":[' \
|
|
b'"%HKEY_LOCAL_MACHINE\\\\SOFTWARE\\\\Microsoft\\\\Windows ' \
|
|
b'NT\\\\CurrentVersion\\\\SystemRoot%",' \
|
|
b'"%HKEY_LOCAL_MACHINE\\\\SOFTWARE\\\\Microsoft\\\\Windows\\\\CurrentVersion' \
|
|
b'\\\\ProgramFilesDir%"],"gui_enabled":true,"gui_port":4509,"updated":"","ip":"",' \
|
|
b'"usb_control_enabled":false,"usb_allowed_storage":[],"usb_allowed_classes":[],' \
|
|
b'"usb_hid_allow_other_subclasses":false,"usb_hid_allowed_subclasses":[],' \
|
|
b'"usb_hid_deny_subclasses":[2,4,6],"usb_connected":[],"clamav_enabled":false,' \
|
|
b'"clamav_scan_on_add":false,"clamav_live_scan":false,"clamav_stop_all_tasks":false,' \
|
|
b'"clamav_paths":null,"clamav_remove_infected_files":false,"clamav_last_update":"09-01-2022",' \
|
|
b'"event_rotation_type":1,"event_rotation_size":100,"event_rotation_period":1,' \
|
|
b'"event_rotation_time":"12:57:01"}'
|
|
|
|
def test_preparation_data(self):
|
|
service = EndpointUploadConfigService(self.endpoint.pk, self.raw_data)
|
|
|
|
assert 'device_control_enabled' in service.data
|
|
assert not service.data['device_control_enabled']
|
|
|
|
assert 'integrity_control_enabled' in service.data
|
|
assert not service.data['integrity_control_enabled']
|
|
|
|
assert 'scan_paths' in service.data
|
|
assert service.data['scan_paths'] == []
|
|
|
|
assert 'integrity_control_timeout' in service.data
|
|
assert service.data['integrity_control_timeout'] == 30
|
|
|
|
assert 'whitelist_enabled' in service.data
|
|
assert not service.data['whitelist_enabled']
|
|
|
|
assert 'whitelist_admin' in service.data
|
|
assert not service.data['whitelist_admin']
|
|
|
|
assert 'white_list_paths' in service.data
|
|
'HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\SystemRoot%'
|
|
assert service.data['white_list_paths'] == [
|
|
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\SystemRoot%",
|
|
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\ProgramFilesDir%"]
|
|
|
|
assert 'antivirus_enabled' in service.data
|
|
assert not service.data['antivirus_enabled']
|
|
|
|
assert 'antivirus_paths' in service.data
|
|
assert service.data['antivirus_paths'] is None
|
|
|
|
assert 'antivirus_remove_infected_files' in service.data
|
|
assert not service.data['antivirus_remove_infected_files']
|
|
|
|
def test_valid_update_data(self):
|
|
service = EndpointUploadConfigService(self.endpoint.pk, self.raw_data)
|
|
result = service.upload()
|
|
assert result['status'] == 'ok'
|
|
endpoint = EndpointModel.objects.get(pk=self.endpoint.pk)
|
|
|
|
assert not endpoint.device_control_enabled
|
|
|
|
assert not endpoint.integrity_control_enabled
|
|
assert endpoint.scan_paths == []
|
|
assert endpoint.integrity_control_timeout == 30
|
|
|
|
assert not endpoint.whitelist_enabled
|
|
assert not endpoint.whitelist_admin
|
|
assert endpoint.white_list_paths == [
|
|
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\SystemRoot%",
|
|
"%HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\ProgramFilesDir%"]
|
|
|
|
assert not endpoint.antivirus_enabled
|
|
assert endpoint.antivirus_paths is None
|
|
assert not endpoint.antivirus_remove_infected_files
|