Files
nuriq_back/src/services/auth/auth_service.py
윤영훈 abf405f8ae [FEAT] (사용자 로직): 프로필 서비스 구현 완료
v0.1.3 (2025-11-16)
- 프로필 조회, 프로필 업데이트, 탈퇴 구현 완료.
2025-11-16 18:02:27 +09:00

152 lines
4.8 KiB
Python

"""
인증 서비스
"""
from typing import Dict, Any, Tuple, Optional
from flask_jwt_extended import create_access_token, create_refresh_token
from models.user_model.users import Users
from models.user_model.user_ips import UserIps
from models.user_model import user_dto
from repositories import UserRepository, UserIpsRepository
import settings
from utils import func
from utils.response import *
from utils.account_lock_policy import AccountLockPolicy
from utils.db_decorators import transactional
from services.auth.cache_service import JWTBlacklistService
from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_auth_service")
class AuthService:
"""
인증 서비스 - 회원가입, 로그인, 로그아웃 비즈니스 로직
"""
@staticmethod
@transactional
def register(user_data: Dict[str, Any]) -> Optional[Users]:
"""회원 가입"""
# Pydantic 검증
create_user_dto = user_dto.CreateUserDTO(**user_data)
user_id = create_user_dto.id
password = create_user_dto.password
user_name = create_user_dto.user_name
user_repo = UserRepository()
# 중복 검사
if user_repo.exists_by_id(user_id):
return None
# 비밀번호 생성
hash_password = func.hash_password(password)
# 사용자 생성
user = Users(
id=user_id,
hash_password=hash_password,
user_name=user_name
)
user = user_repo.create(user)
logger.info(f"유저가 생성되었습니다: {user_id}")
return user
@staticmethod
@transactional
def login(credentials: Dict[str, Any]) -> Optional[Tuple[Users, str, str]]:
# Pydantic 검증
check_user_dto = user_dto.CheckUserDTO(**credentials)
user_repo = UserRepository()
# 사용자 조회
user = user_repo.find_by_id(check_user_dto.id)
if not user:
return None
# 계정 잠금 정책 체크
lock_policy = AccountLockPolicy(
max_attempts=settings.MAX_LOGIN_ATTEMPTS,
lockout_duration_minutes=settings.ACCOUNT_LOCKOUT_DURATION_MINUTES
)
if lock_policy.is_locked(user.count):
return None
# 비밀번호 검증
if not func.verify_password(check_user_dto.password, user.password):
user.increment_failed_login()
user_repo.update(user)
return None
# 로그인 성공 - 실패 횟수 초기화
user.reset_failed_login()
user_repo.update(user)
# 토큰 생성
access_token = create_access_token(identity=user.user_uuid)
refresh_token = create_refresh_token(identity=user.user_uuid)
# Redis 사용자 정보 캐싱 로직
logger.info(f"사용자 로그인: {user.id}")
return (user, access_token, refresh_token)
@staticmethod
def refresh_access_token(user_uuid: str) -> str:
try:
"""엑세스 토큰 갱신"""
access_token = create_access_token(identity=user_uuid)
logger.debug(f"Access Token 재발급: {user_uuid}")
return access_token
except Exception as e:
logger.exception(f"유저 ({user_uuid}) 엑세스 토큰 갱신에 실패하였습니다. error={str(e)}")
return None
@staticmethod
def logout(jti: str) -> bool:
"""로그아웃 (JWT 블랙리스트 추가)"""
success = JWTBlacklistService.add(jti)
if success:
logger.info(f"로그아웃 처리 완료 - JTI 블랙리스트 추가: {jti}")
return success
@staticmethod
@transactional
def save_user_ips(request, user: Users) -> None:
"""사용자 IPs 저장"""
user_ips_repo = UserIpsRepository()
# IP 및 User-Agent 추출
ip_address = request.headers.get("X-Real-IP", "") if request.headers else ""
user_agent = request.headers.get("User-Agent", "") if request.headers else ""
# 기존 IP 기록 조회 로직
existing_ip = user_ips_repo.find_by_user_and_ip(user, ip_address)
if existing_ip:
# 존재하면 마지막 접속 시간 업데이트
user_ips_repo.update_last_access(user.user_uuid, ip_address)
else:
# 없으면 새로 생성
user_ips = UserIps(
user_uuid=user.user_uuid,
user_ip=ip_address,
user_agent=user_agent
)
user_ips_repo.create(user_ips)
logger.debug(f"유저 IP 저장: {user.id} - {ip_address}")