import json import logging import pytest from django.contrib.auth import get_user_model from django.urls import reverse from rest_framework import status from assets.models.assets import Asset, OperatingSystem from incident.models import Incident from perms.models import Perm _log = logging.getLogger() TIMEOUT = 10 # time before timeout exception appears User = get_user_model() @pytest.mark.django_db class TestAssetPagesAccess(object): @pytest.fixture(autouse=True) def setup_tests(self, django_user_model, add_user_with_permissions): """ Fixture for preparing database for tests :param client: object to work with test session :param django_user_model: object to work with User model in pytest :param add_user_with_permissions: fixture for adding new user :return: prepared for tests database with: user 'user1' with perms.can_view_user permission user 'test_user' for testing view_user page features """ username = 'foo' password = 'bar' add_user_with_permissions(username=username, password=password, is_superuser=True) add_user_with_permissions(username='test_no_perms', password='1') add_user_with_permissions(username='test_right_perms', password='1', permissions=[Perm.can_view_assets_list, Perm.can_view_asset, Perm.can_edit_asset, Perm.can_delete_asset]) add_user_with_permissions(username='test_perms', password='1', permissions=[]) os = OperatingSystem.objects.create(name='MACOS') for i in range(5): Asset.objects.create(name=f'test{i}', ip='1.1.1.1', os=os) # TODO: Need to fix this @pytest.mark.skip @pytest.mark.integration def test_export_assets_in_csv_api(self, add_user_with_permissions, api_client): username = 'user' password = 'pro100ton' add_user_with_permissions(username=username, password=password, permissions=[Perm.can_view_network, Perm.can_work_with_incidents, Perm.can_export_incidents_list, Perm.can_export_assets] ) user = User.objects.get(username=username) api_client.force_authenticate(user) response = api_client.get(reverse('asset-csv-export')) assert response.status_code == status.HTTP_200_OK @pytest.mark.unit def test_user_w_perm_can_edit_asset(self, api_client): """ Test for checking if user with right permissions can access edit asset page """ user = User.objects.get(username='test_right_perms') api_client.force_authenticate(user) asset_pk = Asset.objects.get(name='test0').pk url = reverse('asset-detail', args=[asset_pk]) response = api_client.patch(url) assert status.HTTP_200_OK == response.status_code @pytest.mark.unit def test_user_wo_perm_cant_edit_asset(self, api_client): """ Test for checking if user without right permissions cannot edit asset""" user = User.objects.get(username='test_perms') api_client.force_authenticate(user) asset_pk = Asset.objects.get(name='test0').pk url = reverse('asset-detail', args=[asset_pk]) response = api_client.patch(url) assert status.HTTP_403_FORBIDDEN == response.status_code @pytest.mark.unit def test_asset_updated_correctly(self, api_client): """ Test for checking if asset is saved correctly after update""" user = User.objects.get(username='test_right_perms') api_client.force_authenticate(user) asset_pk = Asset.objects.get(name='test0').pk url = reverse('asset-detail', args=[asset_pk]) response = api_client.patch(url, {'model': 'test_model','ports':'[5000,6000]'}) assert response.json()['model'] == 'test_model' assert status.HTTP_200_OK == response.status_code assert Asset.objects.get(name='test0').model == 'test_model' assert Asset.objects.get(name='test0').ports == json.loads("[5000,6000]") @pytest.mark.unit def test_asset_is_deleted_correctly(self, api_client): """ Test for checking if asset is deleted correctly""" user = User.objects.get(username='test_right_perms') api_client.force_authenticate(user) asset_pk = Asset.objects.get(name='test0').pk url = reverse('asset-detail', args=[asset_pk]) response = api_client.delete(url) assert status.HTTP_200_OK == response.status_code assert Asset.objects.filter(name='test0').exists() is False assert Asset.objects.count() == 4 @pytest.mark.unit def test_check_serializer_validation(self, api_client): """ Test for checking if serializer return error with invalid data""" user = User.objects.get(username='test_right_perms') api_client.force_authenticate(user) asset = Asset.objects.get(name='test0') asset_pk = asset.pk asset_os = asset.os url = reverse('asset-detail', args=[asset_pk]) response = api_client.patch(url, {'os': 'bad'}) assert status.HTTP_400_BAD_REQUEST == response.status_code assert 'os' in response.data assert Asset.objects.get(name='test0').os == asset_os @pytest.mark.merge def test_asset_correctly_count_number_of_incidents(self, api_client): user = User.objects.get(username='test_right_perms') api_client.force_authenticate(user) asset = Asset.objects.get(name='test0') incident = Incident.objects.create(title='test_inc', importance=10, event_count=10, events='') asset.incidents.add(incident) url = reverse('asset-list') response = api_client.get(url) assert response.status_code == status.HTTP_200_OK assert response.data['results'][0]['count_incidents'] == 1 assert response.data['results'][1]['count_incidents'] == 0 @pytest.mark.merge def test_filter_by_incidents(self, api_client): """ Test for checking filter returns asset""" user = User.objects.get(username='test_right_perms') api_client.force_authenticate(user) incident = Incident.objects.create(title='test_inc', importance=10, event_count=10, events='') incident_id = incident.incident_id asset = Asset.objects.get(name='test0') asset.incidents.add(incident) url = reverse('asset-list') response = api_client.get(url, **{'QUERY_STRING': f'incidents={incident_id}'}) assert status.HTTP_200_OK == response.status_code _log.info(response.data) _log.info(incident_id) assert response.data['count'] == 1 assert response.data['results'][0]['id'] == asset.pk bad_response = api_client.get(url, **{'QUERY_STRING': f'incidents=bad'}) assert status.HTTP_400_BAD_REQUEST == bad_response.status_code @pytest.mark.unit def test_create_asset_with_bad_status(self, api_client): """We set the asset status asset to 0 by default. Then we will check that the status is always 0""" user = User.objects.get(username='foo') api_client.force_authenticate(user) url = reverse('logstash-asset-list') response = api_client.post(url, data={ "asset_type": "", "description": "Description", "group": "", "ip": "127.0.0.1", "manufacturer": "", "model": "", "name": "192.168.1.101", "os": "", "ports": "[5000]", "sensor": "armaif_1", "status": "25622", "type": "asset", }, format="json") assert status.HTTP_201_CREATED == response.status_code asset = Asset.objects.get(name='192.168.1.101') assert asset.status == 0