import os.path import tempfile from subprocess import Popen, PIPE from unittest.mock import patch import pytest from django.conf import settings from django.core.files.uploadedfile import SimpleUploadedFile from django.urls import reverse from rest_framework.test import APIClient from core.models import TLSSettings from core.services.tls_settings import generate_cert BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) TEST_CERT = os.path.join(BASE_DIR, "tests", "test_data/test_certificate.crt") TEST_KEY = os.path.join(BASE_DIR, "tests", "test_data/test_certificate.key") TMP_DIR = tempfile.TemporaryDirectory() api_client = APIClient() def mock_func_for_test(): pass class TestCertificateGeneration(object): cert_file_name = "certificate.crt" key_file_name = "certificate.key" @pytest.mark.unit def test_can_create_cert(self): with tempfile.TemporaryDirectory() as tmp_dir_name: generate_cert(os.path.join(tmp_dir_name, self.cert_file_name), os.path.join(tmp_dir_name, self.key_file_name)) assert os.path.exists(os.path.join(tmp_dir_name, self.cert_file_name)) assert os.path.exists(os.path.join(tmp_dir_name, self.key_file_name)) @pytest.mark.unit def test_got_error_when_bad_path(self): with pytest.raises(RuntimeError): generate_cert(os.path.join('arr', self.cert_file_name), os.path.join('arr', self.key_file_name)) @pytest.mark.unit def test_check_certificate(self): with tempfile.TemporaryDirectory() as tmp_dir_name: generate_cert(os.path.join(tmp_dir_name, self.cert_file_name), os.path.join(tmp_dir_name, self.key_file_name)) proc = Popen( ['openssl', 'x509', '-in', os.path.join(tmp_dir_name, self.cert_file_name), '-text', '-noout'], stdin=PIPE, stdout=PIPE, stderr=PIPE) output, error = proc.communicate() assert proc.returncode == 0 output = output.decode('utf-8') assert f"Issuer: C = {settings.TLS_CERT_COUNTRY}, ST = {settings.TLS_CERT_STATE}, L = {settings.TLS_CERT_LOCALITY}, O = {settings.TLS_CERT_ORIG_NAME}, CN = {settings.TLS_CERT_COMMON_NAME}" in output assert f"Subject: C = {settings.TLS_CERT_COUNTRY}, ST = {settings.TLS_CERT_STATE}, L = {settings.TLS_CERT_LOCALITY}, O = {settings.TLS_CERT_ORIG_NAME}, CN = {settings.TLS_CERT_COMMON_NAME}" in output class TestCertificateUpload: api_url = reverse('tls-settings') @pytest.fixture(autouse=True) def setup_test(self, add_user_with_permissions): nginx_http_config_path = os.path.join(TMP_DIR.name, settings.NGINX_HTTP_CONFIG_FILENAME) open(nginx_http_config_path, 'a').close() self.user = add_user_with_permissions(username='admintest', password='adminpass', is_superuser=True) self.cert = SimpleUploadedFile(name='testcert.crt', content=open(TEST_CERT, 'rb').read(), content_type='application/pkix-cert') self.key = SimpleUploadedFile(name='testcert.key', content=open(TEST_KEY, 'rb').read(), content_type='application/x-iwork-keynote-sffkey') yield os.remove(nginx_http_config_path) @pytest.mark.unit @patch('django.conf.settings.NGINX_SITES_AVAILABLE', TMP_DIR.name) @patch('core.views.view_settings.restart_nginx', mock_func_for_test) def test_valid_upload_cert_and_key_in_enabled_cert(self, client) -> None: api_client.force_authenticate(self.user) instance_before = TLSSettings.get_solo() assert not instance_before.enabled assert not instance_before.certificate.name assert not instance_before.key.name form_data = {'enabled': False, 'certificate': self.cert, 'key': self.key} response = api_client.patch(self.api_url, form_data) assert response.status_code == 200 instance_after = TLSSettings.get_solo() instance_after.clear_cache() assert not instance_after.enabled assert instance_after.certificate.name == 'certificate.crt' assert instance_after.key.name == 'certificate.key' @pytest.mark.unit def test_not_valid_upload_cert_without_key(self) -> None: api_client.force_authenticate(self.user) instance_before = TLSSettings.get_solo() assert not instance_before.enabled assert not instance_before.certificate.name assert not instance_before.key.name form_data = { 'enabled': False, 'certificate': self.cert } response = api_client.patch(self.api_url, form_data) assert response.status_code == 400 instance_after = TLSSettings.get_solo() instance_after.clear_cache() assert not instance_after.enabled assert not instance_after.certificate.name assert not instance_after.key.name @pytest.mark.unit def test_not_valid_upload_without_cert_and_key(self) -> None: api_client.force_authenticate(self.user) instance_before = TLSSettings.get_solo() assert not instance_before.enabled assert not instance_before.certificate.name assert not instance_before.key.name form_data = { 'enabled': True } response = api_client.patch(self.api_url, form_data) assert response.status_code == 400 instance_after = TLSSettings.get_solo() instance_after.clear_cache() assert not instance_after.enabled assert not instance_after.certificate.name assert not instance_after.key.name @pytest.mark.unit @patch('django.conf.settings.NGINX_SITES_AVAILABLE', TMP_DIR.name) @patch('core.views.view_settings.restart_nginx', mock_func_for_test) def test_remove_ssl_certificate(self) -> None: api_client.force_authenticate(self.user) before_settings = TLSSettings.get_solo() before_settings.certificate.name = 'certificate.cert' before_settings.key.name = 'certificate.key' before_settings.enabled = True before_settings.save() before_settings.clear_cache() assert before_settings.certificate.name == 'certificate.cert' assert before_settings.key.name == 'certificate.key' assert before_settings.enabled response = api_client.delete(self.api_url) assert response.status_code == 200 settings_after_remove = TLSSettings.get_solo() settings_after_remove.clear_cache() assert not settings_after_remove.enabled assert not settings_after_remove.certificate.name assert not settings_after_remove.key.name @pytest.mark.unit @patch('django.conf.settings.NGINX_SITES_AVAILABLE', TMP_DIR.name) @patch('core.views.view_settings.restart_nginx', mock_func_for_test) def test_create_new_cert_via_api(self) -> None: api_client.force_authenticate(self.user) instance_before = TLSSettings.get_solo() instance_before.clear_cache() assert not instance_before.enabled assert not instance_before.certificate.name assert not instance_before.key.name response = api_client.post(self.api_url) assert response.status_code == 200 instance_after = TLSSettings.get_solo() instance_after.clear_cache() assert instance_after.certificate.name == 'certificate.crt' assert instance_after.key.name == 'certificate.key' assert instance_after.enabled is False