""" Redis 클라이언트 전체 관리 모듈 """ from typing import Optional import redis import settings import logging class RedisFacadeOpr(): """Redis 연결 및 작업을 관리하는 Facade 클래스""" def __init__(self) -> None: """Redis 설정 초기화""" self.redis_config = { 'host': settings.REDIS_HOST, 'port': settings.REDIS_PORT, 'password': settings.REDIS_PASSWORD, 'decode_responses': True, # auto-decode response 'db': settings.REDIS_DB } self.redis_client: Optional[redis.StrictRedis] = None def start(self) -> bool: """Redis 연결 시작 Returns: bool: 연결 성공 시 True, 실패 시 False """ # 이미 연결되어 있다면 재연결하지 않음 if self.is_connected(): logging.debug("Redis already connected, skipping reconnection") return True try: # Redis Client 연결 self.redis_client = redis.StrictRedis(**self.redis_config) # 연결 테스트 self.redis_client.ping() logging.info("Redis connection established successfully") return True except Exception as e: logging.error(f"Failed to connect to Redis: {str(e)}") self.redis_client = None return False def reset_database(self) -> None: """Redis 데이터베이스 초기화""" if self.redis_client: self.redis_client.flushdb() logging.info("Redis database flushed") else: logging.warning("Cannot reset database: Redis client not connected") def refresh_init_database(self) -> None: """개별 데이터 초기화""" pass def is_connected(self) -> bool: """Redis 연결 상태 확인""" if not self.redis_client: return False try: self.redis_client.ping() return True except Exception: return False redis_cli = RedisFacadeOpr()