# Custom verification token rules
1. View class
from .authentications import JWTAuthentication
class UserDetail1(APIView):
permission_classes = [IsAuthenticated] # Login required
authentication_classes = [JWTAuthentication] # jwt user token custom login authentication rules
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
2. Customize the token rules and generate an authentications.py certification file in the api
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION ')
# Custom rule: auth token jwt, call the following rule method< /span>
token = self.parse_jwt_token(jwt_token)
# If there is no value, it cannot be verified
if token is None:
return None
try:
# token =>payload reversely parses the payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature: # Determine whether it is expired
raise AuthenticationFailed('Token has expired')
except:
raise AuthenticationFailed('Illegal user')
user = self.authenticate_credentials(payload) # parse out user according to payload
return (user, token)
# Custom verification rules: auth token jwt, auth is the preface , Jwt is the suffix
def parse_jwt_token(self, jw_token):
tokens = jw_token.split()
if len(tokens) != 3 or tokens [0].lower() != 'auth' or tokens[2].lower() != ' jwt' :
return None
# Return the core content of the token for verification
return tokens[1]
# Custom drf-jwt configuration< /strong>
import datetime
JWT_AUTH = {
# user => payload
'JWT_PAYLOAD_HANDLER ':
'rest_framework_jwt.utils.jwt_payload_handler',
# payload => token
'JWT_ENCODE_HANDLER ':
'rest_framework_jwt.utils.jwt_encode_handler',
# token => payload
'JWT_DECODE_HANDLER ':
'rest_framework_jwt.utils.jwt_decode_handler',
# token expiration time
'JWT_EXPIRATION_DELTA ': datetime.timedelta(days=7),
# token refresh expiration time
'JWT_REFRESH_EXPIRATION_DELTA ': datetime.timedelta(days=7),
# Anti-climbing small measure prefix
'JWT_AUTH_HEADER_PREFIX ': 'JWT',
}
Realize multiple ways to log in and issue tokens: such as-account , Mobile phone number, email, etc. login:
1. Disable authentication and authorization components
2. Get the front desk login information and hand it to the serialization class
3. Serialization school It is verified that the logged-in user and token are stored in the serialized object
4. Take out the logged-in user and token and return to the front desk
"""
from .serializers import UserModelSerializer< br>
# 1. View class
class< span style="color: #000000;"> LoginAPIView(APIView):
authentication_classes = [] # Disable authentication
permission_classes = [] # Disable permissions
def post(self, request, *args, **kwargs):
user_ser = UserModelSerializer(data=request.data) # Deserialization Check it out
# Pass the verification, if no exception is reported
user_ser.is_valid(raise_exception=True)
# Log in normally and return the generated token to the front end
return APIResponse(token=user_ser.token, results=UserModelSerializer(user_ser.user). data)
#2. Serialization class, verify, generate token and send it out
from rest_framework import serializers
from. import models
import re
from rest_framework_jwt.serializers import jwt_payload_handler, jwt_encode_handler
class UserModelSerializer(serializers.ModelSerializer):
# Custom deserialization fields: write_only, value must be set Participate in deserialization and will not be mapped with model class fields
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = ['usr', 'pwd', 'username< span style="color: #800000;">', 'mobile', 'email< /span>']
# System verification rules
extra_kwargs = {
# ‘usr’: {
# ‘required’: True, # must be verified
# ‘min_length’: 3,
# ‘error_messages’: {
# ‘required’: ‘must fill in, you are a bedding’,
# ‘min_length’: ‘Too short a building! ‘,
# }
# },
'username ': {
'read_only': True
},
'mobile': {
'read_only': True
},
'email': {
'read_only': True
}
}
# Global hooks, attrs are verified
def validate(self, attrs):
usr = attrs.get('usr')
pwd = attrs.get('pwd')
# Multi-method login: each branch processes the user corresponding to this method
if re.match(r'[email protected]+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
# The signature is generated into a token, and the token is stored in the instantiated object
payload = jwt_payload_handler(user_obj) # put the header, and the payload expiration time, the user object To generate a payload
token = jwt_encode_handler(payload) # put the header, payload, and secret = secret key Generate token after encryption
self.token = token # assign token to the object
self.user = user_obj
print(token)
return attrs
raise serializers.ValidationError({'data': '< span style="color: #800000;">The data provided is incorrect'})
< br>Search filtering, sorting, paging:
from. import models
from .serializers import CarModelSerializer
# Car’s group search interface
from rest_framework.generics import ListAPIView
# 1.drf’s SearchFilter-Search Filter
from rest_framework.filters import SearchFilter
# 2.drf's OrderingFilter-sorting filtering
from rest_framework.filters import OrderingFilter
# 3.drf's paging class-custom
from. import pagenations
class CarListAPIView(ListAPIView):
permission_classes = [] # permission cancellation
authentication_classes = [] # authentication cancellation
queryset = models.Car.objects.all()
serializer_class = CarModelSerializer
# Local configuration filter classes (DEFAULT_FILTER_BACKENDS for global configuration) span>
filter_backends = [SearchFilter,OrderingFilter]
# SearchFilter filter type dependent filter conditions => Interface: /cars /?search=...
search_fields = ['name', 'price']
# OrderingFilter filter class dependent filter conditions => Interface: /cars /?ordering=..., it is ascending order,-is descending order
ordering_fields = ['pk', 'price']
# eg: /cars/?ordering=-price,pk, Descending order by price first, if the same price appears, then press pk in ascending order
pagination_class = pagenations.MyPageNumberPagination
# Custom pagination class
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=page number
page_query_param = 'page'
# ?page=page The number of default pages displayed on the next page
page_size = 3
# ?page=page&page_size=number of entries user-defined number of entries displayed on a page
page_size_query_param = 'page_size'
max_page_size = 5
from .authentications import JWTAuthentication
class UserDetail1(APIView):
permission_classes = [IsAuthenticated] # Login required
authentication_classes = [JWTAuthentication] # jwt user token custom login authentication rules
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION ')
# Custom rule: auth token jwt, call the following rule method< /span>
token = self.parse_jwt_token(jwt_token)
# If there is no value, it cannot be verified
if token is None:
return None
try:
# token =>payload reversely parses the payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature: # Determine whether it is expired
raise AuthenticationFailed('Token has expired')
except:
raise AuthenticationFailed('Illegal user')
user = self.authenticate_credentials(payload) # parse out user according to payload
return (user, token)
# Custom verification rules: auth token jwt, auth is the preface , Jwt is the suffix
def parse_jwt_token(self, jw_token):
tokens = jw_token.split()
if len(tokens) != 3 or tokens [0].lower() != 'auth' or tokens[2].lower() != ' jwt' :
return None
# Return the core content of the token for verification
return tokens[1]
import datetime
JWT_AUTH = {
# user => payload
'JWT_PAYLOAD_HANDLER ':
'rest_framework_jwt.utils.jwt_payload_handler',
# payload => token
'JWT_ENCODE_HANDLER ':
'rest_framework_jwt.utils.jwt_encode_handler',
# token => payload
'JWT_DECODE_HANDLER ':
'rest_framework_jwt.utils.jwt_decode_handler',
# token expiration time
'JWT_EXPIRATION_DELTA ': datetime.timedelta(days=7),
# token refresh expiration time
'JWT_REFRESH_EXPIRATION_DELTA ': datetime.timedelta(days=7),
# Anti-climbing small measure prefix
'JWT_AUTH_HEADER_PREFIX ': 'JWT',
}
class LoginAPIView(APIView):
authentication_classes = [] # Disable authentication
permission_classes = [] # Disable permission
def post(self, request, *args, **kwargs):
user_ser = UserModelSerializer(data=request.data) # Deserialization Check it out
# Pass the verification, if no exception is reported
user_ser.is_valid(raise_exception=True)
# 正常登陆,把生成的token返回给前端
return APIResponse(token=user_ser.token, results=UserModelSerializer(user_ser.user).data)
from rest_framework import serializers
from . import models
import re
from rest_framework_jwt.serializers import jwt_payload_handler, jwt_encode_handler
class UserModelSerializer(serializers.ModelSerializer):
# 自定义反序列化字段:一定要设置write_only,值参与反序列化,不会与model类字段映射
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = [‘usr‘, ‘pwd‘, ‘username‘, ‘mobile‘, ‘email‘]
# 系统校验规则
extra_kwargs = {
# ‘usr‘: {
# ‘required‘: True, # 必须校验
# ‘min_length‘: 3,
# ‘error_messages‘: {
# ‘required‘: ‘必须填写,你个铺盖‘,
# ‘min_length‘: ‘太短楼!‘,
# }
# },
‘username‘: {
‘read_only‘: True
},
‘mobile‘: {
‘read_only‘: True
},
‘email‘: {
‘read_only‘: True
}
}
# 全局钩子,attrs里面是通过校验的
def validate(self, attrs):
usr = attrs.get(‘usr‘)
pwd = attrs.get(‘pwd‘)
# 多方式登录:各分支处理得到该方式对应的用户
if re.match(r‘[email protected]+‘, usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r‘1[3-9][0-9]{9}‘, usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
# 签发生成token,将token存放到实例化对象的中
payload = jwt_payload_handler(user_obj) # 把头部,和载荷过期时间,user对象,生成payload
token = jwt_encode_handler(payload) # 把头部,载荷,和秘=秘钥经过加密生成token
self.token = token # 把token赋值到对象中
self.user = user_obj
print(token)
return attrs
raise serializers.ValidationError({‘data‘: ‘数据提供有误‘})
from . import models
from .serializers import CarModelSerializer
# Car的群查接口
from rest_framework.generics import ListAPIView
# 1.drf的SearchFilter - 搜索过滤
from rest_framework.filters import SearchFilter
# 2.drf的OrderingFilter - 排序过滤
from rest_framework.filters import OrderingFilter
# 3.drf的分页类 - 自定义
from . import pagenations
class CarListAPIView(ListAPIView):
permission_classes = [] # 权限取消
authentication_classes = [] # 认证取消
queryset = models.Car.objects.all()
serializer_class = CarModelSerializer
# 局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter,OrderingFilter]
# SearchFilter过滤类依赖的过滤条件 => 接口:/cars/?search=...
search_fields = [‘name‘, ‘price‘]
# OrderingFilter过滤类依赖的过滤条件 => 接口:/cars/?ordering=...,正是升序,-则是降序
ordering_fields = [‘pk‘, ‘price‘]
# eg:/cars/?ordering=-price,pk,先按price降序,如果出现price相同,再按pk升序
pagination_class = pagenations.MyPageNumberPagination
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=页码
page_query_param = ‘page‘
# ?page=页面 下默认一页显示的条数
page_size = 3
# ?page=页面&page_size=条数 用户自定义一页显示的条数
page_size_query_param = ‘page_size‘
max_page_size = 5