221 lines
9.5 KiB
Python
221 lines
9.5 KiB
Python
import os
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
|
from django.urls import reverse
|
|
from rest_framework import status
|
|
|
|
from devices.models.device import Device
|
|
from devices.models.sensor import ArmaSensor
|
|
from devices.services.sensor.rabbitmq import SensorResponseException
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
SENSOR_DATA = {
|
|
"name": "SENSOR_TEST",
|
|
"ip": "192.168.56.103",
|
|
"port": 5000,
|
|
"type": 'firewall'
|
|
}
|
|
|
|
VECTOR_URL_NAMES = ['status', 'start', 'stop', 'restart', 'reload', 'service_info']
|
|
ZEEK_URL_NAMES = ['status', 'start', 'stop', 'restart']
|
|
ZEEK_URL_NAMES_POST = ['protocols_disable', 'settings_update']
|
|
|
|
|
|
def mock_sensor_management(response=None):
|
|
class MockSensorManagement:
|
|
|
|
def _get_connection_data(self):
|
|
return 1, 2
|
|
|
|
def send_message(*args, **kwargs):
|
|
if response:
|
|
return response
|
|
raise SensorResponseException({'status': 'error'})
|
|
|
|
return MockSensorManagement
|
|
|
|
|
|
def mock_sensor_redis():
|
|
class MockSensorRedis:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
def get_status(self):
|
|
return {'status': 'online'}
|
|
|
|
return MockSensorRedis
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestSensorAPI:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_tests(self, api_client, django_user_model, add_user_with_permissions):
|
|
self.user = add_user_with_permissions(username='test_admin', password='test_admin_pass', is_superuser=True)
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response='ok'))
|
|
@mock.patch('devices.serializers.sensor_serializers.SensorService', mock_sensor_redis())
|
|
def test_getting_list_of_sensors(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
sensor = ArmaSensor.objects.create(**SENSOR_DATA)
|
|
url = reverse('sensor-list')
|
|
response = api_client.get(url)
|
|
assert response.json()['count'] == 1
|
|
assert response.json()['results'][0]['id'] == sensor.id
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response='ok'))
|
|
@mock.patch('devices.serializers.sensor_serializers.SensorService', mock_sensor_redis())
|
|
def test_getting_sensor(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
sensor = ArmaSensor.objects.create(**SENSOR_DATA)
|
|
url = reverse('sensor-detail', args=[sensor.pk])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json()['id'] == sensor.id
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response='ok'))
|
|
@mock.patch('devices.views.sensor.SensorService', mock_sensor_redis())
|
|
def test_getting_sensor_with_invalid_id(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse('sensor-detail', args=[56789])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
|
|
@pytest.mark.unit
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response='ok'))
|
|
@mock.patch('devices.views.sensor.SensorService', mock_sensor_redis())
|
|
@mock.patch('devices.serializers.sensor_serializers.SensorService', mock_sensor_redis())
|
|
def test_updating_sensor_with_valid_data(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
sensor = ArmaSensor.objects.create(**SENSOR_DATA)
|
|
url = reverse('sensor-detail', args=[sensor.id])
|
|
new_data = SENSOR_DATA.copy()
|
|
new_data['port'] = 4545
|
|
new_data['name'] = 'new_sensor'
|
|
response = api_client.patch(url, data=new_data)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json()['id'] == sensor.id
|
|
assert response.json()['port'] == new_data['port']
|
|
assert response.json()['name'] == new_data['name']
|
|
|
|
@pytest.mark.unit
|
|
def test_updating_sensor_with_invalid_port(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
sensor = ArmaSensor.objects.create(**SENSOR_DATA)
|
|
url = reverse('sensor-detail', args=[sensor.id])
|
|
new_data = {
|
|
'name': 'new_sensor',
|
|
'port': 99999
|
|
}
|
|
response = api_client.patch(url, data=new_data)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert 'port' in response.json()
|
|
assert response.json()['port'] == ['Ensure this value is less than or equal to 65535.']
|
|
|
|
@pytest.mark.unit
|
|
def test_creating_sensor_without_ip(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse('sensor-list')
|
|
data = SENSOR_DATA.copy()
|
|
del data['ip']
|
|
response = api_client.post(url, data=data)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert 'ip' in response.json()
|
|
assert response.json()['ip'] == ['This field is required.']
|
|
|
|
@pytest.mark.unit
|
|
def test_delete_sensor(self, api_client):
|
|
firewall = ArmaSensor.objects.create(**SENSOR_DATA)
|
|
assert ArmaSensor.objects.filter(id=firewall.id).exists()
|
|
assert Device.objects.filter(id=firewall.id).exists()
|
|
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse('sensor-detail', args=[firewall.id])
|
|
response = api_client.delete(url)
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
assert not ArmaSensor.objects.filter(id=firewall.id).exists()
|
|
assert not Device.objects.filter(id=firewall.id).exists()
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.django_db
|
|
class TestSensorServicesAPI:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_tests(self, api_client, django_user_model, add_user_with_permissions):
|
|
self.user = add_user_with_permissions(username='test_admin', password='test_admin_pass', is_superuser=True)
|
|
self.sensor = ArmaSensor.objects.create(**SENSOR_DATA)
|
|
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response={'status': 'ok'}))
|
|
def test_sensor_interfaces(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse(f'sensor-system-message', args=[self.sensor.id, 'interfaces'])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json() == {'status': 'ok'}
|
|
|
|
@pytest.mark.parametrize('url_name', VECTOR_URL_NAMES)
|
|
@mock.patch('devices.views.sensor.SensorManagement',
|
|
mock_sensor_management(response={'status': 'ok'}))
|
|
def test_sensor_vector(self, url_name, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse(f'sensor-vector-message', args=[self.sensor.id, url_name])
|
|
print(url)
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json() == {'status': 'ok'}
|
|
|
|
@pytest.mark.parametrize('url_name', VECTOR_URL_NAMES)
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management())
|
|
def test_sensor_vector_with_error(self, url_name, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse(f'sensor-vector-message', args=[self.sensor.id, url_name])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert response.json() == {'status': 'error'}
|
|
|
|
@pytest.mark.parametrize('url_name', ZEEK_URL_NAMES)
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response={'status': 'ok'}))
|
|
def test_sensor_zeek(self, url_name, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse(f'sensor-zeek-message', args=[self.sensor.id, url_name])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json() == {'status': 'ok'}
|
|
|
|
@pytest.mark.parametrize('url_name', ZEEK_URL_NAMES)
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management())
|
|
def test_sensor_zeek_with_error(self, url_name, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse(f'sensor-zeek-message', args=[self.sensor.id, url_name])
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
assert response.json() == {'status': 'error'}
|
|
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response={'status': 'ok'}))
|
|
def test_sensor_zeek_update_settings(self, api_client):
|
|
test_file = os.path.join(BASE_DIR, 'tests/test_files/good_local.zeek')
|
|
file = SimpleUploadedFile(name='good_local.zeek', content=open(test_file, 'rb').read())
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse(f'sensor-zeek-settings-update', args=[self.sensor.id])
|
|
data = {'file': file}
|
|
response = api_client.post(url, data=data)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json() == {'status': 'ok'}
|
|
|
|
@mock.patch('devices.views.sensor.SensorManagement', mock_sensor_management(response={'status': 'ok'}))
|
|
def test_sensor_zeek_protocols_disable(self, api_client):
|
|
api_client.force_authenticate(self.user)
|
|
url = reverse(f'sensor-zeek-protocols-disable', args=[self.sensor.id])
|
|
data = {
|
|
'disable_protocols': ['test', 'aaa', '10101001']
|
|
}
|
|
response = api_client.post(url, data=data)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.json() == {'status': 'ok'}
|