46 lines
1.8 KiB
Python
46 lines
1.8 KiB
Python
import pytest
|
|
from channels.db import database_sync_to_async
|
|
from channels.testing import WebsocketCommunicator
|
|
|
|
from notifications.models import Notification
|
|
from notifications.services.notification_sender import NotificationService
|
|
from notifications.services.ws import WSNotification
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
class TestWebSocket:
|
|
|
|
async def test_connection_to_ws(self, admin_user):
|
|
communicator = WebsocketCommunicator(WSNotification.as_asgi(), '/testws/')
|
|
communicator.scope['user'] = admin_user
|
|
connected, _ = await communicator.connect()
|
|
assert connected
|
|
message = await communicator.receive_json_from()
|
|
assert 'firewalls_status' in message
|
|
assert 'incident_count' in message
|
|
assert not message['incident_count']
|
|
assert message['firewalls_status'] == {}
|
|
await communicator.disconnect()
|
|
|
|
async def test_receive_notification(self, admin_user):
|
|
communicator = WebsocketCommunicator(WSNotification.as_asgi(), '/testws/')
|
|
communicator.scope['user'] = admin_user
|
|
connected, _ = await communicator.connect()
|
|
assert connected
|
|
await communicator.receive_json_from() # method send_init_data returns first message, skip it
|
|
|
|
@database_sync_to_async
|
|
def send_notification():
|
|
notification = Notification.objects.create(text='Message from test')
|
|
NotificationService().send(notification, user=admin_user)
|
|
return notification
|
|
|
|
notification = await send_notification()
|
|
|
|
message = await communicator.receive_json_from()
|
|
assert message['id'] == notification.pk
|
|
assert message['text'] == 'Message from test'
|
|
assert await communicator.receive_nothing(timeout=1) is True # no messages for a second
|
|
|
|
await communicator.disconnect()
|