189 lines
7.5 KiB
Python
189 lines
7.5 KiB
Python
import os.path
|
|
import tempfile
|
|
from subprocess import Popen, PIPE
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from django.conf import settings
|
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
|
from django.urls import reverse
|
|
from rest_framework.test import APIClient
|
|
|
|
from core.models import TLSSettings
|
|
from core.services.tls_settings import generate_cert
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
TEST_CERT = os.path.join(BASE_DIR, "tests", "test_data/test_certificate.crt")
|
|
TEST_KEY = os.path.join(BASE_DIR, "tests", "test_data/test_certificate.key")
|
|
TMP_DIR = tempfile.TemporaryDirectory()
|
|
|
|
api_client = APIClient()
|
|
|
|
|
|
def mock_func_for_test():
|
|
pass
|
|
|
|
|
|
class TestCertificateGeneration(object):
|
|
cert_file_name = "certificate.crt"
|
|
key_file_name = "certificate.key"
|
|
|
|
@pytest.mark.unit
|
|
def test_can_create_cert(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir_name:
|
|
generate_cert(os.path.join(tmp_dir_name, self.cert_file_name),
|
|
os.path.join(tmp_dir_name, self.key_file_name))
|
|
|
|
assert os.path.exists(os.path.join(tmp_dir_name, self.cert_file_name))
|
|
assert os.path.exists(os.path.join(tmp_dir_name, self.key_file_name))
|
|
|
|
@pytest.mark.unit
|
|
def test_got_error_when_bad_path(self):
|
|
with pytest.raises(RuntimeError):
|
|
generate_cert(os.path.join('arr', self.cert_file_name),
|
|
os.path.join('arr', self.key_file_name))
|
|
|
|
@pytest.mark.unit
|
|
def test_check_certificate(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir_name:
|
|
generate_cert(os.path.join(tmp_dir_name, self.cert_file_name),
|
|
os.path.join(tmp_dir_name, self.key_file_name))
|
|
|
|
proc = Popen(
|
|
['openssl', 'x509', '-in', os.path.join(tmp_dir_name, self.cert_file_name), '-text', '-noout'],
|
|
stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
output, error = proc.communicate()
|
|
|
|
assert proc.returncode == 0
|
|
output = output.decode('utf-8')
|
|
assert f"Issuer: C = {settings.TLS_CERT_COUNTRY}, ST = {settings.TLS_CERT_STATE}, L = {settings.TLS_CERT_LOCALITY}, O = {settings.TLS_CERT_ORIG_NAME}, CN = {settings.TLS_CERT_COMMON_NAME}" in output
|
|
assert f"Subject: C = {settings.TLS_CERT_COUNTRY}, ST = {settings.TLS_CERT_STATE}, L = {settings.TLS_CERT_LOCALITY}, O = {settings.TLS_CERT_ORIG_NAME}, CN = {settings.TLS_CERT_COMMON_NAME}" in output
|
|
|
|
|
|
class TestCertificateUpload:
|
|
api_url = reverse('tls-settings')
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_test(self, add_user_with_permissions):
|
|
nginx_http_config_path = os.path.join(TMP_DIR.name, settings.NGINX_HTTP_CONFIG_FILENAME)
|
|
open(nginx_http_config_path, 'a').close()
|
|
self.user = add_user_with_permissions(username='admintest', password='adminpass', is_superuser=True)
|
|
|
|
self.cert = SimpleUploadedFile(name='testcert.crt', content=open(TEST_CERT, 'rb').read(),
|
|
content_type='application/pkix-cert')
|
|
self.key = SimpleUploadedFile(name='testcert.key', content=open(TEST_KEY, 'rb').read(),
|
|
content_type='application/x-iwork-keynote-sffkey')
|
|
yield
|
|
os.remove(nginx_http_config_path)
|
|
|
|
@pytest.mark.unit
|
|
@patch('django.conf.settings.NGINX_SITES_AVAILABLE', TMP_DIR.name)
|
|
@patch('core.views.view_settings.restart_nginx', mock_func_for_test)
|
|
def test_valid_upload_cert_and_key_in_enabled_cert(self, client) -> None:
|
|
api_client.force_authenticate(self.user)
|
|
|
|
instance_before = TLSSettings.get_solo()
|
|
assert not instance_before.enabled
|
|
assert not instance_before.certificate.name
|
|
assert not instance_before.key.name
|
|
|
|
form_data = {'enabled': False, 'certificate': self.cert, 'key': self.key}
|
|
response = api_client.patch(self.api_url, form_data)
|
|
assert response.status_code == 200
|
|
|
|
instance_after = TLSSettings.get_solo()
|
|
instance_after.clear_cache()
|
|
assert not instance_after.enabled
|
|
assert instance_after.certificate.name == 'certificate.crt'
|
|
assert instance_after.key.name == 'certificate.key'
|
|
|
|
@pytest.mark.unit
|
|
def test_not_valid_upload_cert_without_key(self) -> None:
|
|
api_client.force_authenticate(self.user)
|
|
|
|
instance_before = TLSSettings.get_solo()
|
|
assert not instance_before.enabled
|
|
assert not instance_before.certificate.name
|
|
assert not instance_before.key.name
|
|
|
|
form_data = {
|
|
'enabled': False,
|
|
'certificate': self.cert
|
|
}
|
|
response = api_client.patch(self.api_url, form_data)
|
|
assert response.status_code == 400
|
|
|
|
instance_after = TLSSettings.get_solo()
|
|
instance_after.clear_cache()
|
|
assert not instance_after.enabled
|
|
assert not instance_after.certificate.name
|
|
assert not instance_after.key.name
|
|
|
|
@pytest.mark.unit
|
|
def test_not_valid_upload_without_cert_and_key(self) -> None:
|
|
api_client.force_authenticate(self.user)
|
|
|
|
instance_before = TLSSettings.get_solo()
|
|
assert not instance_before.enabled
|
|
assert not instance_before.certificate.name
|
|
assert not instance_before.key.name
|
|
|
|
form_data = {
|
|
'enabled': True
|
|
}
|
|
response = api_client.patch(self.api_url, form_data)
|
|
assert response.status_code == 400
|
|
|
|
instance_after = TLSSettings.get_solo()
|
|
instance_after.clear_cache()
|
|
assert not instance_after.enabled
|
|
assert not instance_after.certificate.name
|
|
assert not instance_after.key.name
|
|
|
|
@pytest.mark.unit
|
|
@patch('django.conf.settings.NGINX_SITES_AVAILABLE', TMP_DIR.name)
|
|
@patch('core.views.view_settings.restart_nginx', mock_func_for_test)
|
|
def test_remove_ssl_certificate(self) -> None:
|
|
api_client.force_authenticate(self.user)
|
|
before_settings = TLSSettings.get_solo()
|
|
before_settings.certificate.name = 'certificate.cert'
|
|
before_settings.key.name = 'certificate.key'
|
|
before_settings.enabled = True
|
|
before_settings.save()
|
|
before_settings.clear_cache()
|
|
|
|
assert before_settings.certificate.name == 'certificate.cert'
|
|
assert before_settings.key.name == 'certificate.key'
|
|
assert before_settings.enabled
|
|
|
|
response = api_client.delete(self.api_url)
|
|
assert response.status_code == 200
|
|
|
|
settings_after_remove = TLSSettings.get_solo()
|
|
settings_after_remove.clear_cache()
|
|
|
|
assert not settings_after_remove.enabled
|
|
assert not settings_after_remove.certificate.name
|
|
assert not settings_after_remove.key.name
|
|
|
|
@pytest.mark.unit
|
|
@patch('django.conf.settings.NGINX_SITES_AVAILABLE', TMP_DIR.name)
|
|
@patch('core.views.view_settings.restart_nginx', mock_func_for_test)
|
|
def test_create_new_cert_via_api(self) -> None:
|
|
api_client.force_authenticate(self.user)
|
|
|
|
instance_before = TLSSettings.get_solo()
|
|
instance_before.clear_cache()
|
|
|
|
assert not instance_before.enabled
|
|
assert not instance_before.certificate.name
|
|
assert not instance_before.key.name
|
|
|
|
response = api_client.post(self.api_url)
|
|
assert response.status_code == 200
|
|
|
|
instance_after = TLSSettings.get_solo()
|
|
instance_after.clear_cache()
|
|
assert instance_after.certificate.name == 'certificate.crt'
|
|
assert instance_after.key.name == 'certificate.key'
|
|
assert instance_after.enabled is False
|