86 lines
3.5 KiB
Python
86 lines
3.5 KiB
Python
import json
|
|
import os
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
import tomli
|
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
|
|
|
from console.management.commands.create_vector_configs import Command as CreateVectorConfig
|
|
from console.management.commands.load_rules import Command
|
|
from correlation.models import Rule
|
|
from correlation.services.import_service import ImportRulesService
|
|
from incident.models import IncidentEffect, IncidentCategory, IncidentRecommendations
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def test_dir(tmp_path):
|
|
with patch('console.management.commands.create_vector_configs.VECTOR_CONFIG_DIR', tmp_path) as test_dir:
|
|
yield test_dir
|
|
|
|
|
|
def mock_correlator_task(*args, **kwargs):
|
|
pass
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.django_db
|
|
class TestConsoleCommands:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_tests(self):
|
|
with open('console/management/commands/rules_console.json', 'r') as json_file:
|
|
self.json_data = json.load(json_file)
|
|
self.count_recommendations = self.calculate_recommendations(self.json_data)
|
|
self.count_effects = self.calculate_effects(self.json_data)
|
|
self.count_category = self.calculate_category(self.json_data)
|
|
self.count_rules = self.calculate_rules(self.json_data)
|
|
|
|
def test_create_recommendation_and_effects(self):
|
|
|
|
with open('console/management/commands/rules_console.json', 'rb') as test_file:
|
|
file = SimpleUploadedFile("rules.json", test_file.read())
|
|
with mock.patch('correlation.tasks.update_correlator_tasks', mock_correlator_task):
|
|
ImportRulesService(file).run_import()
|
|
assert IncidentRecommendations.objects.count() == self.count_recommendations
|
|
assert IncidentEffect.objects.count() == self.count_effects
|
|
assert IncidentCategory.objects.count() == self.count_category
|
|
|
|
def test_rule_create(self):
|
|
with open('console/management/commands/rules_console.json', 'r') as json_file:
|
|
self.json_data = json.load(json_file)
|
|
with mock.patch('correlation.tasks.update_correlator_tasks', mock_correlator_task):
|
|
command = Command()
|
|
command.handle()
|
|
assert Rule.objects.count() == self.count_rules
|
|
|
|
def test_crete_config_vector(self, test_dir):
|
|
command = CreateVectorConfig()
|
|
command.handle()
|
|
assert len(os.listdir(test_dir)) == 8
|
|
config_path = Path(test_dir) / 'mc_logs_es.toml' # one of eight
|
|
source_content = config_path.read_text()
|
|
parse_content = tomli.loads(source_content)
|
|
assert parse_content['sinks']['es_logs_to_es']['auth']['user'] == "elastic"
|
|
assert parse_content['sinks']['es_logs_to_es']['auth']['password'] == "changeme"
|
|
assert parse_content['sinks']['es_logs_to_es']['endpoint'] == "http://elasticsearch:9200"
|
|
|
|
def calculate_recommendations(self, data: dict) -> int:
|
|
return len(data['close_recommendations'])
|
|
|
|
def calculate_effects(self, data: dict) -> int:
|
|
return len(data['effects'])
|
|
|
|
def calculate_category(self, data: dict) -> int:
|
|
category = set()
|
|
for rule in data.get('rules', []):
|
|
for action in rule['actions_json']:
|
|
if action['type'] == 'incident' and action['category'] != '':
|
|
# there is only one incident category for each rule, but it in list
|
|
category.add(action['category'][0]['name'])
|
|
return len(category)
|
|
|
|
def calculate_rules(self, data: dict) -> int:
|
|
return len(data['rules'])
|