Compare commits

...

10 Commits

Author SHA1 Message Date
73ebdf9712 Merge pull request 'dev' (#6) from dev into main
Reviewed-on: #6
2025-11-17 11:02:14 +00:00
0d4cc83a97 Merge pull request '[FEAT] (카레고리 로직): 카테고리 서비스 구현 완료' (#5) from dev_stage into dev
Reviewed-on: #5
2025-11-17 11:02:00 +00:00
c9bfa81305 [FEAT] (카레고리 로직): 카테고리 서비스 구현 완료
v0.1.4 (2025-11-27)
- 카테고리 관련 API 구현 완료.
2025-11-17 20:01:08 +09:00
6daf842642 Merge pull request 'dev' (#4) from dev into main
Reviewed-on: #4
2025-11-16 09:04:41 +00:00
fc67076e80 Merge pull request '[FEAT] (사용자 로직): 프로필 서비스 구현 완료' (#3) from dev_stage into dev
Reviewed-on: #3
2025-11-16 09:03:15 +00:00
abf405f8ae [FEAT] (사용자 로직): 프로필 서비스 구현 완료
v0.1.3 (2025-11-16)
- 프로필 조회, 프로필 업데이트, 탈퇴 구현 완료.
2025-11-16 18:02:27 +09:00
782d858acb Merge pull request 'dev' (#2) from dev into main
Reviewed-on: #2
2025-11-16 07:24:50 +00:00
141931a374 Merge pull request 'dev_stage' (#1) from dev_stage into dev
Reviewed-on: #1
2025-11-16 07:22:41 +00:00
e20c7d58b1 [FEAT] (사용자 로직) : 인증 서비스 구현 완료
v0.1.2 (2025-11-16)
- 로그인, 로그아웃, 토큰 갱신, 회원가입 API 구현 완료
- 로그 포맷 통일화
2025-11-16 16:20:45 +09:00
ae2766cff5 [FEAT] (사용자 로직): 인증 서비스 로직 구현 중
v0.1.1 (2025-11-13)
- 사용자 관련 인증 서비스 로직 구현 중
2025-11-13 17:29:22 +09:00
36 changed files with 2148 additions and 81 deletions

View File

@@ -25,14 +25,28 @@ import constants
def create_app(config: Optional[Dict[str, Any]] = None) -> Flask:
"""Flask 애플리케이션 팩토리"""
app = Flask(__name__)
app.config.from_object(settings)
if config:
app.config.update(config)
# Werkzeug 로거 커스터마이징 (타임스탬프 제거)
import logging
from werkzeug.serving import WSGIRequestHandler
# Werkzeug의 기본 로그 포맷 오버라이드
class CustomRequestHandler(WSGIRequestHandler):
def log(self, type, message, *args):
# 기본 타임스탬프를 제거하고 메시지만 로깅
logger = logging.getLogger('werkzeug')
logger.info(message % args if args else message)
# 커스텀 핸들러를 Flask 앱에 설정
app.request_handler_class = CustomRequestHandler
# 로깅 설정
logger = custom_logger(f"{settings.LOG_PREFIX}main_app")
@@ -67,14 +81,24 @@ def create_app(config: Optional[Dict[str, Any]] = None) -> Flask:
def main() -> None:
"""애플리케이션 실행"""
app = create_app()
# 로그 정보
from werkzeug.serving import WSGIRequestHandler
# Werkzeug의 기본 로그 포맷 오버라이드
class CustomRequestHandler(WSGIRequestHandler):
def log(self, type, message, *args):
import logging
# 기본 타임스탬프를 제거하고 메시지만 로깅
logger = logging.getLogger('werkzeug')
logger.info(message % args if args else message)
app.run(
host=settings.HOST,
port=settings.PORT,
debug=settings.DEBUG,
use_reloader=False
use_reloader=False,
request_handler=CustomRequestHandler
)
# Gunicorn이 import할 app 인스턴스

View File

@@ -6,9 +6,12 @@ Flask Blueprint 관리
from flask import Flask
def register_blueprints(app: Flask) -> None:
from utils.swagger_config import api_blueprint
app.register_blueprint(api_blueprint, url_prefix='/api/v1')
__all__ = ['register_blueprints']
# Swagger API Blueprint 등록
from utils.swagger_config import api_blueprint
app.register_blueprint(api_blueprint, url_prefix="/api/v1")
# Flask-RESTX namespace에 리소스 등록
from . import bp_auth, bp_users, bp_category
__all__ = ["register_blueprints"]

163
src/blueprints/bp_auth.py Normal file
View File

@@ -0,0 +1,163 @@
"""
인증 관련 API 엔드포인트
가입, 로그인, 로그아웃 등
"""
from flask import request
from flask_restx import Resource
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
from utils.response import *
from utils.swagger_config import auth_ns
from models.user_model import auth_swagger
from services.auth.auth_service import AuthService
@auth_ns.route("/register")
class Register(Resource):
@auth_ns.doc(
id="register",
summary="회원가입",
description="새로운 사용자를 등록합니다.",
tags=["Auth"]
)
@auth_ns.expect(auth_swagger.register_model)
@auth_ns.response(201, "Success - 회원가입 완료")
@auth_ns.response(400, "요청 변수가 올바르지 않습니다.")
@auth_ns.response(409, "이미 존재하는 ID")
@auth_ns.response(500, "서버 내부 오류")
def post(self):
"""회원가입"""
try:
# Content-Type 검증
if not request.is_json:
return bad_request_response(message="요청 변수가 올바르지 않습니다.")
request_data = request.get_json()
user = AuthService.register(request_data)
if user is None:
return conflict_response(message="이미 존재하는 ID입니다.")
return success_response(
data=user.to_dict(),
message="회원가입이 완료되었습니다.",
status_code=201
)
except Exception as e:
return internal_error_response(
message=str(e)
)
@auth_ns.route("/login")
class Login(Resource):
@auth_ns.doc(
id="login",
summary="로그인",
description="사용자 인증 후 JWT 토큰 발급",
tags=["Auth"]
)
@auth_ns.expect(auth_swagger.login_model)
@auth_ns.response(200, "Success - 로그인 성공")
@auth_ns.response(400, "요청 변수가 올바르지 않습니다.")
@auth_ns.response(409, "로그인 할 수 없는 사용자입니다.")
@auth_ns.response(500, "서버 내부 오류")
def post(self):
"""로그인"""
try:
# Content-Type 검증
if not request.is_json:
return bad_request_response(message="요청 변수가 올바르지 않습니다.")
request_data = request.get_json()
# 로그인 처리
data = AuthService.login(request_data)
if data is None:
return conflict_response(message="로그인 할 수 없는 사용자입니다.")
user = data[0]
access_token = data[1]
refresh_token = data[2]
# UserIP 저장
AuthService.save_user_ips(request, user)
return success_response(
data={
"user": user.to_dict(),
"access_token": access_token,
"refresh_token": refresh_token
},
message="로그인 성공"
)
except Exception as e:
return internal_error_response(
message=str(e)
)
@auth_ns.route("/refresh")
class RefreshToken(Resource):
@auth_ns.doc(
id="refresh_token",
summary="액세스 토큰 갱신",
description="Refresh Token으로 새로운 Access Token을 발급하고, 사용된 Refresh Token을 무효화합니다.",
tags=["Auth"],
security=[{"Bearer": []}]
)
@auth_ns.response(200, "Success - 토큰 갱신 성공")
@auth_ns.response(409, "토큰 갱신에 실패하였습니다.")
@auth_ns.response(500, "서버 내부 오류")
@jwt_required(refresh=True)
def post(self):
"""토큰 갱신"""
try:
# 현재 리프레시 토큰 정보 추출
jwt_payload = get_jwt()
current_user_uuid = get_jwt_identity()
refresh_jti = jwt_payload["jti"]
# 새로운 액세스 토큰 생성
access_token = AuthService.refresh_access_token(current_user_uuid)
if access_token is None:
return conflict_response(message="토큰 갱신에 실패하였습니다.")
# 사용된 리프레시 토큰을 블랙리스트에 추가 (재사용 방지)
AuthService.logout(refresh_jti)
return success_response(
data={"access_token": access_token},
message="토큰 갱신 성공"
)
except Exception as e:
return internal_error_response(
message=str(e)
)
@auth_ns.route("/logout")
class Logout(Resource):
@auth_ns.doc(
id="logout",
summary="로그아웃",
description="로그아웃하고 토큰을 블랙리스트에 추가하여 재사용을 방지합니다.",
tags=["Auth"],
security=[{"Bearer": []}]
)
@auth_ns.response(200, "Success")
@auth_ns.response(401, "Unauthorized")
@auth_ns.response(500, "서버 내부 오류")
@jwt_required()
def post(self):
"""로그아웃"""
# 현재 토큰의 JTI 추출
jwt_payload = get_jwt()
jti = jwt_payload["jti"]
# 토큰을 블랙리스트에 추가
AuthService.logout(jti)
return success_response(message="로그아웃 성공")

View File

@@ -0,0 +1,142 @@
"""
카테고리 관련 API 엔드포인트
"""
from flask import request, g
from flask_restx import Resource
from flask_jwt_extended import jwt_required
from utils.response import *
from utils.swagger_config import category_ns
from utils.decorators import load_current_user
from models.category_model import category_swagger
from services.category.category_service import CategoryService
@category_ns.route("/")
class Categories(Resource):
@category_ns.doc(
id="get_categories",
summary="카테고리 목록 조회",
description="카테고리 목록을 조회합니다.",
tags=["Category"],
security=[{"Bearer": []}]
)
@category_ns.response(200, "Success - 카테고리 목록 조회 완료")
@category_ns.response(401, "인증에 실패하였습니다.")
@category_ns.response(500, "서버 내부 오류")
@jwt_required()
def get(self):
"""카테고리 목록 조회"""
limit = request.args.get("limit", type=int)
offset = request.args.get("offset", type=int, default=0)
categories = CategoryService.get_all_categories(limit=limit, offset=offset)
return success_response(
data=[category.to_dict() for category in categories],
message="카테고리 목록 조회 성공"
)
@category_ns.doc(
id="create_category",
summary="카테고리 생성",
description="카테고리를 생성합니다.",
tags=["Category"],
security=[{"Bearer": []}]
)
@category_ns.expect(category_swagger.create_category)
@category_ns.response(201, "Success - 카테고리 생성 완료")
@category_ns.response(401, "인증에 실패하였습니다.")
@category_ns.response(409, "이미 존재하는 카테고리명")
@category_ns.response(500, "서버 내부 오류")
@jwt_required()
@load_current_user
def post(self):
"""카테고리 생성"""
request_data = request.get_json()
# 토큰에서 가져온 user_uuid를 request_data에 추가
user_uuid = g.current_user.user_uuid
category = CategoryService.create_category(request_data, user_uuid)
return success_response(
data=category.to_dict(),
message="카테고리 생성이 완료되었습니다",
status_code=201
)
@category_ns.route("/<string:category_uuid>")
class CategoryDetail(Resource):
@category_ns.doc(
id="get_category",
summary="카테고리 조회",
description="UUID로 특정 카테고리를 조회합니다.",
tags=["Category"],
security=[{"Bearer": []}],
params={"category_uuid": {"description": "카테고리 UUID", "type": "string"}}
)
@category_ns.response(201, "Success - 카테고리 조회 성공")
@category_ns.response(401, "인증에 실패하였습니다.")
@category_ns.response(409, "조회에 실패하였습니다.")
@category_ns.response(500, "서버 내부 오류")
@jwt_required()
def get(self, category_uuid):
"""카테고리 조회"""
category = CategoryService.get_category_by_uuid(category_uuid)
if not category:
return not_found_response("Category")
return success_response(
data=category.to_dict(),
message="카테고리 조회 성공"
)
@category_ns.doc(
id="update_category",
summary="카테고리 업데이트",
description="UUID로 특정 카테고리를 업데이트합니다.",
tags=["Category"],
security=[{"Bearer": []}],
params={"category_uuid": {"description": "카테고리 UUID", "type": "string"}}
)
@category_ns.expect(category_swagger.update_category)
@category_ns.response(200, "Success - 카테고리 업데이트 성공")
@category_ns.response(401, "인증에 실패하였습니다.")
@category_ns.response(409, "카테고리 업데이트에 실패하였습니다.")
@category_ns.response(500, "서버 내부 오류")
@jwt_required()
def put(self, category_uuid):
"""카테고리 업데이트"""
request_data = request.get_json()
category = CategoryService.update_category(category_uuid, request_data)
return success_response(
data=category.to_dict(),
message="카테고리 업데이트가 완료되었습니다",
status_code=200
)
@category_ns.doc(
id="delete_category",
summary="카테고리 삭제",
description="UUID로 특정 카테고리를 삭제합니다.",
tags=["Category"],
security=[{"Bearer": []}],
params={"category_uuid": {"description": "카테고리 UUID", "type": "string"}}
)
@category_ns.response(200, "Success - 카테고리 삭제 성공")
@category_ns.response(401, "인증에 실패하였습니다.")
@category_ns.response(409, "카테고리 삭제에 실패하였습니다.")
@category_ns.response(500, "서버 내부 오류")
@jwt_required()
def delete(self, category_uuid):
"""카테고리 삭제"""
CategoryService.delete_category(category_uuid)
return success_response(
message="카테고리 삭제가 완료되었습니다",
status_code=200
)

View File

@@ -0,0 +1,95 @@
"""
유저 관련 API 엔드포인트
"""
from flask import request, g
from flask_restx import Resource
from flask_jwt_extended import jwt_required, get_jwt
from utils.response import *
from utils.swagger_config import user_ns
from utils.decorators import load_current_user
from models.user_model import user_swagger
from services.user.user_service import UserService
@user_ns.route("/me")
class MyProfile(Resource):
@user_ns.doc(
id="get_my_profile",
summary="내 프로필 조회",
description="현재 로그인한 사용의 프로필을 조회합니다. (토큰 기반)",
tags=["Users"],
security=[{"Bearer": []}]
)
@user_ns.response(200, "Success - 프로필 조회 완료")
@user_ns.response(401, "인증 실패에 실패하였습니다.")
@user_ns.response(500, "서버 내부 오류")
@jwt_required()
@load_current_user
def get(self):
"""내 프로필 조회"""
return success_response(
data=g.current_user.to_dict(),
message="프로필 조회 성공"
)
@user_ns.doc(
id="update_my_profile",
summary="내 프로필 업데이트",
description="현재 로그인한 사용자의 프로필을 업데이트 합니다. (토큰 기반)",
tags=["Users"],
security=[{"Bearer": []}]
)
@user_ns.expect(user_swagger.update_profile_model)
@user_ns.response(200, "Success - 프로필 업데이트 완료")
@user_ns.response(400, "잘못된 요청 데이터")
@user_ns.response(401, "인증 실패에 실패하였습니다.")
@user_ns.response(500, "서버 내부 오류")
@jwt_required()
@load_current_user
def put(self):
"""내 프로필 업데이트"""
user = g.current_user
data = request.get_json()
updated_user = UserService.update_user_profile(user.user_uuid, data)
return success_response(
data=updated_user.to_dict(),
message="프로필 업데이트 성공"
)
@user_ns.doc(
id="delete_my_account",
summary="회원 탈퇴",
description="현재 로그인한 사용자의 계정을 삭제합니다. (토큰 기반). 탈퇴 후 모든 데이터가 삭제되며 복구할 수 없습니다.",
tags=["Users"],
security=[{"Bearer": []}]
)
@user_ns.response(200, "Success - 회원 탈퇴 완료")
@user_ns.response(401, "인증에 실패하였습니다.")
@user_ns.response(404, "사용자를 찾을 수 없습니다.")
@user_ns.response(500, "서버 내부 오류")
@jwt_required()
@load_current_user
def delete(self):
"""회원 탈퇴"""
user = g.current_user
# JWT ID 추출 (로그아웃 처리용)
jwt_data = get_jwt()
jti = jwt_data.get("jti")
# 사용자 삭제 (캐시 무효화 + JWT 블랙리스트 추가 포함)
success = UserService.delete_user(user.user_uuid, jti=jti)
if not success:
return error_response(
message="회원 탈퇴에 실패했습니다.",
status_code=404
)
return success_response(
message="회원 탈퇴가 완료되었습니다."
)

View File

@@ -1,6 +1,14 @@
# API INFO
API_VERSION = "v0.1"
API_DATE = 2025-11-13
API_VERSION = "v0.1.4"
API_DATE = "2025-11-17"
# BASE DATE FORMAT
DATE_FORMAT = "%Y-%m-%d %H:%M:%S %z"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S %z"
# 사용여부
IS_N = 0
IS_Y = 1
# 사용자 권한
AUTH_LEVEL_ADMIN = '10'
AUTH_LEVEL_USER = '20'

View File

@@ -12,7 +12,7 @@ from utils.logger_manager import *
from modules.redis.redis_facade import redis_cli
logger = custom_logger(f"{settings.LOG_PREFIX})extenstions")
logger = custom_logger(f"{settings.LOG_PREFIX}_extensions")
try:

View File

@@ -33,6 +33,55 @@ accesslog = settings.GUNICORN_ACCESSLOG
errorlog = settings.GUNICORN_ERRORLOG
loglevel = settings.GUNICORN_LOGLEVEL
# 로그 포맷 통일 (logger_manager.py와 동일한 포맷)
# %(s)s: 상태코드, %(m)s: HTTP메서드, %(U)s: URL경로, %(q)s: 쿼리스트링
# %(h)s: 클라이언트IP, %({User-Agent}i)s: User-Agent
access_log_format = '%(s)s %(m)s %(U)s%(q)s - IP:%(h)s Agent:"%({User-Agent}i)s"'
# Gunicorn error/worker 로그 포맷 설정
logconfig_dict = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '[%(asctime)s] [%(levelname)s] (gunicorn) %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S %z'
},
'access': {
'format': '[%(asctime)s] [%(levelname)s] (gunicorn.access) %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S %z'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'default',
'stream': 'ext://sys.stdout'
},
'access_console': {
'class': 'logging.StreamHandler',
'formatter': 'access',
'stream': 'ext://sys.stdout'
},
},
'root': {
'level': settings.GUNICORN_LOGLEVEL.upper(),
'handlers': ['console']
},
'loggers': {
'gunicorn.error': {
'level': settings.GUNICORN_LOGLEVEL.upper(),
'handlers': ['console'],
'propagate': False,
},
'gunicorn.access': {
'level': settings.GUNICORN_LOGLEVEL.upper(),
'handlers': ['access_console'],
'propagate': False,
},
}
}
# 프로세스 네임
proc_name = "nuriq_server"

View File

@@ -0,0 +1,16 @@
"""
Models 레이어
"""
# User
from .user_model import users, user_ips, user_dto, auth_swagger
# Category
from .category_model import category, category_dto, category_swagger
__all__ = [
# User
users, user_ips, user_dto, auth_swagger,
# Category
category, category_dto, category_swagger,
]

View File

@@ -0,0 +1,63 @@
"""
카테고리 모델
"""
import uuid
from typing import Dict, Any
from extensions import db
from utils.func import get_timestamp_ms
import constants
class Category(db.Model):
"""카테고리 모델"""
__tablename__ = "category"
# 기본 필드
category_uuid = db.Column(db.String(255), primary_key=True, nullable=False, comment="카타게로 UUID")
category_name = db.Column(db.String(255), unique=True, nullable=False, index=True, comment="카테고리명")
description = db.Column(db.String(500), nullable=True, comment="카테고리 설명")
order_no = db.Column(db.Integer, nullable=True, comment="표시 순서")
is_used = db.Column(db.SmallInteger, nullable=False, default=constants.IS_Y, comment="사용 여부 (0:미사용, 1:사용)")
# 편집자
created_by = db.Column(db.String(255), nullable=True, comment="생성자 UUID")
updated_by = db.Column(db.String(255), nullable=True, comment="수정자 UUID")
deleted_by = db.Column(db.String(255), nullable=True, comment="삭제자 UUID")
# 타임스탬프 (BIGINT - Unix Timestamp in milliseconds)
created_at = db.Column(db.BigInteger, nullable=True, comment="생성일")
updated_at = db.Column(db.BigInteger, nullable=True, comment="수정일")
deleted_at = db.Column(db.BigInteger, nullable=True, comment="삭제일")
def __init__(self, category_name: str, description: str = None, created_by: str = None):
"""카테고리 초기화"""
self.category_uuid = str(uuid.uuid4())
self.category_name = category_name
self.description = description
self.is_used = constants.IS_Y
self.created_by = created_by
self.created_at = int(get_timestamp_ms())
def update_timestamp(self) -> None:
"""타임스탬프 업데이트"""
self.updated_at = int(get_timestamp_ms())
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
"category_uuid": self.category_uuid,
"category_name": self.category_name,
"description": self.description,
"order_no": self.order_no,
"is_used": self.is_used,
"created_by": self.created_by,
"updated_by": self.updated_by,
"deleted_by": self.deleted_by,
"created_at": self.created_at,
"updated_at": self.updated_at,
"deleted_at": self.deleted_at,
}
def __repr__(self) -> str:
return f'<Category uuid={self.category_uuid}, category_name={self.category_name}, is_used={self.is_used}'

View File

@@ -0,0 +1,23 @@
from pydantic import BaseModel, field_validator
from typing import Optional
class CreateCategoryDTO(BaseModel):
"""카테고리 생성 DTO"""
# 필수
category_name: str
description: Optional[str] = None
class Config:
strict = True
class UpdateCategoryDTO(BaseModel):
"""카테고리 업데이트 DTO"""
category_name: Optional[str] = None
description: Optional[str] = None
class Config:
strict = True

View File

@@ -0,0 +1,16 @@
"""
카테고리 Swagger 모델 정의
"""
from utils.swagger_config import category_ns
from flask_restx import fields
create_category = category_ns.model("CreateCategory", {
"category_name": fields.String(required=True, description="카테고리명", example="AI"),
"description": fields.String(required=False, description="카테고리 설명", example="AI 관련 주제입니다.")
})
update_category = category_ns.model("UpdateCategory", {
"category_name": fields.String(required=False, description="카테고리명", example="AI"),
"description": fields.String(required=False, description="카테고리 설명", example="AI 관련 주제입니다.")
})

View File

@@ -0,0 +1,20 @@
"""
인증관련 Swagger 모델
"""
from utils.swagger_config import auth_ns
from flask_restx import fields
# Swagger 모델 정의
register_model = auth_ns.model("Register", {
"id": fields.String(required=True, description="사용자 ID", example="user123"),
"password": fields.String(required=True, description="비밀번호 (최소 6자)", example="password123"),
"user_name": fields.String(required=False, description="사용자 이름", example="홍길동")
})
login_model = auth_ns.model("Login", {
"id": fields.String(required=True, description="사용자 ID", example="user123"),
"password": fields.String(required=True, description="비밀번호", example="password123")
})
__all__ = ["register_model", "login_model"]

View File

@@ -0,0 +1,63 @@
from pydantic import BaseModel, field_validator
from typing import Optional
class CreateUserDTO(BaseModel):
"""회원 가입 DTO"""
# 필수
id: str
password: str
# 선택
user_name: Optional[str] = None
@field_validator("id")
@classmethod
def validate_id(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError("ID는 필수입니다")
if len(v) < 3 or len(v) > 50:
raise ValueError("ID는 3-50자 사이여야 합니다")
return v.strip()
@field_validator("password")
@classmethod
def validate_password(cls, v):
if not v or len(v) < 6:
raise ValueError("비밀번호는 최소 6자 이상이어야 합니다")
return v
# 자동 형변환 방지
class Config:
strict = True
class CheckUserDTO(BaseModel):
"""로그인 확인 DTO"""
# 필수
id: str
password: str
@field_validator("id", "password")
@classmethod
def validate_required(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError("필수 항목입니다")
return v.strip()
# 자동 형변환 방지
class Config:
strict = True
class CreateUserIpsDTO(BaseModel):
"""사용자 IP 저장 DTO"""
# 필수
user_uuid: str
user_ip: str
user_agent: str
# 자동 형변환 방지
class Config:
strict = True

View File

@@ -0,0 +1,50 @@
"""
사용자 IP 모델
"""
from extensions import db
from typing import Dict, Any
from utils.func import get_timestamp_ms
class UserIps(db.Model):
"""사용자 IP 모델"""
__tablename__ = "user_ips"
__table_args__ = (
db.PrimaryKeyConstraint("user_uuid", "user_ip", name="pk_user_ips"),
)
# 기본 필드
user_uuid = db.Column(db.String(255), nullable=False, index=True, comment="유저 UUID")
user_ip = db.Column(db.String(255), nullable=False, index=True, comment="유저 IP")
user_agent = db.Column(db.String(255), nullable=False, comment="유저 에이전트")
# 타임스탬프 (BIGINT - Unix Timestamp in milliseconds)
created_at = db.Column(db.BigInteger, nullable=True, comment="생성일 (unix timestamp, ms)")
lasted_at = db.Column(db.BigInteger, nullable=True, comment="마지막 접속일 (unix timestamp, ms)")
def __init__(self, user_uuid: str, user_ip: str, user_agent: str) -> None:
"""사용자 IP 초기화"""
self.user_uuid = user_uuid
self.user_ip = user_ip
self.user_agent = user_agent
# 타임스탬프
current_time = int(get_timestamp_ms())
self.created_at = current_time
self.lasted_at = current_time
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리 변환"""
return {
"user_uuid": self.user_uuid,
"user_ip": self.user_ip,
"user_agent": self.user_agent,
"created_at": self.created_at,
"lasted_at": self.lasted_at
}
def __repr__(self) -> str:
"""디버깅, 로깅용"""
return f"<UserIps user={self.user_uuid[:8]}, ip={self.user_ip}"

View File

@@ -0,0 +1,11 @@
"""
유저 관련 Swagger 모델
"""
from utils.swagger_config import user_ns
from flask_restx import fields
# Swagger 모델 정의
update_profile_model = user_ns.model("UpdateProfile", {
"user_name": fields.String(required=False, description="사용자 이름", example="홍길동")
})

View File

@@ -0,0 +1,70 @@
"""
사용자 모델
"""
import uuid
from typing import Dict, Any
from extensions import db
from utils.func import get_timestamp_ms
import constants
class Users(db.Model):
"""사용자 모델"""
__tablename__ = "users"
# 기본 필드
user_uuid = db.Column(db.String(255), primary_key=True, nullable=False, comment="유저 UUID")
id = db.Column(db.String(255), unique=True, nullable=False, index=True, comment="유저 ID")
user_name = db.Column(db.String(50), nullable=True, comment="유저명")
password = db.Column(db.String(255), nullable=False, comment="비밀번호")
auth_level = db.Column(db.String(255), nullable=True, default=constants.AUTH_LEVEL_USER, comment="권한 레벨 (10: 관리자, 20: 사용자)")
count = db.Column(db.SmallInteger, default=0, nullable=True, comment="로그인 싫패 횟수")
# 타임스탬프 (BIGINT - Unix Timestamp in milliseconds)
created_at = db.Column(db.BigInteger, nullable=True, comment="생성일 (unix timestamp, ms)")
updated_at = db.Column(db.BigInteger, nullable=True, comment="수정일 (unix timestamp, ms)")
def __init__(self, id: str, hash_password: str, **kwargs: Any) -> None:
"""사용자 초기화"""
self.user_uuid = str(uuid.uuid4())
self.id = id
self.password = hash_password
# 선택적 필드
self.user_name = kwargs.get("user_name")
# 타임스탬프
current_time = int(get_timestamp_ms())
self.created_at = current_time
self.updated_at = current_time
def increment_failed_login(self) -> None:
"""로그인 실패 횟수 증가"""
self.count = (self.count or 0) + 1
self.updated_at = int(get_timestamp_ms())
def reset_failed_login(self) -> None:
"""로그인 실패 횟수 초기화"""
self.count = 0
self.updated_at = int(get_timestamp_ms())
def update_timestamp(self) -> None:
"""타임스탬프 업데이트"""
self.updated_at = int(get_timestamp_ms())
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return {
"user_uuid": self.user_uuid,
"id": self.id,
"user_name": self.user_name,
"auth_level": self.auth_level,
"count": self.count,
"created_at": self.created_at,
"updated_at": self.updated_at
}
def __repr__(self) -> str:
"""디버깅, 로깅용"""
return f"<User {self.id}>"

View File

@@ -13,11 +13,11 @@ class RedisFacadeOpr():
def __init__(self) -> None:
"""Redis 설정 초기화"""
self.redis_config = {
'host': settings.REDIS_HOST,
'port': settings.REDIS_PORT,
'password': settings.REDIS_PASSWORD,
'decode_responses': True, # auto-decode response
'db': settings.REDIS_DB
"host": settings.REDIS_HOST,
"port": settings.REDIS_PORT,
"password": settings.REDIS_PASSWORD,
"decode_responses": True, # auto-decode response
"db": settings.REDIS_DB
}
self.redis_client: Optional[redis.StrictRedis] = None
@@ -28,6 +28,11 @@ class RedisFacadeOpr():
Returns:
bool: 연결 성공 시 True, 실패 시 False
"""
# 이미 연결되어 있다면 재연결하지 않음
if self.is_connected():
logging.debug("Redis already connected, skipping reconnection")
return True
try:
# Redis Client 연결
self.redis_client = redis.StrictRedis(**self.redis_config)

View File

@@ -0,0 +1,22 @@
"""
Repository 레이어
"""
# Base
from .base_repository import BaseRepository
# User
from .user.user_repository import UserRepository
from .user.user_ips_repository import UserIpsRepository
# Category
from .category.category_repository import CategoryRepository
__all__ = [
# Base
BaseRepository,
# User
UserRepository, UserIpsRepository,
# Category
CategoryRepository,
]

View File

@@ -0,0 +1,88 @@
"""
기본 레파지토리 추상 클래스
"""
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Optional, List, Type
from sqlalchemy.exc import SQLAlchemyError
from extensions import db, custom_logger
import settings
# Generic type for model entitles
T = TypeVar("T")
logger = custom_logger(f"{settings.LOG_PREFIX}_repository")
class BaseRepository(ABC, Generic[T]):
"""
기본직인 CRUD 제공
"""
@property
@abstractmethod
def model(self) -> Type[T]:
"""레파지토리 반환"""
pass
@property
def session(self):
"""현재 DB 세션 가져오기"""
return db.session
def create(self, entity: T) -> T:
"""엔티티 생성"""
try:
self.session.add(entity)
self.session.flush()
logger.debug(f"{self.model.__name__} 생성완료: {getattr(entity, 'id', 'unknown')}")
return entity
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 생성 실패: {str(e)}")
raise
def update(self, entity) -> T:
"""엔티티 수정"""
try:
self.session.add(entity)
self.session.flush()
logger.debug(f"{self.model.__name__} 수정완료: {getattr(entity, 'id', 'unknown')}")
return entity
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 수정 실패: {str(e)}")
raise
def delete(self, entity: T) -> bool:
"""엔티티 삭제"""
try:
self.session.delete(entity)
self.session.flush()
logger.debug(f"{self.model.__name__} 삭제완료: {getattr(entity, 'id', 'unknown')}")
return True
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 삭제 실패: {str(e)}")
raise
def count(self) -> int:
"""엔티티 개수 반환"""
try:
return self.session.query(self.model).count()
except SQLAlchemyError as e:
logger.error(f"{self.model.__name__} 개수 조회 실패: {str(e)}")
raise
def find_all(self, limit: Optional[int] = None, offset: int = 0) -> List[T]:
"""Find all entities with optional pagination."""
try:
query = self.session.query(self.model)
if offset > 0:
query = query.offset(offset)
if limit:
query = query.limit(limit)
return query.all()
except SQLAlchemyError as e:
logger.error(f"Error finding all {self.model.__name__}: {e}")
raise

View File

@@ -0,0 +1,36 @@
"""
카테고리 Repository
"""
from typing import Optional, Type
from repositories.base_repository import BaseRepository
from models.category_model.category import Category
class CategoryRepository(BaseRepository[Category]):
"""카테고리 Repository"""
@property
def model(self) -> Type[Category]:
"""모델 반환"""
return Category
def find_by_uuid(self, category_uuid: str) -> Optional[Category]:
"""UUID로 조회"""
return self.session.query(Category).filter_by(category_uuid=category_uuid).one_or_none()
def find_by_name(self, category_name: str) -> Optional[Category]:
"""카테고리명으로 조회"""
return self.session.query(Category).filter_by(category_name=category_name).one_or_none()
def exists_by_name(self, category_name: str) -> bool:
"""카테고리명으로 카테고리 존재 여부 확인"""
return self.session.query(
self.session.query(Category).filter_by(category_name=category_name).exists()
).scalar()
def exists_by_uuid(self, category_uuid: str) -> bool:
"""UUID로 카테고리 존재 여부 확인"""
return self.session.query(
self.session.query(Category).filter_by(category_uuid=category_uuid).exists()
).scalar()

View File

@@ -0,0 +1,41 @@
"""
사용자 IPs Repository
"""
from typing import Optional, Type
from..base_repository import BaseRepository
from models.user_model.users import Users
from models.user_model.user_ips import UserIps
from utils.func import get_timestamp_ms
class UserIpsRepository(BaseRepository[UserIps]):
"""
사용자 IPs Repository
"""
@property
def model(self) -> Type[UserIps]:
"""모델 클래스 반환"""
return UserIps
def find_by_user_and_ip(self, user: Users, user_ip: str) -> Optional[UserIps]:
"""사용자 UUID와 IP로 조회"""
return self.session.query(UserIps).filter(
UserIps.user_uuid == user.user_uuid,
UserIps.user_ip == user_ip
).one_or_none()
def update_last_access(self, user_uuid: str, user_ip: str) -> UserIps:
"""사용자 IP 접속 시간 업데이트"""
user_ip = self.session.query(UserIps).filter_by(
user_uuid=user_uuid,
user_ip=user_ip
).one_or_none()
if user_ip:
user_ip.lasted_at = get_timestamp_ms()
self.session.flush()
return user_ip

View File

@@ -0,0 +1,33 @@
"""
사용자 Repository
"""
from typing import Optional, Type
from ..base_repository import BaseRepository
from models.user_model import users
from utils.func import get_timestamp_ms
class UserRepository(BaseRepository[users.Users]):
"""
사용자 Repository
"""
@property
def model(self) -> Type[users.Users]:
"""모델 클래스 반환"""
return users.Users
def find_by_uuid(self, user_uuid: str) -> Optional[users.Users]:
"""UUID로 사용자 조회"""
return self.session.query(users.Users).filter_by(user_uuid=user_uuid).one_or_none()
def find_by_id(self, user_id: str) -> Optional[users.Users]:
"""ID로 사용자 조회"""
return self.session.query(users.Users).filter_by(id=user_id).one_or_none()
def exists_by_id(self, user_id: str) -> bool:
"""ID로 사용자 존재 여부 확인"""
return self.session.query(
self.session.query(users.Users).filter_by(id=user_id).exists()
).scalar()

View File

@@ -0,0 +1,20 @@
"""
서비스 레이어 패키지
"""
from services.auth.auth_service import AuthService
from services.user.user_service import UserService
from services.auth.cache_service import (
CacheService,
UserCacheService,
JWTBlacklistService
)
__all__ = [
"AuthService",
"UserService",
"CacheService",
"UserCacheService",
"JWTBlacklistService",
"AccountLockPolicy"
]

View File

@@ -0,0 +1,151 @@
"""
인증 서비스
"""
from typing import Dict, Any, Tuple, Optional
from flask_jwt_extended import create_access_token, create_refresh_token
from models.user_model.users import Users
from models.user_model.user_ips import UserIps
from models.user_model import user_dto
from repositories import UserRepository, UserIpsRepository
import settings
from utils import func
from utils.response import *
from utils.account_lock_policy import AccountLockPolicy
from utils.db_decorators import transactional
from services.auth.cache_service import JWTBlacklistService
from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_auth_service")
class AuthService:
"""
인증 서비스 - 회원가입, 로그인, 로그아웃 비즈니스 로직
"""
@staticmethod
@transactional
def register(user_data: Dict[str, Any]) -> Optional[Users]:
"""회원 가입"""
# Pydantic 검증
create_user_dto = user_dto.CreateUserDTO(**user_data)
user_id = create_user_dto.id
password = create_user_dto.password
user_name = create_user_dto.user_name
user_repo = UserRepository()
# 중복 검사
if user_repo.exists_by_id(user_id):
return None
# 비밀번호 생성
hash_password = func.hash_password(password)
# 사용자 생성
user = Users(
id=user_id,
hash_password=hash_password,
user_name=user_name
)
user = user_repo.create(user)
logger.info(f"유저가 생성되었습니다: {user_id}")
return user
@staticmethod
@transactional
def login(credentials: Dict[str, Any]) -> Optional[Tuple[Users, str, str]]:
# Pydantic 검증
check_user_dto = user_dto.CheckUserDTO(**credentials)
user_repo = UserRepository()
# 사용자 조회
user = user_repo.find_by_id(check_user_dto.id)
if not user:
return None
# 계정 잠금 정책 체크
lock_policy = AccountLockPolicy(
max_attempts=settings.MAX_LOGIN_ATTEMPTS,
lockout_duration_minutes=settings.ACCOUNT_LOCKOUT_DURATION_MINUTES
)
if lock_policy.is_locked(user.count):
return None
# 비밀번호 검증
if not func.verify_password(check_user_dto.password, user.password):
user.increment_failed_login()
user_repo.update(user)
return None
# 로그인 성공 - 실패 횟수 초기화
user.reset_failed_login()
user_repo.update(user)
# 토큰 생성
access_token = create_access_token(identity=user.user_uuid)
refresh_token = create_refresh_token(identity=user.user_uuid)
# Redis 사용자 정보 캐싱 로직
logger.info(f"사용자 로그인: {user.id}")
return (user, access_token, refresh_token)
@staticmethod
def refresh_access_token(user_uuid: str) -> str:
try:
"""엑세스 토큰 갱신"""
access_token = create_access_token(identity=user_uuid)
logger.debug(f"Access Token 재발급: {user_uuid}")
return access_token
except Exception as e:
logger.exception(f"유저 ({user_uuid}) 엑세스 토큰 갱신에 실패하였습니다. error={str(e)}")
return None
@staticmethod
def logout(jti: str) -> bool:
"""로그아웃 (JWT 블랙리스트 추가)"""
success = JWTBlacklistService.add(jti)
if success:
logger.info(f"로그아웃 처리 완료 - JTI 블랙리스트 추가: {jti}")
return success
@staticmethod
@transactional
def save_user_ips(request, user: Users) -> None:
"""사용자 IPs 저장"""
user_ips_repo = UserIpsRepository()
# IP 및 User-Agent 추출
ip_address = request.headers.get("X-Real-IP", "") if request.headers else ""
user_agent = request.headers.get("User-Agent", "") if request.headers else ""
# 기존 IP 기록 조회 로직
existing_ip = user_ips_repo.find_by_user_and_ip(user, ip_address)
if existing_ip:
# 존재하면 마지막 접속 시간 업데이트
user_ips_repo.update_last_access(user.user_uuid, ip_address)
else:
# 없으면 새로 생성
user_ips = UserIps(
user_uuid=user.user_uuid,
user_ip=ip_address,
user_agent=user_agent
)
user_ips_repo.create(user_ips)
logger.debug(f"유저 IP 저장: {user.id} - {ip_address}")

View File

@@ -0,0 +1,210 @@
"""
캐시 서비스
Redis 기반 캐싱 로직 관리하는 서비스 레이어
"""
import json
from typing import Optional, Dict, Any
from extensions import redis_cli
import settings
from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_auth_service")
class CacheService:
@staticmethod
def _is_redis_available() -> bool:
"""Redis 연결 상태 확인"""
return redis_cli.is_connected()
@staticmethod
def set(key: str, value: Any, ttl: int) -> bool:
"""Redis에 데이터 저장"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 등록 로직을 ㄴ생략합니다.")
return False
try:
if isinstance(value, dict):
value = json.dumps(value, ensure_ascii=False)
redis_cli.redis_client.setex(key, ttl, value)
logger.debug(f"캐시 저장: {key} (TTL: {ttl}s)")
return True
except Exception as e:
logger.error(f"캐시 저장 실패: {key} - {str(e)}")
return False
@staticmethod
def get(key: str, parse_json: bool = True) -> Optional[Any]:
"""Redis에서 데이터 조회"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 조회 로직을 생략합니다.")
return None
try:
value = redis_cli.redis_client.get(key)
if value:
logger.debug(f"Cache hit: {key}")
if parse_json:
return json.loads(value)
return value
logger.debug(f"Cache miss: {key}")
return None
except Exception as e:
logger.error(f"Failed to get cache {key}: {str(e)}")
return None
@staticmethod
def delete(key: str) -> bool:
"""Redis에서 데이터 삭제
Args:
key: Redis 키
Returns:
bool: 성공 여부
"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 삭제가 불가능 합니다.")
return False
try:
redis_cli.redis_client.delete(key)
logger.debug(f"Cache deleted: {key}")
return True
except Exception as e:
logger.error(f"Failed to delete cache {key}: {str(e)}")
return False
@staticmethod
def exists(key: str) -> bool:
"""Redis에 키가 존재하는지 확인"""
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 조회가 불가능 합니다.")
return False
try:
return redis_cli.redis_client.exists(key) > 0
except Exception as e:
logger.error(f"Failed to check cache existence {key}: {str(e)}")
return False
class UserCacheService:
"""사용자 캐시 관리 서비스"""
PREFIX = "user:cache:"
TTL = settings.REDIS_USER_CACHE_TTL
@classmethod
def _get_key(cls, user_uuid: str) -> str:
"""캐시 키 생성"""
return f"{cls.PREFIX}{user_uuid}"
@classmethod
def set(cls, user_uuid: str, user_data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""사용자 정보를 Redis에 캐싱"""
key = cls._get_key(user_uuid)
ttl = ttl or cls.TTL
return CacheService.set(key, user_data, ttl)
@classmethod
def get(cls, user_uuid: str) -> Optional[Dict[str, Any]]:
"""Redis에서 사용자 정보 조회"""
key = cls._get_key(user_uuid)
return CacheService.get(key, parse_json=True)
@classmethod
def delete(cls, user_uuid: str) -> bool:
"""캐시에서 사용자 정보 삭제"""
key = cls._get_key(user_uuid)
return CacheService.delete(key)
@classmethod
def refresh(cls, user_uuid: str, user_data: Dict[str, Any]) -> bool:
"""캐시 갱신 (삭제 후 재설정)"""
cls.delete(user_uuid)
return cls.set(user_uuid, user_data)
class JWTBlacklistService:
"""JWT 블랙리스트 관리 서비스 (로그아웃된 토큰 추적)"""
PREFIX = "jwt:blacklist:"
TTL = settings.REDIS_JWT_BLACKLIST_TTL
@classmethod
def _get_key(cls, jti: str) -> str:
"""블랙리스트 키 생성"""
return f"{cls.PREFIX}{jti}"
@classmethod
def add(cls, jti: str, ttl: Optional[int] = None) -> bool:
"""블랙리스트에 토큰 추가"""
key = cls._get_key(jti)
ttl = ttl or cls.TTL
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 블랙리스트에 추가가 블가능합니다.")
return False
success = CacheService.set(key, "1", ttl)
if success:
logger.info(f"JWT 블랙리스트 추가완료: {jti}")
return success
@classmethod
def is_blacklisted(cls, jti: str) -> bool:
"""토큰이 블랙리스트에 있는지 확인"""
key = cls._get_key(jti)
return CacheService.exists(key)
class AccountLockService:
"""계정 잠금 관리 서비스 (Redis 기반)"""
PREFIX = "account:lock:"
TTL = settings.ACCOUNT_LOCKOUT_DURATION_MINUTES * 60 # 분을 초로 변환
@classmethod
def _get_key(cls, user_uuid: str) -> str:
"""잠금 키 생성"""
return f"{cls.PREFIX}{user_uuid}"
@classmethod
def lock(cls, user_uuid: str, ttl: Optional[int] = None) -> bool:
"""계정 잠금 (Redis에 TTL과 함께 저장)"""
key = cls._get_key(user_uuid)
ttl = ttl or cls.TTL
if not CacheService._is_redis_available():
logger.warning("레디스가 연결되어 있지 않습니다. 계정 잠금이 불가능합니다.")
return False
success = CacheService.set(key, "1", ttl)
if success:
logger.info(f"계정 잠금 완료: {user_uuid} (TTL: {ttl}s)")
return success
@classmethod
def is_locked(cls, user_uuid: str) -> bool:
"""계정이 잠겨있는지 확인"""
key = cls._get_key(user_uuid)
return CacheService.exists(key)
@classmethod
def unlock(cls, user_uuid: str) -> bool:
"""계정 잠금 해제"""
key = cls._get_key(user_uuid)
success = CacheService.delete(key)
if success:
logger.info(f"계정 잠금 해제 완료: {user_uuid}")
return success

View File

@@ -0,0 +1,122 @@
"""
카테고리 서비스
"""
from typing import Dict, Any, Optional, List
from models.category_model.category import Category
from models.category_model.category_dto import CreateCategoryDTO
from repositories import CategoryRepository
from utils.db_decorators import transactional
from extensions import custom_logger
import settings
import constants
logger = custom_logger(f"{settings.LOG_PREFIX}_category_service")
class CategoryService:
"""
카테고리 서비스
"""
@transactional
def create_category(category_data: Dict[str, Any], user_uuid: str) -> Optional[Category]:
"""카테고리 생성"""
create_category_dto = CreateCategoryDTO(**category_data)
category_repo = CategoryRepository()
# 중복 검사
if category_repo.exists_by_name(create_category_dto.category_name):
return None
# 카테고리 생성
category = Category(
category_name=create_category_dto.category_name,
description=create_category_dto.description,
created_by=user_uuid
)
category = category_repo.create(category)
logger.info(f"카테고리 생성 완료: {category.category_uuid}")
return category
def get_category_by_uuid(category_uuid: str) -> Optional[Category]:
"""UUID로 카테고리 조회"""
category_repo = CategoryRepository()
return category_repo.find_by_uuid(category_uuid)
def get_category_by_name(category_name: str) -> Optional[Category]:
"""카테고리명으로 카테고리 조회"""
category_repo = CategoryRepository()
return category_repo.find_by_name(category_name)
def get_all_categories(limit: Optional[int] = None, offset: int = 0) -> List[Category]:
"""모든 카테고리 조회"""
category_repo = CategoryRepository()
return category_repo.find_all(limit=limit, offset=offset)
@transactional
def update_category(category_uuid: str, update_data: Dict[str, Any]) -> Category:
"""카테고리 업데이트"""
category_repo = CategoryRepository()
# 카테고리 조회
category = category_repo.find_by_uuid(category_uuid)
if not category:
return None
# 이름 변경 시 중복 검사
if 'category_name' in update_data and update_data['category_name'] != category.category_name:
if category_repo.exists_by_name(update_data['category_name']):
return None
category.category_name = update_data['category_name']
# 설명 업데이트
if 'description' in update_data:
category.description = update_data['description']
# 타임스탬프 업데이트
if hasattr(category, 'update_timestamp'):
category.update_timestamp()
category = category_repo.update(category)
logger.info(f"Category updated: {category.category_name} ({category_uuid})")
return category
@transactional
def delete_category(category_uuid: str) -> bool:
"""카테고리 삭제"""
category_repo = CategoryRepository()
# 카테고리 조회
category = category_repo.find_by_uuid(category_uuid)
if not category:
return None
category.is_used = constants.IS_N
# 삭제
success = category_repo.update(category)
if success:
logger.info(f"Category deleted: {category.category_name} ({category_uuid})")
return success
def count_categories() -> int:
"""카테고리 총 개수 조회"""
category_repo = CategoryRepository()
return category_repo.count()
def exists_by_name(category_name: str) -> bool:
"""카테고리명 존재 여부 확인"""
category_repo = CategoryRepository()
return category_repo.exists_by_name(category_name)

View File

@@ -0,0 +1,111 @@
"""
사용자 서비스
사용자 관리 관련 비즈니스 로직을 관리하는 서비스 레이어
"""
from typing import Dict, Any, Optional
from types import SimpleNamespace
from models.user_model.users import Users
from repositories import UserRepository
from services.auth.cache_service import UserCacheService, JWTBlacklistService
from utils.db_decorators import transactional
from extensions import custom_logger
import settings
logger = custom_logger(f"{settings.LOG_PREFIX}_user_service")
class UserService:
"""
사용자 서비스
"""
@staticmethod
def get_user_by_uuid(user_uuid: str, use_cache: bool = True) -> Optional[Users]:
"""UUID로 사용자 조회 (캐시 우선)"""
if use_cache:
# 캐시 조회
cached_user_data = UserCacheService.get(user_uuid)
if cached_user_data:
user = SimpleNamespace(**cached_user_data)
user.to_dict = lambda: cached_user_data
logger.debug(f"캐시에서 사용자 조회 성공: {user_uuid}")
return user
# DB 조회
user_repo = UserRepository()
user = user_repo.find_by_uuid(user_uuid)
if user and use_cache:
# 캐시 저장
UserCacheService.set(user_uuid, user.to_dict())
logger.debug(f"DB에서 사용자 조회 후 캐시에 저장: {user_uuid}")
return user
@staticmethod
def get_user_by_id(user_ud: str) -> Optional[Users]:
"""ID로 사용자 조회"""
user_repo = UserRepository()
return user_repo.get_by_id(user_ud)
@staticmethod
@transactional
def update_user_profile(user_uuid: str, update_data: Dict[str, Any]) -> Users:
"""사용자 프로필 업데이트"""
user_repo = UserRepository()
# 사용자 조회
user = user_repo.find_by_uuid(user_uuid)
if not user:
return None
# 업데이트 가능한 필드만 처리
if "user_name" in update_data:
user.user_name = update_data["user_name"]
user = user_repo.update(user)
# 캐시 갱신
UserCacheService.refresh(user_uuid, user.to_dict())
logger.info(f"사용자 프로필이 업데이트되었습니다: {user_uuid}")
return user
@staticmethod
def invalidate_user_cache(user_uuid: str) -> bool:
"""사용자 캐시 무효화"""
success = UserCacheService.delete(user_uuid)
if success:
logger.debug(f"사용자 캐시가 무효화되었습니다: {user_uuid}")
return success
@staticmethod
@transactional
def delete_user(user_uuid: str, jti: Optional[str] = None) -> bool:
"""사용자 삭제 (회원 탈퇴)"""
user_repo = UserRepository()
user = user_repo.find_by_uuid(user_uuid)
if not user:
logger.warning(f"삭제할 사용자를 DB에서 찾을 수 없음: {user_uuid}")
UserCacheService.delete(user_uuid)
return False
# 삭제
success = user_repo.delete(user)
if success:
# 캐시 무효화
UserCacheService.delete(user_uuid)
# JWT 블랙리스트 추가 (현재 토큰 무효화)
if jti:
JWTBlacklistService.add(jti)
logger.debug(f"JWT 블랙리스트 추가: {jti}")
logger.info(f"사용자가 삭제되었습니다: {user_uuid}")
return success

View File

@@ -13,68 +13,76 @@ import multiprocessing
load_dotenv()
# 환경설정
DEBUG = os.getenv('DEBUG', False)
DEBUG = os.getenv("DEBUG", False)
# 서버
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', '5000'))
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "5000"))
# 데이터베이스 설정
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5000))
DB_USER = os.getenv('DB_USER', 'root')
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
DB_NAME = os.getenv('DB_NAME', 'test')
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 5000))
DB_USER = os.getenv("DB_USER", "root")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
DB_NAME = os.getenv("DB_NAME", "test")
SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4"
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ECHO = DEBUG
SQLALCHEMY_POOL_SIZE = int(os.getenv('DB_POOL_SIZE', 10))
SQLALCHEMY_MAX_OVERFLOW = int(os.getenv('DB_MAX_OVERFLOW', 20))
SQLALCHEMY_POOL_RECYCLE = int(os.getenv('DB_POOL_RECYCLE', 3600))
SQLALCHEMY_ECHO = False
SQLALCHEMY_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", 10))
SQLALCHEMY_MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", 20))
SQLALCHEMY_POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", 3600))
JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'your-secret-key-change-this-in-production')
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=int(os.getenv('JWT_ACCESS_TOKEN_HOURS', 24)))
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=int(os.getenv('JWT_REFRESH_TOKEN_DAYS', 30)))
JWT_TOKEN_LOCATION = ['headers']
JWT_HEADER_NAME = 'Authorization'
JWT_HEADER_TYPE = 'Bearer'
JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this-in-production")
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=int(os.getenv("JWT_ACCESS_TOKEN_HOURS", 24)))
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=int(os.getenv("JWT_REFRESH_TOKEN_DAYS", 30)))
JWT_TOKEN_LOCATION = ["headers"]
JWT_HEADER_NAME = "Authorization"
JWT_HEADER_TYPE = "Bearer"
JWT_ENCODE_JTI = True # JWT ID 포함 (블랙리스트 추적용)
# CORS 설정
CORS_ORIGINS = os.getenv('CORS_ORIGINS', '*').split(',')
CORS_ALLOW_HEADERS = ['Content-Type', 'Authorization']
CORS_METHODS = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS']
CORS_ORIGINS = os.getenv("CORS_ORIGINS", "*").split(",")
CORS_ALLOW_HEADERS = ["Content-Type", "Authorization"]
CORS_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
# Redis 설정
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
REDIS_DB = int(os.getenv('REDIS_DB', 0))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
# 로깅 설정
LOG_DIR = os.getenv('LOG_DIR', 'logs')
LOG_PREFIX = os.getenv('LOG_PREFIX', 'default.')
LOG_LEVEL_TEXT = os.getenv('LOG_LEVEL_TEXT', 'INFO' if not DEBUG else 'DEBUG')
MAX_LOG_SIZE = int(os.getenv('MAX_LOG_SIZE', 10 * 1024 * 1024)) # 10MB
LOG_BACKUP_COUNT = int(os.getenv('LOG_BACKUP_COUNT', 5))
LOG_DIR = os.getenv("LOG_DIR", "logs")
LOG_PREFIX = os.getenv("LOG_PREFIX", "default.")
LOG_LEVEL_TEXT = os.getenv("LOG_LEVEL_TEXT", "INFO" if not DEBUG else "DEBUG")
MAX_LOG_SIZE = int(os.getenv("MAX_LOG_SIZE", 10 * 1024 * 1024)) # 10MB
LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", 5))
# Worker 설정
GUNICORN_WORKERS = int(os.getenv('GUNICORN_WORKERS', multiprocessing.cpu_count() * 2 + 1))
GUNICORN_WORKER_CLASS = os.getenv('GUNICORN_WORKER_CLASS', 'gevent') # gevent for async I/O
GUNICORN_WORKER_CONNECTIONS = int(os.getenv('GUNICORN_WORKER_CONNECTIONS', 1000))
GUNICORN_THREADS = int(os.getenv('GUNICORN_THREADS', 1)) # gevent 사용 시 무시됨
GUNICORN_WORKERS = int(os.getenv("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
GUNICORN_WORKER_CLASS = os.getenv("GUNICORN_WORKER_CLASS", "gevent") # gevent for async I/O
GUNICORN_WORKER_CONNECTIONS = int(os.getenv("GUNICORN_WORKER_CONNECTIONS", 1000))
GUNICORN_THREADS = int(os.getenv("GUNICORN_THREADS", 1)) # gevent 사용 시 무시됨
# 타임아웃 설정
GUNICORN_TIMEOUT = int(os.getenv('GUNICORN_TIMEOUT', 30)) # 요청 타임아웃 (초)
GUNICORN_KEEPALIVE = int(os.getenv('GUNICORN_KEEPALIVE', 5)) # Keep-Alive 연결 유지 시간
GUNICORN_GRACEFUL_TIMEOUT = int(os.getenv('GUNICORN_GRACEFUL_TIMEOUT', 30)) # Graceful shutdown 대기 시간
GUNICORN_TIMEOUT = int(os.getenv("GUNICORN_TIMEOUT", 30)) # 요청 타임아웃 (초)
GUNICORN_KEEPALIVE = int(os.getenv("GUNICORN_KEEPALIVE", 5)) # Keep-Alive 연결 유지 시간
GUNICORN_GRACEFUL_TIMEOUT = int(os.getenv("GUNICORN_GRACEFUL_TIMEOUT", 30)) # Graceful shutdown 대기 시간
# 재시작 설정 (메모리 누수 방지)
GUNICORN_MAX_REQUESTS = int(os.getenv('GUNICORN_MAX_REQUESTS', 1000))
GUNICORN_MAX_REQUESTS_JITTER = int(os.getenv('GUNICORN_MAX_REQUESTS_JITTER', 50))
GUNICORN_MAX_REQUESTS = int(os.getenv("GUNICORN_MAX_REQUESTS", 1000))
GUNICORN_MAX_REQUESTS_JITTER = int(os.getenv("GUNICORN_MAX_REQUESTS_JITTER", 50))
# 로깅 설정
GUNICORN_ACCESSLOG = os.getenv('GUNICORN_ACCESSLOG', '-') # '-' = stdout
GUNICORN_ERRORLOG = os.getenv('GUNICORN_ERRORLOG', '-')
GUNICORN_LOGLEVEL = os.getenv('GUNICORN_LOGLEVEL', 'info')
GUNICORN_ACCESSLOG = os.getenv("GUNICORN_ACCESSLOG", "-") # "-" = stdout
GUNICORN_ERRORLOG = os.getenv("GUNICORN_ERRORLOG", "-")
GUNICORN_LOGLEVEL = os.getenv("GUNICORN_LOGLEVEL", "info")
# 보안 설정 - 로그인 실패
MAX_LOGIN_ATTEMPTS = int(os.getenv("MAX_LOGIN_ATTEMPTS", 5))
ACCOUNT_LOCKOUT_DURATION_MINUTES = int(os.getenv("ACCOUNT_LOCKOUT_DURATION_MINUTES", 30))
# Redis 캐시 TTL 설정 (초 단위)
REDIS_USER_CACHE_TTL = int(os.getenv("REDIS_USER_CACHE_TTL", 300)) # 5분
REDIS_JWT_BLACKLIST_TTL = int(os.getenv("REDIS_JWT_BLACKLIST_TTL", 86400)) # 24시간

View File

@@ -0,0 +1,32 @@
"""
계정 잠금 정책 비즈니스 로직
"""
from typing import Optional
from utils import func
class AccountLockPolicy:
"""계정 잠금 정책 비즈니스 로직"""
def __init__(self, max_attempts: int = 5,
lockout_duration_minutes: int = 30):
self.max_attempts = max_attempts
self.lockout_duration_minutes = lockout_duration_minutes
def is_locked(self, failed_count: int, locked_at: Optional[int] = None) -> bool:
"""계정 잠금 여부 확인"""
if failed_count < self.max_attempts:
return False
# 잠금 해제 시간 체크
if locked_at:
current_time = func.get_timestamp_ms()
lockout_duration_ms = self.lockout_duration_minutes * 60 * 1000
if current_time - locked_at > lockout_duration_ms:
return False
return True
def should_reset(self, failed_count: int, locked_at: Optional[int] = None) -> bool:
return not self.is_locked(failed_count, locked_at)

View File

@@ -0,0 +1,47 @@
"""
DB 트랜잭션 데코레이더
"""
import functools
from typing import Callable
from extensions import custom_logger
import settings
logger = custom_logger(f"{settings.LOG_PREFIX}_db_decorator")
_db = None
def get_db():
"""DB Lazy 로딩"""
global _db
if _db is None:
from extensions import db
_db = db
return _db
def transactional(f: Callable) -> Callable:
"""
데이터 베이스 트랜잭션 관리하는 데코레이터
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# DB 가져오기
db = get_db()
try:
# 함수 실행
result = f(*args, **kwargs)
# 성공 시 커밋
db.session.commit()
logger.debug(f"트랜잭션 커밋 완료 - {f.__name__}")
return result
except Exception as e:
# 실패 시 롤백
db.session.rollback()
logger.error(f"트랜잭션 롤백 - {f.__name__}: {str(e)}")
raise
return decorated_function

82
src/utils/decorators.py Normal file
View File

@@ -0,0 +1,82 @@
"""
데코레이터 유틸리티 모듈
"""
from functools import wraps
from typing import Callable
from flask import request, g
from flask_jwt_extended import get_jwt_identity, get_jwt
from utils.response import *
import settings
from extensions import custom_logger
logger = custom_logger(f"{settings.LOG_PREFIX}_decorators")
# Lazy 로딩
def _get_user_service():
from services.user.user_service import UserService
return UserService()
def _get_account_lock_service():
from services.auth.cache_service import AccountLockService
return AccountLockService
def _get_jwt_blacklist_service():
from services.auth.cache_service import JWTBlacklistService
return JWTBlacklistService
def load_current_user(f: Callable) -> Callable:
"""
JWT identity(user_uuid)를 기반으로 User 객체를 로드함.
g.current_user에 저장
Redis 캐시를 활용
JWT 블랙리스트 체크 (로그아웃된 토큰 방지)
"""
@wraps(f)
def decorated_function(*args, **kwargs):
try:
# JWT 페이로드 및 user_uuid 추출
jwt_payload = get_jwt()
user_uuid = get_jwt_identity()
jti = jwt_payload.get("jti")
if not user_uuid:
logger.warning("JWT에서 user_uuid를 찾을 수 없습니다.")
return unauthorized_response(message="토큰에서 사용자 정보를 찾을 수 없습니다.")
# JWT 블랙리스트 체크 (사용이 불가능한 토큰)
JWTBlacklistService = _get_jwt_blacklist_service()
if jti and JWTBlacklistService.is_blacklisted(jti):
logger.warning(f"블랙리스트된 토큰 접근 시도: jti={jti}, user_uuid={user_uuid}")
return unauthorized_response(message="사용이 불가능한 토큰입니다. 다시 로그인해주세요.")
# UserService를 통한 조회
UserService = _get_user_service()
user = UserService.get_user_by_uuid(user_uuid, use_cache=True)
if not user:
logger.warning(f"user_uuid에 해당하는 사용자를 찾을 수 없습니다: {user_uuid}")
return unauthorized_response(message="사용자를 찾을 수 없습니다.")
# 계정 잠금 체크 (Redis 기반)
AccountLockService = _get_account_lock_service()
if AccountLockService.is_locked(user_uuid):
logger.warning(f"잠긴 계정 접근 시도: {user_uuid}")
return unauthorized_response(message="계정이 잠겼습니다. 관리자에게 문의하세요.")
# DB의 로그인 실패 횟수가 최대 시도 횟수를 초과하면 Redis에 잠금 설정
if (user.count or 0) >= settings.MAX_LOGIN_ATTEMPTS:
AccountLockService.lock(user_uuid)
logger.warning(f"로그인 실패 횟수 초과로 계정 잠금: {user_uuid}")
return unauthorized_response(message="계정이 잠겼습니다. 관리자에게 문의하세요.")
# g에 현재 사용자 저장
g.current_user = user
return f(*args, **kwargs)
except Exception as e:
logger.exception(f"현재 사용자 로드에 실패하였습니다. error={str(e)}")
return error_response(message="현재 사용자 정보를 불러오는 데 실패하였습니다.")
return decorated_function

30
src/utils/func.py Normal file
View File

@@ -0,0 +1,30 @@
def get_timestamp_ms():
"""타임스탬프 생성"""
import time
return round(time.time() * 1000)
def hash_password(password: str) -> str:
"""비밀번호 암호화"""
import bcrypt
if not password:
raise ValueError("비밀번호는 비어있을 수 없습니다.")
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
return hashed.decode("utf-8")
def verify_password(password: str, hashed_password: str) -> bool:
import bcrypt
"""비밀번호 검증"""
if not password or not hashed_password:
return False
try:
return bcrypt.checkpw(
password.encode("utf-8"),
hashed_password.encode("utf-8")
)
except (ValueError, AttributeError):
return False

View File

@@ -40,7 +40,7 @@ class StdLogRedirect:
s = s.decode(errors="replace")
self._buf += s
while True:
idx = self._bug.find("\n")
idx = self._buf.find("\n")
if idx == -1:
break
line = self._buf[:idx+1:]
@@ -80,18 +80,39 @@ logging.basicConfig(
level=settings.LOG_LEVEL_TEXT,
format=log_format,
datefmt=date_format,
stream=sys.stdout
stream=sys.stdout,
force=True
)
# create own logger class to prevent init custom loggers by other libs
class GatekeeperLogger(logging.Logger):
def assHandler(self, h):
# only 'root', '__main__' ans own loggers will be accepted
if self.name == "root" or self.name.startswith(('LOG_PREFIX'), "__main__"):
def addHandler(self, h):
# only "root", "__main__" and own loggers will be accepted
if self.name == "root" or self.name.startswith((settings.LOG_PREFIX, "__main__")):
return super().addHandler(h)
logging.setLoggerClass(GatekeeperLogger)
# Werkzeug 로거 설정 (Flask 개발 서버용)
# 기존 werkzeug 로거의 핸들러를 모두 제거하고 커스텀 포맷 적용
import logging as _logging
werkzeug_logger = _logging.getLogger("werkzeug")
werkzeug_logger.handlers = [] # 기존 핸들러 제거
werkzeug_logger.setLevel(settings.LOG_LEVEL_TEXT)
handler = _logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter) # 통일된 포맷 사용
werkzeug_logger.addHandler(handler)
werkzeug_logger.propagate = False
# Flask의 기본 로거도 동일하게 설정
flask_logger = _logging.getLogger("flask.app")
flask_logger.handlers = []
flask_logger.setLevel(settings.LOG_LEVEL_TEXT)
flask_handler = _logging.StreamHandler(sys.stdout)
flask_handler.setFormatter(formatter)
flask_logger.addHandler(flask_handler)
flask_logger.propagate = False
def custom_logger(log_prefix: str):
import logging

185
src/utils/response.py Normal file
View File

@@ -0,0 +1,185 @@
""""
공통 응답 유틸리티 모듈
API 응답을 일관서 있게 처리하기 위한 헬퍼 함수들
"""
from typing import Any, Dict, List, Optional, Union
from flask import jsonify
from werkzeug.http import HTTP_STATUS_CODES
def _make_response_body(status: str, message: str, data: Any = None,
errors: Any = None, meta: Any = None, error_code: Optional[str] = None) -> Dict[str, Any]:
"""응답 구조 공통"""
response = {
"status": status,
"message": message
}
if data is not None:
response["data"] = data
if errors is not None:
response["error"] = errors
if meta is not None:
response["meta"] = meta
if error_code is not None:
response["error_code"] = error_code
return response
def _is_restx_context() -> bool:
"""
Flask-RestX는 has_request_context()가 True이지만,
jsonify()된 Response를 재직려화함으로 RESTX 사용 시에는 jsonify를 피해야함.
"""
return True
def success_response(data: Any = None, message: str = "Success",
status_code: int = 200, meta: Optional[Dict[str, Any]] = None) -> tuple:
"""
성공 응답을 생성
Args:
data: 응답 데이터
message: 응답 메시지
status_code: HTTP 상태 코드
meta: 추가 메타 데이터
Returns:
tuple: (response, status_code)
"""
body = _make_response_body(message, message, data=data, meta=meta)
return (jsonify(body), status_code) if not _is_restx_context() else (body, status_code)
def error_response(message: str = "An error occurred", status_code: int = 400,
errors: Optional[Union[Dict[str, Any], List[str]]] = None,
error_code: Optional[str] = None) -> tuple:
"""
에러 응답을 반환
Args:
message: 에러 메시지
status_code: HTTP 상태 코드
errors: 상세 에러 정보
error_code: 에러 코드
Returns:
tuple: (response, status_code)
"""
body = _make_response_body(
message,
message,
errors=errors,
error_code=error_code or HTTP_STATUS_CODES.get(status_code, "UNKNOWN_ERROR")
)
return (jsonify(body), status_code) if not _is_restx_context() else (body, status_code)
def bad_request_response(message: str = "Bad request", errors: Optional[Union[Dict[str, Any], List[str]]] = None) -> tuple:
"""
400 Bad Request 응답
Args:
message: 잘못된 요청 메시지
errors: 상세 에러 정보
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=400,
errors=errors,
error_code="BAD_REQUEST"
)
def unauthorized_response(message: str = "Authentication required") -> tuple:
"""
401 Unauthorized 응답
Args:
message: 인증 실패 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=401,
error_code="UNAUTHORIZED"
)
def forbidden_response(message: str = "Access denied") -> tuple:
"""
403 Forbidden 응답
Args:
message: 접근 거부 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=403,
error_code="FORBIDDEN"
)
def not_found_response(message: str = "Resource not found") -> tuple:
"""
404 Not Found 응답
Args:
message: 리소스를 찾을 수 없음 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=404,
error_code="NOT_FOUND"
)
def conflict_response(message: str = "Resource already exists") -> tuple:
"""
409 Conflict 응답
Args:
message: 리소스 충돌 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=409,
error_code="CONFLICT"
)
def unsupported_media_type_response(message: str = "Content-Type must be 'application/json'") -> tuple:
"""
415 Unsupported Media Type 응답
Args:
message: 지원하지 않는 미디어 타입 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=415,
error_code="UNSUPPORTED_MEDIA_TYPE"
)
def internal_error_response(message: str = "Internal server error") -> tuple:
"""
500 Internal Server Error 응답
Args:
message: 서버 에러 메시지
Returns:
tuple: (response, status_code)
"""
return error_response(
message=message,
status_code=500,
error_code="INTERNAL_ERROR"
)

View File

@@ -3,32 +3,39 @@ Swagger 설정
"""
# Third-Party Library Imports
from flask import Blueprint, jsonify
from flask import Blueprint
from flask_restx import Api
from flask_jwt_extended.exceptions import JWTExtendedException, RevokedTokenError
import constants
# Swagger API Blueprint 및 설정
api_blueprint = Blueprint('api', __name__)
api_blueprint = Blueprint("api", __name__)
api = Api(
api_blueprint,
title='NuriQ API',
title="NuriQ API",
version=constants.API_VERSION,
description='NuriQ REST API 문서',
doc='/docs',
description="NuriQ REST API 문서",
doc="/docs",
authorizations={
'Bearer': {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization',
'description': 'JWT 토큰 입력 (예: Bearer <token>)'
"Bearer": {
"type": "apiKey",
"in": "header",
"name": "Authorization",
"description": "JWT 토큰 입력 (예: Bearer <token>)"
}
},
security='Bearer',
security="Bearer",
validate=True
)
# 네임 스페이스 정의 및 API 추가
auth_ns = api.namespace("auth", description="인증 API", path="/auth")
user_ns = api.namespace("users", description="유저 API", path="/users")
category_ns = api.namespace("categories", description="카테고리 API", path="/categories")
# 네임 스페이스 정의 및 API에 추가
__all__ = ['api_blueprint', 'api']
__all__ = [
"api_blueprint", "api",
"auth_ns", "user_ns"
]