old_console/storage/tests/test_export.py
2024-11-02 14:12:45 +03:00

191 lines
7.4 KiB
Python

import csv
import json
import os
import shutil
from unittest import mock
import datetime
import pytest
from django.conf import settings
from correlation.models import Rule
from devices.models.device import Device
from incident.models import IncidentRecommendations
from storage.export import export, ExportException, ExportToCSV, ExportToJSON
from storage.models import DataStorage, get_storage_path
TMP_DIR_EXPORT = '/tmp/export'
@pytest.mark.django_db
class TestExport:
@pytest.fixture(autouse=True)
def setup_tests(self, django_user_model):
self.admin_user = django_user_model.objects.get(username='admin')
os.makedirs(TMP_DIR_EXPORT, exist_ok=True)
yield
shutil.rmtree(TMP_DIR_EXPORT)
@pytest.mark.unit
def test_export_csv_task(self, api_client):
test_csv_device1 = Device.objects.create(ip='2.2.2.2', port='2500', type='firewall', name='FIREWALL')
test_csv_device2 = Device.objects.create(ip='3.3.3.3', port='9999', type='firewall', name='ENDPOINT')
queryset = Device.objects.all()
model_name = queryset.model._meta.model_name
datastorage_id = export('csv', self.admin_user.pk, model_name, query_parameters={})
datastorage = DataStorage.objects.get(pk=datastorage_id)
full_name = datastorage.get_full_path()
assert os.path.exists(full_name)
with open(full_name, 'r') as csv_file:
reader = csv.DictReader(csv_file)
parsed_file = []
for row in reader:
parsed_file.append(row)
assert len(parsed_file) == 2
assert parsed_file[0]['id'] == str(test_csv_device1.pk)
assert parsed_file[0]['port'] == '2500'
assert parsed_file[0]['name'] == 'FIREWALL'
assert parsed_file[1]['id'] == str(test_csv_device2.pk)
assert parsed_file[1]['port'] == '9999'
assert parsed_file[1]['name'] == 'ENDPOINT'
os.remove(datastorage.get_full_path())
@pytest.mark.unit
def test_export_csv_task_with_parameters(self, api_client):
test_csv_device1 = Device.objects.create(ip='2.2.2.2', port='2500', type='firewall', name='FIREWALL')
test_csv_device2 = Device.objects.create(ip='3.3.3.3', port='9999', type='firewall', name='ENDPOINT')
queryset = Device.objects.all()
model_name = queryset.model._meta.model_name
# export only one device - 'test_csv_device1'
datastorage_id = export('csv', self.admin_user.pk, model_name, query_parameters={'name': 'FIREWALL'})
datastorage = DataStorage.objects.get(pk=datastorage_id)
full_name = datastorage.get_full_path()
assert os.path.exists(full_name)
with open(full_name, 'r') as csv_file:
reader = csv.DictReader(csv_file)
parsed_file = []
for row in reader:
parsed_file.append(row)
assert len(parsed_file) == 1
assert parsed_file[0]['id'] == str(test_csv_device1.pk)
assert parsed_file[0]['port'] == '2500'
assert parsed_file[0]['name'] == 'FIREWALL'
os.remove(datastorage.get_full_path())
@pytest.mark.django_db
class TestExportService:
@pytest.fixture(autouse=True)
def setup_tests(self, django_user_model):
self.test_csv_device1 = Device.objects.create(ip='2.2.2.2', port='2500', type='firewall', name='FIREWALL')
self.test_csv_device2 = Device.objects.create(ip='3.3.3.3', port='9999', type='firewall', name='ENDPOINT')
self.admin_user = django_user_model.objects.get(username='admin')
os.makedirs(TMP_DIR_EXPORT, exist_ok=True)
yield
shutil.rmtree(TMP_DIR_EXPORT)
@pytest.mark.unit
def test_pass_invalid_export_type(self):
with pytest.raises(ExportException) as exc:
export('blah-blah', 1, '1', {})
assert str(exc.value) == '"blah-blah" export type not possible'
@pytest.mark.unit
def test_create_default_store(self):
adapter = ExportToCSV(1, '1', {})
store = adapter.create_store(self.admin_user)
assert store.user == self.admin_user
assert store.format == 3
assert store.type == 2
assert store.size == 0
assert store.file == 'table'
@pytest.mark.unit
def test_get_file_path(self):
adapter = ExportToCSV(1, '1', {})
store = DataStorage(type=DataStorage.Type.CSV_EXPORT, format=DataStorage.Format.CSV,
user=self.admin_user, file='table', size=0)
store.save()
file_name = get_storage_path(store, 'export.csv')
file_path = adapter.get_file_path(file_name)
assert file_path == os.path.join(settings.MEDIA_ROOT, file_name)
@pytest.mark.unit
def test_get_export_data(self):
adapter = ExportToCSV(self.admin_user.pk, 'device', {})
data = adapter.get_export_data(Device)
assert len(data) == 2
device = data[0]
assert device['id'] == self.test_csv_device1.pk
assert device['type'] == self.test_csv_device1.type
assert device['name'] == self.test_csv_device1.name
assert device['description'] == self.test_csv_device1.description
assert device['group'] == self.test_csv_device1.group
@pytest.mark.unit
@mock.patch('storage.export.MEDIA_ROOT', TMP_DIR_EXPORT)
@mock.patch('storage.models.MEDIA_ROOT', TMP_DIR_EXPORT)
def test_run_export(self):
service = ExportToCSV(self.admin_user.pk, 'device', {})
data_storage_id = service.run_export()
store = DataStorage.objects.get(id=data_storage_id)
os.path.exists(store.get_full_path())
@pytest.mark.django_db
class TestExportRule:
@pytest.fixture(autouse=True)
def setup_test(self, django_user_model):
self.admin_user = django_user_model.objects.get(username='admin')
recommendation_in_rule = IncidentRecommendations.objects.create(name='rec_name', description='rec_desc')
recommendation_without_rule = IncidentRecommendations.objects.create(name='TEST', description='TEST')
Rule.objects.create(
name='Test',
type=0,
status=True,
rev=1,
sid=2,
depth=datetime.timedelta(minutes=10),
rule_json={
"type": "query_string",
"field": "",
"operands": "event_severity:>=6",
},
actions_json=[{
"type": "incident",
"title": "{{.source_ip}}",
"comment": "",
"category": "",
"importance": "50",
"assigned_to": "",
"description": "{{.source_ip}}",
"close_recommendations": [recommendation_in_rule.pk]
}],
)
@pytest.mark.unit
@mock.patch('storage.export.MEDIA_ROOT', TMP_DIR_EXPORT)
@mock.patch('storage.models.MEDIA_ROOT', TMP_DIR_EXPORT)
def test_export_all_effects_recommendations(self):
service = ExportToJSON(self.admin_user.pk, 'rule', {})
data_storage_id = service.run_export()
store = DataStorage.objects.get(id=data_storage_id)
with open(store.get_full_path(), 'r') as f:
content = json.loads(f.read())
assert os.path.exists(store.get_full_path())
assert len(content['rules'][0]['actions_json'][0]['close_recommendations']) == 1 # one recommendation in rule
assert len(content['close_recommendations']) == 2 # but exist two recommendations