Django+React全栈开发:自定义验证与授权

创建于:发布于:文集:Django+React全栈开发

因为之前有人问过Django的自定义用户模型,就写了这篇文章,放在我的《Django+React全栈开发》系列里凑个数,不过和后续内容关联性不大,不感兴趣可以直接跳过。

自定义User

Django原生的User模型已经足够满足一般小网站的需求,但是有时候不可避免要对用户模型做一些定制,官方文档给出了四种方法

前两个方法适用于只要扩展用户信息或增加一些处理方法而和身份验证无关,而后两者则适用于对于身份验证有定制需求。

继承AbstractUser

Django官方文档对于如何定制用户模型有着详尽的解释,这里仅仅讲讲我在某次实践中是如何使用的。

首先我们可以新建一个Django app,我们可以把验证授权相关的功能都放在这里,假定命名为core。假如我们需要多种分级的等级标识,而不仅仅是原生User模型的is_staff字段指示用户是否是管理员,例如需要三重等级,可以像下面这样编写代码:

# core/models.py
class User(AbstractUser):
    LEVEL_SET = (
        (0, 'Super User'),
        (1, 'Normal User'),
        (2, 'Internship'),
    )
    level = models.IntegerField(choices=LEVEL_SET, default=2)
 
    class Meta:
        ordering = ('date_joined',)

之后需要在项目的settings.py文件中加入:

# 字符串内容是“app名.模型名”
AUTH_USER_MODEL = 'core.User'

可以在core/admin.py中注册我们的定制模型:

from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from .models import User
 
admin.site.register(User, UserAdmin)

有一点需要注意,Django的模型需要迁移操作,对于定制的User,最好在项目刚刚开始的时候,在你还没有执行第一次python manage.py migrate的时候完成上述操作,当然如果还在开发阶段,即使之前执行过迁移操作,也可以通过删除项目中所有migrations文件夹以及sqlite文件来初始化。

此后,如果你的模型中有需要自定义用户模型做外键的需求,例如文章与文章作者,可以参考如下设置:

from django.conf import settings
from django.db import models
 
class Article(models.Model):
    author = models.ForeignKey(
        settings.AUTH_USER_MODEL,
        on_delete=models.CASCADE,
    )

任何需要使用到我们自定义用户模型的地方都可以这样操作。

序列化

可以参考如下代码:

# core/serializers.py
from rest_framework import serializers
from core.models import User
 
 
class UserSerializer(serializers.HyperlinkedModelSerializer):
    password = serializers.CharField(style={'input_type': 'password'}, label='密码', write_only=True)
 
    class Meta:
        model = User
        fields = ['url', 'id', 'username', 'password', 'email', 'level', 'is_active', 'date_joined']

这里我们设置password字段时加入了write_only=True这个参数,这样我们的view视图将只会在处理POSTPUTPATCH请求时(如果你允许这些请求的话)写入密码而不会在返回用户列表或详情信息时显示密码

接下来可以写个简单的视图试试:

# 别忘了引入我们自定义的模型与序列化器
class UserViewSet(viewsets.ModelViewSet):
    queryset = User.objects.get_queryset()
    serializer_class = UserSerializer

现在尝试使用POST请求创建一个新用户吧(不过要记得注册路由),最简单的方法是直接用浏览器打开访问127.0.0.1:8000/api/users/。接着使用新建的账户密码验证登录,你会发现验证失败。

为了安全起见,我们设置的密码会经过加密处理再放入数据库,同样,验证用户密码时,也会对密码加密再比对密文,这样即使是拥有查看数据库权限的人也无法查看用户密码的明文。但是这里我们的视图没有对密码进行加密就被存入了数据库,而用户验证时却是用的Django自身的API,比对的是密文,也就是验证时你提交的密码被加密,而数据库中的密码却没有加密,这样就出现了无法匹配的现象。

可以通过覆写ViewSetcreate方法来修复这个bug:

from django.contrib.auth.hashers import make_password
 
 
class UserViewSet(viewsets.ModelViewSet):
    # ......
    def create(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        serializer.validated_data['password'] = make_password(serializer.validated_data['password'])
        self.perform_create(serializer)
        headers = self.get_success_headers(serializer.data)
        return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)

这里调用Django提供的make_password函数来生成正确的加密的密码。

既然是编写REST风格的API,那么建议对于用户的增加、修改、删除都使用这个视图。对于用户改密码的需求,可以在序列化器中添加一个old_password字段,并设置为当前密码,同时要改写视图类的partial_update方法。以下是一个我用来实现超管直接修改所有用户密码的需求(不要问我为什么会有这种需求~)的方式:

class UserViewSet(viewsets.ModelViewSet):
    # ......
    def partial_update(self, request, *args, **kwargs):
        if 'password' in request.data:
            request.data['password'] = make_password(request.data['password'])
        kwargs['partial'] = True
        return self.update(request, *args, **kwargs)

通过设置partial参数为True并将内容传递给update来实现仅针对密码部分更新。

自定义Token验证

常规情况下我们通过用户的用户名与密码来识别用户身份,最基础的方法是每次请求都需要用户名及密码,但是这非常麻烦且容易暴露敏感信息,一般不采用。比较常见的方式是基于OAuthSession以及Token的验证方式。REST framework为我们提供了可用的TokenAPI,这里介绍一下在此基础上做一些扩展。

注意:这里的Token和下一章要讲的JWT并不等同。

一般来说登录验证会使用到一些成熟的第三方库,这里拿原生的Token验证来练习一下。

Token类

这里使用的基于Token的验证就是客户端发送用户密码,服务端创建一个与用户相对应的随机字符串,之后客户端每次请求时在请求头中加上这段字符串,服务端解码Token再与数据库信息进行比对,即可通过验证。

为了使用REST framework提供的Token我们需要在settings.py中注册:

INSTALLED_APPS = [
    ...
    'rest_framework.authtoken'
]

如果你已经创建过用户,可以使用命令python manage.py shell,按如下操作:

>>> from core.models import User
>>> from rest_framework.authtoken.models import Token
 
>>> for user in User.objects.all():
>>>     Token.objects.get_or_create(user=user)

同时修改core/models.py,通过Django的信号机制,在每次新建用户时为其创建Token:

......
 
@receiver(post_save, sender=settings.AUTH_USER_MODEL)
def create_auth_token(sender, instance=None, created=False, **kwargs):
    # 接收用户创建信号,每次新建用户后自动创建token
    if created:
        Token.objects.create(user=instance)

接下来修改你需要添加权限的视图:

from rest_framework.authentication import TokenAuthentication
from rest_framework.permissions import IsAuthenticated
 
# ......
 
class ArticleViewSet(viewsets.ModelViewSet):
    authentication_classes = [TokenAuthentication]
    permission_classes = [permissions.IsAuthenticated]
    queryset = Article.objects.all()
    serializer_class = ArticleSerializer

通过authentication_classes指定要使用的验证类,有关permission_classes的内容下节在说。现在我们设置一下项目的urls.py

from rest_framework.authtoken import views
 
 
urlpatterns = [
    ......
    path('api-token-auth/', views.obtain_auth_token),
]

现在向该接口发送POST请求提交用户密码,将会得到Token,仅在将该Token放在请求头headers中,才可得到articles的正确响应,使用命令行工具httpie调试的示例如下:

$ http POST http://127.0.0.1:8000/api-token-auth/ username="user" password="password"                           
HTTP/1.1 200 OK
......
 
{
    "token": "bed522b6f41b962b5c829598e990b9f058518c9d"
}
 
$ http http://127.0.0.1:8000/articles/ 'Authorization: Token bed522b6f41b962b5c829598e990b9f058518c9d'
 

你可以尝试一下不带Authorization这一串会得到什么响应。

Token过期

但是REST framework自带的Token有着不小的缺陷,最典型的一点是这个Token没有过期机制,这意味着如果有谁截获了你的Token,就可以无限制的使用,安全风险实在太大。下面我们来试试扩展一下原生的Token验证,新建core/authentication.py

import datetime
from django.conf import settings
from django.core.cache import cache
from rest_framework.authentication import TokenAuthentication
from rest_framework import exceptions
from django.utils.translation import ugettext_lazy as _
 
# 记得要在settings.py中设置REST_FRAMEWORK_TOKEN_EXPIRE_MINUTES变量
# 这是为了方便以后调节过期时间,例如给该变量赋值为60,则为一小时过期
EXPIRE_MINUTES = getattr(settings, 'REST_FRAMEWORK_TOKEN_EXPIRE_MINUTES', 1)
 
 
class ExpiringTokenAuthentication(TokenAuthentication):
    """
    Setup token expired time
    """
    def authenticate_credentials(self, key):
        model = self.get_model()
        # 利用Django的cache减少数据库操作
        cache_user = cache.get(key)
        if cache_user:
            return cache_user, key
 
        try:
            token = model.objects.select_related('user').get(key=key)
        except model.DoesNotExist:
            raise exceptions.AuthenticationFailed(_("无效令牌"))
 
        if not token.user.is_active:
            raise exceptions.AuthenticationFailed(_("用户被禁用"))
 
        time_now = datetime.datetime.now()
 
        if token.created < time_now - datetime.timedelta(minutes=EXPIRE_MINUTES):
            token.delete()
            raise exceptions.AuthenticationFailed(_("认证信息已过期"))
 
        if token:
            # EXPIRE_MINUTES * 60 because the param is seconds
            cache.set(key, token.user, EXPIRE_MINUTES * 60)
 
        return token.user, token

同时我们可以修改core/views.py,定制验证视图,如果当前Token没有过期则返回cache中的Token,否则创建新Token:

from rest_framework.authtoken.views import ObtainAuthToken
 
# ......
 
class ObtainExpiringAuthToken(ObtainAuthToken):
    # 别忘了from rest_framework.authentication import BasicAuthentication
    # 这是通过post用户名密码获取token的视图,可不能采取token验证哦
    authentication_classes = [BasicAuthentication]
 
    def post(self, request, *args, **kwargs):
        serializer = self.serializer_class(data=request.data)
        if serializer.is_valid():
            user = serializer.validated_data['user']
            token, created = Token.objects.get_or_create(user=user)
            time_now = datetime.datetime.now()
 
            if created or (token.created < time_now - datetime.timedelta(minutes=EXPIRE_MINUTES)):
                token.delete()
                token = Token.objects.create(user=user)
                token.created = time_now
                token.save()
            # 这里可以定制返回信息
            context = {
                'id': user.id,
                'username': user.username,
                'token': token.key
            }
 
            return Response(context)
        else:
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

这样我们要修改urls.py以启用我们新的验证视图:

from core.views import ObtainExpiringAuthToken
 
 
urlpatterns = [
    ......
    path('api-token-auth/', ObtainExpiringAuthToken.as_view()),
]

现在你可以修改settings.py中的REST_FRAMEWORK_TOKEN_EXPIRE_MINUTES变量为1来看看Token过期的效果。

定制permission

既然有了验证,也就是对用户的身份进行识别是管理员、普通用户,还是未登录用户,那么肯定要针对不同类型的用户给予不同权限,否则整个验证过程就失去了意义。事实上我们之前在articlesAPI中已经使用了REST framework提供的IsAuthenticated权限,指定只有经过登录验证的用户可以访问。现在让我们设置一个基于用户级别的权限吧,新建core/permissions.py

from rest_framework import permissions
 
 
class AdministratorLevel(permissions.BasePermission):
    # 客户端向服务端发送请求后,此方法被调用,根据返回的布尔值决定用户是否拥有权限
    def has_permission(self, request, view):
        if request.user.is_authenticated:
            if request.method in permissions.SAFE_METHODS:
                return True
            # 普通管理员可修改数据
            elif request.method.upper() in ('POST', 'PUT', 'PATCH') and request.user.level == 1:
                return True
            # 超级管理员拥有所有权限
            elif request.user.level == 0:
                return True
            else:
                return False
        return False

现在可以修改articles API的视图,用我们自定义的权限类替换掉之前的IsAuthenticated,并且新建多个不同等级的用户,试试它们的权限吧。这里的if-else分支可以优化一下,不妨试试。

Throttling

顾名思义,throttling起到节流作用,它和permissions有些类似,但可以用来限制客户端的请求频率。

例如,我们想要用户的一个Token在一小时内过期,但只要用户保持活跃,那么在较长的一段时间内不必重复登录。可以添加一个通过旧Token获取新Token的接口,由前端判断如果用户在活跃状态下,那么可以在用户不知道的情况下获取新的Token。

# core/views.py
from rest_framework.views import APIView
 
# ......
 
class TokenForToken(APIView):
    authentication_classes = [ExpiringTokenAuthentication]
    permission_classes = [permissions.IsAuthenticated]
 
    def get(self, request, format=None):
        user = request.user
        # 这里有个小bug,留给读者去思考了
        token, created = Token.objects.get_or_create(user=user)
        time_now = datetime.datetime.now()
        token.delete()
        token = Token.objects.create(user=user)
        token.created = time_now
        token.save()
        return Response({'token': token.key}

urls.py中注册此视图,我们就可以用旧的Token来替换新的Token,但是如果你想要限制用户使用此方法的次数,则可以设置Throttling。如下修改settings.py

REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_CLASSES': [
        'rest_framework.throttling.UserRateThrottle'
    ],
    'DEFAULT_THROTTLE_RATES': {
        'user': '10/day'
    }
}

接着在core/views.py中修改:

from rest_framework.throttling import UserRateThrottle
 
# ......
class TokenForToken(APIView):
    authentication_classes = [ExpiringTokenAuthentication]
    permission_classes = [permissions.IsAuthenticated]
    throttle_classes = [UserRateThrottle]
 
    # ......

这样可以限制每个用户每天最多请求10次。更多throttling的用法请查看REST framework官方文档。

EOF
文章有帮助?为我充个
版权声明