old_console/devices/tests/test_firewall_api.py
2024-11-02 14:12:45 +03:00

236 lines
10 KiB
Python

import os
from unittest import mock
import pytest
import requests
from django.urls import reverse
from rest_framework import status
from devices.models.device import Device, DeviceGroup
from devices.models.firewall import ArmaIndustrialFirewall
from devices.services.firewall import InvalidCredentialException, IncompatibilityVersionException, ConnectionException, \
InvalidResponseException, FailedUploadException, InvalidFileException
FIREWALL_DATA = {
"name": "ADD IFTEST",
"ip": "192.168.56.103",
"key": "nWM0Pnj4w3DJHbkfIRQ2CbUdqIc0TMUYIHohRCSqWJ5TycVfLo3JlIyurmOXN7MaRMQv/hlUIPbD89Ng",
"secret": "veREg8dbHC/V4hSCi6LBzuQ0NF5eeS/50d7K7Ahut6X0N/77peVQE5ucIJ/fyKhp0RNlbCHEcen2Rk8U",
"port": 5000,
"type": 'firewall'
}
TEST_REQUEST_RESPONSE_LIST = [
requests.exceptions.ConnectTimeout,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout
]
check_connection_exceptions = [InvalidCredentialException, IncompatibilityVersionException, ConnectionException]
upload_file_exceptions = [InvalidFileException, FailedUploadException, InvalidResponseException]
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
TEST_FILES = os.path.join(BASE_DIR, "tests", "test_files")
def _get_json_mock(value):
return value
@mock.patch('devices.services.firewall.firewall.FIREWALL_TIMEOUT', 1)
@pytest.mark.django_db
class TestFirewallAPI:
@pytest.fixture(autouse=True)
def setup_tests(self, api_client, django_user_model, add_user_with_permissions):
self.user = add_user_with_permissions(username='test_admin', password='test_admin_pass', is_superuser=True)
@pytest.mark.unit
def test_getting_list_of_firewalls(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
url = reverse('firewall-list')
response = api_client.get(url)
assert response.json()['count'] == 1
assert response.json()['results'][0]['id'] == firewall.id
@pytest.mark.unit
def test_getting_firewall(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
url = reverse('firewall-detail', args=[firewall.pk])
response = api_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response.json()['id'] == firewall.id
@pytest.mark.unit
def test_getting_firewall_with_invalid_id(self, api_client):
api_client.force_authenticate(self.user)
url = reverse('firewall-detail', args=[56789])
response = api_client.get(url)
assert response.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.unit
@mock.patch('devices.services.firewall.firewall.FirewallService.check_connection')
def test_updating_firewall_with_valid_data(self, mock_check, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
url = reverse('firewall-detail', args=[firewall.id])
new_data = FIREWALL_DATA.copy()
new_data['port'] = 4545
new_data['name'] = 'new_firewall'
response = api_client.patch(url, data=new_data)
assert response.status_code == status.HTTP_200_OK
assert response.json()['id'] == firewall.id
assert response.json()['port'] == new_data['port']
assert response.json()['port'] == new_data['port']
assert response.json()['name'] == new_data['name']
@pytest.mark.unit
def test_updating_firewall_with_invalid_port(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
url = reverse('firewall-detail', args=[firewall.id])
new_data = {
'name': 'new_firewall',
'port': 99999
}
response = api_client.patch(url, data=new_data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'port' in response.json()
assert response.json()['port'] == ['Ensure this value is less than or equal to 65535.']
@pytest.mark.unit
def test_updating_firewall_with_connection_error(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
firewall_id = firewall.id
url = reverse('firewall-detail', args=[firewall_id])
new_data = FIREWALL_DATA.copy()
new_data.update({'name': 'new_firewall', 'port': 9999})
with mock.patch('requests.Session.get', side_effect=ConnectionException):
response = api_client.patch(url, data=new_data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()['detail'] == 'There was a problem connecting to the firewall'
firewall = ArmaIndustrialFirewall.objects.get(pk=firewall_id)
assert firewall.name == FIREWALL_DATA['name']
assert firewall.port == FIREWALL_DATA['port']
@pytest.mark.unit
@mock.patch('devices.services.firewall.firewall.FirewallService.check_connection')
def test_updating_firewall_null_group(self, mock_check, api_client):
api_client.force_authenticate(self.user)
device_group = DeviceGroup.objects.create(name='IF test group')
data = FIREWALL_DATA.copy()
data['group'] = device_group
firewall = ArmaIndustrialFirewall.objects.create(**data)
url = reverse('firewall-detail', args=[firewall.id])
data['group'] = ''
response = api_client.patch(url, data=data)
assert response.status_code == status.HTTP_200_OK
assert response.data['group'] is None
@pytest.mark.unit
def test_creating_firewall_without_ip(self, api_client):
api_client.force_authenticate(self.user)
url = reverse('firewall-list')
data = FIREWALL_DATA.copy()
del data['ip']
response = api_client.post(url, data=data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'ip' in response.json()
assert response.json()['ip'] == ['This field is required.']
@pytest.mark.unit
@pytest.mark.parametrize('req_res', TEST_REQUEST_RESPONSE_LIST)
def test_create_firewall_with_network_error(self, req_res, api_client):
api_client.force_authenticate(self.user)
with mock.patch('requests.Session.get', side_effect=req_res):
url = reverse('firewall-list')
response = api_client.post(url, data=FIREWALL_DATA)
assert response.json()['detail'] == 'There was a problem connecting to the firewall'
@pytest.mark.unit
@pytest.mark.parametrize('exc', check_connection_exceptions)
def test_create_firewall_with_error_check_connection(self, exc, api_client):
api_client.force_authenticate(self.user)
with mock.patch('devices.services.firewall.FirewallService.check_connection', side_effect=exc):
url = reverse('firewall-list')
data = FIREWALL_DATA.copy()
data['key'] = 'invalid'
response = api_client.post(url, data=data)
assert response.json()['detail'] == exc.default_detail['detail']
@pytest.mark.unit
def test_delete_firewall(self, api_client):
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
assert ArmaIndustrialFirewall.objects.filter(id=firewall.id).exists()
assert Device.objects.filter(id=firewall.id).exists()
api_client.force_authenticate(self.user)
url = reverse('firewall-detail', args=[firewall.id])
response = api_client.delete(url)
assert response.status_code == status.HTTP_204_NO_CONTENT
assert not ArmaIndustrialFirewall.objects.filter(id=firewall.id).exists()
assert not Device.objects.filter(id=firewall.id).exists()
@pytest.mark.unit
@pytest.mark.parametrize('exc', upload_file_exceptions)
def test_upload_firewall_config_error(self, exc, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
file_path = os.path.join(TEST_FILES, 'config.xml')
file = open(file_path, 'r')
url = reverse('firewall-upload-config', args=[firewall.id])
data = {'conffile': file}
with mock.patch('devices.services.firewall.FirewallService.upload_file', side_effect=exc):
response = api_client.post(url, data)
assert response.json()['detail'] == exc.default_detail['detail']
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.unit
def test_upload_firewall_config_without_file(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
url = reverse('firewall-upload-config', args=[firewall.id])
response = api_client.post(url)
assert 'conffile' in response.json()
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.unit
@mock.patch('devices.services.firewall.FirewallService.upload_file', lambda *args: {'status': 'ok'})
def test_upload_firewall_config_success(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
file_path = os.path.join(TEST_FILES, 'config.xml')
file = open(file_path, 'r')
url = reverse('firewall-upload-config', args=[firewall.id])
data = {'conffile': file}
response = api_client.post(url, data)
assert response.json()['status'] == 'ok'
assert response.status_code == status.HTTP_200_OK
@pytest.mark.unit
@mock.patch('devices.services.firewall.FirewallService.reboot', lambda *args: {'status': 'ok'})
def test_reboot_firewall_success(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
url = reverse('firewall-reboot', args=[firewall.id])
response = api_client.post(url)
assert response.json()['status'] == 'ok'
assert response.status_code == status.HTTP_200_OK
# Without IF parallel execution
@pytest.mark.integration
def test_get_firewall_status(self, api_client):
api_client.force_authenticate(self.user)
firewall = ArmaIndustrialFirewall.objects.create(**FIREWALL_DATA)
url = reverse('firewall-status', args=[firewall.id])
response = api_client.get(url)
assert response.status_code == status.HTTP_200_OK