import logging import os import shutil from shutil import copyfile from subprocess import Popen, PIPE from django.conf import settings from core.models import TLSSettings from core import constants _log = logging.getLogger(__name__) def remove_ssl_certificate() -> None: """Remove ssl certificate from model and folder""" instance = TLSSettings.get_solo() instance.certificate = None instance.key = None instance.enabled = False instance.save() instance.clear_cache() cert_path = os.path.join(settings.MEDIA_ROOT, constants.DEFAULT_CERT_FILENAME) key_path = os.path.join(settings.MEDIA_ROOT, constants.DEFAULT_KEY_FILENAME) if os.path.exists(cert_path) or os.path.exists(key_path): os.remove(cert_path) os.remove(key_path) def update_nginx(https_enabled: bool): """Update NGINX config @param https_enabled: if True - enable HTTPS """ instance = TLSSettings.get_solo() if instance.certificate and instance.key: _log.info("Update cert and key from MEDIA_ROOT") cert_path = os.path.join(settings.MEDIA_ROOT, instance.certificate.name) key_path = os.path.join(settings.MEDIA_ROOT, instance.key.name) copyfile(cert_path, settings.TLS_CERT_FILENAME) copyfile(key_path, settings.TLS_CERT_KEY_FILENAME) if https_enabled and (not instance.certificate or not instance.key): _log.error("No cert or key provided to update_nginx with enabled HTTPS") return # Select new one if https_enabled: src_file = os.path.join(settings.NGINX_SITES_AVAILABLE, settings.NGINX_HTTPS_CONFIG_FILENAME) dst_file = os.path.join(settings.NGINX_SITES_AVAILABLE, settings.NGINX_ENABLED_CONFIG_FILENAME) else: src_file = os.path.join(settings.NGINX_SITES_AVAILABLE, settings.NGINX_HTTP_CONFIG_FILENAME) dst_file = os.path.join(settings.NGINX_SITES_AVAILABLE, settings.NGINX_ENABLED_CONFIG_FILENAME) shutil.copyfile(src_file, dst_file) def restart_nginx(): """Restart NGINX""" # TODO: check config: nginx -T proc = Popen(['sudo', 'systemctl', 'reload', 'nginx.service'], stdin=PIPE, stdout=PIPE, stderr=PIPE) output, error = proc.communicate() if proc.returncode != 0: _log.error(f"Can't reload nginx: {error.decode('utf-8')}") raise RuntimeError(f"Can't reload nginx: {error.decode('utf-8')}") def generate_cert(cert_filename, key_filename): """ Generate new TLS certificate @raises RuntimeError: if some error occurs @param cert_filename: Where to write certificate @param key_filename: Where to write key """ _log.info(f"Generating new cert {cert_filename} and key {key_filename}") subj = f"/C={settings.TLS_CERT_COUNTRY}/ST={settings.TLS_CERT_STATE}/L={settings.TLS_CERT_LOCALITY}/O={settings.TLS_CERT_ORIG_NAME}/CN={settings.TLS_CERT_COMMON_NAME}" args = ['openssl', 'req', '-x509', '-nodes', '-days', str(settings.TLS_CERT_DAYS), '-newkey', f'rsa:{settings.TLS_CERT_KEY_SIZE}', '-keyout', key_filename, '-out', cert_filename, '-subj', subj] _log.info(" ".join(args)) proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) output, error = proc.communicate() output = output.decode("utf-8") error = error.decode("utf-8") if proc.returncode != 0: _log.error(f"Can't generate TLS cert: {output}{error}") raise RuntimeError(f"Can't create certificate: {output}{error}") if 'error' in output: _log.error(f"Got some error {output}") raise RuntimeError("Can't create certificate:\n" + output) if 'error' in error: _log.error(f"Got some error {error}") raise RuntimeError("Can't create certificate:\n" + error) def handle_uploaded_file(content, filename): if not os.path.exists(settings.MEDIA_ROOT): os.makedirs(settings.MEDIA_ROOT) with open(filename, 'wb+') as destination: for chunk in content.chunks(): destination.write(chunk) def update_model(tls_settings): tls_settings.certificate.name = constants.DEFAULT_CERT_FILENAME tls_settings.key.name = constants.DEFAULT_KEY_FILENAME os.chmod(os.path.join(settings.MEDIA_ROOT, constants.DEFAULT_CERT_FILENAME), 0o644) os.chmod(os.path.join(settings.MEDIA_ROOT, constants.DEFAULT_KEY_FILENAME), 0o644) def create_cert(tls_settings): generate_cert(os.path.join(settings.MEDIA_ROOT, constants.DEFAULT_CERT_FILENAME), os.path.join(settings.MEDIA_ROOT, constants.DEFAULT_KEY_FILENAME)) update_model(tls_settings)