462 lines
18 KiB
Python
462 lines
18 KiB
Python
import datetime
|
|
import logging
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.urls import reverse
|
|
from rest_framework import status
|
|
|
|
from correlation.constants import Type
|
|
from correlation.models import Rule, Group
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
def mock_correlator_task(*args, **kwargs):
|
|
pass
|
|
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
class TestRuleOrdering:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self, add_user_with_permissions):
|
|
"""Create two rules with different status."""
|
|
self.user = add_user_with_permissions(username='superuser', password='pwd123', is_superuser=True)
|
|
|
|
Rule.objects.all().delete()
|
|
rule_type = Type.system
|
|
rule_data = {
|
|
'name': '1 Test with Status True',
|
|
'type': rule_type,
|
|
'status': True,
|
|
'rev': 1,
|
|
'sid': 2,
|
|
'depth': datetime.timedelta(minutes=10),
|
|
'rule_json': {
|
|
"type": "query_string",
|
|
"field": "",
|
|
"operands": "event_severity:>=6",
|
|
},
|
|
'actions_json': {
|
|
"type": "incident",
|
|
"title": "{{.SignName}}",
|
|
"comment": "",
|
|
"category": "",
|
|
"importance": "50",
|
|
"assigned_to": "",
|
|
"description": "{{.EventSrcMsg}}"
|
|
}
|
|
}
|
|
self.rule1 = Rule.objects.create(
|
|
**rule_data
|
|
)
|
|
|
|
rule_data_disable = rule_data
|
|
rule_data_disable['status'] = False
|
|
rule_data_disable['rev'] = 2
|
|
self.rule1_disable = Rule.objects.create(**rule_data_disable)
|
|
|
|
self.rule2 = Rule.objects.create(
|
|
name='2 Test with Status False',
|
|
type=rule_type,
|
|
status=False,
|
|
rev=1,
|
|
sid=3,
|
|
depth=datetime.timedelta(minutes=10),
|
|
rule_json={
|
|
"type": "query_string",
|
|
"field": "",
|
|
"operands": "event_severity:>=6",
|
|
},
|
|
actions_json={
|
|
"type": "incident",
|
|
"title": "{{.SignName}}",
|
|
"comment": "",
|
|
"category": "",
|
|
"importance": "50",
|
|
"assigned_to": "",
|
|
"description": "{{.EventSrcMsg}}"
|
|
},
|
|
|
|
)
|
|
self.rule2.status = False
|
|
self.rule2.save()
|
|
|
|
@pytest.mark.integration
|
|
def test_rules_list(self, client, add_user_with_permissions):
|
|
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
|
|
client.force_login(user=user)
|
|
url = reverse('rules-list')
|
|
response = client.get(url)
|
|
data_json = response.json()
|
|
assert response.status_code == 200
|
|
assert len(data_json) == 4
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_add_rule(self, client, add_user_with_permissions):
|
|
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
|
|
client.force_login(user=user)
|
|
url = reverse('rules-list')
|
|
data = {
|
|
'name': 'New Test with Status False',
|
|
'sid': 3,
|
|
'rev': 4,
|
|
'kind': 'System',
|
|
'group': None,
|
|
'status': False,
|
|
'multi': False,
|
|
'archived': False,
|
|
'type': 0,
|
|
'depth': '00:20:00',
|
|
'rule_json': {
|
|
'type': 'query_string',
|
|
'field': 'query field',
|
|
'operands': 'event_severity:>=666'
|
|
},
|
|
'actions_json': {
|
|
'type': 'incident',
|
|
'title': '{{.SignName}}',
|
|
'comment': 'New comment',
|
|
'category': 'New category',
|
|
'importance': '50',
|
|
'assigned_to': '',
|
|
'description': '{{.EventSrcMsg}}'
|
|
}
|
|
}
|
|
response = client.post(url, data=data, content_type='application/json')
|
|
assert response.status_code == 201
|
|
rule = Rule.objects.get(name='New Test with Status False')
|
|
assert rule.sid == 3
|
|
assert rule.rev == 4
|
|
assert rule.group == None
|
|
assert not rule.status
|
|
assert not rule.archived
|
|
assert rule.type == 0
|
|
assert rule.depth == datetime.timedelta(seconds=1200)
|
|
assert rule.rule_json == {
|
|
'type': 'query_string',
|
|
'field': 'NULL',
|
|
'operands': 'event_severity:>=666'
|
|
}
|
|
assert rule.actions_json == {
|
|
'type': 'incident',
|
|
'title': '{{.SignName}}',
|
|
'comment': 'New comment',
|
|
'category': 'New category',
|
|
'importance': '50',
|
|
'assigned_to': '',
|
|
'description': '{{.EventSrcMsg}}'
|
|
}
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_delete_rule(self, client, add_user_with_permissions):
|
|
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
|
|
client.force_login(user=user)
|
|
rule = Rule.objects.get(name='2 Test with Status False')
|
|
rules_before = Rule.objects.count()
|
|
url = reverse('rules-detail', kwargs={"pk": rule.pk})
|
|
response = client.delete(url)
|
|
assert response.status_code == 204
|
|
assert Rule.objects.count() == rules_before - 1
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.skip(reason='1.5 waiting of products')
|
|
def test_edit_rule(self, client, add_user_with_permissions):
|
|
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
|
|
client.force_login(user=user)
|
|
rule = Rule.objects.get(name='2 Test with Status False')
|
|
url = reverse('rules-detail', kwargs={"pk": rule.pk})
|
|
data = {'name': '2 Test with Status False', 'sid': 5, 'rev': 6, 'kind': 'System', 'group': 'None',
|
|
'status': True, 'multi': True,
|
|
'archived': True, 'type': 1, 'depth': '00:20:00',
|
|
'rule_json': {'type': 'query_string', 'field': 'field', 'operands': 'event_severity:>=50'},
|
|
'actions_json': {'type': 'incident', 'title': '{{.SignName}}', 'comment': 'new comment', 'category': '',
|
|
'importance': '500', 'assigned_to': 'user', 'description': '{{.EventSrcMsg}}'}}
|
|
response = client.patch(url, data=data, content_type='application/json')
|
|
assert response.status_code == 200
|
|
rule = Rule.objects.get(name='2 Test with Status False')
|
|
assert rule.sid == 5
|
|
assert rule.rev == 7
|
|
assert rule.type == 1
|
|
assert not rule.group
|
|
assert not rule.status
|
|
assert rule.multi
|
|
assert rule.archived
|
|
assert rule.depth == datetime.timedelta(seconds=1200)
|
|
assert rule.rule_json == {'type': 'query_string', 'field': 'field', 'operands': 'event_severity:>=50'}
|
|
assert rule.actions_json == {'type': 'incident', 'title': '{{.SignName}}', 'comment': 'new comment',
|
|
'category': '', 'importance': '500', 'assigned_to': 'user',
|
|
'description': '{{.EventSrcMsg}}'}
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_enable_rule_but_this_sid_already_enable(self, api_client):
|
|
api_client.force_login(user=self.user)
|
|
|
|
url = reverse('rules-detail', kwargs={"pk": self.rule1_disable.pk})
|
|
response = api_client.patch(url, data={'status': True})
|
|
assert 'status' in response.json()
|
|
assert response.json()['status'] == ['There is already an enabled rule with this sid: 2']
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_edit_rule_but_rule_with_same_sid_disable(self, api_client):
|
|
api_client.force_login(user=self.user)
|
|
|
|
url = reverse('rules-detail', kwargs={"pk": self.rule1.pk})
|
|
response = api_client.patch(url, data={'description': '-----'})
|
|
assert 'sid' in response.json()
|
|
assert response.json()['sid'] == ['Check that there is no rule with SID 2 and REV 2 and try again']
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_expansion_rule_json_when_edit(self, api_client):
|
|
api_client.force_login(user=self.user)
|
|
|
|
url = reverse('rules-detail', kwargs={"pk": self.rule2.pk})
|
|
response = api_client.patch(url, data={'rule_json': {'operands': 'test'}}, format='json')
|
|
assert response.json()['rule_json'] == {
|
|
'operands': 'test',
|
|
'field': 'NULL',
|
|
'type': 'query_string'
|
|
}
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_edit_rule_only_status(self, api_client):
|
|
api_client.force_login(user=self.user)
|
|
|
|
url = reverse('rules-detail', kwargs={"pk": self.rule2.pk})
|
|
response = api_client.patch(url, data={'status': True})
|
|
assert response.json()['rev'] == 1
|
|
assert Rule.objects.first().rev == 1
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_add_rule_error(self, client, add_user_with_permissions):
|
|
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
|
|
client.force_login(user=user)
|
|
url = reverse('rules-list')
|
|
data = {
|
|
'name': 'New Test with Status False',
|
|
'sid': 3,
|
|
'rev': 4,
|
|
'kind': 'System',
|
|
'group': 'None',
|
|
'status': False,
|
|
'multi': False,
|
|
'archived': False,
|
|
'type': 0,
|
|
'depth': '00:20:00',
|
|
'rule_json': {
|
|
'type': 'query_string',
|
|
'field': 'query field',
|
|
'operands': 'event_severity:>=666'
|
|
},
|
|
|
|
}
|
|
response = client.post(url, data=data, content_type='application/json')
|
|
assert response.status_code == 400
|
|
|
|
@pytest.mark.integration
|
|
@patch('correlation.tasks.update_correlator_tasks', mock_correlator_task)
|
|
@patch('correlation.api.update_correlator_tasks', mock_correlator_task)
|
|
def test_add_rule_with_non_unique_sid(self, api_client, add_user_with_permissions):
|
|
user = add_user_with_permissions(username='test_user1', password='pwd123', is_superuser=True)
|
|
api_client.force_authenticate(user)
|
|
url = reverse('rules-list')
|
|
data = {
|
|
'name': 'New Test',
|
|
'sid': 3,
|
|
'rev': 4,
|
|
'kind': 'System',
|
|
'group': None,
|
|
'status': True,
|
|
'multi': False,
|
|
'archived': False,
|
|
'type': 0,
|
|
'depth': '00:20:00',
|
|
'rule_json': {
|
|
'type': 'query_string',
|
|
'field': 'query field',
|
|
'operands': 'event_severity:>=666'
|
|
},
|
|
'actions_json': {
|
|
'type': 'incident',
|
|
'title': '{{.SignName}}',
|
|
'comment': 'New comment',
|
|
'category': 'New category',
|
|
'importance': '50',
|
|
'assigned_to': '',
|
|
'description': '{{.EventSrcMsg}}'
|
|
}
|
|
}
|
|
|
|
response = api_client.post(url, data=data, format='json')
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
data['rev'] = 111111
|
|
response = api_client.post(url, data=data, format='json')
|
|
|
|
assert response.status_code == 400
|
|
assert 'sid' in response.json()
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestCorrelationGroups:
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self, django_user_model):
|
|
self.admin = django_user_model.objects.first()
|
|
test_rule = dict(
|
|
name='Rule 1',
|
|
type=1,
|
|
status=True,
|
|
rev=1,
|
|
sid=2,
|
|
depth=datetime.timedelta(minutes=10),
|
|
rule_json={
|
|
"type": "query_string",
|
|
"field": "",
|
|
"operands": "event_severity:>=6",
|
|
},
|
|
actions_json={
|
|
"type": "incident",
|
|
"title": "{{.SignName}}",
|
|
"comment": "",
|
|
"category": "",
|
|
"importance": "50",
|
|
"assigned_to": "",
|
|
"description": "{{.EventSrcMsg}}"
|
|
},
|
|
|
|
)
|
|
self.rule1 = Rule.objects.create(**test_rule)
|
|
self.group = Group.objects.create(name="test_group", description="test description")
|
|
|
|
@pytest.mark.unit
|
|
def test_get_group(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
response = api_client.get(reverse('rules-groups-list'))
|
|
assert response.status_code == 200
|
|
|
|
data_json = response.json()
|
|
assert len(data_json['results']) == 1 # there is one group
|
|
assert data_json['count'] == 1
|
|
print(data_json)
|
|
assert data_json['results'][0] == {
|
|
'id': self.group.pk,
|
|
'name': 'test_group',
|
|
'description': 'test description',
|
|
'rules': []
|
|
}
|
|
|
|
@pytest.mark.unit
|
|
def test_add_group(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
groups_before = Group.objects.count()
|
|
|
|
data = {
|
|
"name": "New group",
|
|
"description": "Group description"
|
|
}
|
|
response = api_client.post(reverse('rules-groups-list'), data=data)
|
|
assert response.status_code == 201
|
|
assert response.json()['name'] == data['name']
|
|
assert response.json()['description'] == data['description']
|
|
|
|
groups_after = Group.objects.count()
|
|
assert groups_before != groups_after
|
|
|
|
group = Group.objects.get(name="New group")
|
|
assert group.description == "Group description"
|
|
|
|
@pytest.mark.unit
|
|
def test_delete_group(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
|
|
group = Group.objects.create(name="group", description="test description")
|
|
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
|
|
response = api_client.delete(url)
|
|
assert response.status_code == 204
|
|
with pytest.raises(ObjectDoesNotExist):
|
|
Group.objects.get(pk=group.pk)
|
|
|
|
@pytest.mark.unit
|
|
def test_update_group(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
group = Group.objects.create(name="group", description="test description")
|
|
data = {
|
|
"name": "New group",
|
|
"description": "Group description"
|
|
}
|
|
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
|
|
response = api_client.patch(url, data=data)
|
|
assert response.status_code == 200
|
|
group = Group.objects.get(name="New group")
|
|
assert group.description == "Group description"
|
|
|
|
@pytest.mark.unit
|
|
def test_partial_update_group_description(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
group = Group.objects.create(name="group", description="test description")
|
|
data = {
|
|
"description": "Group description"
|
|
}
|
|
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
|
|
response = api_client.patch(url, data=data)
|
|
assert response.status_code == 200
|
|
group = Group.objects.get(name="group")
|
|
assert group.description == "Group description"
|
|
|
|
@pytest.mark.unit
|
|
def test_partial_update_group_name(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
group = Group.objects.create(name="test", description="test description")
|
|
data = {
|
|
"name": "New group",
|
|
}
|
|
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
|
|
response = api_client.patch(url, data=data)
|
|
assert response.status_code == 200
|
|
group = Group.objects.get(name="New group")
|
|
assert group.description == "test description"
|
|
|
|
@pytest.mark.unit
|
|
def test_add_rule_to_group(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
group = Group.objects.create(name="test", description="test description")
|
|
|
|
url = reverse('rules-groups-detail', kwargs={"pk": group.pk})
|
|
response = api_client.get(url)
|
|
assert response.json()['rules'] == []
|
|
|
|
data = {"rules": [self.rule1.pk]}
|
|
response = api_client.patch(url, data=data)
|
|
assert response.status_code == 200
|
|
assert len(response.json()['rules']) == 1
|
|
|
|
@pytest.mark.unit
|
|
def test_edit_group_in_rule(self, api_client):
|
|
api_client.force_authenticate(self.admin)
|
|
group = Group.objects.create(name="test", description="test description")
|
|
|
|
url = reverse('rules-detail', kwargs={"pk": self.rule1.pk})
|
|
response = api_client.get(url)
|
|
assert response.json()['group'] == None
|
|
|
|
data = {"group": [group.pk]}
|
|
response = api_client.patch(url, data=data)
|
|
assert response.status_code == 200
|
|
assert response.json()['group'] == {'id': group.pk, 'name': 'test'}
|