236 lines
10 KiB
Python
236 lines
10 KiB
Python
import os
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
import requests
|
|
from django.urls import reverse
|
|
from rest_framework import status
|
|
|
|
from devices.models.device import Device, DeviceGroup
|
|
from devices.models.firewall import ArmaIndustrialFirewall
|
|
from devices.services.firewall import InvalidCredentialException, IncompatibilityVersionException, ConnectionException, \
|
|
InvalidResponseException, FailedUploadException, InvalidFileException
|
|
|
|
FIREWALL_DATA = {
|
|
"name": "ADD IFTEST",
|
|
"ip": "192.168.56.103",
|
|
"key": "nWM0Pnj4w3DJHbkfIRQ2CbUdqIc0TMUYIHohRCSqWJ5TycVfLo3JlIyurmOXN7MaRMQv/hlUIPbD89Ng",
|
|
"secret": "veREg8dbHC/V4hSCi6LBzuQ0NF5eeS/50d7K7Ahut6X0N/77peVQE5ucIJ/fyKhp0RNlbCHEcen2Rk8U",
|
|
"port": 5000,
|
|
"type": 'firewall'
|
|
}
|
|
|
|
TEST_REQUEST_RESPONSE_LIST = [
|
|
requests.exceptions.ConnectTimeout,
|
|
requests.exceptions.ConnectionError,
|
|
requests.exceptions.Timeout
|
|
]
|
|
|
|
check_connection_exceptions = [InvalidCredentialException, IncompatibilityVersionException, ConnectionException]
|
|
upload_file_exceptions = [InvalidFileException, FailedUploadException, InvalidResponseException]
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
TEST_FILES = os.path.join(BASE_DIR, "tests", "test_files")
|
|
|
|
|
|
def _get_json_mock(value):
|
|
return value
|
|
|
|
|
|
@mock.patch('devices.services.firewall.firewall.FIREWALL_TIMEOUT', 1)
|
|
@pytest.mark.django_db
|
|
class TestFirewallAPI:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_tests(self, api_client, django_user_model, add_user_with_permissions):
|
|
self.user = add_user_with_permissions(username='test_admin', password='test_admin_pass', is_superuser=True)
|
|
|
|
@pytest.mark.unit
|
|
def test_getting_list_of_firewalls(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
url = reverse('firewall-list')
|
|
response = api_client.get(url)
|
|
assert response.json()['count'] == 1
|
|
assert response.json()['results'][0]['id'] == firewall.id
|
|
|
|
@pytest.mark.unit
|
|
def test_getting_firewall(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
url = reverse('firewall-detail', args=[firewall.pk])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json()['id'] == firewall.id
|
|
|
|
@pytest.mark.unit
|
|
def test_getting_firewall_with_invalid_id(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse('firewall-detail', args=[56789])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.services.firewall.firewall.FirewallService.check_connection')
|
|
def test_updating_firewall_with_valid_data(self, mock_check, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
url = reverse('firewall-detail', args=[firewall.id])
|
|
new_data = FIREWALL_DATA.copy()
|
|
new_data['port'] = 4545
|
|
new_data['name'] = 'new_firewall'
|
|
response = api_client.patch(url, data=new_data)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json()['id'] == firewall.id
|
|
assert response.json()['port'] == new_data['port']
|
|
assert response.json()['port'] == new_data['port']
|
|
assert response.json()['name'] == new_data['name']
|
|
|
|
@pytest.mark.unit
|
|
def test_updating_firewall_with_invalid_port(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
url = reverse('firewall-detail', args=[firewall.id])
|
|
new_data = {
|
|
'name': 'new_firewall',
|
|
'port': 99999
|
|
}
|
|
response = api_client.patch(url, data=new_data)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert 'port' in response.json()
|
|
assert response.json()['port'] == ['Ensure this value is less than or equal to 65535.']
|
|
|
|
@pytest.mark.unit
|
|
def test_updating_firewall_with_connection_error(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
firewall_id = firewall.id
|
|
url = reverse('firewall-detail', args=[firewall_id])
|
|
new_data = FIREWALL_DATA.copy()
|
|
new_data.update({'name': 'new_firewall', 'port': 9999})
|
|
with mock.patch('requests.Session.get', side_effect=ConnectionException):
|
|
response = api_client.patch(url, data=new_data)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert response.json()['detail'] == 'There was a problem connecting to the firewall'
|
|
|
|
firewall = ArmaIndustrialFirewall.objects.get(pk=firewall_id)
|
|
assert firewall.name == FIREWALL_DATA['name']
|
|
assert firewall.port == FIREWALL_DATA['port']
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.services.firewall.firewall.FirewallService.check_connection')
|
|
def test_updating_firewall_null_group(self, mock_check, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
device_group = DeviceGroup.objects.create(name='IF test group')
|
|
data = FIREWALL_DATA.copy()
|
|
data['group'] = device_group
|
|
firewall = ArmaIndustrialFirewall.objects.create(**data)
|
|
url = reverse('firewall-detail', args=[firewall.id])
|
|
data['group'] = ''
|
|
response = api_client.patch(url, data=data)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.data['group'] is None
|
|
|
|
@pytest.mark.unit
|
|
def test_creating_firewall_without_ip(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse('firewall-list')
|
|
data = FIREWALL_DATA.copy()
|
|
del data['ip']
|
|
response = api_client.post(url, data=data)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert 'ip' in response.json()
|
|
assert response.json()['ip'] == ['This field is required.']
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.parametrize('req_res', TEST_REQUEST_RESPONSE_LIST)
|
|
def test_create_firewall_with_network_error(self, req_res, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
with mock.patch('requests.Session.get', side_effect=req_res):
|
|
url = reverse('firewall-list')
|
|
response = api_client.post(url, data=FIREWALL_DATA)
|
|
assert response.json()['detail'] == 'There was a problem connecting to the firewall'
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.parametrize('exc', check_connection_exceptions)
|
|
def test_create_firewall_with_error_check_connection(self, exc, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
with mock.patch('devices.services.firewall.FirewallService.check_connection', side_effect=exc):
|
|
url = reverse('firewall-list')
|
|
data = FIREWALL_DATA.copy()
|
|
data['key'] = 'invalid'
|
|
response = api_client.post(url, data=data)
|
|
assert response.json()['detail'] == exc.default_detail['detail']
|
|
|
|
@pytest.mark.unit
|
|
def test_delete_firewall(self, api_client):
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
assert ArmaIndustrialFirewall.objects.filter(id=firewall.id).exists()
|
|
assert Device.objects.filter(id=firewall.id).exists()
|
|
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse('firewall-detail', args=[firewall.id])
|
|
response = api_client.delete(url)
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
assert not ArmaIndustrialFirewall.objects.filter(id=firewall.id).exists()
|
|
assert not Device.objects.filter(id=firewall.id).exists()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.parametrize('exc', upload_file_exceptions)
|
|
def test_upload_firewall_config_error(self, exc, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
|
|
file_path = os.path.join(TEST_FILES, 'config.xml')
|
|
file = open(file_path, 'r')
|
|
url = reverse('firewall-upload-config', args=[firewall.id])
|
|
data = {'conffile': file}
|
|
with mock.patch('devices.services.firewall.FirewallService.upload_file', side_effect=exc):
|
|
response = api_client.post(url, data)
|
|
assert response.json()['detail'] == exc.default_detail['detail']
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
@pytest.mark.unit
|
|
def test_upload_firewall_config_without_file(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
|
|
url = reverse('firewall-upload-config', args=[firewall.id])
|
|
response = api_client.post(url)
|
|
assert 'conffile' in response.json()
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.services.firewall.FirewallService.upload_file', lambda *args: {'status': 'ok'})
|
|
def test_upload_firewall_config_success(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
|
|
file_path = os.path.join(TEST_FILES, 'config.xml')
|
|
file = open(file_path, 'r')
|
|
url = reverse('firewall-upload-config', args=[firewall.id])
|
|
data = {'conffile': file}
|
|
|
|
response = api_client.post(url, data)
|
|
assert response.json()['status'] == 'ok'
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.services.firewall.FirewallService.reboot', lambda *args: {'status': 'ok'})
|
|
def test_reboot_firewall_success(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
|
|
url = reverse('firewall-reboot', args=[firewall.id])
|
|
response = api_client.post(url)
|
|
assert response.json()['status'] == 'ok'
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
# Without IF parallel execution
|
|
@pytest.mark.integration
|
|
def test_get_firewall_status(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
|
|
url = reverse('firewall-status', args=[firewall.id])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_200_OK
|