56 lines
1.9 KiB
Python
56 lines
1.9 KiB
Python
import logging
|
||
import os
|
||
|
||
from channels.auth import AuthMiddlewareStack
|
||
from channels.db import database_sync_to_async
|
||
from channels.routing import ProtocolTypeRouter, URLRouter
|
||
from django.contrib.auth.models import AnonymousUser
|
||
from django.core.asgi import get_asgi_application
|
||
from rest_framework.authtoken.models import Token
|
||
|
||
from console.routing import websocket_urlpatterns
|
||
|
||
_log = logging.getLogger(__name__)
|
||
|
||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'console.settings.dev')
|
||
|
||
django_asgi_app = get_asgi_application()
|
||
|
||
|
||
class TokenHeaderAuthMiddleware:
|
||
"""Middleware для авторизации WebSocket с помощью токена в header.
|
||
|
||
Будет вызываться в последнюю очередь и только после
|
||
неуспешной попытки аутентификации по сессии, т.е. если scope['user'] == AnonymousUser()
|
||
|
||
Пример:
|
||
Authorization: Token vgft67uhgtreerfcgvh678uihvhkugct7iyukv
|
||
"""
|
||
|
||
def __init__(self, inner):
|
||
self.inner = inner
|
||
|
||
async def __call__(self, scope, receive, send):
|
||
headers = dict(scope['headers'])
|
||
if b'authorization' in headers and scope['user'] == AnonymousUser():
|
||
token_name, token_key = headers[b'authorization'].decode().split()
|
||
if token_name == 'Token':
|
||
user = await self.get_user_by_token(token_key)
|
||
if user:
|
||
scope['user'] = user
|
||
return await self.inner(scope, receive, send)
|
||
return await self.inner(scope, receive, send)
|
||
|
||
@database_sync_to_async
|
||
def get_user_by_token(self, token):
|
||
try:
|
||
return Token.objects.get(key=token).user
|
||
except Token.DoesNotExist:
|
||
return
|
||
|
||
|
||
application = ProtocolTypeRouter(
|
||
{
|
||
"websocket": AuthMiddlewareStack(TokenHeaderAuthMiddleware(URLRouter(websocket_urlpatterns)))
|
||
}
|
||
)
|