authentication.py
3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from django.contrib.auth import get_user_model
from oauth2_provider.contrib.rest_framework import OAuth2Authentication
from oauth2_provider.oauth2_backends import get_oauthlib_core
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
from rest_framework import exceptions
from rest_framework_jwt.settings import api_settings
import jwt
from django.utils.translation import ugettext as _
from common.redis_cache import redis_handler as rh
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
class OAuth2AuthenticationWithUser(OAuth2Authentication):
"""
OAuth 2 authentication backend using `django-oauth-toolkit`
"""
www_authenticate_realm = "api"
user = get_user_model().objects.first()
def authenticate(self, request):
"""
Returns two-tuple of (user, token) if authentication succeeds,
or None otherwise.
"""
oauthlib_core = get_oauthlib_core()
valid, r = oauthlib_core.verify_request(request, scopes=[])
if valid:
return self.user, r.access_token
request.oauth2_error = getattr(r, "oauth2_error", {})
return None
class MyJSONWebTokenAuthentication(JSONWebTokenAuthentication):
def authenticate_credentials(self, username):
"""
Returns an active user that matches the payload's user id and email.
"""
User = get_user_model()
# username = jwt_get_username_from_payload(payload)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User account is disabled.')
raise exceptions.AuthenticationFailed(msg)
return user
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
jwt_str = str(jwt_value, 'UTF-8')[-10:]
username = rh.get_token(jwt_str)
if isinstance(username, str):
rh.set_token(jwt_str, username)
else:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
# try:
# payload = jwt_decode_handler(jwt_value)
# except jwt.ExpiredSignature:
# msg = _('Signature has expired.')
# raise exceptions.AuthenticationFailed(msg)
# except jwt.DecodeError:
# msg = _('Error decoding signature.')
# raise exceptions.AuthenticationFailed(msg)
# except jwt.InvalidTokenError:
# raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(username)
return (user, jwt_value)