[FEAT] (사용자 로직) : 인증 서비스 구현 완료

v0.1.2 (2025-11-16)
- 로그인, 로그아웃, 토큰 갱신, 회원가입 API 구현 완료
- 로그 포맷 통일화
This commit is contained in:
2025-11-16 16:20:45 +09:00
parent ae2766cff5
commit e20c7d58b1
21 changed files with 792 additions and 56 deletions

View File

@@ -33,6 +33,20 @@ def create_app(config: Optional[Dict[str, Any]] = None) -> Flask:
if config: if config:
app.config.update(config) app.config.update(config)
# Werkzeug 로거 커스터마이징 (타임스탬프 제거)
import logging
from werkzeug.serving import WSGIRequestHandler
# Werkzeug의 기본 로그 포맷 오버라이드
class CustomRequestHandler(WSGIRequestHandler):
def log(self, type, message, *args):
# 기본 타임스탬프를 제거하고 메시지만 로깅
logger = logging.getLogger('werkzeug')
logger.info(message % args if args else message)
# 커스텀 핸들러를 Flask 앱에 설정
app.request_handler_class = CustomRequestHandler
# 로깅 설정 # 로깅 설정
logger = custom_logger(f"{settings.LOG_PREFIX}main_app") logger = custom_logger(f"{settings.LOG_PREFIX}main_app")
@@ -69,12 +83,22 @@ def main() -> None:
app = create_app() app = create_app()
# 로그 정보 # 로그 정보
from werkzeug.serving import WSGIRequestHandler
# Werkzeug의 기본 로그 포맷 오버라이드
class CustomRequestHandler(WSGIRequestHandler):
def log(self, type, message, *args):
import logging
# 기본 타임스탬프를 제거하고 메시지만 로깅
logger = logging.getLogger('werkzeug')
logger.info(message % args if args else message)
app.run( app.run(
host=settings.HOST, host=settings.HOST,
port=settings.PORT, port=settings.PORT,
debug=settings.DEBUG, debug=settings.DEBUG,
use_reloader=False use_reloader=False,
request_handler=CustomRequestHandler
) )
# Gunicorn이 import할 app 인스턴스 # Gunicorn이 import할 app 인스턴스

View File

@@ -7,8 +7,11 @@ from flask import Flask
def register_blueprints(app: Flask) -> None: def register_blueprints(app: Flask) -> None:
# Swagger API Blueprint 등록
from utils.swagger_config import api_blueprint from utils.swagger_config import api_blueprint
app.register_blueprint(api_blueprint, url_prefix='/api/v1') app.register_blueprint(api_blueprint, url_prefix='/api/v1')
# Flask-RESTX namespace에 리소스 등록
from . import bp_auth
__all__ = ['register_blueprints'] __all__ = ['register_blueprints']

163
src/blueprints/bp_auth.py Normal file
View File

@@ -0,0 +1,163 @@
"""
인증 관련 API 엔드포인트
가입, 로그인, 로그아웃 등
"""
from flask import request
from flask_restx import Resource
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from utils.response import *
from utils.swagger_config import auth_ns
from models.user_model import auth_swagger
from services.auth.auth_service import AuthService
@auth_ns.route("/register")
class Register(Resource):
@auth_ns.doc(
id="register",
summary="회원가입",
description="새로운 사용자를 등록합니다.",
tags=["Auth"]
)
@auth_ns.expect(auth_swagger.register_model)
@auth_ns.response(201, "Success - 회원가입 완료")
@auth_ns.response(400, "요청 변수가 올바르지 않습니다.")
@auth_ns.response(409, "이미 존재하는 ID")
@auth_ns.response(500, "서버 내부 오류")
def post(self):
"""회원가입"""
try:
# Content-Type 검증
if not request.is_json:
return bad_request_response(message="요청 변수가 올바르지 않습니다.")
request_data = request.get_json()
user = AuthService.register(request_data)
if user is None:
return conflict_response(message="이미 존재하는 ID입니다.")
return success_response(
data=user.to_dict(),
message="회원가입이 완료되었습니다.",
status_code=201
)
except Exception as e:
return internal_error_response(
message=str(e)
)
@auth_ns.route("/login")
class Login(Resource):
@auth_ns.doc(
id="login",
summary="로그인",
description="사용자 인증 후 JWT 토큰 발급",
tags=["Auth"]
)
@auth_ns.expect(auth_swagger.login_model)
@auth_ns.response(200, 'Success - 로그인 성공')
@auth_ns.response(400, '요청 변수가 올바르지 않습니다.')
@auth_ns.response(409, '로그인 할 수 없는 사용자입니다.')
@auth_ns.response(500, "서버 내부 오류")
def post(self):
"""로그인"""
try:
# Content-Type 검증
if not request.is_json:
return bad_request_response(message="요청 변수가 올바르지 않습니다.")
request_data = request.get_json()
# 로그인 처리
data = AuthService.login(request_data)
if data is None:
return conflict_response(message="로그인 할 수 없는 사용자입니다.")
user = data[0]
access_token = data[1]
refresh_token = data[2]
# UserIP 저장
AuthService.save_user_ips(request, user)
return success_response(
data={
"user": user.to_dict(),
"access_token": access_token,
"refresh_token": refresh_token
},
message="로그인 성공"
)
except Exception as e:
return internal_error_response(
message=str(e)
)
@auth_ns.route("/refresh")
class RefreshToken(Resource):
@auth_ns.doc(
id="refresh_token",
summary="액세스 토큰 갱신",
description="Refresh Token으로 새로운 Access Token을 발급하고, 사용된 Refresh Token을 무효화합니다.",
tags=["Auth"],
security=[{"Bearer": []}]
)
@auth_ns.response(200, "Success - 토큰 갱신 성공")
@auth_ns.response(409, "토큰 갱신에 실패하였습니다.")
@auth_ns.response(500, "서버 내부 오류")
@jwt_required(refresh=True)
def post(self):
"""토큰 갱신"""
try:
# 현재 리프레시 토큰 정보 추출
jwt_payload = get_jwt()
current_user_uuid = get_jwt_identity()
refresh_jti = jwt_payload["jti"]
# 새로운 액세스 토큰 생성
access_token = AuthService.refresh_access_token(current_user_uuid)
if access_token is None:
return conflict_response(message="토큰 갱신에 실패하였습니다.")
# 사용된 리프레시 토큰을 블랙리스트에 추가 (재사용 방지)
AuthService.logout(refresh_jti)
return success_response(
data={"access_token": access_token},
message="토큰 갱신 성공"
)
except Exception as e:
return internal_error_response(
message=str(e)
)
@auth_ns.route("/logout")
class Logout(Resource):
@auth_ns.doc(
id="logout",
summary="로그아웃",
description="로그아웃하고 토큰을 블랙리스트에 추가하여 재사용을 방지합니다.",
tags=["Auth"],
security=[{"Bearer": []}]
)
@auth_ns.response(200, "Success")
@auth_ns.response(401, "Unauthorized")
@auth_ns.response(500, "서버 내부 오류")
@jwt_required()
def post(self):
"""로그아웃"""
# 현재 토큰의 JTI 추출
jwt_payload = get_jwt()
jti = jwt_payload["jti"]
# 토큰을 블랙리스트에 추가
AuthService.logout(jti)
return success_response(message="로그아웃 성공")

View File

@@ -0,0 +1,3 @@
"""
유저 관련 API 엔드포인트
"""

View File

@@ -1,6 +1,6 @@
# API INFO # API INFO
API_VERSION = "v0.1.1" API_VERSION = "v0.1.2"
API_DATE = "2025-11-13" API_DATE = "2025-11-16"
# BASE DATE FORMAT # BASE DATE FORMAT
DATE_FORMAT = "%Y-%m-%d %H:%M:%S %z" DATE_FORMAT = "%Y-%m-%d %H:%M:%S %z"

View File

@@ -12,7 +12,7 @@ from utils.logger_manager import *
from modules.redis.redis_facade import redis_cli from modules.redis.redis_facade import redis_cli
logger = custom_logger(f"{settings.LOG_PREFIX})extenstions") logger = custom_logger(f"{settings.LOG_PREFIX}_extensions")
try: try:

View File

@@ -33,6 +33,55 @@ accesslog = settings.GUNICORN_ACCESSLOG
errorlog = settings.GUNICORN_ERRORLOG errorlog = settings.GUNICORN_ERRORLOG
loglevel = settings.GUNICORN_LOGLEVEL loglevel = settings.GUNICORN_LOGLEVEL
# 로그 포맷 통일 (logger_manager.py와 동일한 포맷)
# %(s)s: 상태코드, %(m)s: HTTP메서드, %(U)s: URL경로, %(q)s: 쿼리스트링
# %(h)s: 클라이언트IP, %({User-Agent}i)s: User-Agent
access_log_format = '%(s)s %(m)s %(U)s%(q)s - IP:%(h)s Agent:"%({User-Agent}i)s"'
# Gunicorn error/worker 로그 포맷 설정
logconfig_dict = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '[%(asctime)s] [%(levelname)s] (gunicorn) %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S %z'
},
'access': {
'format': '[%(asctime)s] [%(levelname)s] (gunicorn.access) %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S %z'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'default',
'stream': 'ext://sys.stdout'
},
'access_console': {
'class': 'logging.StreamHandler',
'formatter': 'access',
'stream': 'ext://sys.stdout'
},
},
'root': {
'level': settings.GUNICORN_LOGLEVEL.upper(),
'handlers': ['console']
},
'loggers': {
'gunicorn.error': {
'level': settings.GUNICORN_LOGLEVEL.upper(),
'handlers': ['console'],
'propagate': False,
},
'gunicorn.access': {
'level': settings.GUNICORN_LOGLEVEL.upper(),
'handlers': ['access_console'],
'propagate': False,
},
}
}
# 프로세스 네임 # 프로세스 네임
proc_name = "nuriq_server" proc_name = "nuriq_server"

View File

@@ -3,11 +3,12 @@ Models 레이어
""" """
# User # User
from .user_model import users, user_ips, user_dto from .user_model import users, user_ips, user_dto, auth_swagger
__all__ = [ __all__ = [
# User # User
users, users,
user_ips, user_ips,
user_dto user_dto,
auth_swagger
] ]

View File

@@ -0,0 +1,20 @@
"""
인증관련 Swagger 모델
"""
from utils.swagger_config import auth_ns
from flask_restx import fields
# Swagger 모델 정의
register_model = auth_ns.model('Register', {
'id': fields.String(required=True, description='사용자 ID', example='user123'),
'password': fields.String(required=True, description='비밀번호 (최소 6자)', example='password123'),
'name': fields.String(required=False, description='사용자 이름', example='홍길동')
})
login_model = auth_ns.model('Login', {
'id': fields.String(required=True, description='사용자 ID', example='user123'),
'password': fields.String(required=True, description='비밀번호', example='password123')
})
__all__ = ['register_model', 'login_model']

View File

@@ -11,7 +11,7 @@ class UserIps(db.Model):
__tablename__ = "user_ips" __tablename__ = "user_ips"
__table_args__ = ( __table_args__ = (
db.PrimaryKeyConstraint("user_uuid", "user_ip", name="pk_user_ips") db.PrimaryKeyConstraint("user_uuid", "user_ip", name="pk_user_ips"),
) )
# 기본 필드 # 기본 필드

View File

@@ -25,11 +25,11 @@ class Users(db.Model):
created_at = db.Column(db.BigInteger, nullable=True, comment="생성일 (unix timestamp, ms)") created_at = db.Column(db.BigInteger, nullable=True, comment="생성일 (unix timestamp, ms)")
updated_at = db.Column(db.BigInteger, nullable=True, comment="수정일 (unix timestamp, ms)") updated_at = db.Column(db.BigInteger, nullable=True, comment="수정일 (unix timestamp, ms)")
def __init__(self, id: str, password_hash: str, **kwargs: Any) -> None: def __init__(self, id: str, hash_password: str, **kwargs: Any) -> None:
"""사용자 초기화""" """사용자 초기화"""
self.user_uuid = str(uuid.uuid4()) self.user_uuid = str(uuid.uuid4())
self.id = id self.id = id
self.password = password_hash self.password = hash_password
# 선택적 필드 # 선택적 필드
self.user_name = kwargs.get('user_name') self.user_name = kwargs.get('user_name')

View File

@@ -29,3 +29,43 @@ class BaseRepository(ABC, Generic[T]):
def session(self): def session(self):
"""현재 DB 세션 가져오기""" """현재 DB 세션 가져오기"""
return db.session return db.session
def create(self, entity: T) -> T:
"""엔티티 생성"""
try:
self.session.add(entity)
self.session.flush()
logger.debug(f"{self.model.__name__} 생성완료: {getattr(entity, 'id', 'unknown')}")
return entity
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 생성 실패: {str(e)}")
raise
def update(self, entity) -> T:
"""엔티티 수정"""
try:
self.session.add(entity)
self.session.flush()
logger.debug(f"{self.model.__name__} 수정완료: {getattr(entity, 'id', 'unknown')}")
return entity
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 수정 실패: {str(e)}")
raise
def delete(self, entity: T) -> None:
"""엔티티 삭제"""
try:
self.session.delete(entity)
self.session.flush()
logger.debug(f"{self.model.__name__} 삭제완료: {getattr(entity, 'id', 'unknown')}")
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 삭제 실패: {str(e)}")
raise
def count(self) -> int:
"""엔티티 개수 반환"""
try:
return self.session.query(self.model).count()
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 개수 조회 실패: {str(e)}")
raise

View File

@@ -4,16 +4,38 @@
from typing import Optional, Type from typing import Optional, Type
from base_repository import BaseRepository from..base_repository import BaseRepository
from models.user_model import users, user_ips from models.user_model.users import Users
from models.user_model.user_ips import UserIps
from utils.func import get_timestamp_ms from utils.func import get_timestamp_ms
class UserIpsRepository(BaseRepository[user_ips.UserIps]): class UserIpsRepository(BaseRepository[UserIps]):
""" """
사용자 IPs Repository 사용자 IPs Repository
""" """
@property @property
def model(self) -> Type[user_ips.UserIps]: def model(self) -> Type[UserIps]:
"""모델 클래스 반환""" """모델 클래스 반환"""
return user_ips.UserIps return UserIps
def find_by_user_and_ip(self, user: Users, user_ip: str) -> Optional[UserIps]:
"""사용자 UUID와 IP로 조회"""
return self.session.query(UserIps).filter(
UserIps.user_uuid == user.user_uuid,
UserIps.user_ip == user_ip
).one_or_none()
def update_last_access(self, user_uuid: str, user_ip: str) -> UserIps:
"""사용자 IP 접속 시간 업데이트"""
user_ip = self.session.query(UserIps).filter_by(
user_uuid=user_uuid,
user_ip=user_ip
).one_or_none()
if user_ip:
user_ip.lasted_at = get_timestamp_ms()
self.session.flush()
return user_ip

View File

@@ -4,7 +4,7 @@
from typing import Optional, Type from typing import Optional, Type
from base_repository import BaseRepository from ..base_repository import BaseRepository
from models.user_model import users from models.user_model import users
from utils.func import get_timestamp_ms from utils.func import get_timestamp_ms
@@ -17,3 +17,17 @@ class UserRepository(BaseRepository[users.Users]):
def model(self) -> Type[users.Users]: def model(self) -> Type[users.Users]:
"""모델 클래스 반환""" """모델 클래스 반환"""
return users.Users return users.Users
def find_by_uuid(self, user_uuid: str) -> Optional[users.Users]:
"""UUID로 사용자 조회"""
return self.session.query(users.Users).filter_by(user_uuid=user_uuid).one_or_none()
def find_by_id(self, user_id: str) -> Optional[users.Users]:
"""ID로 사용자 조회"""
return self.session.query(users.Users).filter_by(id=user_id).one_or_none()
def exists_by_id(self, user_id: str) -> bool:
"""ID로 사용자 존재 여부 확인"""
return self.session.query(
self.session.query(users.Users).filter_by(id=user_id).exists()
).scalar()

View File

@@ -12,9 +12,12 @@ from repositories import UserRepository, UserIpsRepository
import settings import settings
from utils import func from utils import func
from utils.response import *
from utils.account_lock_policy import AccountLockPolicy from utils.account_lock_policy import AccountLockPolicy
from utils.db_decorators import transactional from utils.db_decorators import transactional
from services.auth.cache_service import JWTBlacklistService
from extensions import custom_logger from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_auth_service") logger = custom_logger(f"{settings.LOG_PREFIX}_auth_service")
@@ -26,7 +29,7 @@ class AuthService:
@staticmethod @staticmethod
@transactional @transactional
def register(user_data: Dict[str, Any]) -> Users: def register(user_data: Dict[str, Any]) -> Optional[Users]:
"""회원 가입""" """회원 가입"""
# Pydantic 검증 # Pydantic 검증
@@ -40,7 +43,7 @@ class AuthService:
# 중복 검사 # 중복 검사
if user_repo.exists_by_id(user_id): if user_repo.exists_by_id(user_id):
... return None
# 비밀번호 생성 # 비밀번호 생성
hash_password = func.hash_password(password) hash_password = func.hash_password(password)
@@ -48,7 +51,7 @@ class AuthService:
# 사용자 생성 # 사용자 생성
user = Users( user = Users(
id=user_id, id=user_id,
password=hash_password, hash_password=hash_password,
user_name=user_name user_name=user_name
) )
@@ -59,7 +62,7 @@ class AuthService:
@staticmethod @staticmethod
@transactional @transactional
def login(credentials: Dict[str, Any]) -> Tuple[Users, str, str]: def login(credentials: Dict[str, Any]) -> Optional[Tuple[Users, str, str]]:
# Pydantic 검증 # Pydantic 검증
check_user_dto = user_dto.CheckUserDTO(**credentials) check_user_dto = user_dto.CheckUserDTO(**credentials)
@@ -69,7 +72,7 @@ class AuthService:
# 사용자 조회 # 사용자 조회
user = user_repo.find_by_id(check_user_dto.id) user = user_repo.find_by_id(check_user_dto.id)
if not user: if not user:
... return None
# 계정 잠금 정책 체크 # 계정 잠금 정책 체크
lock_policy = AccountLockPolicy( lock_policy = AccountLockPolicy(
@@ -78,14 +81,13 @@ class AuthService:
) )
if lock_policy.is_locked(user.count): if lock_policy.is_locked(user.count):
... return None
# 비밀번호 검증 # 비밀번호 검증
if not func.verify_password(check_user_dto.password, user.password): if not func.verify_password(check_user_dto.password, user.password):
user.increment_failed_login() user.increment_failed_login()
user_repo.update(user) user_repo.update(user)
# commit되어 실패 횟수 기록 return None
...
# 로그인 성공 - 실패 횟수 초기화 # 로그인 성공 - 실패 횟수 초기화
user.reset_failed_login() user.reset_failed_login()
@@ -99,19 +101,26 @@ class AuthService:
logger.info(f"사용자 로그인: {user.id}") logger.info(f"사용자 로그인: {user.id}")
return user, access_token, refresh_token return (user, access_token, refresh_token)
@staticmethod @staticmethod
def refresh_access_token(user_uuid: str) -> str: def refresh_access_token(user_uuid: str) -> str:
"""엑세스 토큰 갱신""" try:
access_token = create_access_token(identity=user_uuid) """엑세스 토큰 갱신"""
logger.debug(f"Access Token 재발급: {user_uuid}") access_token = create_access_token(identity=user_uuid)
return access_token logger.debug(f"Access Token 재발급: {user_uuid}")
return access_token
except Exception as e:
logger.exception(f"유저 ({user_uuid}) 엑세스 토큰 갱신에 실패하였습니다. error={str(e)}")
return None
@staticmethod @staticmethod
def logout(jti: str) -> bool: def logout(jti: str) -> bool:
"""로그아웃 (JWT 블랙리스트 추가)""" """로그아웃 (JWT 블랙리스트 추가)"""
... success = JWTBlacklistService.add(jti)
if success:
logger.info(f"로그아웃 처리 완료 - JTI 블랙리스트 추가: {jti}")
return success
@staticmethod @staticmethod
@transactional @transactional
@@ -125,9 +134,13 @@ class AuthService:
user_agent = request.headers.get('User-Agent', '') if request.headers else '' user_agent = request.headers.get('User-Agent', '') if request.headers else ''
# 기존 IP 기록 조회 로직 # 기존 IP 기록 조회 로직
if True: existing_ip = user_ips_repo.find_by_user_and_ip(user, ip_address)
...
if existing_ip:
# 존재하면 마지막 접속 시간 업데이트
user_ips_repo.update_last_access(user.user_uuid, ip_address)
else: else:
# 없으면 새로 생성
user_ips = UserIps( user_ips = UserIps(
user_uuid=user.user_uuid, user_uuid=user.user_uuid,
user_ip=ip_address, user_ip=ip_address,

View File

@@ -2,3 +2,167 @@
캐시 서비스 캐시 서비스
Redis 기반 캐싱 로직 관리하는 서비스 레이어 Redis 기반 캐싱 로직 관리하는 서비스 레이어
""" """
import json
from typing import Optional, Dict, Any
from extensions import redis_cli
import settings
from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_auth_service")
class CacheService:
@staticmethod
def _is_redis_available() -> bool:
"""Redis 연결 상태 확인"""
return redis_cli.is_connected()
@staticmethod
def set(key: str, value: Any, ttl: int) -> bool:
"""Redis에 데이터 저장"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 등록 로직을 ㄴ생략합니다.")
return False
try:
if isinstance(value, dict):
value = json.dumps(value, ensure_ascii=False)
redis_cli.redis_client.setex(key, ttl, value)
logger.debug(f"캐시 저장: {key} (TTL: {ttl}s)")
return True
except Exception as e:
logger.error(f"캐시 저장 실패: {key} - {str(e)}")
return False
@staticmethod
def get(key: str, parse_json: bool = True) -> Optional[Any]:
"""Redis에서 데이터 조회"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 조회 로직을 생략합니다.")
return None
try:
value = redis_cli.redis_client.get(key)
if value:
logger.debug(f"Cache hit: {key}")
if parse_json:
return json.loads(value)
return value
logger.debug(f"Cache miss: {key}")
return None
except Exception as e:
logger.error(f"Failed to get cache {key}: {str(e)}")
return None
@staticmethod
def delete(key: str) -> bool:
"""Redis에서 데이터 삭제
Args:
key: Redis 키
Returns:
bool: 성공 여부
"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 삭제가 불가능 합니다.")
return False
try:
redis_cli.redis_client.delete(key)
logger.debug(f"Cache deleted: {key}")
return True
except Exception as e:
logger.error(f"Failed to delete cache {key}: {str(e)}")
return False
@staticmethod
def exists(key: str) -> bool:
"""Redis에 키가 존재하는지 확인"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 조회가 불가능 합니다.")
return False
try:
return redis_cli.redis_client.exists(key) > 0
except Exception as e:
logger.error(f"Failed to check cache existence {key}: {str(e)}")
return False
class UserCacheService:
"""사용자 캐시 관리 서비스"""
PREFIX = "user:cache:"
TTL = settings.REDIS_USER_CACHE_TTL
@classmethod
def _get_key(cls, user_uuid: str) -> str:
"""캐시 키 생성"""
return f"{cls.PREFIX}{user_uuid}"
@classmethod
def set(cls, user_uuid: str, user_data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""사용자 정보를 Redis에 캐싱"""
key = cls._get_key(user_uuid)
ttl = ttl or cls.TTL
return CacheService.set(key, user_data, ttl)
@classmethod
def get(cls, user_uuid: str) -> Optional[Dict[str, Any]]:
"""Redis에서 사용자 정보 조회"""
key = cls._get_key(user_uuid)
return CacheService.get(key, parse_json=True)
@classmethod
def delete(cls, user_uuid: str) -> bool:
"""캐시에서 사용자 정보 삭제"""
key = cls._get_key(user_uuid)
return CacheService.delete(key)
@classmethod
def refresh(cls, user_uuid: str, user_data: Dict[str, Any]) -> bool:
"""캐시 갱신 (삭제 후 재설정)"""
cls.delete(user_uuid)
return cls.set(user_uuid, user_data)
class JWTBlacklistService:
"""JWT 블랙리스트 관리 서비스 (로그아웃된 토큰 추적)"""
PREFIX = "jwt:blacklist:"
TTL = settings.REDIS_JWT_BLACKLIST_TTL
@classmethod
def _get_key(cls, jti: str) -> str:
"""블랙리스트 키 생성"""
return f"{cls.PREFIX}{jti}"
@classmethod
def add(cls, jti: str, ttl: Optional[int] = None) -> bool:
"""블랙리스트에 토큰 추가"""
key = cls._get_key(jti)
ttl = ttl or cls.TTL
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 블랙리스트에 추가가 블가능합니다.")
return False
success = CacheService.set(key, "1", ttl)
if success:
logger.info(f"JWT 블랙리스트 추가완료: {jti}")
return success
@classmethod
def is_blacklisted(cls, jti: str) -> bool:
"""토큰이 블랙리스트에 있는지 확인"""
key = cls._get_key(jti)
return CacheService.exists(key)

View File

@@ -28,7 +28,7 @@ DB_NAME = os.getenv('DB_NAME', 'test')
SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4" SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4"
SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ECHO = DEBUG SQLALCHEMY_ECHO = False
SQLALCHEMY_POOL_SIZE = int(os.getenv('DB_POOL_SIZE', 10)) SQLALCHEMY_POOL_SIZE = int(os.getenv('DB_POOL_SIZE', 10))
SQLALCHEMY_MAX_OVERFLOW = int(os.getenv('DB_MAX_OVERFLOW', 20)) SQLALCHEMY_MAX_OVERFLOW = int(os.getenv('DB_MAX_OVERFLOW', 20))
SQLALCHEMY_POOL_RECYCLE = int(os.getenv('DB_POOL_RECYCLE', 3600)) SQLALCHEMY_POOL_RECYCLE = int(os.getenv('DB_POOL_RECYCLE', 3600))
@@ -78,3 +78,11 @@ GUNICORN_MAX_REQUESTS_JITTER = int(os.getenv('GUNICORN_MAX_REQUESTS_JITTER', 50)
GUNICORN_ACCESSLOG = os.getenv('GUNICORN_ACCESSLOG', '-') # '-' = stdout GUNICORN_ACCESSLOG = os.getenv('GUNICORN_ACCESSLOG', '-') # '-' = stdout
GUNICORN_ERRORLOG = os.getenv('GUNICORN_ERRORLOG', '-') GUNICORN_ERRORLOG = os.getenv('GUNICORN_ERRORLOG', '-')
GUNICORN_LOGLEVEL = os.getenv('GUNICORN_LOGLEVEL', 'info') GUNICORN_LOGLEVEL = os.getenv('GUNICORN_LOGLEVEL', 'info')
# 보안 설정 - 로그인 실패
MAX_LOGIN_ATTEMPTS = int(os.getenv('MAX_LOGIN_ATTEMPTS', 5))
ACCOUNT_LOCKOUT_DURATION_MINUTES = int(os.getenv('ACCOUNT_LOCKOUT_DURATION_MINUTES', 30))
# Redis 캐시 TTL 설정 (초 단위)
REDIS_USER_CACHE_TTL = int(os.getenv('REDIS_USER_CACHE_TTL', 300)) # 5분
REDIS_JWT_BLACKLIST_TTL = int(os.getenv('REDIS_JWT_BLACKLIST_TTL', 86400)) # 24시간

View File

@@ -7,7 +7,7 @@ from typing import Callable
from extensions import custom_logger from extensions import custom_logger
import settings import settings
logger = custom_logger(f"{settings.LOG_PREFIX})_db_decorator") logger = custom_logger(f"{settings.LOG_PREFIX}_db_decorator")
_db = None _db = None
@@ -17,7 +17,7 @@ def get_db():
if _db is None: if _db is None:
from extensions import db from extensions import db
_db = db _db = db
return db return _db
def transactional(f: Callable) -> Callable: def transactional(f: Callable) -> Callable:
""" """
@@ -41,7 +41,7 @@ def transactional(f: Callable) -> Callable:
except Exception as e: except Exception as e:
# 실패 시 롤백 # 실패 시 롤백
db.session.rollback() db.session.rollback()
logger.error(f"트랜잭션 롤백 - {f.__name__}") logger.error(f"트랜잭션 롤백 - {f.__name__}: {str(e)}")
raise raise
return decorated_function() return decorated_function

View File

@@ -40,7 +40,7 @@ class StdLogRedirect:
s = s.decode(errors="replace") s = s.decode(errors="replace")
self._buf += s self._buf += s
while True: while True:
idx = self._bug.find("\n") idx = self._buf.find("\n")
if idx == -1: if idx == -1:
break break
line = self._buf[:idx+1:] line = self._buf[:idx+1:]
@@ -80,18 +80,39 @@ logging.basicConfig(
level=settings.LOG_LEVEL_TEXT, level=settings.LOG_LEVEL_TEXT,
format=log_format, format=log_format,
datefmt=date_format, datefmt=date_format,
stream=sys.stdout stream=sys.stdout,
force=True
) )
# create own logger class to prevent init custom loggers by other libs # create own logger class to prevent init custom loggers by other libs
class GatekeeperLogger(logging.Logger): class GatekeeperLogger(logging.Logger):
def assHandler(self, h): def addHandler(self, h):
# only 'root', '__main__' ans own loggers will be accepted # only 'root', '__main__' and own loggers will be accepted
if self.name == "root" or self.name.startswith(('LOG_PREFIX'), "__main__"): if self.name == "root" or self.name.startswith((settings.LOG_PREFIX, "__main__")):
return super().addHandler(h) return super().addHandler(h)
logging.setLoggerClass(GatekeeperLogger) logging.setLoggerClass(GatekeeperLogger)
# Werkzeug 로거 설정 (Flask 개발 서버용)
# 기존 werkzeug 로거의 핸들러를 모두 제거하고 커스텀 포맷 적용
import logging as _logging
werkzeug_logger = _logging.getLogger('werkzeug')
werkzeug_logger.handlers = [] # 기존 핸들러 제거
werkzeug_logger.setLevel(settings.LOG_LEVEL_TEXT)
handler = _logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter) # 통일된 포맷 사용
werkzeug_logger.addHandler(handler)
werkzeug_logger.propagate = False
# Flask의 기본 로거도 동일하게 설정
flask_logger = _logging.getLogger('flask.app')
flask_logger.handlers = []
flask_logger.setLevel(settings.LOG_LEVEL_TEXT)
flask_handler = _logging.StreamHandler(sys.stdout)
flask_handler.setFormatter(formatter)
flask_logger.addHandler(flask_handler)
flask_logger.propagate = False
def custom_logger(log_prefix: str): def custom_logger(log_prefix: str):
import logging import logging

185
src/utils/response.py Normal file
View File

@@ -0,0 +1,185 @@
""""
공통 응답 유틸리티 모듈
API 응답을 일관서 있게 처리하기 위한 헬퍼 함수들
"""
from typing import Any, Dict, List, Optional, Union
from flask import jsonify
from werkzeug.http import HTTP_STATUS_CODES
def _make_response_body(status: str, message: str, data: Any = None,
errors: Any = None, meta: Any = None, error_code: Optional[str] = None) -> Dict[str, Any]:
"""응답 구조 공통"""
response = {
"status": status,
"message": message
}
if data is not None:
response["data"] = data
if errors is not None:
response["error"] = errors
if meta is not None:
response["meta"] = meta
if error_code is not None:
response["error_code"] = error_code
return response
def _is_restx_context() -> bool:
"""
Flask-RestX는 has_request_context()가 True이지만,
jsonify()된 Response를 재직려화함으로 RESTX 사용 시에는 jsonify를 피해야함.
"""
return True
def success_response(data: Any = None, message: str = "Success",
status_code: int = 200, meta: Optional[Dict[str, Any]] = None) -> tuple:
"""
성공 응답을 생성
Args:
data: 응답 데이터
message: 응답 메시지
status_code: HTTP 상태 코드
meta: 추가 메타 데이터
Returns:
tuple: (response, status_code)
"""
body = _make_response_body(message, message, data=data, meta=meta)
return (jsonify(body), status_code) if not _is_restx_context() else (body, status_code)
def error_response(message: str = "An error occurred", status_code: int = 400,
errors: Optional[Union[Dict[str, Any], List[str]]] = None,
error_code: Optional[str] = None) -> tuple:
"""
에러 응답을 반환
Args:
message: 에러 메시지
status_code: HTTP 상태 코드
errors: 상세 에러 정보
error_code: 에러 코드
Returns:
tuple: (response, status_code)
"""
body = _make_response_body(
message,
message,
errors=errors,
error_code=error_code or HTTP_STATUS_CODES.get(status_code, 'UNKNOWN_ERROR')
)
return (jsonify(body), status_code) if not _is_restx_context() else (body, status_code)
def bad_request_response(message: str = "Bad request", errors: Optional[Union[Dict[str, Any], List[str]]] = None) -> tuple:
"""
400 Bad Request 응답
Args:
message: 잘못된 요청 메시지
errors: 상세 에러 정보
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=400,
errors=errors,
error_code="BAD_REQUEST"
)
def unauthorized_response(message: str = "Authentication required") -> tuple:
"""
401 Unauthorized 응답
Args:
message: 인증 실패 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=401,
error_code="UNAUTHORIZED"
)
def forbidden_response(message: str = "Access denied") -> tuple:
"""
403 Forbidden 응답
Args:
message: 접근 거부 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=403,
error_code="FORBIDDEN"
)
def not_found_response(message: str = "Resource not found") -> tuple:
"""
404 Not Found 응답
Args:
message: 리소스를 찾을 수 없음 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=404,
error_code="NOT_FOUND"
)
def conflict_response(message: str = "Resource already exists") -> tuple:
"""
409 Conflict 응답
Args:
message: 리소스 충돌 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=409,
error_code="CONFLICT"
)
def unsupported_media_type_response(message: str = "Content-Type must be 'application/json'") -> tuple:
"""
415 Unsupported Media Type 응답
Args:
message: 지원하지 않는 미디어 타입 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=415,
error_code="UNSUPPORTED_MEDIA_TYPE"
)
def internal_error_response(message: str = "Internal server error") -> tuple:
"""
500 Internal Server Error 응답
Args:
message: 서버 에러 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=500,
error_code="INTERNAL_ERROR"
)

View File

@@ -3,9 +3,8 @@ Swagger 설정
""" """
# Third-Party Library Imports # Third-Party Library Imports
from flask import Blueprint, jsonify from flask import Blueprint
from flask_restx import Api from flask_restx import Api
from flask_jwt_extended.exceptions import JWTExtendedException, RevokedTokenError
import constants import constants
@@ -30,5 +29,12 @@ api = Api(
validate=True validate=True
) )
# 네임 스페이스 정의 및 API 추가
auth_ns = api.namespace('auth', description='인증 API', path='/auth')
# 네임 스페이스 정의 및 API에 추가 # 네임 스페이스 정의 및 API에 추가
__all__ = ['api_blueprint', 'api'] __all__ = [
'api_blueprint', 'api',
'auth_ns'
]