old_console/networkmap/tests/test_api.py
2024-11-02 14:12:45 +03:00

346 lines
20 KiB
Python

import json
import logging
import os
from unittest import mock
import pytest
from django.db.models import Q
from django.urls import reverse
from assets.models.assets import Asset
from console.models import Connection, ConnectionType
from console.tests.test_utils import PermApiBaseTest, wait_db_element
from core.utils import fmt_input, dtnow
from networkmap.api import (auto_netmap_protocol_filter, auto_netmap_time_filter, auto_netmap_assets_filter,
apply_auto_netmap_filters)
from networkmap.models import NetworkMap, NetworkMapBackgroundImage
from networkmap.serializers import AutoNetmapElementsSerializer, AutoNetmapConnectionsSerializer
from networkmap.tests.netmap_test_utils import get_mocked_elk_data, create_new_background_image
from perms.models import Perm
_log = logging.getLogger()
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
ELK_TEST_DATA = os.path.join(BASE_DIR, "tests", "elk_test_data.json")
UPDATE_CONNECTION_API_ELK_DATA = os.path.join(BASE_DIR, "tests", "test_data/connection_update_api_data.json")
CONNECTION_PROTOCOL_TEST_DATA = os.path.join(BASE_DIR, "tests", "test_data/connection_protocol_data.json")
IMAGE_BOUNDS_DATA = {"bounds": {"x": 42, "y": 98, "width": 1110, "height": 484}, "locked": True,
"name": "test_pic_name", "visible": True}
netmap_test_permission = Perm.can_view_network
TEST_LINKS = [
[[netmap_test_permission], 'netmap-maps-list'],
[[netmap_test_permission], 'netmap-elements-list'],
[[netmap_test_permission], 'netmap-groups-list'],
[[netmap_test_permission], 'netmap-in-danger-list'],
[[netmap_test_permission], 'auto-netmap-elements-list'],
[[netmap_test_permission], 'auto-netmap-connections-list'],
# [[netmap_test_permission], 'api_handle_filter_form', dict(form_type='assets')],
# [[netmap_test_permission], 'api_handle_filter_form', dict(form_type='time')],
# [[netmap_test_permission], 'api_handle_filter_form', dict(form_type='protocol')],
# [[netmap_test_permission], 'api_handle_filter_form', dict(form_type='reset')],
[[netmap_test_permission], 'background_images-list'],
]
TEST_ASSETS_IP = [
[],
['192.168.1.20'],
['192.168.1.21'],
['192.168.1.20', '192.168.1.21'],
]
TEST_IMAGE = os.path.join(BASE_DIR, "tests", "test_data/controller.png")
# Don't touch skip tests here, until we start new device map
@pytest.mark.django_db
class TestApiTwo:
@pytest.fixture(autouse=True)
def setup_tests(self, client, add_user_with_permissions):
""" Fixture for creating basic setup for following import/export tests """
username = 'user_admin'
password = 'bar'
client.logout()
admin = add_user_with_permissions(username=username, password=password, is_superuser=True)
client.force_login(admin)
@pytest.mark.unit
@pytest.mark.skip
def test_check_connections_creation_from_elk_data(self, create_elk_function_test_data):
"""
Test for checking if task for creating connection type and connections between assets works correctly and
created 5 connections between testing assets
:return:
assert 1: true if connection DB state before not equals to connection DB state after which means
that connections are being created
assert 2: true if 5 connection instances being created as intended
assert 3: true if connection type DB state changed
assert 4: true if 1 connection type instance being created as intended
"""
with mock.patch("elasticsearch.Elasticsearch.search") as mocked_search, \
mock.patch("elasticsearch.Elasticsearch.update_by_query") as mocked_update:
connections_before = Connection.objects.all()
get_mocked_elk_data(ELK_TEST_DATA, mocked_search, mocked_update)
connections_after = Connection.objects.all()
assert connections_after != connections_before
assert len(connections_after) == 5
@pytest.mark.unit
@pytest.mark.skip
def test_check_connections_update_from_elk_data(self, create_elk_function_test_data):
"""
Test for checking if test connection is being updated after execution of update/create task
:return:
assert: true if test connection update field is changed after execution of task
"""
# Create connection, which will be updated by the task
test_connection_type = ConnectionType.objects.create(name='TCP')
test_connection = Connection.objects.create(src_asset=Asset.objects.get(ip='192.168.1.21'),
dst_asset=Asset.objects.get(ip='10.10.1.21'),
connection_protocol=test_connection_type)
test_connection_update_time_before = test_connection.updated
with mock.patch("elasticsearch.Elasticsearch.search") as mocked_search, mock.patch(
"elasticsearch.Elasticsearch.update_by_query") as mocked_update:
get_mocked_elk_data(ELK_TEST_DATA, mocked_search, mocked_update)
test_connection_after_test = Connection.objects.get(src_asset=Asset.objects.get(ip='192.168.1.21'),
dst_asset=Asset.objects.get(ip='10.10.1.21'))
test_connection_update_time_after = test_connection_after_test.updated
assert test_connection_update_time_before != test_connection_update_time_after
@pytest.mark.unit
def test_auto_netmap_assist_filter_without_neighbours(self, create_filter_test_data):
"""
Test for checking the correct execution of asset filter function, without neighbours
:param create_filter_test_data: fixture for creating filters testing database
:return:
assert: true if testing assets ID's matches filter result ID's.
Result of filter functions work is looking like this:
[filtered_assets_queryset, filtered_connections_queryset].
"""
test_queryset = Asset.objects.filter(ip__in=['192.168.1.21', '192.168.1.22'])
test_connections = Connection.objects.filter(Q(src_asset__in=test_queryset) & Q(dst_asset__in=test_queryset))
assets_ips = list(test_queryset.values_list('id', flat=True))
result_list = auto_netmap_assets_filter(assets_ips, False)
assert list(result_list[0].values_list('id', flat=True)) == assets_ips
assert list(result_list[1].values_list('id', flat=True)) == list(test_connections.values_list('id', flat=True))
@pytest.mark.unit
def test_auto_netmap_assist_filter_with_neighbours(self, create_filter_test_data):
"""
Test for checking the correct execution of asset filter function, with neighbours
:param create_filter_test_data: fixture for creating filters testing database
:return:
assert: true if testing assets ID's matches filter result ID's
Result of filter functions work is looking like this:
[filtered_assets_queryset, filtered_connections_queryset].
"""
test_selected_by_user_queryset = Asset.objects.filter(ip__in=['192.168.1.21', '192.168.1.22'])
assets_ips = list(test_selected_by_user_queryset.values_list('id', flat=True))
result_list = auto_netmap_assets_filter(assets_ips, True)
# List of assets PK, which should be in results of work of filter function
expected_filter_result = Asset.objects.filter(
ip__in=['192.168.1.21', '192.168.1.22', '192.168.1.23', '192.168.1.24']).values_list('id', flat=True)
assert set(result_list[0].values_list('id', flat=True)) == set(expected_filter_result)
@pytest.mark.unit
def test_auto_netmap_protocol_filter(self, create_filter_test_data):
"""
Test for checking the correct execution of protocol filter function
:param create_filter_test_data: fixture for creating filters testing database
:return:
assert: true if testing connections ID's matches filter result ID's
Result of filter functions work is looking like this:
[filtered_assets_queryset, filtered_connections_queryset].
"""
test_selected_protocol = Connection.ProtocolType.UDP
result_list = auto_netmap_protocol_filter([test_selected_protocol])
# PK of connection, which should be in results of work of filter function
expected_filter_result = Connection.objects.filter(connection_protocol=test_selected_protocol).values_list('id',
flat=True)
assert set(result_list[1].values_list('id', flat=True)) == set(expected_filter_result)
@pytest.mark.unit
def test_auto_netmap_time_range_filter(self, create_filter_test_data):
"""
Test for checking the correct execution of time range filter function
:param create_filter_test_data: fixture for creating filters testing database
:return:
assert: true if testing connections ID's matches filter result ID's
Result of filter functions work is looking like this:
[filtered_assets_queryset, filtered_connections_queryset].
"""
time_test_input = f'{fmt_input(dtnow(days=9))} - {fmt_input(dtnow(days=11))}'
result_list = auto_netmap_time_filter(time_test_input)
# PK of connection, which should be in results of work of filter function
expected_filter_result = Connection.objects.filter(
connection_protocol=ConnectionType.objects.get(name='UDP')).values_list('id', flat=True)
assert set(result_list[1].values_list('id', flat=True)) == set(expected_filter_result)
@pytest.mark.unit
def test_auto_netmap_combined_filters(self, create_filter_test_data):
"""
Test for checking the work of applying multiple filters function
:param create_filter_test_data: fixture for creating filters testing database
:return:
assert: true if execution result data is equal to expected
"""
input_filter_assets = list(
Asset.objects.filter(ip__in=['192.168.1.22', '192.168.1.24']).values_list('id', flat=True))
input_protocol_filter = Connection.ProtocolType.UDP
input_time_filter = f'{fmt_input(dtnow(days=9))} - {fmt_input(dtnow(days=11))}'
filter_result_data = {
'assets_filter': {
'assets': input_filter_assets,
'include_neighbours': True
},
'protocol_filter': [input_protocol_filter],
'time_filter': input_time_filter
}
result_list = apply_auto_netmap_filters(filter_result_data)
kek = Asset.objects.filter(pk__in=input_filter_assets).reverse()
result_asset_data = AutoNetmapElementsSerializer(kek, many=True)
result_connection_data = AutoNetmapConnectionsSerializer(
Connection.objects.filter(connection_protocol=input_protocol_filter), many=True)
assert_data = {
'edges': result_connection_data.data,
'nodes': result_asset_data.data,
}
assert result_list == assert_data
@pytest.mark.unit
@pytest.mark.skip
@pytest.mark.parametrize('pre_loaded_assets', TEST_ASSETS_IP)
def test_assets_created_from_update_connection_api(self, pre_loaded_assets):
"""
This test checks if update connection api creates proper assets if ELK data has IP's of assets, that are not in
DB. Also checks if API does not create duplicate assets
"""
# Creating assets
if len(pre_loaded_assets) > 0:
for asset_ip in pre_loaded_assets:
Asset.objects.create(ip=asset_ip, sensor='armaif_1')
with mock.patch("elasticsearch.Elasticsearch.search") as mocked_search, \
mock.patch("elasticsearch.Elasticsearch.update_by_query") as mocked_update:
connections_before = Connection.objects.all()
assets_before = Asset.objects.all()
get_mocked_elk_data(UPDATE_CONNECTION_API_ELK_DATA, mocked_search, mocked_update)
connections_after = Connection.objects.all()
assets_after = Asset.objects.all()
assert assets_before != assets_after
assert assets_after.count() == 2
assert connections_after != connections_before
assert connections_after.count() == 1
@pytest.mark.unit
@pytest.mark.skip
def test_upd_tcp_protocol_connection_creation(self):
""" Test for checking if events parsed correctly and creates both TCP and UDP connections """
with mock.patch("elasticsearch.Elasticsearch.search") as mocked_search, \
mock.patch("elasticsearch.Elasticsearch.update_by_query") as mocked_update:
connections_before = Connection.objects.all()
assets_before = Asset.objects.all()
get_mocked_elk_data(CONNECTION_PROTOCOL_TEST_DATA, mocked_search, mocked_update)
connections_after = Connection.objects.all()
assets_after = Asset.objects.all()
assert assets_before != assets_after
assert assets_after.count() == 4
assert connections_after != connections_before
assert connections_after.count() == 2
assert Connection.objects.get(connection_protocol=Connection.ProtocolType.UDP)
assert Connection.objects.get(connection_protocol=Connection.ProtocolType.TCP)
@pytest.mark.skip('Must be rewrite for API')
def test_update_background_image_for_networkmap(self, client, test_server):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
netmap_before = wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
support_images_data = json.dumps(IMAGE_BOUNDS_DATA)
url = reverse('background_images-update-image-data')
user_input = {'current_map_id': test_map.pk, 'image_data': support_images_data}
client.post(url, user_input, follow=True)
netmap_after = wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
assert netmap_after.bounds != netmap_before.bounds
assert netmap_after.locked != netmap_before.locked
assert netmap_after.visible != netmap_before.visible
@pytest.mark.skip('Must be rewrite for API')
def test_add_new_background_image_for_networkmap(self, client, test_server):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
assert wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
@pytest.mark.skip('Must be rewrite for API')
def test_add_new_background_image_with_same_name_for_networkmap(self, client, test_server, caplog):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
create_new_background_image(test_map, client, 'test_pic_name')
assert f'Background image with this name already exist for current network' in caplog.text
@pytest.mark.skip('Must be rewrite for API')
def test_delete_network_background_image(self, client, test_server):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name_1')
create_new_background_image(test_map, client, 'test_pic_name_2')
background_images_before = NetworkMapBackgroundImage.objects.all().count()
test_image = wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name_1'))
url = reverse('background_images-delete-background-image')
user_input = {'current_map_id': test_map.pk, 'image_data': test_image.pk}
client.post(url, user_input, follow=True)
background_images_after = NetworkMapBackgroundImage.objects.all().count()
assert background_images_after != background_images_before
@pytest.mark.skip('Must be rewrite for API')
def test_corrupted_image_data_when_updating_background_image_in_db(self, client, test_server):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
support_images_data = 'kdasod11'
url = reverse('background_images-update-image-data')
user_input = {'current_map_id': test_map.pk, 'image_data': support_images_data}
response = client.post(url, user_input, follow=True)
assert '"error_message": "Image data is not valid"' in response.content.decode("utf-8")
@pytest.mark.skip('Must be rewrite for API')
def test_correct_post_arguments_check_for_background_image_update_api_map(self, client, caplog):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
support_images_data = json.dumps(IMAGE_BOUNDS_DATA)
url = reverse('background_images-update-image-data')
user_input = {'image_data': support_images_data}
response = client.post(url, user_input, follow=True)
assert '"error_message": "Necessary data for request not provided or corrupted"' in response.content.decode(
"utf-8")
@pytest.mark.skip('Must be rewrite for API')
def test_correct_post_arguments_check_for_background_image_update_api_image(self, client, caplog):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
url = reverse('background_images-update-image-data')
user_input = {'current_map_id': test_map.pk}
response = client.post(url, user_input, follow=True)
assert '"error_message": "Some mandatory arguments are missing"' in response.content.decode("utf-8")
@pytest.mark.skip('Must be rewrite for API')
def test_correct_post_arguments_check_for_background_image_delete_api_map(self, client, caplog):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
test_image = wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
support_images_data = test_image.pk
url = reverse('background_images-delete-background-image')
user_input = {'image_data': support_images_data}
response = client.post(url, user_input, follow=True)
assert '"error_message": "Necessary data for request not provided or corrupted"' in response.content.decode(
"utf-8")
@pytest.mark.skip('Must be rewrite for API')
def test_correct_post_arguments_check_for_background_image_delete_api_image(self, client, caplog):
test_map = NetworkMap.objects.create(name='test_map')
create_new_background_image(test_map, client, 'test_pic_name')
wait_db_element(lambda: NetworkMapBackgroundImage.objects.get(name='test_pic_name'))
url = reverse('background_images-delete-background-image')
user_input = {'current_map_id': test_map.pk}
response = client.post(url, user_input, follow=True)
assert '"error_message": "Necessary data for request not provided or corrupted"' in response.content.decode(
"utf-8")