old_console/devices/services/sensor/rabbitmq.py
2024-11-02 14:12:45 +03:00

241 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from functools import partial
from typing import Optional, Union, Tuple
import pika
from django.conf import settings
from pika.adapters.blocking_connection import BlockingChannel
from pika.exchange_type import ExchangeType
from rest_framework import status
from rest_framework.exceptions import APIException
from devices.models.sensor import ArmaSensor
from devices.services.sensor.enums import SystemMessage, ZeekMessage, VectorMessage, SuricataMessage
_log = logging.getLogger(__name__)
class SensorResponseException(APIException):
status_code = status.HTTP_400_BAD_REQUEST
class SensorManagement:
"""Service for sending message and receiving response using RabbitMQ"""
# Префикс, с которого начинаются все очереди для сенсоров. Каждый сенсор слушает свою очередь
SENSOR_QUEUE_PREFIX = 'sensor_'
# "Обменник", к которому можно подключить очередь сенсора.
# Консоль может отправить одно сообщение в этот "обменник" и оно будет доставлено во все подключённые очереди
SENSORS_EXCHANGE = 'sensors'
# если отправляется сообщение, на которое ожидается ответ, то в этом атрибуте хранится список таких ответов
_rpc_responses = tuple()
def __init__(self, proceed=True):
self.connection, self.channel = self._get_connection_data()
# Флаг означает, что ожидание и обработка ответов от сенсора ещё продолжается
self._proceed = proceed
def send_message(self,
sensor: ArmaSensor = None,
message_type: Union[
SystemMessage, VectorMessage, ZeekMessage, SuricataMessage] = SystemMessage.ping,
body: Union[dict, bytes, str] = None,
wait_response: bool = False,
decode_response: bool = True,
time_limit: int = 5) -> Optional[dict]:
"""Method for sending messages.
If we requested a response from the sensor, then it returns dict(). Otherwise, it will return nothing.
Throws an exception if the response from the sensor cannot be decoded.
:param sensor: ArmaSensor object. Specify if you want to send the message only to its queue.
If it is not transmitted, the message will be sent to all sensors.
:param message_type: A string that specifies what type of message will be sent.
Depending on this type, different logic will be called on the sensor side.
If the sensor does not know how to process any type, then it will return an error
:param body: The payload that can be sent along with the message.
Used on the sensor side. We only encode and send
:param wait_response: A flag that determines whether we are waiting for a response to the message or not.
If False, then we send the message and immediately close the connection.
Otherwise, we block the process until we wait for a response.
:param decode_response: Whether to decode the message that is returned from the sensor
:param time_limit:
If we send a message and a response is required, how long to wait for it. Specified in seconds.
If we sent a message to all sensors, but after a while not all of them answered,
then we will return only the received response
:return:
"""
# Шаг 1. Определяем куда будем отправлять сообщение. Если передан сенсор, то только ему. Иначе отправим всем
if sensor:
queue = f'{self.SENSOR_QUEUE_PREFIX}{sensor.uuid}' # очередь сенсора, которую будет слушать только он
exchange = ''
required_responses = 1 # если отправляем сообщение только одному сенсору, то мы ожидаем только один ответ
else:
exchange, queue = self._get_sensors_exchange(), ''
required_responses = ArmaSensor.objects.count() # ожидаем количество ответов, по количеству сенсоров
# Шаг 2. Готовим сообщение к отправке
# 2.1 Приводим необязательную полезную нагрузку к байтам
bytes_body = self._convert_to_bytes(body)
# 2.2 Готовим мета-информацию (свойства) для сообщения
properties = {
# Тип сообщения. по нему сенсор определит что именно ему нужно сделать
'type': message_type.value,
# Как долго отправленное сообщение будет храниться в очереди.
# По истечение времени если его никто не получит, то оно будет удалено
'expiration': '5000'
}
# 2.3 Если мы отправляем сообщение и хотим получить на него ответ, то должны указать:
# - в какую очередь сенсор должен направить ответ
# - какая функция будет вызываться при получении ответа в эту очередь
if wait_response:
on_message_callback = partial(self._response_callback, decode_response=decode_response)
properties['reply_to'] = self._prepare_callback_queue(on_message_callback)
# конвертируем подготовленные свойства сообщения в особый формат
properties = pika.BasicProperties(**properties)
# Шаг 3. Отправляем сообщение. Указываем получателей, тело сообщения и его свойства
self.channel.basic_publish(exchange=exchange, routing_key=queue, body=bytes_body, properties=properties)
# Шаг 4. Получаем ответы от сенсора. Если мы их не запрашивали, то из функции вернётся None
response = self._get_responses(sensor, wait_response, required_responses, time_limit)
# Шаг 5. Закрываем соединение
self.connection.close()
return response
def _get_responses(self, sensor, wait_response, required_responses, time_limit) -> Union[list, dict]:
"""A function that counts messages and converts them to json format"""
def _stop_proceed_after_timeout():
self._proceed = False
# Определяем функцию и через какое время она будет вызвана.
# Функция выключит флаг обработки и условие 'while' ниже не будет выполняться.
# Этот метод не блокирует выполнение
self.connection.call_later(time_limit, _stop_proceed_after_timeout)
# псевдокод:
# пока мы ожидаем ответа и получены не все требуемые ответы и время ещё не вышло: ...
while wait_response and (len(self._rpc_responses) < required_responses) and self._proceed:
self.connection.process_data_events()
# когда будет произведён выход из цикла (после получения всех ответов или по истечение времени) мы начнём
# обработку ответов
if wait_response and sensor:
# если мы ожидаем ответ только от одного сенсора и этот ответ получен,
# то мы проверяем его статус и возвращаем json ответ
if self._rpc_responses:
response = self._check_response_status(self._rpc_responses[0])
return response
# если ответ не получен, то возбуждаем исключение
else:
raise SensorResponseException({
'status': 'error',
'detail': 'sensor did not send a response'
})
elif wait_response:
# Если ожидаем ответ от всех сенсоров и закончилось время обработки, то возвращаем всё, что успели получить.
return self._rpc_responses
def _check_response_status(self, response):
"""Check response status"""
# Проверяем статус ответа. При необходимости возбуждаем исключение, чтобы на фронт был отдан ответ с 400 кодом
# Данное поле считать соглашением между консолью и сенсором
# Если сенсор не пришлёт это поле или пришлёт что-то другое, то считается, что это корректный ответ без ошибки
if isinstance(response, dict) and response.get('status', None) == 'error':
raise SensorResponseException(response)
return response
def _get_sensors_exchange(self) -> str:
"""Determine the exchanger and its type."""
# 'fanout' означает, что сообщение будет отправлено во все очереди, связанные с обменником
self.channel.exchange_declare(self.SENSORS_EXCHANGE, exchange_type=ExchangeType.fanout.value, durable=True)
return self.SENSORS_EXCHANGE
def _prepare_callback_queue(self, callback_func) -> str:
"""We create a queue in which we expect a response"""
# Создаём одноразовую очередь для ожидания в ней ответа. Она удалится после закрытия соединения
result = self.channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
# Указываем функцию, которая будет обрабатывать сообщения во временной очереди
self.channel.basic_consume(queue=callback_queue, on_message_callback=callback_func, auto_ack=True)
return callback_queue
@staticmethod
def _convert_to_bytes(body) -> bytes:
"""Convert body to bytes"""
if body is None:
body = b''
if isinstance(body, bytes):
return body
try:
str_json = json.dumps(body)
bytes_body = str_json.encode()
return bytes_body
except TypeError:
raise SensorResponseException({
'status': 'error',
'detail': 'unable to encode data to send'
})
def _response_callback(self, channel, method, properties, body, decode_response: bool):
"""A function that processes messages in the response queue
Receives response, converts to json and writes to attribute self._rpc_responses
"""
response = self._prepare_response(body, decode_response)
self._rpc_responses += (response, )
def _prepare_response(self, rpc_response: Optional[bytes], decode_rcp_response: bool) -> Union[dict, bytes]:
if not rpc_response:
response = {
'status': 'error',
'detail': 'sensor doesnt return response'
}
return response if decode_rcp_response else json.dumps(response)
if decode_rcp_response:
response = self._decode_rpc_response(rpc_response)
else:
response = rpc_response
return response
@staticmethod
def _decode_rpc_response(rcp_response: bytes) -> dict:
try:
response = json.loads(rcp_response)
except json.JSONDecodeError:
response = {
'status': 'error',
'detail': 'failed to decode sensor response to json'
}
return response
@staticmethod
def _get_connection_data() -> Tuple[pika.BlockingConnection, BlockingChannel]:
"""Create connection to RabbitMQ"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.RABBIT_HOST,
port=settings.RABBIT_PORT))
channel = connection.channel()
return connection, channel