186 lines
8.4 KiB
Python
186 lines
8.4 KiB
Python
import json
|
|
import logging
|
|
|
|
import pytest
|
|
from django.contrib.auth import get_user_model
|
|
from django.urls import reverse
|
|
from rest_framework import status
|
|
|
|
from assets.models.assets import Asset, OperatingSystem
|
|
from incident.models import Incident
|
|
from perms.models import Perm
|
|
|
|
_log = logging.getLogger()
|
|
|
|
TIMEOUT = 10 # time before timeout exception appears
|
|
User = get_user_model()
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestAssetPagesAccess(object):
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_tests(self, django_user_model, add_user_with_permissions):
|
|
""" Fixture for preparing database for tests
|
|
:param client: object to work with test session
|
|
:param django_user_model: object to work with User model in pytest
|
|
:param add_user_with_permissions: fixture for adding new user
|
|
:return: prepared for tests database with:
|
|
user 'user1' with perms.can_view_user permission
|
|
user 'test_user' for testing view_user page features
|
|
"""
|
|
username = 'foo'
|
|
password = 'bar'
|
|
|
|
add_user_with_permissions(username=username, password=password,
|
|
is_superuser=True)
|
|
add_user_with_permissions(username='test_no_perms', password='1')
|
|
add_user_with_permissions(username='test_right_perms', password='1',
|
|
permissions=[Perm.can_view_assets_list,
|
|
Perm.can_view_asset,
|
|
Perm.can_edit_asset,
|
|
Perm.can_delete_asset])
|
|
add_user_with_permissions(username='test_perms', password='1',
|
|
permissions=[])
|
|
os = OperatingSystem.objects.create(name='MACOS')
|
|
for i in range(5):
|
|
Asset.objects.create(name=f'test{i}', ip='1.1.1.1', os=os)
|
|
|
|
# TODO: Need to fix this
|
|
@pytest.mark.skip
|
|
@pytest.mark.integration
|
|
def test_export_assets_in_csv_api(self, add_user_with_permissions, api_client):
|
|
username = 'user'
|
|
password = 'pro100ton'
|
|
add_user_with_permissions(username=username,
|
|
password=password,
|
|
permissions=[Perm.can_view_network,
|
|
Perm.can_work_with_incidents,
|
|
Perm.can_export_incidents_list,
|
|
Perm.can_export_assets]
|
|
)
|
|
user = User.objects.get(username=username)
|
|
api_client.force_authenticate(user)
|
|
|
|
response = api_client.get(reverse('asset-csv-export'))
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
@pytest.mark.unit
|
|
def test_user_w_perm_can_edit_asset(self, api_client):
|
|
""" Test for checking if user with right permissions can access edit asset page """
|
|
user = User.objects.get(username='test_right_perms')
|
|
api_client.force_authenticate(user)
|
|
asset_pk = Asset.objects.get(name='test0').pk
|
|
url = reverse('asset-detail', args=[asset_pk])
|
|
response = api_client.patch(url)
|
|
assert status.HTTP_200_OK == response.status_code
|
|
|
|
@pytest.mark.unit
|
|
def test_user_wo_perm_cant_edit_asset(self, api_client):
|
|
""" Test for checking if user without right permissions cannot edit asset"""
|
|
user = User.objects.get(username='test_perms')
|
|
api_client.force_authenticate(user)
|
|
asset_pk = Asset.objects.get(name='test0').pk
|
|
url = reverse('asset-detail', args=[asset_pk])
|
|
response = api_client.patch(url)
|
|
assert status.HTTP_403_FORBIDDEN == response.status_code
|
|
|
|
@pytest.mark.unit
|
|
def test_asset_updated_correctly(self, api_client):
|
|
""" Test for checking if asset is saved correctly after update"""
|
|
user = User.objects.get(username='test_right_perms')
|
|
api_client.force_authenticate(user)
|
|
asset_pk = Asset.objects.get(name='test0').pk
|
|
url = reverse('asset-detail', args=[asset_pk])
|
|
response = api_client.patch(url, {'model': 'test_model','ports':'[5000,6000]'})
|
|
assert response.json()['model'] == 'test_model'
|
|
assert status.HTTP_200_OK == response.status_code
|
|
assert Asset.objects.get(name='test0').model == 'test_model'
|
|
assert Asset.objects.get(name='test0').ports == json.loads("[5000,6000]")
|
|
|
|
@pytest.mark.unit
|
|
def test_asset_is_deleted_correctly(self, api_client):
|
|
""" Test for checking if asset is deleted correctly"""
|
|
user = User.objects.get(username='test_right_perms')
|
|
api_client.force_authenticate(user)
|
|
asset_pk = Asset.objects.get(name='test0').pk
|
|
url = reverse('asset-detail', args=[asset_pk])
|
|
response = api_client.delete(url)
|
|
assert status.HTTP_200_OK == response.status_code
|
|
assert Asset.objects.filter(name='test0').exists() is False
|
|
assert Asset.objects.count() == 4
|
|
|
|
@pytest.mark.unit
|
|
def test_check_serializer_validation(self, api_client):
|
|
""" Test for checking if serializer return error with invalid data"""
|
|
user = User.objects.get(username='test_right_perms')
|
|
api_client.force_authenticate(user)
|
|
asset = Asset.objects.get(name='test0')
|
|
asset_pk = asset.pk
|
|
asset_os = asset.os
|
|
url = reverse('asset-detail', args=[asset_pk])
|
|
response = api_client.patch(url, {'os': 'bad'})
|
|
assert status.HTTP_400_BAD_REQUEST == response.status_code
|
|
assert 'os' in response.data
|
|
assert Asset.objects.get(name='test0').os == asset_os
|
|
|
|
@pytest.mark.merge
|
|
def test_asset_correctly_count_number_of_incidents(self, api_client):
|
|
user = User.objects.get(username='test_right_perms')
|
|
api_client.force_authenticate(user)
|
|
asset = Asset.objects.get(name='test0')
|
|
incident = Incident.objects.create(title='test_inc', importance=10, event_count=10, events='')
|
|
asset.incidents.add(incident)
|
|
url = reverse('asset-list')
|
|
response = api_client.get(url)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
assert response.data['results'][0]['count_incidents'] == 1
|
|
assert response.data['results'][1]['count_incidents'] == 0
|
|
|
|
@pytest.mark.merge
|
|
def test_filter_by_incidents(self, api_client):
|
|
""" Test for checking filter returns asset"""
|
|
user = User.objects.get(username='test_right_perms')
|
|
api_client.force_authenticate(user)
|
|
incident = Incident.objects.create(title='test_inc', importance=10, event_count=10, events='')
|
|
incident_id = incident.incident_id
|
|
|
|
asset = Asset.objects.get(name='test0')
|
|
asset.incidents.add(incident)
|
|
|
|
url = reverse('asset-list')
|
|
response = api_client.get(url, **{'QUERY_STRING': f'incidents={incident_id}'})
|
|
assert status.HTTP_200_OK == response.status_code
|
|
_log.info(response.data)
|
|
_log.info(incident_id)
|
|
assert response.data['count'] == 1
|
|
assert response.data['results'][0]['id'] == asset.pk
|
|
|
|
bad_response = api_client.get(url, **{'QUERY_STRING': f'incidents=bad'})
|
|
assert status.HTTP_400_BAD_REQUEST == bad_response.status_code
|
|
|
|
@pytest.mark.unit
|
|
def test_create_asset_with_bad_status(self, api_client):
|
|
"""We set the asset status asset to 0 by default. Then we will check that the status is always 0"""
|
|
user = User.objects.get(username='foo')
|
|
api_client.force_authenticate(user)
|
|
url = reverse('logstash-asset-list')
|
|
response = api_client.post(url,
|
|
data={
|
|
"asset_type": "",
|
|
"description": "Description",
|
|
"group": "",
|
|
"ip": "127.0.0.1",
|
|
"manufacturer": "",
|
|
"model": "",
|
|
"name": "192.168.1.101",
|
|
"os": "",
|
|
"ports": "[5000]",
|
|
"sensor": "armaif_1",
|
|
"status": "25622",
|
|
"type": "asset",
|
|
}, format="json")
|
|
|
|
assert status.HTTP_201_CREATED == response.status_code
|
|
asset = Asset.objects.get(name='192.168.1.101')
|
|
assert asset.status == 0
|