웹 애플리케이션 개발에서 인증은 매우 중요한 부분이고, JWT는 웹과 API에서 인증 및 권한 부여를 효율적이고 안전하게 관리해주는 매우 유용한 인증 기법이다.
처음 JWT에 대해서 공부하고 적용시키는데 많은 검색과 고민을 했고 여러가지 레퍼런스들을 참고하면서 진행하였는데, 좋은 레퍼런스를 찾았으나 재대로 이해하기 어려워서 정리하면서 이해하고자 작성하게 되었다.
기본적인 설정은 아래의 블로그 글을 참고하여 적용시켰다.
Django-Rest-Framework(DRF)로 JWT 기반 Authentication 세팅하기(with simplejwt) — 초기 환경 세팅(1)
앞서 포스팅했던 소셜 로그인 구현에서 생각보다 많은 개발자 분들이 봐주신 덕분에 상위노출도 되어 기뻤지만, 이전 코드를 다시 보니 많이 부족하단 생각이 들었다. 특히 JWT 부분에서 이해력
medium.com
Custom 하기
Settings.py
우선 settings에서 인증백엔드를 추가해주었다.
AUTHENTICATION_BACKENDS = (
'core.backends.CustomJWTBackend',
)
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'core.middlewares.CustomJWTAuthentication',
),
...
}
AUTHENTICATION_BACKENDS 설정은 Django에서 사용자 인증 시스템을 커스터마이징 할 때, 사용하는 중요한 설정이다. Django가 사용자를 인증하기 위해 어떤 백엔드를 사용할지 결정한다.
core폴더안에 backends라는 파일을 만들어 CustomJWTBackend 클래스를 만들어 정의해줄 것이고, 그것을 인증 백엔드로 사용할 것이기 때문에 위의 코드처럼 작성하였다.
REST_FRAMEWORK 설정은 DRF를 사용하여 RESTful API를 개발할 때 필요한 다양한 설정들을 포함하는 설정이다. 여기서 DEFAULT_AUTHENTICATION_CLASSES는 API에 대한 기본 인증 클래스를 정의하는데 사용된다.
역시 core폴더안에 middlewares파일을 만들어 CustomJWTAuthentication 클래스를 만들어 정의해줄 것이기 때문에 위 코드처럼 작성하였다.
SessionAuthentication은 세션인증을 의미하며, 기본 세션 메커니즘을 이용하여 인증합니다.
BasicAuthentication은 HTTP인증을 의미하며, 요청 헤더에 인증 정보를 전송하여 사용자를 인증합니다.
backends.py
앞서 설명했듯이 사용자를 인증하기 위한 백엔드를 구현하는 파일이다.
# core.backends
from django.conf import settings
from rest_framework.request import Request
from rest_framework_simplejwt.authentication import JWTAuthentication
from core.exceptions.service_exceptions import (
UserNotFound,
UserPasswordInvalid,
AccessTokenUnAuthorized
)
from api.models.users.models import User
class CustomJWTBackend(JWTAuthentication):
"""
:comment: TokenObtainPairSerializer using this Authentication Class
"""
def authenticate(self, request: Request, **credentials):
"""
:param request: request object from `/admin/login` or `/api/v1/users/login`
:param credentials: {"username": EMAIL_INFO, "password": "PASSWORD"} or
{"email": EMAIL_INFO, "password": "PASSWORD"}
:return: user object.
"""
_params = {
self.user_model().USERNAME_FIELD: (
credentials.get(self.user_model().USERNAME_FIELD) or
credentials.get("username")
)
}
user = User.objects.filter(**_params, is_active=True).first()
if not user:
raise UserNotFound
if user and not user.check_password(credentials.get("password")):
raise UserPasswordInvalid
return user
def get_user(self, _key):
"""
:param _key: This value maybe `user id` or `JWT in dict`.
:comment: if django admin session authentication, using `user id` to get user object.
if jwt(default) authentication, using `jwt` to get user object.
"""
try:
if isinstance(_key, int):
# if Django Admin Session, _key is `user's id`
user_id = _key
_filter = {"id": user_id}
else:
# else API JWT
_filter = {self.user_model().USERNAME_FIELD: _key[settings.SIMPLE_JWT_USER_ID_CLAIM]}
except KeyError:
raise AccessTokenUnAuthorized
try:
user = self.user_model.objects.get(**_filter)
except self.user_model.DoesNotExist as e:
raise UserNotFound
except Exception as e:
raise AccessTokenUnAuthorized
if not user.is_active:
raise AccessTokenUnAuthorized(message="서비스를 사용할 수 없는 유저입니다.")
return user
rest_framework_simplejwt.authentication.JWTAuthentication을 상속받아 구현했다.
**credentials는 사용자가 제공한 자격 증명 정보이다.
def authenticate(self, request: Request, **credentials):
_params 변수는 사용자 인증에 필요한 파라미터를 담기 위해서 credentials에서 user모델의 USERNAME_FIELD에 해당하는 값을 찾고 없을 경우 username의 키를 찾아 준다.
이후 User객체에서 _params 변수를 사용하여 사용자 데이터를 찾는다.
def get_user(self, _key):
사용자를 식별하기 위한 정보 _key를 매개변수로 받는다. _key를 이용해서 user 객체를 반환한다.
middlewares.py
미들웨어를 구성하는 파일이다. REST API의 엔드포인트에 접근할 때마다 JWT 토큰을 검증하고 검증에 실패하면 예외처리를 해주는 코드이다.
from rest_framework_simplejwt.settings import api_settings
from rest_framework_simplejwt.authentication import JWTAuthentication
from rest_framework_simplejwt.exceptions import (
AuthenticationFailed,
InvalidToken,
TokenError
)
from rest_framework_simplejwt.token_blacklist.models import (
OutstandingToken
)
from core.exceptions.service_exceptions import (
AccessTokenUnAuthorized,
JWTOutstandingNotFound
)
class CustomJWTAuthentication(JWTAuthentication):
"""
:comment: Validate access-token, refresh-token when client calling any APIs.
"""
def authenticate(self, request):
try:
return super().authenticate(request)
except (InvalidToken, AuthenticationFailed, TokenError):
raise AccessTokenUnAuthorized
def get_validated_token(self, raw_token):
for AuthToken in api_settings.AUTH_TOKEN_CLASSES:
try:
# Verify access-token if refresh_jti matched with outstanding refresh-token and blacklisted.
auth_token = AuthToken(raw_token)
refresh_jti = auth_token.get("refresh_jti", "")
refresh_token_instance = OutstandingToken.objects.filter(
jti=refresh_jti
).select_related(
'blacklistedtoken'
).filter(
blacklistedtoken__isnull=True
).first()
if not refresh_token_instance:
raise JWTOutstandingNotFound
return auth_token
except TokenError:
raise AccessTokenUnAuthorized
raise AccessTokenUnAuthorized
authenticate(self, request):
상위 클래스를 호출하여 JWT를 검증하고, 토큰이 유효하지 않거나 인증에 실패하면 예외를 발생시킨다.
get_validated_token(self, raw_token):
AUTH_TOKEN_CLASSES에서 인증 토큰 클래스를 확인하고, 토큰을 인스턴스화 해서 블랙리스트 토큰에 있는지 확인한다.
Mixin.py
TokenMixin: 토큰 및 사용자 관련 작업을 추상화하여 코드의 재사용성 및 가독성을 향상시키기위함
SimpleJWTMixin: view에서 header의 토큰을 추출하여 user 정보를 가지고 오기 위함
from rest_framework_simplejwt.tokens import (
Token,
AccessToken,
RefreshToken,
OutstandingToken
)
from core.exceptions.service_exceptions import (
UserNotFound,
InvalidRequest
)
from api.models.users.models import User
class TokenMixin:
_access_token_class = AccessToken
_refresh_token_class = RefreshToken
@staticmethod
def set_refresh_jti_into_access(refresh: RefreshToken) -> AccessToken:
access: AccessToken = refresh.access_token
access['refresh_jti'] = refresh.get("jti")
return access
@staticmethod
def get_refresh_token_instance(filters: dict):
return OutstandingToken.objects.filter(**filters).select_related(
'blacklistedtoken'
).filter(blacklistedtoken__isnull=True).first()
def is_active_user(self, _value: str = "", _value_type: str = "email"):
"""
:param _value: str(Token) | str(EMAIL)
:param _value_type: "access_token" | "refresh_token" | "email"
:comment: User can have only one activated account.
:return: True | False
"""
user = self._get_users_object(_value, _value_type)
if not user:
raise UserNotFound
return user.exists()
@staticmethod
def _get_users_object(_value: str | Token = "", _value_type: str = "email", token_verifying=False):
if not _value:
raise UserNotFound(message="유저를 특정할 정보를 찾을 수 없습니다.")
_filters = {
"id": None,
"is_active": True,
}
if _value_type == "email":
user = User.objects.filter(email=_value, is_active=True).first()
if not user:
raise UserNotFound
_filters['id'] = user.id
elif _value_type == "access_token":
_filters['id'] = AccessToken(_value, verify=token_verifying).get("user_id")
elif _value_type == "refresh_token":
_filters['id'] = RefreshToken(_value, verify=token_verifying).get("user_id")
else:
pass
return User.objects.filter(**_filters)
def invalid_user_handler(self, request, *args, **kwargs):
if request.user.id != kwargs.get(self.lookup_field):
raise InvalidRequest
class SimpleJWTMixin(CustomJWTAuthentication, TokenMixin):
def get_raw_token(self, header: bytes) -> bytes | None:
return super().get_raw_token(header)
def get_user_instance(self, request, _type="access_token", token_verifying=False):
access_token = self.get_raw_token(header=self.get_header(request))
user = self._get_users_object(_value=access_token.decode(), _value_type=_type, token_verifying=token_verifying)
if not user:
raise UserNotFound
return user.first()
_access_token_class, _refresh_token_class:
AccessToken 및 RefreshToken 클래스를 정의하는 속성, 기본값은 AccessToken 및 RefreshToken이다.
set_refresh_jti_into_access(refresh: RefreshToken) -> AccessToken:
RefreshToken에서 access 토큰으로 refresh_jti 값을 설정하는 메서드, 새로 발급된 access 토큰이 refresh 토큰의 jti 값을 포함하도록 한다.
get_refresh_token_instance(filters: dict):
특정 필터를 기반으로 OutstandingToken 모델에서 refresh 토큰 인스턴스를 가져오는 메서드로 refresh 토큰의 유효성을 검사하고, 해당 토큰이 블랙리스트에 있는지 확인하는 데 사용
is_active_user(self, _value: str = "", _value_type: str = "email"):
사용자가 활성 상태인지 확인하는 메서드, 주어진 값과 유형에 따라 사용자를 식별하고 활성 상태 여부를 확인합니다.
_get_users_object(_value: str | Token = "", _value_type: str = "email", token_verifying=False):
주어진 값 및 유형에 따라 사용자 객체를 가져오는 내부 메서드, 주어진 값에 따라 이메일, access 토큰 또는 refresh 토큰으로부터 사용자를 식별합니다.
invalid_user_handler(self, request, *args, **kwargs):
요청된 사용자가 올바르지 않은 경우를 처리하는 메서드입니다. 이는 요청된 사용자가 예상되는 사용자와 일치하지 않을 때 예외를 발생시킵니다.
serializers.py
토큰 생성 및 관리를 수행하는 시리얼라이저 클래스, 로그인시
from django.contrib.auth.models import update_last_login
from rest_framework_simplejwt.exceptions import *
from rest_framework_simplejwt.settings import api_settings
from rest_framework_simplejwt.serializers import (
TokenBlacklistSerializer,
TokenObtainPairSerializer
)
from core.exceptions.service_exceptions import (
UserIsNotAuthorized
)
from api.models.users.models import User
from .mixins import TokenMixin
class CustomTokenObtainPairSerializer(TokenObtainPairSerializer, TokenMixin):
def validate(self, attrs):
if not self.is_active_user(_value=attrs.get(self.username_field), _value_type="email"):
raise UserIsNotAuthorized
_ = super(TokenObtainPairSerializer, self).validate(attrs)
refresh = self.get_token(self.user)
access = self.set_refresh_jti_into_access(refresh)
if api_settings.UPDATE_LAST_LOGIN:
update_last_login(None, self.user)
return {
"id": self.user.id,
"access_token": str(access),
"refresh_token": str(refresh),
}
def get_token(self, user: User):
"""
:comment: Get refresh token & check has already outstanding refresh-token.
"""
refresh_token_instance = self.get_refresh_token_instance(filters={"user_id": user.id})
if refresh_token_instance:
"""
* If `refresh-token` is already outstanding, blacklisting it.
* Only single device can access this app.
"""
blacklist_serializer = TokenBlacklistSerializer(data={"refresh": refresh_token_instance.token})
"""
* If `refresh-token` exist and not blacklisted but expired(or invalid),
delete this token from database.
"""
try:
blacklist_serializer.is_valid()
except (TokenError, TokenBackendError, InvalidToken, AuthenticationFailed):
refresh_token_instance.delete()
return super().get_token(user)
class CustomTokenBlacklistSerializer(TokenBlacklistSerializer, TokenMixin):
def validate(self, attrs: Dict[str, Any]) -> Dict[Any, Any]:
return super().validate(attrs)
참고
GitHub - kkamikoon/retro-ctf-backend: Retro CTF(해킹대회) 서비스를 위한 백엔드 서버입니다. Django Rest Framewo
Retro CTF(해킹대회) 서비스를 위한 백엔드 서버입니다. Django Rest Framework를 이용하여 개발되었습니다. - kkamikoon/retro-ctf-backend
github.com
'Django > DRF' 카테고리의 다른 글
[회고록] 효율적인 Django 시작하기: initialize_django 프로젝트 (0) | 2024.05.27 |
---|---|
[Custom] Custom Logging Mixin With CRUD Mixin (0) | 2024.05.23 |
[Custom] Django와 DRF에서 커스텀 예외처리하기(custom_exception) (0) | 2024.05.16 |
[TEST] 테스트 DB 없이 Django에서 유닛 테스트 수행하기 (0) | 2024.05.14 |
[CORS] DRF에서 CORS 문제 해결 및 테스트 방법 (0) | 2024.05.14 |