Compare commits

...

3 Commits

Author SHA1 Message Date
6daf842642 Merge pull request 'dev' (#4) from dev into main
Reviewed-on: #4
2025-11-16 09:04:41 +00:00
fc67076e80 Merge pull request '[FEAT] (사용자 로직): 프로필 서비스 구현 완료' (#3) from dev_stage into dev
Reviewed-on: #3
2025-11-16 09:03:15 +00:00
abf405f8ae [FEAT] (사용자 로직): 프로필 서비스 구현 완료
v0.1.3 (2025-11-16)
- 프로필 조회, 프로필 업데이트, 탈퇴 구현 완료.
2025-11-16 18:02:27 +09:00
20 changed files with 464 additions and 109 deletions

View File

@@ -9,9 +9,9 @@ def register_blueprints(app: Flask) -> None:
# Swagger API Blueprint 등록
from utils.swagger_config import api_blueprint
app.register_blueprint(api_blueprint, url_prefix='/api/v1')
app.register_blueprint(api_blueprint, url_prefix="/api/v1")
# Flask-RESTX namespace에 리소스 등록
from . import bp_auth
from . import bp_auth, bp_users
__all__ = ['register_blueprints']
__all__ = ["register_blueprints"]

View File

@@ -61,9 +61,9 @@ class Login(Resource):
tags=["Auth"]
)
@auth_ns.expect(auth_swagger.login_model)
@auth_ns.response(200, 'Success - 로그인 성공')
@auth_ns.response(400, '요청 변수가 올바르지 않습니다.')
@auth_ns.response(409, '로그인 할 수 없는 사용자입니다.')
@auth_ns.response(200, "Success - 로그인 성공")
@auth_ns.response(400, "요청 변수가 올바르지 않습니다.")
@auth_ns.response(409, "로그인 할 수 없는 사용자입니다.")
@auth_ns.response(500, "서버 내부 오류")
def post(self):
"""로그인"""

View File

@@ -1,3 +1,95 @@
"""
유저 관련 API 엔드포인트
"""
from flask import request, g
from flask_restx import Resource
from flask_jwt_extended import jwt_required, get_jwt
from utils.response import *
from utils.swagger_config import user_ns
from utils.decorators import load_current_user
from models.user_model import user_swagger
from services.user.user_service import UserService
@user_ns.route("/me")
class MyProfile(Resource):
@user_ns.doc(
id="get_my_profile",
summary="내 프로필 조회",
description="현재 로그인한 사용의 프로필을 조회합니다. (토큰 기반)",
tags=["Users"],
security=[{"Bearer": []}]
)
@user_ns.response(200, "Success - 프로필 조회 완료")
@user_ns.response(401, "인증 실패에 실패하였습니다.")
@user_ns.response(500, "서버 내부 오류")
@jwt_required()
@load_current_user
def get(self):
"""내 프로필 조회"""
return success_response(
data=g.current_user.to_dict(),
message="프로필 조회 성공"
)
@user_ns.doc(
id="update_my_profile",
summary="내 프로필 업데이트",
description="현재 로그인한 사용자의 프로필을 업데이트 합니다. (토큰 기반)",
tags=["Users"],
security=[{"Bearer": []}]
)
@user_ns.expect(user_swagger.update_profile_model)
@user_ns.response(200, "Success - 프로필 업데이트 완료")
@user_ns.response(400, "잘못된 요청 데이터")
@user_ns.response(401, "인증 실패에 실패하였습니다.")
@user_ns.response(500, "서버 내부 오류")
@jwt_required()
@load_current_user
def put(self):
"""내 프로필 업데이트"""
user = g.current_user
data = request.get_json()
updated_user = UserService.update_user_profile(user.user_uuid, data)
return success_response(
data=updated_user.to_dict(),
message="프로필 업데이트 성공"
)
@user_ns.doc(
id="delete_my_account",
summary="회원 탈퇴",
description="현재 로그인한 사용자의 계정을 삭제합니다. (토큰 기반). 탈퇴 후 모든 데이터가 삭제되며 복구할 수 없습니다.",
tags=["Users"],
security=[{"Bearer": []}]
)
@user_ns.response(200, "Success - 회원 탈퇴 완료")
@user_ns.response(401, "인증에 실패하였습니다.")
@user_ns.response(404, "사용자를 찾을 수 없습니다.")
@user_ns.response(500, "서버 내부 오류")
@jwt_required()
@load_current_user
def delete(self):
"""회원 탈퇴"""
user = g.current_user
# JWT ID 추출 (로그아웃 처리용)
jwt_data = get_jwt()
jti = jwt_data.get("jti")
# 사용자 삭제 (캐시 무효화 + JWT 블랙리스트 추가 포함)
success = UserService.delete_user(user.user_uuid, jti=jti)
if not success:
return error_response(
message="회원 탈퇴에 실패했습니다.",
status_code=404
)
return success_response(
message="회원 탈퇴가 완료되었습니다."
)

View File

@@ -1,5 +1,5 @@
# API INFO
API_VERSION = "v0.1.2"
API_VERSION = "v0.1.3"
API_DATE = "2025-11-16"
# BASE DATE FORMAT

View File

@@ -6,15 +6,15 @@ from utils.swagger_config import auth_ns
from flask_restx import fields
# Swagger 모델 정의
register_model = auth_ns.model('Register', {
'id': fields.String(required=True, description='사용자 ID', example='user123'),
'password': fields.String(required=True, description='비밀번호 (최소 6자)', example='password123'),
'name': fields.String(required=False, description='사용자 이름', example='홍길동')
register_model = auth_ns.model("Register", {
"id": fields.String(required=True, description="사용자 ID", example="user123"),
"password": fields.String(required=True, description="비밀번호 (최소 6자)", example="password123"),
"user_name": fields.String(required=False, description="사용자 이름", example="홍길동")
})
login_model = auth_ns.model('Login', {
'id': fields.String(required=True, description='사용자 ID', example='user123'),
'password': fields.String(required=True, description='비밀번호', example='password123')
login_model = auth_ns.model("Login", {
"id": fields.String(required=True, description="사용자 ID", example="user123"),
"password": fields.String(required=True, description="비밀번호", example="password123")
})
__all__ = ['register_model', 'login_model']
__all__ = ["register_model", "login_model"]

View File

@@ -11,20 +11,20 @@ class CreateUserDTO(BaseModel):
# 선택
user_name: Optional[str] = None
@field_validator('id')
@field_validator("id")
@classmethod
def validate_id(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError('ID는 필수입니다')
raise ValueError("ID는 필수입니다")
if len(v) < 3 or len(v) > 50:
raise ValueError('ID는 3-50자 사이여야 합니다')
raise ValueError("ID는 3-50자 사이여야 합니다")
return v.strip()
@field_validator('password')
@field_validator("password")
@classmethod
def validate_password(cls, v):
if not v or len(v) < 6:
raise ValueError('비밀번호는 최소 6자 이상이어야 합니다')
raise ValueError("비밀번호는 최소 6자 이상이어야 합니다")
return v
# 자동 형변환 방지
@@ -39,11 +39,11 @@ class CheckUserDTO(BaseModel):
id: str
password: str
@field_validator('id', 'password')
@field_validator("id", "password")
@classmethod
def validate_required(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError('필수 항목입니다')
raise ValueError("필수 항목입니다")
return v.strip()
# 자동 형변환 방지

View File

@@ -38,11 +38,11 @@ class UserIps(db.Model):
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리 변환"""
return {
'user_uuid': self.user_uuid,
'user_ip': self.user_ip,
'user_agent': self.user_agent,
'created_at': self.created_at,
'lasted_at': self.lasted_at
"user_uuid": self.user_uuid,
"user_ip": self.user_ip,
"user_agent": self.user_agent,
"created_at": self.created_at,
"lasted_at": self.lasted_at
}
def __repr__(self) -> str:

View File

@@ -0,0 +1,11 @@
"""
유저 관련 Swagger 모델
"""
from utils.swagger_config import user_ns
from flask_restx import fields
# Swagger 모델 정의
update_profile_model = user_ns.model("UpdateProfile", {
"user_name": fields.String(required=False, description="사용자 이름", example="홍길동")
})

View File

@@ -32,7 +32,7 @@ class Users(db.Model):
self.password = hash_password
# 선택적 필드
self.user_name = kwargs.get('user_name')
self.user_name = kwargs.get("user_name")
# 타임스탬프
current_time = int(get_timestamp_ms())
@@ -56,13 +56,13 @@ class Users(db.Model):
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'user_uuid': self.user_uuid,
'id': self.id,
'user_name': self.user_name,
'auth_level': self.auth_level,
'count': self.count,
'created_at': self.created_at,
'updated_at': self.updated_at
"user_uuid": self.user_uuid,
"id": self.id,
"user_name": self.user_name,
"auth_level": self.auth_level,
"count": self.count,
"created_at": self.created_at,
"updated_at": self.updated_at
}
def __repr__(self) -> str:

View File

@@ -13,11 +13,11 @@ class RedisFacadeOpr():
def __init__(self) -> None:
"""Redis 설정 초기화"""
self.redis_config = {
'host': settings.REDIS_HOST,
'port': settings.REDIS_PORT,
'password': settings.REDIS_PASSWORD,
'decode_responses': True, # auto-decode response
'db': settings.REDIS_DB
"host": settings.REDIS_HOST,
"port": settings.REDIS_PORT,
"password": settings.REDIS_PASSWORD,
"decode_responses": True, # auto-decode response
"db": settings.REDIS_DB
}
self.redis_client: Optional[redis.StrictRedis] = None

View File

@@ -10,7 +10,7 @@ from extensions import db, custom_logger
import settings
# Generic type for model entitles
T = TypeVar('T')
T = TypeVar("T")
logger = custom_logger(f"{settings.LOG_PREFIX}_repository")
@@ -52,12 +52,13 @@ class BaseRepository(ABC, Generic[T]):
logger.error(f"{self.model.__name__} 수정 실패: {str(e)}")
raise
def delete(self, entity: T) -> None:
def delete(self, entity: T) -> bool:
"""엔티티 삭제"""
try:
self.session.delete(entity)
self.session.flush()
logger.debug(f"{self.model.__name__} 삭제완료: {getattr(entity, 'id', 'unknown')}")
return True
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 삭제 실패: {str(e)}")
raise

View File

@@ -0,0 +1,20 @@
"""
서비스 레이어 패키지
"""
from services.auth.auth_service import AuthService
from services.user.user_service import UserService
from services.auth.cache_service import (
CacheService,
UserCacheService,
JWTBlacklistService
)
__all__ = [
"AuthService",
"UserService",
"CacheService",
"UserCacheService",
"JWTBlacklistService",
"AccountLockPolicy"
]

View File

@@ -130,8 +130,8 @@ class AuthService:
user_ips_repo = UserIpsRepository()
# IP 및 User-Agent 추출
ip_address = request.headers.get('X-Real-IP', '') if request.headers else ''
user_agent = request.headers.get('User-Agent', '') if request.headers else ''
ip_address = request.headers.get("X-Real-IP", "") if request.headers else ""
user_agent = request.headers.get("User-Agent", "") if request.headers else ""
# 기존 IP 기록 조회 로직
existing_ip = user_ips_repo.find_by_user_and_ip(user, ip_address)

View File

@@ -165,4 +165,46 @@ class JWTBlacklistService:
def is_blacklisted(cls, jti: str) -> bool:
"""토큰이 블랙리스트에 있는지 확인"""
key = cls._get_key(jti)
return CacheService.exists(key)
return CacheService.exists(key)
class AccountLockService:
"""계정 잠금 관리 서비스 (Redis 기반)"""
PREFIX = "account:lock:"
TTL = settings.ACCOUNT_LOCKOUT_DURATION_MINUTES * 60 # 분을 초로 변환
@classmethod
def _get_key(cls, user_uuid: str) -> str:
"""잠금 키 생성"""
return f"{cls.PREFIX}{user_uuid}"
@classmethod
def lock(cls, user_uuid: str, ttl: Optional[int] = None) -> bool:
"""계정 잠금 (Redis에 TTL과 함께 저장)"""
key = cls._get_key(user_uuid)
ttl = ttl or cls.TTL
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 계정 잠금이 불가능합니다.")
return False
success = CacheService.set(key, "1", ttl)
if success:
logger.info(f"계정 잠금 완료: {user_uuid} (TTL: {ttl}s)")
return success
@classmethod
def is_locked(cls, user_uuid: str) -> bool:
"""계정이 잠겨있는지 확인"""
key = cls._get_key(user_uuid)
return CacheService.exists(key)
@classmethod
def unlock(cls, user_uuid: str) -> bool:
"""계정 잠금 해제"""
key = cls._get_key(user_uuid)
success = CacheService.delete(key)
if success:
logger.info(f"계정 잠금 해제 완료: {user_uuid}")
return success

View File

@@ -1,4 +1,111 @@
"""
사용자 서비스
사용자 관리 관련 비즈니스 로직을 관리하는 서비스 레이어
"""
"""
from typing import Dict, Any, Optional
from types import SimpleNamespace
from models.user_model.users import Users
from repositories import UserRepository
from services.auth.cache_service import UserCacheService, JWTBlacklistService
from utils.db_decorators import transactional
from extensions import custom_logger
import settings
logger = custom_logger(f"{settings.LOG_PREFIX}_user_service")
class UserService:
"""
사용자 서비스
"""
@staticmethod
def get_user_by_uuid(user_uuid: str, use_cache: bool = True) -> Optional[Users]:
"""UUID로 사용자 조회 (캐시 우선)"""
if use_cache:
# 캐시 조회
cached_user_data = UserCacheService.get(user_uuid)
if cached_user_data:
user = SimpleNamespace(**cached_user_data)
user.to_dict = lambda: cached_user_data
logger.debug(f"캐시에서 사용자 조회 성공: {user_uuid}")
return user
# DB 조회
user_repo = UserRepository()
user = user_repo.find_by_uuid(user_uuid)
if user and use_cache:
# 캐시 저장
UserCacheService.set(user_uuid, user.to_dict())
logger.debug(f"DB에서 사용자 조회 후 캐시에 저장: {user_uuid}")
return user
@staticmethod
def get_user_by_id(user_ud: str) -> Optional[Users]:
"""ID로 사용자 조회"""
user_repo = UserRepository()
return user_repo.get_by_id(user_ud)
@staticmethod
@transactional
def update_user_profile(user_uuid: str, update_data: Dict[str, Any]) -> Users:
"""사용자 프로필 업데이트"""
user_repo = UserRepository()
# 사용자 조회
user = user_repo.find_by_uuid(user_uuid)
if not user:
return None
# 업데이트 가능한 필드만 처리
if "user_name" in update_data:
user.user_name = update_data["user_name"]
user = user_repo.update(user)
# 캐시 갱신
UserCacheService.refresh(user_uuid, user.to_dict())
logger.info(f"사용자 프로필이 업데이트되었습니다: {user_uuid}")
return user
@staticmethod
def invalidate_user_cache(user_uuid: str) -> bool:
"""사용자 캐시 무효화"""
success = UserCacheService.delete(user_uuid)
if success:
logger.debug(f"사용자 캐시가 무효화되었습니다: {user_uuid}")
return success
@staticmethod
@transactional
def delete_user(user_uuid: str, jti: Optional[str] = None) -> bool:
"""사용자 삭제 (회원 탈퇴)"""
user_repo = UserRepository()
user = user_repo.find_by_uuid(user_uuid)
if not user:
logger.warning(f"삭제할 사용자를 DB에서 찾을 수 없음: {user_uuid}")
UserCacheService.delete(user_uuid)
return False
# 삭제
success = user_repo.delete(user)
if success:
# 캐시 무효화
UserCacheService.delete(user_uuid)
# JWT 블랙리스트 추가 (현재 토큰 무효화)
if jti:
JWTBlacklistService.add(jti)
logger.debug(f"JWT 블랙리스트 추가: {jti}")
logger.info(f"사용자가 삭제되었습니다: {user_uuid}")
return success

View File

@@ -13,76 +13,76 @@ import multiprocessing
load_dotenv()
# 환경설정
DEBUG = os.getenv('DEBUG', False)
DEBUG = os.getenv("DEBUG", False)
# 서버
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', '5000'))
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "5000"))
# 데이터베이스 설정
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5000))
DB_USER = os.getenv('DB_USER', 'root')
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
DB_NAME = os.getenv('DB_NAME', 'test')
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 5000))
DB_USER = os.getenv("DB_USER", "root")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
DB_NAME = os.getenv("DB_NAME", "test")
SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4"
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ECHO = False
SQLALCHEMY_POOL_SIZE = int(os.getenv('DB_POOL_SIZE', 10))
SQLALCHEMY_MAX_OVERFLOW = int(os.getenv('DB_MAX_OVERFLOW', 20))
SQLALCHEMY_POOL_RECYCLE = int(os.getenv('DB_POOL_RECYCLE', 3600))
SQLALCHEMY_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", 10))
SQLALCHEMY_MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", 20))
SQLALCHEMY_POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", 3600))
JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-this-in-production')
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=int(os.getenv('JWT_ACCESS_TOKEN_HOURS', 24)))
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=int(os.getenv('JWT_REFRESH_TOKEN_DAYS', 30)))
JWT_TOKEN_LOCATION = ['headers']
JWT_HEADER_NAME = 'Authorization'
JWT_HEADER_TYPE = 'Bearer'
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this-in-production")
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=int(os.getenv("JWT_ACCESS_TOKEN_HOURS", 24)))
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=int(os.getenv("JWT_REFRESH_TOKEN_DAYS", 30)))
JWT_TOKEN_LOCATION = ["headers"]
JWT_HEADER_NAME = "Authorization"
JWT_HEADER_TYPE = "Bearer"
JWT_ENCODE_JTI = True # JWT ID 포함 (블랙리스트 추적용)
# CORS 설정
CORS_ORIGINS = os.getenv('CORS_ORIGINS', '*').split(',')
CORS_ALLOW_HEADERS = ['Content-Type', 'Authorization']
CORS_METHODS = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS']
CORS_ORIGINS = os.getenv("CORS_ORIGINS", "*").split(",")
CORS_ALLOW_HEADERS = ["Content-Type", "Authorization"]
CORS_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
# Redis 설정
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
REDIS_DB = int(os.getenv('REDIS_DB', 0))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
# 로깅 설정
LOG_DIR = os.getenv('LOG_DIR', 'logs')
LOG_PREFIX = os.getenv('LOG_PREFIX', 'default.')
LOG_LEVEL_TEXT = os.getenv('LOG_LEVEL_TEXT', 'INFO' if not DEBUG else 'DEBUG')
MAX_LOG_SIZE = int(os.getenv('MAX_LOG_SIZE', 10 * 1024 * 1024)) # 10MB
LOG_BACKUP_COUNT = int(os.getenv('LOG_BACKUP_COUNT', 5))
LOG_DIR = os.getenv("LOG_DIR", "logs")
LOG_PREFIX = os.getenv("LOG_PREFIX", "default.")
LOG_LEVEL_TEXT = os.getenv("LOG_LEVEL_TEXT", "INFO" if not DEBUG else "DEBUG")
MAX_LOG_SIZE = int(os.getenv("MAX_LOG_SIZE", 10 * 1024 * 1024)) # 10MB
LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", 5))
# Worker 설정
GUNICORN_WORKERS = int(os.getenv('GUNICORN_WORKERS', multiprocessing.cpu_count() * 2 + 1))
GUNICORN_WORKER_CLASS = os.getenv('GUNICORN_WORKER_CLASS', 'gevent') # gevent for async I/O
GUNICORN_WORKER_CONNECTIONS = int(os.getenv('GUNICORN_WORKER_CONNECTIONS', 1000))
GUNICORN_THREADS = int(os.getenv('GUNICORN_THREADS', 1)) # gevent 사용 시 무시됨
GUNICORN_WORKERS = int(os.getenv("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
GUNICORN_WORKER_CLASS = os.getenv("GUNICORN_WORKER_CLASS", "gevent") # gevent for async I/O
GUNICORN_WORKER_CONNECTIONS = int(os.getenv("GUNICORN_WORKER_CONNECTIONS", 1000))
GUNICORN_THREADS = int(os.getenv("GUNICORN_THREADS", 1)) # gevent 사용 시 무시됨
# 타임아웃 설정
GUNICORN_TIMEOUT = int(os.getenv('GUNICORN_TIMEOUT', 30)) # 요청 타임아웃 (초)
GUNICORN_KEEPALIVE = int(os.getenv('GUNICORN_KEEPALIVE', 5)) # Keep-Alive 연결 유지 시간
GUNICORN_GRACEFUL_TIMEOUT = int(os.getenv('GUNICORN_GRACEFUL_TIMEOUT', 30)) # Graceful shutdown 대기 시간
GUNICORN_TIMEOUT = int(os.getenv("GUNICORN_TIMEOUT", 30)) # 요청 타임아웃 (초)
GUNICORN_KEEPALIVE = int(os.getenv("GUNICORN_KEEPALIVE", 5)) # Keep-Alive 연결 유지 시간
GUNICORN_GRACEFUL_TIMEOUT = int(os.getenv("GUNICORN_GRACEFUL_TIMEOUT", 30)) # Graceful shutdown 대기 시간
# 재시작 설정 (메모리 누수 방지)
GUNICORN_MAX_REQUESTS = int(os.getenv('GUNICORN_MAX_REQUESTS', 1000))
GUNICORN_MAX_REQUESTS_JITTER = int(os.getenv('GUNICORN_MAX_REQUESTS_JITTER', 50))
GUNICORN_MAX_REQUESTS = int(os.getenv("GUNICORN_MAX_REQUESTS", 1000))
GUNICORN_MAX_REQUESTS_JITTER = int(os.getenv("GUNICORN_MAX_REQUESTS_JITTER", 50))
# 로깅 설정
GUNICORN_ACCESSLOG = os.getenv('GUNICORN_ACCESSLOG', '-') # '-' = stdout
GUNICORN_ERRORLOG = os.getenv('GUNICORN_ERRORLOG', '-')
GUNICORN_LOGLEVEL = os.getenv('GUNICORN_LOGLEVEL', 'info')
GUNICORN_ACCESSLOG = os.getenv("GUNICORN_ACCESSLOG", "-") # "-" = stdout
GUNICORN_ERRORLOG = os.getenv("GUNICORN_ERRORLOG", "-")
GUNICORN_LOGLEVEL = os.getenv("GUNICORN_LOGLEVEL", "info")
# 보안 설정 - 로그인 실패
MAX_LOGIN_ATTEMPTS = int(os.getenv('MAX_LOGIN_ATTEMPTS', 5))
ACCOUNT_LOCKOUT_DURATION_MINUTES = int(os.getenv('ACCOUNT_LOCKOUT_DURATION_MINUTES', 30))
MAX_LOGIN_ATTEMPTS = int(os.getenv("MAX_LOGIN_ATTEMPTS", 5))
ACCOUNT_LOCKOUT_DURATION_MINUTES = int(os.getenv("ACCOUNT_LOCKOUT_DURATION_MINUTES", 30))
# Redis 캐시 TTL 설정 (초 단위)
REDIS_USER_CACHE_TTL = int(os.getenv('REDIS_USER_CACHE_TTL', 300)) # 5분
REDIS_JWT_BLACKLIST_TTL = int(os.getenv('REDIS_JWT_BLACKLIST_TTL', 86400)) # 24시간
REDIS_USER_CACHE_TTL = int(os.getenv("REDIS_USER_CACHE_TTL", 300)) # 5분
REDIS_JWT_BLACKLIST_TTL = int(os.getenv("REDIS_JWT_BLACKLIST_TTL", 86400)) # 24시간

82
src/utils/decorators.py Normal file
View File

@@ -0,0 +1,82 @@
"""
데코레이터 유틸리티 모듈
"""
from functools import wraps
from typing import Callable
from flask import request, g
from flask_jwt_extended import get_jwt_identity, get_jwt
from utils.response import *
import settings
from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_decorators")
# Lazy 로딩
def _get_user_service():
from services.user.user_service import UserService
return UserService()
def _get_account_lock_service():
from services.auth.cache_service import AccountLockService
return AccountLockService
def _get_jwt_blacklist_service():
from services.auth.cache_service import JWTBlacklistService
return JWTBlacklistService
def load_current_user(f: Callable) -> Callable:
"""
JWT identity(user_uuid)를 기반으로 User 객체를 로드함.
g.current_user에 저장
Redis 캐시를 활용
JWT 블랙리스트 체크 (로그아웃된 토큰 방지)
"""
@wraps(f)
def decorated_function(*args, **kwargs):
try:
# JWT 페이로드 및 user_uuid 추출
jwt_payload = get_jwt()
user_uuid = get_jwt_identity()
jti = jwt_payload.get("jti")
if not user_uuid:
logger.warning("JWT에서 user_uuid를 찾을 수 없습니다.")
return unauthorized_response(message="토큰에서 사용자 정보를 찾을 수 없습니다.")
# JWT 블랙리스트 체크 (사용이 불가능한 토큰)
JWTBlacklistService = _get_jwt_blacklist_service()
if jti and JWTBlacklistService.is_blacklisted(jti):
logger.warning(f"블랙리스트된 토큰 접근 시도: jti={jti}, user_uuid={user_uuid}")
return unauthorized_response(message="사용이 불가능한 토큰입니다. 다시 로그인해주세요.")
# UserService를 통한 조회
UserService = _get_user_service()
user = UserService.get_user_by_uuid(user_uuid, use_cache=True)
if not user:
logger.warning(f"user_uuid에 해당하는 사용자를 찾을 수 없습니다: {user_uuid}")
return unauthorized_response(message="사용자를 찾을 수 없습니다.")
# 계정 잠금 체크 (Redis 기반)
AccountLockService = _get_account_lock_service()
if AccountLockService.is_locked(user_uuid):
logger.warning(f"잠긴 계정 접근 시도: {user_uuid}")
return unauthorized_response(message="계정이 잠겼습니다. 관리자에게 문의하세요.")
# DB의 로그인 실패 횟수가 최대 시도 횟수를 초과하면 Redis에 잠금 설정
if (user.count or 0) >= settings.MAX_LOGIN_ATTEMPTS:
AccountLockService.lock(user_uuid)
logger.warning(f"로그인 실패 횟수 초과로 계정 잠금: {user_uuid}")
return unauthorized_response(message="계정이 잠겼습니다. 관리자에게 문의하세요.")
# g에 현재 사용자 저장
g.current_user = user
return f(*args, **kwargs)
except Exception as e:
logger.exception(f"현재 사용자 로드에 실패하였습니다. error={str(e)}")
return error_response(message="현재 사용자 정보를 불러오는 데 실패하였습니다.")
return decorated_function

View File

@@ -87,7 +87,7 @@ logging.basicConfig(
# create own logger class to prevent init custom loggers by other libs
class GatekeeperLogger(logging.Logger):
def addHandler(self, h):
# only 'root', '__main__' and own loggers will be accepted
# only "root", "__main__" and own loggers will be accepted
if self.name == "root" or self.name.startswith((settings.LOG_PREFIX, "__main__")):
return super().addHandler(h)
@@ -96,7 +96,7 @@ logging.setLoggerClass(GatekeeperLogger)
# Werkzeug 로거 설정 (Flask 개발 서버용)
# 기존 werkzeug 로거의 핸들러를 모두 제거하고 커스텀 포맷 적용
import logging as _logging
werkzeug_logger = _logging.getLogger('werkzeug')
werkzeug_logger = _logging.getLogger("werkzeug")
werkzeug_logger.handlers = [] # 기존 핸들러 제거
werkzeug_logger.setLevel(settings.LOG_LEVEL_TEXT)
handler = _logging.StreamHandler(sys.stdout)
@@ -105,7 +105,7 @@ werkzeug_logger.addHandler(handler)
werkzeug_logger.propagate = False
# Flask의 기본 로거도 동일하게 설정
flask_logger = _logging.getLogger('flask.app')
flask_logger = _logging.getLogger("flask.app")
flask_logger.handlers = []
flask_logger.setLevel(settings.LOG_LEVEL_TEXT)
flask_handler = _logging.StreamHandler(sys.stdout)

View File

@@ -66,7 +66,7 @@ def error_response(message: str = "An error occurred", status_code: int = 400,
message,
message,
errors=errors,
error_code=error_code or HTTP_STATUS_CODES.get(status_code, 'UNKNOWN_ERROR')
error_code=error_code or HTTP_STATUS_CODES.get(status_code, "UNKNOWN_ERROR")
)
return (jsonify(body), status_code) if not _is_restx_context() else (body, status_code)

View File

@@ -9,32 +9,32 @@ from flask_restx import Api
import constants
# Swagger API Blueprint 및 설정
api_blueprint = Blueprint('api', __name__)
api_blueprint = Blueprint("api", __name__)
api = Api(
api_blueprint,
title='NuriQ API',
title="NuriQ API",
version=constants.API_VERSION,
description='NuriQ REST API 문서',
doc='/docs',
description="NuriQ REST API 문서",
doc="/docs",
authorizations={
'Bearer': {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization',
'description': 'JWT 토큰 입력 (예: Bearer <token>)'
"Bearer": {
"type": "apiKey",
"in": "header",
"name": "Authorization",
"description": "JWT 토큰 입력 (예: Bearer <token>)"
}
},
security='Bearer',
security="Bearer",
validate=True
)
# 네임 스페이스 정의 및 API 추가
auth_ns = api.namespace('auth', description='인증 API', path='/auth')
auth_ns = api.namespace("auth", description="인증 API", path="/auth")
user_ns = api.namespace("users", description="유저 API", path="/users")
# 네임 스페이스 정의 및 API에 추가
__all__ = [
'api_blueprint', 'api',
'auth_ns'
"api_blueprint", "api",
"auth_ns", "user_ns"
]