Skip to content
Snippets Groups Projects
Commit c92adf29 authored by jan.bednarik's avatar jan.bednarik
Browse files

Custom middleware for JWT token auth.

parent c8097c2e
No related branches found
No related tags found
No related merge requests found
from django.http import JsonResponse
def graphql_error_response(message, status_code=400):
error = {'message': message}
return JsonResponse({'errors': [error]}, status=status_code)
from django.conf import settings
import json
import jwt
import time
......@@ -18,8 +17,3 @@ def create_access_token(username, expiration=None):
def parse_access_token(token):
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
return payload['sub']
def graphql_error_response(message, code=400):
error = {'message': message}
return json.dumps({'errors': [error]}), code, {'Content-Type': 'application/json'}
import re
from .api.utils import graphql_error_response
from .auth import parse_access_token
from .models import User
class TokenAuthMiddleware:
"""Custom authentication middleware which using JWT token."""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
auth_header = request.META.get('HTTP_AUTHORIZATION')
if auth_header is not None:
m = re.match(r'Bearer (?P<token>.+)', auth_header)
if m:
token = m.group('token')
else:
return graphql_error_response('Wrong Authorization header. Expected: "Bearer <token>"')
try:
username = parse_access_token(token)
except Exception:
return graphql_error_response('Invalid Token.', 401)
try:
request.user = User.objects.get(username=username)
except User.DoesNotExist:
pass
return self.get_response(request)
......@@ -37,6 +37,7 @@ MIDDLEWARE = [
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'openlobby.core.middleware.TokenAuthMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
]
......
# -*- coding: utf-8 -*-
# snapshottest: v1 - https://goo.gl/zC4yUc
from __future__ import unicode_literals
from snapshottest import Snapshot
snapshots = Snapshot()
snapshots['test_wrong_header 1'] = {
'errors': [
{
'message': 'Wrong Authorization header. Expected: "Bearer <token>"'
}
]
}
snapshots['test_invalid_token 1'] = {
'errors': [
{
'message': 'Invalid Token.'
}
]
}
import pytest
import json
from unittest.mock import Mock
from openlobby.core.auth import create_access_token
from openlobby.core.middleware import TokenAuthMiddleware
from openlobby.core.models import User
pytestmark = pytest.mark.django_db
def test_no_auth_header():
request = Mock()
request.user = None
request.META.get.return_value = None
middleware = TokenAuthMiddleware(lambda r: r)
response = middleware(request)
request.META.get.assert_called_once_with('HTTP_AUTHORIZATION')
assert response == request
assert response.user is None
def test_authorized_user():
user = User.objects.create(username='wolfe', first_name='Winston',
last_name='Wolfe', email='winston@wolfe.com')
request = Mock()
request.user = None
request.META.get.return_value = 'Bearer {}'.format(create_access_token('wolfe'))
middleware = TokenAuthMiddleware(lambda r: r)
response = middleware(request)
request.META.get.assert_called_once_with('HTTP_AUTHORIZATION')
assert response == request
assert response.user == user
def test_wrong_header(snapshot):
request = Mock()
request.user = None
request.META.get.return_value = 'WRONG {}'.format(create_access_token('unknown'))
middleware = TokenAuthMiddleware(lambda r: r)
response = middleware(request)
request.META.get.assert_called_once_with('HTTP_AUTHORIZATION')
assert response.status_code == 400
snapshot.assert_match(json.loads(response.content))
def test_invalid_token(snapshot):
request = Mock()
request.user = None
request.META.get.return_value = 'Bearer XXX{}'.format(create_access_token('unknown'))
middleware = TokenAuthMiddleware(lambda r: r)
response = middleware(request)
request.META.get.assert_called_once_with('HTTP_AUTHORIZATION')
assert response.status_code == 401
snapshot.assert_match(json.loads(response.content))
def test_unknown_user(snapshot):
request = Mock()
request.user = None
request.META.get.return_value = 'Bearer {}'.format(create_access_token('unknown'))
middleware = TokenAuthMiddleware(lambda r: r)
response = middleware(request)
request.META.get.assert_called_once_with('HTTP_AUTHORIZATION')
assert response == request
assert response.user is None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment