95 lines
3 KiB
Python
95 lines
3 KiB
Python
import logging
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional, Type
|
|
|
|
from asgiref.sync import async_to_sync
|
|
from channels.layers import get_channel_layer
|
|
from django.contrib.auth.models import User
|
|
|
|
from notifications.models import Notification
|
|
from notifications.serializers import NotificationSerializer
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class SenderABC(ABC):
|
|
|
|
@abstractmethod
|
|
def send_to_group(self, group: Optional[str], data: Optional[dict] = None):
|
|
...
|
|
|
|
@abstractmethod
|
|
def send_to_user(self, user: User, data: Optional[dict] = None):
|
|
...
|
|
|
|
|
|
class WebSockerSender(SenderABC):
|
|
DEFAULT_GROUP = 'notification'
|
|
|
|
def __init__(self):
|
|
channel_layer = get_channel_layer()
|
|
self._send_to_group_method = async_to_sync(channel_layer.group_send)
|
|
self._send_to_user_method = async_to_sync(channel_layer.send)
|
|
|
|
def send_to_group(self, group: Optional[str], data: Optional[dict] = None):
|
|
data = data or dict()
|
|
group = group or self.DEFAULT_GROUP
|
|
self._send_to_group_method(
|
|
group,
|
|
{
|
|
'type': 'notification',
|
|
'data': data
|
|
}
|
|
)
|
|
|
|
def send_to_user(self, user: User, data: Optional[dict] = None):
|
|
data = data or dict()
|
|
channel_name = user.userinfo.channel_name
|
|
if channel_name:
|
|
self._send_to_user_method(
|
|
channel_name,
|
|
{
|
|
'type': 'notification',
|
|
'data': data
|
|
}
|
|
)
|
|
|
|
|
|
class NotificationService:
|
|
|
|
def __init__(self, sender_class: Optional[Type[SenderABC]] = WebSockerSender):
|
|
self.sender = sender_class()
|
|
|
|
def send(self,
|
|
notification: Optional[Notification] = None,
|
|
group: Optional[str] = None,
|
|
user: Optional[User] = None,
|
|
data: Optional[dict] = None):
|
|
if not notification:
|
|
if not self._create_notification(data):
|
|
_log.warning('Notification not created')
|
|
return
|
|
if group:
|
|
self._send_notification_to_group(notification, group)
|
|
elif user:
|
|
self._send_notification_to_user(notification, user)
|
|
|
|
def _send_notification_to_group(self, notification: Notification, group: str):
|
|
data = self._serialize_notification(notification)
|
|
self.sender.send_to_group(group=group, data=data)
|
|
|
|
def _send_notification_to_user(self, notification: Notification, user: User):
|
|
data = self._serialize_notification(notification)
|
|
self.sender.send_to_user(user=user, data=data)
|
|
|
|
@staticmethod
|
|
def _create_notification(data: dict) -> Optional[Notification]:
|
|
serializer = NotificationSerializer(data=data)
|
|
if serializer.is_valid():
|
|
return serializer.save()
|
|
_log.warning(f'Invalid Notification data: {serializer.errors}')
|
|
|
|
@staticmethod
|
|
def _serialize_notification(notification) -> dict:
|
|
data = NotificationSerializer(instance=notification).data
|
|
return data
|