Issue token
Source code entry
"""
Prerequisite: Send user information to a view class with all authentication and permissions partially disabled to get the token, which is actually the login interface
1) The post method of rest_framework_jwt.views.ObtainJSONWebToken parent class JSONWebTokenAPIView
Accept post requests with username and password
2) The post method passes the request data to rest_framework_jwt.serializer.JSONWebTokenSerializer for processing
To complete the verification of the data, the global hook verification rules of the serialization class will be followed, and the login user will be obtained and the token will be issued and stored in the serialization object.
"""
def validate(self, attrs):
# Account password dictionary
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
# Issuing token Step 1: Use account password to get user object< /span>
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled. ')
raise serializers.ValidationError(msg)
# The second step of issuing token: get the payload through the user, the payload contains User information and expiration time
payload = jwt_payload_handler(user)
# In the view class, you can serialize the object .object. get('user' or'token') get user and token
return {
# Token issuance Step 3: Token issuance via payload span>
'token ': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.< /span>')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and " password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
"""
1) Get the user object by username and password
2) Generate payload through user object: jwt_payload_handler(user) => payload
from rest_framework_jwt.serializers import jwt_payload_handler
3) Issue token through payload: jwt_encode_handler(payload) => token
from rest_framework_jwt.serializers import jwt_encode_handler
"""
#< /span> Prerequisite: To access a view class configured with jwt authentication rules, you need to submit the authentication string token, and complete the token verification in the authentication class
# 1) The authenticate method of the parent class of rest_framework_jwt.authentication.JSONWebTokenAuthentication, BaseJSONWebTokenAuthentication
# The request header takes the authentication information jwt-token => Use the anti-crawl rule to determine the useful token => payload => user
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# Obtain token with anti-climbing rules: the front desk must press "jwt token" "String" to submit
# Verify user Step 1: Take token from request header HTTP_AUTHORIZATION and extract
jwt_value = self.get_jwt_value(request)
# Tourist
if jwt_value is None:
return None
# Verification
try:
# Verify user step 2: token => payload span>
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.< span style="color: #800000;">')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.< span style="color: #800000;">')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# Verify user step 3: token => payload span>
user = self.authenticate_credentials(payload)
return (user, jwt_value)
# 1) Get the token from the request header
# 2) Analyze the payload according to the token: jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3) Analyze the user according to the payload: self.authenticate_credentials(payload) => user
# Inherit the BaseJSONWebTokenAuthentication of drf-jwt, and get the parent’s authenticate_credentials method
from django.db import models
from django.contrib.auth.models import< span style="color: #000000;"> AbstractUser
class User(AbstractUser):
mobile = models.CharField(max_length=11, unique=True)
class Meta:
db_table = 'api_user'
verbose_name = 'User table'
verbose_name_plural = verbose_name
def __str__(self):
return self.username
from rest_framework import serializers
from. import models
import re
# Two functions to get the front-end token: user => payload => token
# from rest_framework_jwt.settings import api_settings
# jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
# jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
# 1) The front desk submits multiple login information using one key. Therefore, you can customize the deserialization field in the background to correspond.
# 2) The serialization class needs to handle serialization and deserialization, and set all the used Model classes bound to the model in the fields Field
# 3) Distinguish between serialized and deserialized fields read_only | write_only
# 4) In the custom verification rules (local hooks, global hooks) to verify whether the data is legal, determine the logged-in user, according to the user Issue token
# 5) Save the logged-in user and the issued token in the serialized object
class UserModelSerializer(serializers.ModelSerializer):
# Custom deserialization field: write_only must be set, only participate Deserialization, will not be mapped with the model class field
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = ['usr', 'pwd', 'username< span style="color: #800000;">', 'mobile', 'email< /span>']
# System verification rules
extra_kwargs = {
'username': {
'read_only': True
},
'mobile': {
'read_only': True
},
'email': {
'read_only': True
},
}
def validate(self, attrs):
usr = attrs.get('usr')
pwd = attrs.get('pwd')
# Multi-method login: each branch processes the corresponding User
if re.match(r'[email protected]+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
# Issuance: Get the logged-in user, issue the token and store it in the instantiation Object
if user_obj and user_obj.check_password(pwd):
# Sign and issue a token, and store the token in the token of the instantiated class object In the name
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
# save the current user and the issued token in the serialized object Medium
self.user = user_obj
self.token = token
return attrs
raise serializers.ValidationError({'data': '< span style="color: #800000;">The data is incorrect'})
#Realize multi-method login and issuance token: login account, mobile phone number, email, etc.
# 1) Disable authentication and authorization components
# 2) Get the front desk login information and hand it to the serialization class
# 3) The serialization class verifies that the logged-in user and token are stored in the serialization object
# 4) Take out the logged-in user and token and return to the front desk
import re
from. import serializers, models
from utils.response import APIResponse
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
class LoginAPIView(APIView):
# 1) Disable authentication and authorization components
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs) :
# 2) Get the front desk login information and hand it to the serialization class , Rules: use usr for account and pwd for password
user_ser = serializers.UserModelSerializer(data=request.data)
# 3) Serialized verification to get the login user and token storage In the serialized object
user_ser.is_valid(raise_exception=True)
# 4) Take out the logged-in user and token and return to the front desk
return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user ).data)
# "a rib" way of thinking: all logic is in view Handling in the class
def my_post(self, request, *args, **kwargs):
usr = request.data.get('usr ')
pwd = request.data.get('pwd ')
if re.match(r'[emailprotected]+', usr) :
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
return APIResponse(results={'username': user_obj.username}, token=token)
return APIResponse(data_msg='Uncontrollable error')
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION ')
# Custom verification rule: auth token jwt
token = self.parse_jwt_token(jwt_token)
if token is None:
return None
try:
# token => payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
raise AuthenticationFailed('token has expired')
except:
raise AuthenticationFailed('Illegal user')
# payload => user
user = self.authenticate_credentials(payload)
return (user, token)
# Custom verification rules: auth token jwt, auth is the previous Salt, jwt is post-salt
def parse_jwt_token(self, jwt_token ):
tokens = jwt_token.split()
if len(tokens) != 3 or tokens [0].lower() != 'auth' or tokens[2].lower() != ' jwt' :
return None
return tokens[1]
from rest_framework.views import APIView
from utils.response import APIResponse
# You must log in before you can access-passed the authentication authority component span>
from rest_framework.permissions import IsAuthenticated
# Custom jwt verification rules
from .authentications import JWTAuthentication
class UserDetail(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs) :
return APIResponse(results={‘username‘: request.user.username})
class Car(models.Model):
name = models.CharField(max_length=16, unique=True, verbose_name=‘车名‘)
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name=‘价格‘)
brand = models.CharField(max_length=16, verbose_name=‘品牌‘)
class Meta:
db_table = ‘api_car‘
verbose_name = ‘汽车表‘
verbose_name_plural = verbose_name
def __str__(self):
return self.name
admin.site.register(models.Car)
class CarModelSerializer(serializers.ModelSerializer):
class Meta:
model = models.Car
fields = [‘name‘, ‘price‘, ‘brand‘]
# Car的群查接口
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
url(r‘^cars/$‘, views.CarListAPIView.as_view()),
from rest_framework.generics import ListAPIView
# 第一步:drf的SearchFilter - 搜索过滤
from rest_framework.filters import SearchFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter]
# 第三步:SearchFilter过滤类依赖的过滤条件 => 接口:/cars/?search=...
search_fields = [‘name‘, ‘price‘]
# eg:/cars/?search=1,name和price中包含1的数据都会被查询出
from rest_framework.generics import ListAPIView
# 第一步:drf的OrderingFilter - 排序过滤
from rest_framework.filters import OrderingFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [OrderingFilter]
# 第三步:OrderingFilter过滤类依赖的过滤条件 => 接口:/cars/?ordering=...
ordering_fields = [‘pk‘, ‘price‘]
# eg:/cars/?ordering=-price,pk,先按price降序,如果出现price相同,再按pk升序
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=页码
page_query_param = ‘page‘
# ?page=页面 下默认一页显示的条数
page_size = 3
# ?page=页面&page_size=条数 用户自定义一页显示的条数
page_size_query_param = ‘page_size‘
# 用户自定义一页显示的条数最大限制:数值超过5也只显示5条
max_page_size = 5
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
# 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 分页组件 - 给视图类配置分页类即可 - 分页类需要自定义,继承drf提供的分页类即可
pagination_class = pagenations.MyPageNumberPagination
"""
前提: 给一个局部禁用了所有 认证与权限 的视图类发送用户信息得到token, 其实就是登陆接口
1) rest_framework_jwt.views.ObtainJSONWebToken 的父类 JSONWebTokenAPIView 的 post方法
接受有username, password的post请求
2) post方法将请求数据交给 rest_framework_jwt.serializer.JSONWebTokenSerializer 处理
完成数据的校验, 会走序列化类的 全局钩子校验规则, 校验得到登录用户并签发token存储在序列化对象中
"""
def validate(self, attrs):
# 账号密码字典
credentials = {
self.username_field: attrs.get(self.username_field),
‘password‘: attrs.get(‘password‘)
}
if all(credentials.values()):
# 签发token第1步:用账号密码得到user对象
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _(‘User account is disabled.‘)
raise serializers.ValidationError(msg)
# 签发token第2步:通过user得到payload,payload包含着用户信息与过期时间
payload = jwt_payload_handler(user)
# 在视图类中,可以通过 序列化对象.object.get(‘user‘或者‘token‘) 拿到user和token
return {
# 签发token第3步:通过payload签发出token
‘token‘: jwt_encode_handler(payload),
‘user‘: user
}
else:
msg = _(‘Unable to log in with provided credentials.‘)
raise serializers.ValidationError(msg)
else:
msg = _(‘Must include "{username_field}" and "password".‘)
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
"""
1) 通过username, password得到user对象
2) 通过user对象生成payload: jwt_payload_handler(user) => payload
from rest_framework_jwt.serializers import jwt_payload_handler
3) 通过payload签发token: jwt_encode_handler(payload) => token
from rest_framework_jwt.serializers import jwt_encode_handler
"""
# 前提:访问一个配置了jwt认证规则的视图类,就需要提交认证字符串token,在认证类中完成token的校验
# 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父类 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 请求头拿认证信息jwt-token => 通过反爬小规则确定有用的token => payload => user
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# 带有反爬小规则的获取token:前台必须按 "jwt token字符串" 方式提交
# 校验user第1步:从请求头 HTTP_AUTHORIZATION 中拿token,并提取
jwt_value = self.get_jwt_value(request)
# 游客
if jwt_value is None:
return None
# 校验
try:
# 校验user第2步:token => payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _(‘Signature has expired.‘)
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _(‘Error decoding signature.‘)
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# 校验user第3步:token => payload
user = self.authenticate_credentials(payload)
return (user, jwt_value)
# 1)从请求头中获取token
# 2)根据token解析出payload:jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3)根据payload解析出user:self.authenticate_credentials(payload) => user
# 继承drf-jwt的BaseJSONWebTokenAuthentication,拿到父级的authenticate_credentials方法
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
mobile = models.CharField(max_length=11, unique=True)
class Meta:
db_table = ‘api_user‘
verbose_name = ‘用户表‘
verbose_name_plural = verbose_name
def __str__(self):
return self.username
from rest_framework import serializers
from . import models
import re
# 拿到前台token的两个函数: user => payload => token
# from rest_framework_jwt.settings import api_settings
# jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
# jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
# 1) 前台提交多种登录信息都采用一个key,所以后台可以自定义反序列化字段进行对应
# 2) 序列化类要处理序列化与反序列化,要在fields中设置model绑定的Model类所有使用到的字段
# 3) 区分序列化字段与反序列化字段 read_only | write_only
# 4) 在自定义校验规则中(局部钩子、全局钩子)校验数据是否合法、确定登录的用户、根据用户签发token
# 5) 将登录的用户与签发的token保存在序列化类对象中
class UserModelSerializer(serializers.ModelSerializer):
# 自定义反序列字段:一定要设置write_only,只参与反序列化,不会与model类字段映射
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = [‘usr‘, ‘pwd‘, ‘username‘, ‘mobile‘, ‘email‘]
# 系统校验规则
extra_kwargs = {
‘username‘: {
‘read_only‘: True
},
‘mobile‘: {
‘read_only‘: True
},
‘email‘: {
‘read_only‘: True
},
}
def validate(self, attrs):
usr = attrs.get(‘usr‘)
pwd = attrs.get(‘pwd‘)
# 多方式登录:各分支处理得到该方式下对应的用户
if re.match(r‘[email protected]+‘, usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r‘1[3-9][0-9]{9}‘, usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
# 签发:得到登录用户,签发token并存储在实例化对象中
if user_obj and user_obj.check_password(pwd):
# 签发token,将token存放到 实例化类对象的token 名字中
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
# 将当前用户与签发的token都保存在序列化对象中
self.user = user_obj
self.token = token
return attrs
raise serializers.ValidationError({‘data‘: ‘数据有误‘})
#实现多方式登陆签发token:账号、手机号、邮箱等登陆
# 1) 禁用认证与权限组件
# 2) 拿到前台登录信息,交给序列化类
# 3) 序列化类校验得到登录用户与token存放在序列化对象中
# 4) 取出登录用户与token返回给前台
import re
from . import serializers, models
from utils.response import APIResponse
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
class LoginAPIView(APIView):
# 1) 禁用认证与权限组件
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
# 2) 拿到前台登录信息,交给序列化类,规则:账号用usr传,密码用pwd传
user_ser = serializers.UserModelSerializer(data=request.data)
# 3) 序列化类校验得到登录用户与token存放在序列化对象中
user_ser.is_valid(raise_exception=True)
# 4) 取出登录用户与token返回给前台
return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user).data)
# "一根筋" 思考方式:所有逻辑都在视图类中处理
def my_post(self, request, *args, **kwargs):
usr = request.data.get(‘usr‘)
pwd = request.data.get(‘pwd‘)
if re.match(r‘[email protected]+‘, usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r‘1[3-9][0-9]{9}‘, usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
return APIResponse(results={‘username‘: user_obj.username}, token=token)
return APIResponse(data_msg=‘不可控错误‘)
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get(‘HTTP_AUTHORIZATION‘)
# 自定义校验规则:auth token jwt
token = self.parse_jwt_token(jwt_token)
if token is None:
return None
try:
# token => payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
raise AuthenticationFailed(‘token已过期‘)
except:
raise AuthenticationFailed(‘非法用户‘)
# payload => user
user = self.authenticate_credentials(payload)
return (user, token)
# 自定义校验规则:auth token jwt,auth为前盐,jwt为后盐
def parse_jwt_token(self, jwt_token):
tokens = jwt_token.split()
if len(tokens) != 3 or tokens[0].lower() != ‘auth‘ or tokens[2].lower() != ‘jwt‘:
return None
return tokens[1]
from rest_framework.views import APIView
from utils.response import APIResponse
# 必须登录后才能访问 - 通过了认证权限组件
from rest_framework.permissions import IsAuthenticated
# 自定义jwt校验规则
from .authentications import JWTAuthentication
class UserDetail(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
return APIResponse(results={‘username‘: request.user.username})
class Car(models.Model):
name = models.CharField(max_length=16, unique=True, verbose_name=‘车名‘)
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name=‘价格‘)
brand = models.CharField(max_length=16, verbose_name=‘品牌‘)
class Meta:
db_table = ‘api_car‘
verbose_name = ‘汽车表‘
verbose_name_plural = verbose_name
def __str__(self):
return self.name
admin.site.register(models.Car)
class CarModelSerializer(serializers.ModelSerializer):
class Meta:
model = models.Car
fields = [‘name‘, ‘price‘, ‘brand‘]
# Car的群查接口
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
url(r‘^cars/$‘, views.CarListAPIView.as_view()),
from rest_framework.generics import ListAPIView
# 第一步:drf的SearchFilter - 搜索过滤
from rest_framework.filters import SearchFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter]
# 第三步:SearchFilter过滤类依赖的过滤条件 => 接口:/cars/?search=...
search_fields = [‘name‘, ‘price‘]
# eg:/cars/?search=1,name和price中包含1的数据都会被查询出
from rest_framework.generics import ListAPIView
# 第一步:drf的OrderingFilter - 排序过滤
from rest_framework.filters import OrderingFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [OrderingFilter]
# 第三步:OrderingFilter过滤类依赖的过滤条件 => 接口:/cars/?ordering=...
ordering_fields = [‘pk‘, ‘price‘]
# eg:/cars/?ordering=-price,pk,先按price降序,如果出现price相同,再按pk升序
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=页码
page_query_param = ‘page‘
# ?page=页面 下默认一页显示的条数
page_size = 3
# ?page=页面&page_size=条数 用户自定义一页显示的条数
page_size_query_param = ‘page_size‘
# 用户自定义一页显示的条数最大限制:数值超过5也只显示5条
max_page_size = 5
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
# 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 分页组件 - 给视图类配置分页类即可 - 分页类需要自定义,继承drf提供的分页类即可
pagination_class = pagenations.MyPageNumberPagination