old_console/console/api/users.py
2024-11-02 14:12:45 +03:00

332 lines
13 KiB
Python

import json
import logging
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.models import User, Permission, Group
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from console import conslog
from console.serializers import AllPermsSerializer, GroupNameSerializer
from core.decorators import log_url
from perms.models import Perm
from perms.services.get_permissions import get_all_linked_permissions, get_linked_permissions_name
from users.serializers import UserSerializers
_log = logging.getLogger(__name__)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
def all_perms(requset):
""" API for getting list of all console permissions
:param requset: request object
:return: JSON with all console groups
"""
if requset.method == 'GET':
permissions = Permission.objects.all().exclude(
name='Can view vulnerabilities')
serializer = AllPermsSerializer(permissions, many=True)
return JsonResponse(serializer.data, safe=False)
else:
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
@log_url
@login_required
def get_linked_permissions(request):
permissions = request.GET.getlist('permissions[]', [])
data = get_linked_permissions_name(permissions)
return JsonResponse(data)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
def user_perms(request, pk):
""" API for getting user permissions
:param request: request object
:param pk: target user primaty key
:return: JSON with target user permissions
"""
if request.method == 'GET':
user_by_id = get_object_or_404(User, pk=pk)
user_perms = Permission.objects.filter(user=user_by_id)
serializer = AllPermsSerializer(user_perms, many=True)
return JsonResponse(serializer.data, safe=False)
else:
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
def all_groups(requset):
""" API for getting all console groups
:param requset: request object
:return: JSON with all console groups
"""
if requset.method == 'GET':
groups = Group.objects.all()
serializer = GroupNameSerializer(groups, many=True)
return JsonResponse(serializer.data, safe=False)
else:
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
def group_perms(request, g_name):
""" API for getting target group permissions
:param request: request object
:param g_name: target group name
:return: JSON with target group permissions
"""
if request.method == 'GET':
target_group = Group.objects.filter(name=g_name)
g_perms = Permission.objects.filter(content_type__app_label="perms", group__in=target_group).exclude(
name='Can view vulnerabilities')
serializer = AllPermsSerializer(g_perms, many=True)
return JsonResponse(serializer.data, safe=False)
else:
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
def all_perms_wo_group(request, g_name):
""" API for getting permissions, which target group dont have
:param request: request object
:param g_name: target group name
:return: JSON with permissions, whic target group dont have
"""
if request.method == 'GET':
target_group = Group.objects.filter(name=g_name)
perm_codenames = []
g_perms = Permission.objects.filter(content_type__app_label="perms", group__in=target_group)
perms_count = g_perms.count()
for perm in g_perms:
perm_codenames.append(perm.codename)
exc_perms = Permission.objects.filter(content_type__app_label="perms").exclude(
codename__in=perm_codenames
).exclude(name='Can view vulnerabilities')
serializer = AllPermsSerializer(exc_perms, many=True)
return JsonResponse(serializer.data, safe=False)
else:
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
def group_users(request, g_name):
""" API for getting users, which assigned to target group
:param request: request object
:param g_name: target group name
:return: JSON with target group users
"""
if request.method == 'GET':
users = User.objects.filter(groups__name=g_name).exclude(username__startswith='deleted_')
serializer = UserSerializers(users, many=True)
return JsonResponse(serializer.data, safe=False)
else:
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
def group_users_excluded(request, g_name):
""" API for getting users, which are not assigned to the target group
:param request: request object
:param g_name: target group name
:return: JSON with serialized user data
"""
group_users = User.objects.filter(groups__name=g_name)
group_users_usernames = []
for user in group_users:
group_users_usernames.append(user.username)
available_users = User.objects.exclude(username__in=group_users_usernames).filter(is_active=True)
serializer = UserSerializers(available_users, many=True)
return JsonResponse(serializer.data, safe=False)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
@api_view(['POST'])
def update_group_permission(request, g_name):
""" API for updating target group permissions with the information, that user provided
:param request: request object
:param g_name: target group name
:return: JSON with serialized permissions data
"""
# Recieving JSON with perms from top right table on 'manage_rights' page
table_data = json.loads(request.POST.get('perms'))
# perms_all = []
# for item in table_data:
# perms_all.append(item['codename'])
# perms_from_table = Permission.objects.filter(codename__in=perms_all).exclude(
# name='Can view vulnerabilities')
# serializer = AllPermsSerializer(perms_from_table, many=True)
# Method for setting new permissions from permission list, provided by user
perm_codenames = []
# Getting list of codenames, that users chose to apply to target group
for item in table_data:
perm_codenames.append(item.get('codename'))
# Gettimg current perms of target group
target_group = Group.objects.get(name=g_name)
# Clearing all permissions from group
target_group.permissions.clear()
# Adding permissions, chosen by user to target group
perm_codenames = get_all_linked_permissions(perm_codenames)
permissions = Permission.objects.filter(codename__in=perm_codenames)
target_group.permissions.set(permissions)
serializer = AllPermsSerializer(permissions, many=True)
log_message = f"User [{request.user}] updated the [[{g_name}]] group permissions. New group permissions: {perm_codenames}"
conslog.add_info_log(log_message, _log)
return JsonResponse(serializer.data, safe=False)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
@api_view(['POST'])
def update_group_users(request, g_name):
""" API for updating the group users set
:param request: request object
:param g_name: target group name from the user
:return: JSON with serialized users data
"""
users = User.objects.filter(groups__name=g_name)
target_group = Group.objects.get(name=g_name)
# Gettimg list of usernames form bottom right table in JSON format
users_new = json.loads(request.POST.get('users'))
usernames_after = []
for user in users_new:
usernames_after.append(user.get('username'))
# Getting usernames, which are assigned to the target group before any changes
users_before = User.objects.filter(groups__name=g_name)
usernames_before = []
for user in users_before:
usernames_before.append(user.username)
# Removing all users from group
target_group.user_set.clear()
users_before_1 = User.objects.filter(groups__name=g_name)
# Adding users to group, provided by operator
for username in usernames_after:
target_group.user_set.add(User.objects.get(username=username))
users_before_2 = User.objects.filter(groups__name=g_name)
serializer = UserSerializers(users, many=True)
log_message = f"User [{request.user}] updated the [[{g_name}]] group users. New group users: {usernames_after}"
conslog.add_info_log(log_message, _log)
return JsonResponse(serializer.data, safe=False)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
@api_view(['POST'])
def delete_group_api(request, g_name):
""" API for deleting the group
:param request: request object
:param g_name: target group name
:return: JSON with serialized groups data
"""
target_group = Group.objects.get(name=g_name)
serializer = GroupNameSerializer(target_group)
Group.objects.get(name=g_name).delete()
log_message = f"User [{request.user}] deleted [[{g_name}]] group"
conslog.add_info_log(log_message, _log)
return JsonResponse(serializer.data)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
@api_view(['POST'])
def add_new_group(request, g_name):
""" API for adding new group
:param request: request object
:param g_name: target group name
:return: JSON with serialized groups data
"""
# Prepearing data for serializer (needs to be dict value, revieving a string from request
dict_name = {'name': g_name}
serializer = GroupNameSerializer(data=dict_name)
if serializer.is_valid():
serializer.save()
# serializer.add_new_group(validated_info=serializer.data)
log_message = f"User [{request.user}] created a [[{g_name}]] group"
conslog.add_info_log(log_message, _log)
return Response(serializer.data)
else:
log_message = f"User [{request.user}] failed to create a new group [[{g_name}]] due to the error: {serializer.errors}"
conslog.add_error_log(log_message, _log)
return JsonResponse(serializer.errors, status=400)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
@api_view(['POST'])
def rename_group(request, g_name, new_g_name):
""" API for renaming the target group
:param request: request object
:param g_name: target group name
:param new_g_name: new target group name
"""
# Prepearing data for serializer (needs to be dict value, revieving a string from request
dict_name = {'name': new_g_name}
serializer = GroupNameSerializer(data=dict_name)
if serializer.is_valid():
upd_group = Group.objects.get(name=g_name)
upd_group.name = serializer.data['name']
upd_group.save()
log_message = f"User [{request.user}] renamed [[{g_name}]] group with the name: [[{new_g_name}]]"
conslog.add_info_log(log_message, _log)
return Response(serializer.data)
else:
log_message = f"User [{request.user}] failed to rename [[{g_name}]], due to the error: {serializer.errors}"
conslog.add_error_log(log_message, _log)
return JsonResponse(serializer.errors, status=400)
@log_url
@login_required
@permission_required(Perm.perm_req(Perm.can_add_group), raise_exception=True)
@api_view(['POST'])
def copy_group(request, g_name, new_g_name):
""" API for copying the target group
:param request: request object
:param g_name: target group name
:param new_g_name: new target group name
"""
# Preparing data for serializer (needs to be dict value, revieving a string from request
dict_name = {'name': new_g_name}
serializer = GroupNameSerializer(data=dict_name)
if serializer.is_valid():
master_group = Group.objects.get(name=g_name)
slave_group = Group.objects.create(name=serializer.data['name'])
for perm in master_group.permissions.all():
slave_group.permissions.add(Permission.objects.get(codename=perm.codename))
log_message = f"User [{request.user}] created a copy of the [[{g_name}]] group with the name: [[{new_g_name}]]"
conslog.add_info_log(log_message, _log)
return Response(serializer.data)
else:
log_message = f"User [{request.user}] failed to create copy of the [[{g_name}]], due to the error: {serializer.errors}"
conslog.add_error_log(log_message, _log)
return JsonResponse(serializer.errors, status=400)