old_console/correlation/tests/test_api.py
2024-11-02 14:12:45 +03:00

462 lines
18 KiB
Python

import datetime
import logging
from unittest.mock import patch
import pytest
from django.core.exceptions import ObjectDoesNotExist
from django.urls import reverse
from rest_framework import status
from correlation.constants import Type
from correlation.models import Rule, Group
_log = logging.getLogger(__name__)
def mock_correlator_task(*args, **kwargs):
pass
@pytest.mark.django_db(transaction=True)
class TestRuleOrdering:
@pytest.fixture(autouse=True)
def setup_test(self, add_user_with_permissions):
"""Create two rules with different status."""
self.user = add_user_with_permissions(username='superuser', password='pwd123', is_superuser=True)
Rule.objects.all().delete()
rule_type = Type.system
rule_data = {
'name': '1 Test with Status True',
'type': rule_type,
'status': True,
'rev': 1,
'sid': 2,
'depth': datetime.timedelta(minutes=10),
'rule_json': {
"type": "query_string",
"field": "",
"operands": "event_severity:>=6",
},
'actions_json': {
"type": "incident",
"title": "{{.SignName}}",
"comment": "",
"category": "",
"importance": "50",
"assigned_to": "",
"description": "{{.EventSrcMsg}}"
}
}
self.rule1 = Rule.objects.create(
**rule_data
)
rule_data_disable = rule_data
rule_data_disable['status'] = False
rule_data_disable['rev'] = 2
self.rule1_disable = Rule.objects.create(**rule_data_disable)
self.rule2 = Rule.objects.create(
name='2 Test with Status False',
type=rule_type,
status=False,
rev=1,
sid=3,
depth=datetime.timedelta(minutes=10),
rule_json={
"type": "query_string",
"field": "",
"operands": "event_severity:>=6",
},
actions_json={
"type": "incident",
"title": "{{.SignName}}",
"comment": "",
"category": "",
"importance": "50",
"assigned_to": "",
"description": "{{.EventSrcMsg}}"
},
)
self.rule2.status = False
self.rule2.save()
@pytest.mark.integration
def test_rules_list(self, client, add_user_with_permissions):
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
client.force_login(user=user)
url = reverse('rules-list')
response = client.get(url)
data_json = response.json()
assert response.status_code == 200
assert len(data_json) == 4
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_add_rule(self, client, add_user_with_permissions):
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
client.force_login(user=user)
url = reverse('rules-list')
data = {
'name': 'New Test with Status False',
'sid': 3,
'rev': 4,
'kind': 'System',
'group': None,
'status': False,
'multi': False,
'archived': False,
'type': 0,
'depth': '00:20:00',
'rule_json': {
'type': 'query_string',
'field': 'query field',
'operands': 'event_severity:>=666'
},
'actions_json': {
'type': 'incident',
'title': '{{.SignName}}',
'comment': 'New comment',
'category': 'New category',
'importance': '50',
'assigned_to': '',
'description': '{{.EventSrcMsg}}'
}
}
response = client.post(url, data=data, content_type='application/json')
assert response.status_code == 201
rule = Rule.objects.get(name='New Test with Status False')
assert rule.sid == 3
assert rule.rev == 4
assert rule.group == None
assert not rule.status
assert not rule.archived
assert rule.type == 0
assert rule.depth == datetime.timedelta(seconds=1200)
assert rule.rule_json == {
'type': 'query_string',
'field': 'NULL',
'operands': 'event_severity:>=666'
}
assert rule.actions_json == {
'type': 'incident',
'title': '{{.SignName}}',
'comment': 'New comment',
'category': 'New category',
'importance': '50',
'assigned_to': '',
'description': '{{.EventSrcMsg}}'
}
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_delete_rule(self, client, add_user_with_permissions):
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
client.force_login(user=user)
rule = Rule.objects.get(name='2 Test with Status False')
rules_before = Rule.objects.count()
url = reverse('rules-detail', kwargs={"pk": rule.pk})
response = client.delete(url)
assert response.status_code == 204
assert Rule.objects.count() == rules_before - 1
@pytest.mark.unit
@pytest.mark.skip(reason='1.5 waiting of products')
def test_edit_rule(self, client, add_user_with_permissions):
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
client.force_login(user=user)
rule = Rule.objects.get(name='2 Test with Status False')
url = reverse('rules-detail', kwargs={"pk": rule.pk})
data = {'name': '2 Test with Status False', 'sid': 5, 'rev': 6, 'kind': 'System', 'group': 'None',
'status': True, 'multi': True,
'archived': True, 'type': 1, 'depth': '00:20:00',
'rule_json': {'type': 'query_string', 'field': 'field', 'operands': 'event_severity:>=50'},
'actions_json': {'type': 'incident', 'title': '{{.SignName}}', 'comment': 'new comment', 'category': '',
'importance': '500', 'assigned_to': 'user', 'description': '{{.EventSrcMsg}}'}}
response = client.patch(url, data=data, content_type='application/json')
assert response.status_code == 200
rule = Rule.objects.get(name='2 Test with Status False')
assert rule.sid == 5
assert rule.rev == 7
assert rule.type == 1
assert not rule.group
assert not rule.status
assert rule.multi
assert rule.archived
assert rule.depth == datetime.timedelta(seconds=1200)
assert rule.rule_json == {'type': 'query_string', 'field': 'field', 'operands': 'event_severity:>=50'}
assert rule.actions_json == {'type': 'incident', 'title': '{{.SignName}}', 'comment': 'new comment',
'category': '', 'importance': '500', 'assigned_to': 'user',
'description': '{{.EventSrcMsg}}'}
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_enable_rule_but_this_sid_already_enable(self, api_client):
api_client.force_login(user=self.user)
url = reverse('rules-detail', kwargs={"pk": self.rule1_disable.pk})
response = api_client.patch(url, data={'status': True})
assert 'status' in response.json()
assert response.json()['status'] == ['There is already an enabled rule with this sid: 2']
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_edit_rule_but_rule_with_same_sid_disable(self, api_client):
api_client.force_login(user=self.user)
url = reverse('rules-detail', kwargs={"pk": self.rule1.pk})
response = api_client.patch(url, data={'description': '-----'})
assert 'sid' in response.json()
assert response.json()['sid'] == ['Check that there is no rule with SID 2 and REV 2 and try again']
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_expansion_rule_json_when_edit(self, api_client):
api_client.force_login(user=self.user)
url = reverse('rules-detail', kwargs={"pk": self.rule2.pk})
response = api_client.patch(url, data={'rule_json': {'operands': 'test'}}, format='json')
assert response.json()['rule_json'] == {
'operands': 'test',
'field': 'NULL',
'type': 'query_string'
}
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_edit_rule_only_status(self, api_client):
api_client.force_login(user=self.user)
url = reverse('rules-detail', kwargs={"pk": self.rule2.pk})
response = api_client.patch(url, data={'status': True})
assert response.json()['rev'] == 1
assert Rule.objects.first().rev == 1
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_add_rule_error(self, client, add_user_with_permissions):
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
client.force_login(user=user)
url = reverse('rules-list')
data = {
'name': 'New Test with Status False',
'sid': 3,
'rev': 4,
'kind': 'System',
'group': 'None',
'status': False,
'multi': False,
'archived': False,
'type': 0,
'depth': '00:20:00',
'rule_json': {
'type': 'query_string',
'field': 'query field',
'operands': 'event_severity:>=666'
},
}
response = client.post(url, data=data, content_type='application/json')
assert response.status_code == 400
@pytest.mark.integration
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
def test_add_rule_with_non_unique_sid(self, api_client, add_user_with_permissions):
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
api_client.force_authenticate(user)
url = reverse('rules-list')
data = {
'name': 'New Test',
'sid': 3,
'rev': 4,
'kind': 'System',
'group': None,
'status': True,
'multi': False,
'archived': False,
'type': 0,
'depth': '00:20:00',
'rule_json': {
'type': 'query_string',
'field': 'query field',
'operands': 'event_severity:>=666'
},
'actions_json': {
'type': 'incident',
'title': '{{.SignName}}',
'comment': 'New comment',
'category': 'New category',
'importance': '50',
'assigned_to': '',
'description': '{{.EventSrcMsg}}'
}
}
response = api_client.post(url, data=data, format='json')
assert response.status_code == status.HTTP_201_CREATED
data['rev'] = 111111
response = api_client.post(url, data=data, format='json')
assert response.status_code == 400
assert 'sid' in response.json()
@pytest.mark.django_db
class TestCorrelationGroups:
@pytest.fixture(autouse=True)
def setup_test(self, django_user_model):
self.admin = django_user_model.objects.first()
test_rule = dict(
name='Rule 1',
type=1,
status=True,
rev=1,
sid=2,
depth=datetime.timedelta(minutes=10),
rule_json={
"type": "query_string",
"field": "",
"operands": "event_severity:>=6",
},
actions_json={
"type": "incident",
"title": "{{.SignName}}",
"comment": "",
"category": "",
"importance": "50",
"assigned_to": "",
"description": "{{.EventSrcMsg}}"
},
)
self.rule1 = Rule.objects.create(**test_rule)
self.group = Group.objects.create(name="test_group", description="test description")
@pytest.mark.unit
def test_get_group(self, api_client):
api_client.force_authenticate(self.admin)
response = api_client.get(reverse('rules-groups-list'))
assert response.status_code == 200
data_json = response.json()
assert len(data_json['results']) == 1 # there is one group
assert data_json['count'] == 1
print(data_json)
assert data_json['results'][0] == {
'id': self.group.pk,
'name': 'test_group',
'description': 'test description',
'rules': []
}
@pytest.mark.unit
def test_add_group(self, api_client):
api_client.force_authenticate(self.admin)
groups_before = Group.objects.count()
data = {
"name": "New group",
"description": "Group description"
}
response = api_client.post(reverse('rules-groups-list'), data=data)
assert response.status_code == 201
assert response.json()['name'] == data['name']
assert response.json()['description'] == data['description']
groups_after = Group.objects.count()
assert groups_before != groups_after
group = Group.objects.get(name="New group")
assert group.description == "Group description"
@pytest.mark.unit
def test_delete_group(self, api_client):
api_client.force_authenticate(self.admin)
group = Group.objects.create(name="group", description="test description")
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
response = api_client.delete(url)
assert response.status_code == 204
with pytest.raises(ObjectDoesNotExist):
Group.objects.get(pk=group.pk)
@pytest.mark.unit
def test_update_group(self, api_client):
api_client.force_authenticate(self.admin)
group = Group.objects.create(name="group", description="test description")
data = {
"name": "New group",
"description": "Group description"
}
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
response = api_client.patch(url, data=data)
assert response.status_code == 200
group = Group.objects.get(name="New group")
assert group.description == "Group description"
@pytest.mark.unit
def test_partial_update_group_description(self, api_client):
api_client.force_authenticate(self.admin)
group = Group.objects.create(name="group", description="test description")
data = {
"description": "Group description"
}
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
response = api_client.patch(url, data=data)
assert response.status_code == 200
group = Group.objects.get(name="group")
assert group.description == "Group description"
@pytest.mark.unit
def test_partial_update_group_name(self, api_client):
api_client.force_authenticate(self.admin)
group = Group.objects.create(name="test", description="test description")
data = {
"name": "New group",
}
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
response = api_client.patch(url, data=data)
assert response.status_code == 200
group = Group.objects.get(name="New group")
assert group.description == "test description"
@pytest.mark.unit
def test_add_rule_to_group(self, api_client):
api_client.force_authenticate(self.admin)
group = Group.objects.create(name="test", description="test description")
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
response = api_client.get(url)
assert response.json()['rules'] == []
data = {"rules": [self.rule1.pk]}
response = api_client.patch(url, data=data)
assert response.status_code == 200
assert len(response.json()['rules']) == 1
@pytest.mark.unit
def test_edit_group_in_rule(self, api_client):
api_client.force_authenticate(self.admin)
group = Group.objects.create(name="test", description="test description")
url = reverse('rules-detail', kwargs={"pk": self.rule1.pk})
response = api_client.get(url)
assert response.json()['group'] == None
data = {"group": [group.pk]}
response = api_client.patch(url, data=data)
assert response.status_code == 200
assert response.json()['group'] == {'id': group.pk, 'name': 'test'}