import json import os from pathlib import Path from unittest import mock from unittest.mock import patch import pytest import tomli from django.core.files.uploadedfile import SimpleUploadedFile from console.management.commands.create_vector_configs import Command as CreateVectorConfig from console.management.commands.load_rules import Command from correlation.models import Rule from correlation.services.import_service import ImportRulesService from incident.models import IncidentEffect, IncidentCategory, IncidentRecommendations @pytest.fixture(autouse=True) def test_dir(tmp_path): with patch('console.management.commands.create_vector_configs.VECTOR_CONFIG_DIR', tmp_path) as test_dir: yield test_dir def mock_correlator_task(*args, **kwargs): pass @pytest.mark.unit @pytest.mark.django_db class TestConsoleCommands: @pytest.fixture(autouse=True) def setup_tests(self): with open('console/management/commands/rules_console.json', 'r') as json_file: self.json_data = json.load(json_file) self.count_recommendations = self.calculate_recommendations(self.json_data) self.count_effects = self.calculate_effects(self.json_data) self.count_category = self.calculate_category(self.json_data) self.count_rules = self.calculate_rules(self.json_data) def test_create_recommendation_and_effects(self): with open('console/management/commands/rules_console.json', 'rb') as test_file: file = SimpleUploadedFile("rules.json", test_file.read()) with mock.patch('correlation.tasks.update_correlator_tasks', mock_correlator_task): ImportRulesService(file).run_import() assert IncidentRecommendations.objects.count() == self.count_recommendations assert IncidentEffect.objects.count() == self.count_effects assert IncidentCategory.objects.count() == self.count_category def test_rule_create(self): with open('console/management/commands/rules_console.json', 'r') as json_file: self.json_data = json.load(json_file) with mock.patch('correlation.tasks.update_correlator_tasks', mock_correlator_task): command = Command() command.handle() assert Rule.objects.count() == self.count_rules def test_crete_config_vector(self, test_dir): command = CreateVectorConfig() command.handle() assert len(os.listdir(test_dir)) == 8 config_path = Path(test_dir) / 'mc_logs_es.toml' # one of eight source_content = config_path.read_text() parse_content = tomli.loads(source_content) assert parse_content['sinks']['es_logs_to_es']['auth']['user'] == "elastic" assert parse_content['sinks']['es_logs_to_es']['auth']['password'] == "changeme" assert parse_content['sinks']['es_logs_to_es']['endpoint'] == "http://elasticsearch:9200" def calculate_recommendations(self, data: dict) -> int: return len(data['close_recommendations']) def calculate_effects(self, data: dict) -> int: return len(data['effects']) def calculate_category(self, data: dict) -> int: category = set() for rule in data.get('rules', []): for action in rule['actions_json']: if action['type'] == 'incident' and action['category'] != '': # there is only one incident category for each rule, but it in list category.add(action['category'][0]['name']) return len(category) def calculate_rules(self, data: dict) -> int: return len(data['rules'])