[FEAT] (사용자 로직): 인증 서비스 로직 구현 중

v0.1.1 (2025-11-13)
- 사용자 관련 인증 서비스 로직 구현 중
This commit is contained in:
2025-11-13 17:29:22 +09:00
parent 422c0638fd
commit ae2766cff5
16 changed files with 553 additions and 3 deletions

View File

@@ -1,6 +1,14 @@
# API INFO
API_VERSION = "v0.1"
API_DATE = 2025-11-13
API_VERSION = "v0.1.1"
API_DATE = "2025-11-13"
# BASE DATE FORMAT
DATE_FORMAT = "%Y-%m-%d %H:%M:%S %z"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S %z"
# 사용여부
IS_N = 0
IS_Y = 1
# 사용자 권한
AUTH_LEVEL_ADMIN = '10'
AUTH_LEVEL_USER = '20'

View File

@@ -0,0 +1,13 @@
"""
Models 레이어
"""
# User
from .user_model import users, user_ips, user_dto
__all__ = [
# User
users,
user_ips,
user_dto
]

View File

@@ -0,0 +1,63 @@
from pydantic import BaseModel, field_validator
from typing import Optional
class CreateUserDTO(BaseModel):
"""회원 가입 DTO"""
# 필수
id: str
password: str
# 선택
user_name: Optional[str] = None
@field_validator('id')
@classmethod
def validate_id(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError('ID는 필수입니다')
if len(v) < 3 or len(v) > 50:
raise ValueError('ID는 3-50자 사이여야 합니다')
return v.strip()
@field_validator('password')
@classmethod
def validate_password(cls, v):
if not v or len(v) < 6:
raise ValueError('비밀번호는 최소 6자 이상이어야 합니다')
return v
# 자동 형변환 방지
class Config:
strict = True
class CheckUserDTO(BaseModel):
"""로그인 확인 DTO"""
# 필수
id: str
password: str
@field_validator('id', 'password')
@classmethod
def validate_required(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError('필수 항목입니다')
return v.strip()
# 자동 형변환 방지
class Config:
strict = True
class CreateUserIpsDTO(BaseModel):
"""사용자 IP 저장 DTO"""
# 필수
user_uuid: str
user_ip: str
user_agent: str
# 자동 형변환 방지
class Config:
strict = True

View File

@@ -0,0 +1,50 @@
"""
사용자 IP 모델
"""
from extensions import db
from typing import Dict, Any
from utils.func import get_timestamp_ms
class UserIps(db.Model):
"""사용자 IP 모델"""
__tablename__ = "user_ips"
__table_args__ = (
db.PrimaryKeyConstraint("user_uuid", "user_ip", name="pk_user_ips")
)
# 기본 필드
user_uuid = db.Column(db.String(255), nullable=False, index=True, comment="유저 UUID")
user_ip = db.Column(db.String(255), nullable=False, index=True, comment="유저 IP")
user_agent = db.Column(db.String(255), nullable=False, comment="유저 에이전트")
# 타임스탬프 (BIGINT - Unix Timestamp in milliseconds)
created_at = db.Column(db.BigInteger, nullable=True, comment="생성일 (unix timestamp, ms)")
lasted_at = db.Column(db.BigInteger, nullable=True, comment="마지막 접속일 (unix timestamp, ms)")
def __init__(self, user_uuid: str, user_ip: str, user_agent: str) -> None:
"""사용자 IP 초기화"""
self.user_uuid = user_uuid
self.user_ip = user_ip
self.user_agent = user_agent
# 타임스탬프
current_time = int(get_timestamp_ms())
self.created_at = current_time
self.lasted_at = current_time
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리 변환"""
return {
'user_uuid': self.user_uuid,
'user_ip': self.user_ip,
'user_agent': self.user_agent,
'created_at': self.created_at,
'lasted_at': self.lasted_at
}
def __repr__(self) -> str:
"""디버깅, 로깅용"""
return f"<UserIps user={self.user_uuid[:8]}, ip={self.user_ip}"

View File

@@ -0,0 +1,70 @@
"""
사용자 모델
"""
import uuid
from typing import Dict, Any
from extensions import db
from utils.func import get_timestamp_ms
import constants
class Users(db.Model):
"""사용자 모델"""
__tablename__ = "users"
# 기본 필드
user_uuid = db.Column(db.String(255), primary_key=True, nullable=False, comment="유저 UUID")
id = db.Column(db.String(255), unique=True, nullable=False, index=True, comment="유저 ID")
user_name = db.Column(db.String(50), nullable=True, comment="유저명")
password = db.Column(db.String(255), nullable=False, comment="비밀번호")
auth_level = db.Column(db.String(255), nullable=True, default=constants.AUTH_LEVEL_USER, comment="권한 레벨 (10: 관리자, 20: 사용자)")
count = db.Column(db.SmallInteger, default=0, nullable=True, comment="로그인 싫패 횟수")
# 타임스탬프 (BIGINT - Unix Timestamp in milliseconds)
created_at = db.Column(db.BigInteger, nullable=True, comment="생성일 (unix timestamp, ms)")
updated_at = db.Column(db.BigInteger, nullable=True, comment="수정일 (unix timestamp, ms)")
def __init__(self, id: str, password_hash: str, **kwargs: Any) -> None:
"""사용자 초기화"""
self.user_uuid = str(uuid.uuid4())
self.id = id
self.password = password_hash
# 선택적 필드
self.user_name = kwargs.get('user_name')
# 타임스탬프
current_time = int(get_timestamp_ms())
self.created_at = current_time
self.updated_at = current_time
def increment_failed_login(self) -> None:
"""로그인 실패 횟수 증가"""
self.count = (self.count or 0) + 1
self.updated_at = int(get_timestamp_ms())
def reset_failed_login(self) -> None:
"""로그인 실패 횟수 초기화"""
self.count = 0
self.updated_at = int(get_timestamp_ms())
def update_timestamp(self) -> None:
"""타임스탬프 업데이트"""
self.updated_at = int(get_timestamp_ms())
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
'user_uuid': self.user_uuid,
'id': self.id,
'user_name': self.user_name,
'auth_level': self.auth_level,
'count': self.count,
'created_at': self.created_at,
'updated_at': self.updated_at
}
def __repr__(self) -> str:
"""디버깅, 로깅용"""
return f"<User {self.id}>"

View File

@@ -28,6 +28,11 @@ class RedisFacadeOpr():
Returns:
bool: 연결 성공 시 True, 실패 시 False
"""
# 이미 연결되어 있다면 재연결하지 않음
if self.is_connected():
logging.debug("Redis already connected, skipping reconnection")
return True
try:
# Redis Client 연결
self.redis_client = redis.StrictRedis(**self.redis_config)

View File

@@ -0,0 +1,17 @@
"""
Repository 레이어
"""
# Base
from .base_repository import BaseRepository
# User
from .user.user_repository import UserRepository
from .user.user_ips_repository import UserIpsRepository
__all__ = [
# Base
BaseRepository,
# User
UserRepository, UserIpsRepository
]

View File

@@ -0,0 +1,31 @@
"""
기본 레파지토리 추상 클래스
"""
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Optional, List, Type
from sqlalchemy.exc import SQLAlchemyError
from extensions import db, custom_logger
import settings
# Generic type for model entitles
T = TypeVar('T')
logger = custom_logger(f"{settings.LOG_PREFIX}_repository")
class BaseRepository(ABC, Generic[T]):
"""
기본직인 CRUD 제공
"""
@property
@abstractmethod
def model(self) -> Type[T]:
"""레파지토리 반환"""
pass
@property
def session(self):
"""현재 DB 세션 가져오기"""
return db.session

View File

@@ -0,0 +1,19 @@
"""
사용자 IPs Repository
"""
from typing import Optional, Type
from base_repository import BaseRepository
from models.user_model import users, user_ips
from utils.func import get_timestamp_ms
class UserIpsRepository(BaseRepository[user_ips.UserIps]):
"""
사용자 IPs Repository
"""
@property
def model(self) -> Type[user_ips.UserIps]:
"""모델 클래스 반환"""
return user_ips.UserIps

View File

@@ -0,0 +1,19 @@
"""
사용자 Repository
"""
from typing import Optional, Type
from base_repository import BaseRepository
from models.user_model import users
from utils.func import get_timestamp_ms
class UserRepository(BaseRepository[users.Users]):
"""
사용자 Repository
"""
@property
def model(self) -> Type[users.Users]:
"""모델 클래스 반환"""
return users.Users

View File

@@ -0,0 +1,138 @@
"""
인증 서비스
"""
from typing import Dict, Any, Tuple, Optional
from flask_jwt_extended import create_access_token, create_refresh_token
from models.user_model.users import Users
from models.user_model.user_ips import UserIps
from models.user_model import user_dto
from repositories import UserRepository, UserIpsRepository
import settings
from utils import func
from utils.account_lock_policy import AccountLockPolicy
from utils.db_decorators import transactional
from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_auth_service")
class AuthService:
"""
인증 서비스 - 회원가입, 로그인, 로그아웃 비즈니스 로직
"""
@staticmethod
@transactional
def register(user_data: Dict[str, Any]) -> Users:
"""회원 가입"""
# Pydantic 검증
create_user_dto = user_dto.CreateUserDTO(**user_data)
user_id = create_user_dto.id
password = create_user_dto.password
user_name = create_user_dto.user_name
user_repo = UserRepository()
# 중복 검사
if user_repo.exists_by_id(user_id):
...
# 비밀번호 생성
hash_password = func.hash_password(password)
# 사용자 생성
user = Users(
id=user_id,
password=hash_password,
user_name=user_name
)
user = user_repo.create(user)
logger.info(f"유저가 생성되었습니다: {user_id}")
return user
@staticmethod
@transactional
def login(credentials: Dict[str, Any]) -> Tuple[Users, str, str]:
# Pydantic 검증
check_user_dto = user_dto.CheckUserDTO(**credentials)
user_repo = UserRepository()
# 사용자 조회
user = user_repo.find_by_id(check_user_dto.id)
if not user:
...
# 계정 잠금 정책 체크
lock_policy = AccountLockPolicy(
max_attempts=settings.MAX_LOGIN_ATTEMPTS,
lockout_duration_minutes=settings.ACCOUNT_LOCKOUT_DURATION_MINUTES
)
if lock_policy.is_locked(user.count):
...
# 비밀번호 검증
if not func.verify_password(check_user_dto.password, user.password):
user.increment_failed_login()
user_repo.update(user)
# commit되어 실패 횟수 기록
...
# 로그인 성공 - 실패 횟수 초기화
user.reset_failed_login()
user_repo.update(user)
# 토큰 생성
access_token = create_access_token(identity=user.user_uuid)
refresh_token = create_refresh_token(identity=user.user_uuid)
# Redis 사용자 정보 캐싱 로직
logger.info(f"사용자 로그인: {user.id}")
return user, access_token, refresh_token
@staticmethod
def refresh_access_token(user_uuid: str) -> str:
"""엑세스 토큰 갱신"""
access_token = create_access_token(identity=user_uuid)
logger.debug(f"Access Token 재발급: {user_uuid}")
return access_token
@staticmethod
def logout(jti: str) -> bool:
"""로그아웃 (JWT 블랙리스트 추가)"""
...
@staticmethod
@transactional
def save_user_ips(request, user: Users) -> None:
"""사용자 IPs 저장"""
user_ips_repo = UserIpsRepository()
# IP 및 User-Agent 추출
ip_address = request.headers.get('X-Real-IP', '') if request.headers else ''
user_agent = request.headers.get('User-Agent', '') if request.headers else ''
# 기존 IP 기록 조회 로직
if True:
...
else:
user_ips = UserIps(
user_uuid=user.user_uuid,
user_ip=ip_address,
user_agent=user_agent
)
user_ips_repo.create(user_ips)
logger.debug(f"유저 IP 저장: {user.id} - {ip_address}")

View File

@@ -0,0 +1,4 @@
"""
캐시 서비스
Redis 기반 캐싱 로직 관리하는 서비스 레이어
"""

View File

@@ -0,0 +1,4 @@
"""
사용자 서비스
사용자 관리 관련 비즈니스 로직을 관리하는 서비스 레이어
"""

View File

@@ -0,0 +1,32 @@
"""
계정 잠금 정책 비즈니스 로직
"""
from typing import Optional
from utils import func
class AccountLockPolicy:
"""계정 잠금 정책 비즈니스 로직"""
def __init__(self, max_attempts: int = 5,
lockout_duration_minutes: int = 30):
self.max_attempts = max_attempts
self.lockout_duration_minutes = lockout_duration_minutes
def is_locked(self, failed_count: int, locked_at: Optional[int] = None) -> bool:
"""계정 잠금 여부 확인"""
if failed_count < self.max_attempts:
return False
# 잠금 해제 시간 체크
if locked_at:
current_time = func.get_timestamp_ms()
lockout_duration_ms = self.lockout_duration_minutes * 60 * 1000
if current_time - locked_at > lockout_duration_ms:
return False
return True
def should_reset(self, failed_count: int, locked_at: Optional[int] = None) -> bool:
return not self.is_locked(failed_count, locked_at)

View File

@@ -0,0 +1,47 @@
"""
DB 트랜잭션 데코레이더
"""
import functools
from typing import Callable
from extensions import custom_logger
import settings
logger = custom_logger(f"{settings.LOG_PREFIX})_db_decorator")
_db = None
def get_db():
"""DB Lazy 로딩"""
global _db
if _db is None:
from extensions import db
_db = db
return db
def transactional(f: Callable) -> Callable:
"""
데이터 베이스 트랜잭션 관리하는 데코레이터
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# DB 가져오기
db = get_db()
try:
# 함수 실행
result = f(*args, **kwargs)
# 성공 시 커밋
db.session.commit()
logger.debug(f"트랜잭션 커밋 완료 - {f.__name__}")
return result
except Exception as e:
# 실패 시 롤백
db.session.rollback()
logger.error(f"트랜잭션 롤백 - {f.__name__}")
raise
return decorated_function()

30
src/utils/func.py Normal file
View File

@@ -0,0 +1,30 @@
def get_timestamp_ms():
"""타임스탬프 생성"""
import time
return round(time.time() * 1000)
def hash_password(password: str) -> str:
"""비밀번호 암호화"""
import bcrypt
if not password:
raise ValueError("비밀번호는 비어있을 수 없습니다.")
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
return hashed.decode("utf-8")
def verify_password(password: str, hashed_password: str) -> bool:
import bcrypt
"""비밀번호 검증"""
if not password or not hashed_password:
return False
try:
return bcrypt.checkpw(
password.encode("utf-8"),
hashed_password.encode("utf-8")
)
except (ValueError, AttributeError):
return False