78 lines
2.4 KiB
Python
78 lines
2.4 KiB
Python
import logging
|
|
import os
|
|
|
|
from django.template.loader import render_to_string
|
|
|
|
from devices.constants import VECTOR_CONFIG_DIR
|
|
from devices.enums import DeviceType
|
|
from devices.models.device import Device
|
|
from events.constants import ELK_URL, ELK_LOGIN, ELK_PASS
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class VectorServiceError(Exception):
|
|
pass
|
|
|
|
|
|
class VectorService:
|
|
|
|
def __init__(self, device: Device):
|
|
self.device = device
|
|
self.config_type = self._get_config_type()
|
|
self.config_name = self._get_config_name()
|
|
|
|
def update_config(self):
|
|
self.check_license()
|
|
config_content = self._create_config_content()
|
|
self._write_to_file(config_content)
|
|
|
|
def delete_config(self):
|
|
self._delete_config_file()
|
|
|
|
def check_license(self):
|
|
pass # todo call license service
|
|
|
|
def _get_config_name(self) -> str:
|
|
return f'{self.config_type}_{self.device.pk}.toml'
|
|
|
|
def _create_config_content(self) -> str:
|
|
context = {
|
|
"pk": self.device.pk,
|
|
"port": self.device.port,
|
|
"adjust_datetime": self.device.adjust_datetime,
|
|
"elastic_url": ELK_URL,
|
|
"elastic_login": ELK_LOGIN,
|
|
"elastic_pass": ELK_PASS,
|
|
}
|
|
|
|
config = render_to_string(f"vector/config/{self.config_type}.toml", context)
|
|
return config
|
|
|
|
def _write_to_file(self, config_content: str):
|
|
"""" Check directory exists """
|
|
if not os.path.exists(VECTOR_CONFIG_DIR):
|
|
os.makedirs(VECTOR_CONFIG_DIR)
|
|
""" Write new config content to correct file """
|
|
with open(os.path.join(VECTOR_CONFIG_DIR, self.config_name), 'w') as f:
|
|
f.write(config_content)
|
|
_log.info(f'Created file [{self.config_name}]')
|
|
|
|
def _delete_config_file(self):
|
|
try:
|
|
os.remove(os.path.join(VECTOR_CONFIG_DIR, self.config_name))
|
|
_log.info(f'Removed file [{self.config_name}]')
|
|
except IOError:
|
|
_log.warning(
|
|
f'Try to remove device with pk {self.device.pk} but file {self.config_name} does not exist')
|
|
|
|
def _get_config_type(self) -> str:
|
|
type_mapping = {
|
|
DeviceType.FIREWALL: "armaif",
|
|
DeviceType.ENDPOINT: "endpoint",
|
|
DeviceType.SENSOR: "sensor",
|
|
}
|
|
try:
|
|
return type_mapping[self.device.type]
|
|
except KeyError:
|
|
raise VectorServiceError(f"Can't made config for type {self.device.type}")
|