old_console/storage/tests/test.py
2024-11-02 14:12:45 +03:00

264 lines
9.9 KiB
Python

import glob
import os
import pytest
from django.conf import settings
from django.contrib.auth.models import User
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIRequestFactory, force_authenticate, APIClient
from core.utils import dtnow
from devices.models.device import DeviceType
from devices.models.endpoint_device import EndpointModel
from perms.models import Perm
from storage.models import DataStorage
from storage.views import StorageViewSet
TMP_DIR_EXPORT = '/tmp/export'
TEST_FILES = os.path.join(settings.BASE_DIR, 'storage/tests/test_files/')
@pytest.mark.django_db
class TestStorage(object):
@pytest.fixture(autouse=True)
def setup_tests(self, django_user_model):
self.admin_user = django_user_model.objects.get(username='admin')
os.makedirs(TMP_DIR_EXPORT, exist_ok=True)
yield
files = glob.glob(f'{TMP_DIR_EXPORT}/*')
for file in files:
os.remove(os.path.join(TMP_DIR_EXPORT, file))
@pytest.mark.unit
@pytest.mark.skip(reason="Fix in 1.5")
def test_user_can_download_file(self, client, add_user_with_permissions, get_url):
u_name = 'pro100ton3333'
u_pass = 'ponala61'
add_user_with_permissions(username=u_name, password=u_pass)
client.login(username=u_name, password=u_pass)
file1 = open('file1', 'w')
file1.write('!!!!!!!!!!!!!!!')
file1.close()
user = User.objects.get(username=u_name)
store1 = DataStorage(type=DataStorage.Type.UNKNOWN,
format=DataStorage.Format.UNKNOWN,
user=user,
size=1,
crc={'type': 'bad'},
file='file1')
store1.save()
os.system('cp file1 ' + store1.get_full_path())
response = client.get(get_url('store-download', kwargs={"id": store1.pk}))
os.remove('file1')
os.remove(store1.get_full_path())
assert response['Content-Type'] == 'text/html; charset=utf-8'
@pytest.mark.unit
@pytest.mark.skip(reason="Fix in 1.5")
def test_download_update_last_access(self,
client,
add_user_with_permissions,
get_url):
u_name = 'pro100ton3333'
u_pass = 'ponala61'
add_user_with_permissions(username=u_name, password=u_pass)
client.login(username=u_name, password=u_pass)
file1 = open('file1', 'w')
file1.write('!!!!!!!!!!!!!!!')
file1.close()
user = User.objects.get(username=u_name)
dt = dtnow(days=-1)
store1 = DataStorage(type=DataStorage.Type.UNKNOWN,
format=DataStorage.Format.UNKNOWN,
user=user,
size=1,
crc={'type': 'bad'},
file='file1')
store1.save()
os.system('cp file1 ' + store1.get_full_path())
response = client.get(get_url('store-download', kwargs={"id": store1.pk}))
os.remove('file1')
os.remove(store1.get_full_path())
assert response['Content-Type'] == 'text/html; charset=utf-8'
store = DataStorage.objects.get(pk=store1.pk)
assert store.last_access != dt
@pytest.mark.unit
def test_get_data_current_user_and_superuser(self, add_user_with_permissions):
url = reverse('store-list')
request = APIRequestFactory().get(url)
view = StorageViewSet.as_view({'get': 'list'})
superuser = add_user_with_permissions(username="superuser", password="pass", is_superuser=True)
user = add_user_with_permissions(username="user", password='pass', permissions=[Perm.can_view_storage])
store = DataStorage(type=DataStorage.Type.UNKNOWN,
format=DataStorage.Format.UNKNOWN,
user=user,
size=1,
crc={'type': 'bad'},
file='file1')
store.save()
force_authenticate(request, user=user)
response = view(request)
assert response.status_code == 200
assert int(response.data.get('count')) == 1
force_authenticate(request, user=superuser)
response = view(request)
assert response.status_code == 200
assert int(response.data.get('count')) == 1
@pytest.mark.unit
def test_get_data_another_user(self, add_user_with_permissions):
url = reverse('store-list')
request = APIRequestFactory().get(url)
view = StorageViewSet.as_view({'get': 'list'})
superuser = add_user_with_permissions(username="superuser", password="pass", is_superuser=True)
user = add_user_with_permissions(username="user", password='pass', permissions=[Perm.can_view_storage])
store = DataStorage(type=DataStorage.Type.UNKNOWN,
format=DataStorage.Format.UNKNOWN,
user=superuser,
size=1,
crc={'type': 'bad'},
file='file1')
store.save()
force_authenticate(request, user=user)
response = view(request)
assert response.status_code == 200
assert int(response.data.get('count')) == 0
@pytest.mark.unit
def test_get_rotation_file(self, add_user_with_permissions):
"""Test show rotation file if user has `can_download_rotation_files`."""
superuser = add_user_with_permissions(username="superuser", password="pass", is_superuser=True)
user_with_perms = add_user_with_permissions(
username="user1", password='PassWord123',
permissions=[Perm.can_view_storage, Perm.can_download_rotation_files]
)
user_without_perm = add_user_with_permissions(
username="user2", password='PassWord123', permissions=[Perm.can_view_storage]
)
store = DataStorage(type=DataStorage.Type.DB_DUMP,
format=DataStorage.Format.UNKNOWN,
user=superuser,
size=1,
crc={'type': 'bad'},
file='file1',
description='Table rotation')
store.save()
url = reverse('store-list')
request = APIRequestFactory().get(url)
view = StorageViewSet.as_view({'get': 'list'})
force_authenticate(request, user=user_with_perms)
response = view(request)
assert response.status_code == 200
assert response.data.get('count') == 1
force_authenticate(request, user=user_without_perm)
response = view(request)
assert response.status_code == 200
assert not response.data.get('count')
@pytest.mark.unit
def test_delete_file(self, add_user_with_permissions):
"""Test delete file if user has `can_view_storage`."""
user_with_perms = add_user_with_permissions(
username="user1", password='PassWord123',
permissions=[Perm.can_view_storage]
)
store = DataStorage(type=DataStorage.Type.UNKNOWN,
format=DataStorage.Format.UNKNOWN,
user=user_with_perms,
size=1,
crc={'type': 'bad'},
file='file1',
description='Table rotation')
store.save()
client = APIClient()
client.force_authenticate(user=user_with_perms)
url = reverse('store-list')
response = client.get(url)
assert response.data.get('count') == 1
url = reverse('store-detail', kwargs={'pk': store.pk})
response = client.delete(url)
url = reverse('store-list')
response = client.get(url)
assert response.data.get('count') == 0
@pytest.mark.unit
def test_download_file(self, add_user_with_permissions):
user_with_perms = add_user_with_permissions(
username="user1", password='PassWord123',
permissions=[Perm.can_view_storage]
)
store = DataStorage(type=DataStorage.Type.UNKNOWN,
format=DataStorage.Format.UNKNOWN,
user=user_with_perms,
size=1,
crc={'type': 'bad'},
file='file1',
description='Table rotation')
store.save()
file1 = open('file1', 'w')
file1.write('!!!!!!!!!!!!!!!')
file1.close()
client = APIClient()
client.force_authenticate(user=user_with_perms)
url = reverse('store-download', kwargs={'pk': store.pk})
response = client.get(url)
os.system('cp file1 ' + store.get_full_path())
response = client.get(reverse('store-download', kwargs={"pk": store.pk}))
os.remove('file1')
os.remove(store.get_full_path())
assert response.status_code == 200
@pytest.mark.unit
def test_upload_antivirus_database(self, api_client):
"""Test upload antivirus database to storage"""
api_client.force_authenticate(self.admin_user)
ep_test = EndpointModel.objects.create(name='EP_TEST_1', type=DeviceType.ENDPOINT, ip='127.0.0.100',
port='5555', antivirus_update_db=False)
file_path = os.path.join(TEST_FILES, 'antivirus_update.zip')
file = open(file_path, 'rb')
url = reverse('store-antivirus')
data = {'file': file}
response = api_client.post(url, data)
assert response.json()['status'] == 'ok'
assert response.status_code == status.HTTP_200_OK
assert DataStorage.objects.filter(type=DataStorage.Type.CLAMAV).count() == 1
assert EndpointModel.objects.get(pk=ep_test.pk).antivirus_update_db is True