title: # 목차
style: nestedList # TOC style (nestedList|nestedOrderedList|inlineFirstLevel)
minLevel: 0 # Include headings from the specified level
maxLevel: 5 # Include headings up to the specified level
includeLinks: true # Make headings clickable
hideWhenEmpty: false # Hide TOC if no headings are found
debugInConsole: false # Print debug info in Obsidian console

대규모 시스템 설계 안티패턴

개념 설명

대규모 파이썬 시스템 설계는 단순한 스크립트 작성을 넘어 복잡한 아키텍처를 효과적으로 구성하는 능력이 필요하다. 실생활에서는 도시 계획과 유사하다. 도시(시스템)는 구역(모듈)을 명확히 구분하고, 교통 체계(의존성)를 효율적으로 설계하며, 미래 확장(스케일링)을 고려해야 한다.

기본 동작 방식

대규모 시스템 설계의 핵심 요소:

  • 모듈화: 코드를 논리적 단위로 조직화
  • 의존성 관리: 모듈 간 관계와 의존성 방향 제어
  • API 설계: 안정적이고 확장 가능한 인터페이스 정의
  • 계층화: 관심사 분리와 추상화 수준 구분
  • 확장성: 코드베이스와 시스템 확장을 고려한 설계

안티패턴 사례

모듈 경계 및 책임 불명확

# 잘못된 예시: 모호한 책임과 과도한 결합
class SuperClass:
    def __init__(self):
        self.db = Database()
        self.auth = Authenticator()
        self.logger = Logger()
        self.config = Config()
    
    def do_everything(self, user_id, data):
        # 데이터 검증
        if not self.validate_data(data):
            return False
        
        # 사용자 인증
        if not self.auth.authenticate(user_id):
            return False
        
        # 데이터베이스 처리
        self.db.save(data)
        
        # 로깅
        self.logger.log(f"User {user_id} saved data")
        return True
    
    def validate_data(self, data):
        # 데이터 검증 로직...
        pass
 
# 올바른 예시: 명확한 책임 분리
class DataValidator:
    def validate(self, data):
        # 데이터 검증 로직...
        pass
 
class DataService:
    def __init__(self, validator, db, logger):
        self.validator = validator
        self.db = db
        self.logger = logger
    
    def save_data(self, user_id, data):
        if not self.validator.validate(data):
            return False
        
        self.db.save(data)
        self.logger.log(f"User {user_id} saved data")
        return True
 
# 사용 예시 - 의존성 주입
validator = DataValidator()
db = Database()
logger = Logger()
service = DataService(validator, db, logger)

패키지 의존성 설계 실패

# 잘못된 예시: 순환 의존성
# module_a.py
from module_b import ModuleB
 
class ModuleA:
    def process(self):
        return ModuleB().process_b() + 1
 
# module_b.py
from module_a import ModuleA
 
class ModuleB:
    def process_b(self):
        return ModuleA().process() * 2
 
# 올바른 예시: 의존성 방향 정리
# common.py
class CommonInterface:
    def process(self):
        raise NotImplementedError()
 
# module_a.py
from common import CommonInterface
 
class ModuleA(CommonInterface):
    def process(self):
        return 10
 
# module_b.py
from common import CommonInterface
 
class ModuleB:
    def __init__(self, processor: CommonInterface):
        self.processor = processor
    
    def process_b(self):
        return self.processor.process() * 2

API 버전 관리 전략 부재

# 잘못된 예시: 하위 호환성을 깨는 변경
class UserAPI:
    def get_user(self, user_id):
        # API v1: 기본 정보만 반환
        return {'id': user_id, 'name': f'User {user_id}'}
    
    # 메서드 시그니처 변경 - 기존 클라이언트 코드 깨짐
    def get_user(self, user_id, include_details=False):
        # API v2: 선택적으로 상세 정보 포함
        user = {'id': user_id, 'name': f'User {user_id}'}
        if include_details:
            user['details'] = {'email': f'user{user_id}@example.com'}
        return user
 
# 올바른 예시: 명시적 버전 관리
class UserAPIv1:
    def get_user(self, user_id):
        # API v1: 기본 정보만 반환
        return {'id': user_id, 'name': f'User {user_id}'}
 
class UserAPIv2:
    def get_user(self, user_id, include_details=False):
        # API v2: 선택적으로 상세 정보 포함
        user = {'id': user_id, 'name': f'User {user_id}'}
        if include_details:
            user['details'] = {'email': f'user{user_id}@example.com'}
        return user
 
# 또는 데코레이터를 활용한 버전 관리
api = APIRouter()
 
@api.get('/users/{user_id}', version='1.0')
def get_user_v1(user_id: int):
    return {'id': user_id, 'name': f'User {user_id}'}
 
@api.get('/users/{user_id}', version='2.0')
def get_user_v2(user_id: int, include_details: bool = False):
    user = {'id': user_id, 'name': f'User {user_id}'}
    if include_details:
        user['details'] = {'email': f'user{user_id}@example.com'}
    return user

확장성과 유지보수성 간 균형 무시

# 잘못된 예시: 과도한 추상화로 인한 복잡성
class EntityFactoryBuilderManager:
    def __init__(self, config_provider_factory):
        self.config_provider_factory = config_provider_factory
    
    def create_entity_factory_builder(self, entity_type):
        config_provider = self.config_provider_factory.create_provider(entity_type)
        return EntityFactoryBuilder(config_provider)
 
class EntityFactoryBuilder:
    def __init__(self, config_provider):
        self.config_provider = config_provider
    
    def build(self):
        return EntityFactory(self.config_provider.get_config())
 
class EntityFactory:
    def __init__(self, config):
        self.config = config
    
    def create_entity(self, *args, **kwargs):
        # 복잡한 엔티티 생성 로직...
        pass
 
# 올바른 예시: 적절한 추상화 수준
class EntityFactory:
    def __init__(self, config_source):
        self.config = self._load_config(config_source)
    
    def _load_config(self, source):
        # 설정 로드 로직...
        return config
    
    def create_entity(self, entity_type, *args, **kwargs):
        # 엔티티 생성 로직...
        return entity

실제 사용 예시

효과적인 모듈 설계:

# 도메인 계층 - 비즈니스 로직 캡슐화
class User:
    def __init__(self, user_id, name, email):
        self.id = user_id
        self.name = name
        self.email = email
    
    def validate(self):
        """사용자 데이터 유효성 검사"""
        if not self.email or '@' not in self.email:
            return False
        return True
 
# 리포지토리 계층 - 데이터 접근 캡슐화
class UserRepository:
    def __init__(self, db_connection):
        self.db = db_connection
    
    def get(self, user_id):
        """사용자 ID로 조회"""
        data = self.db.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        if not data:
            return None
        return User(data['id'], data['name'], data['email'])
    
    def save(self, user):
        """사용자 저장/갱신"""
        if not user.validate():
            raise ValueError("Invalid user data")
        
        self.db.execute(
            "INSERT INTO users (id, name, email) VALUES (%s, %s, %s) "
            "ON DUPLICATE KEY UPDATE name = %s, email = %s",
            (user.id, user.name, user.email, user.name, user.email)
        )
 
# 서비스 계층 - 비즈니스 로직 오케스트레이션
class UserService:
    def __init__(self, user_repository, event_publisher):
        self.user_repository = user_repository
        self.event_publisher = event_publisher
    
    def create_user(self, name, email):
        """새 사용자 생성"""
        user_id = self._generate_id()
        user = User(user_id, name, email)
        
        if not user.validate():
            raise ValueError("Invalid user data")
        
        self.user_repository.save(user)
        self.event_publisher.publish("user_created", user_id)
        return user
    
    def _generate_id(self):
        """새 사용자 ID 생성"""
        # ID 생성 로직...
        return new_id

고급 활용법

레이어드 아키텍처와 의존성 역전 원칙 적용:

# interfaces.py - 추상 인터페이스 정의
from abc import ABC, abstractmethod
 
class UserRepository(ABC):
    @abstractmethod
    def get(self, user_id):
        pass
    
    @abstractmethod
    def save(self, user):
        pass
 
class EventPublisher(ABC):
    @abstractmethod
    def publish(self, event_name, data):
        pass
 
# domain.py - 도메인 모델과 서비스
class User:
    def __init__(self, user_id, name, email):
        self.id = user_id
        self.name = name
        self.email = email
    
    def validate(self):
        # 유효성 검사 로직...
        return True
 
class UserService:
    def __init__(self, user_repository, event_publisher):
        self.user_repository = user_repository
        self.event_publisher = event_publisher
    
    def create_user(self, name, email):
        # 비즈니스 로직...
        pass
 
# infrastructure.py - 구현체
class SqlUserRepository(UserRepository):
    def __init__(self, db_connection):
        self.db = db_connection
    
    def get(self, user_id):
        # 실제 SQL 구현...
        pass
    
    def save(self, user):
        # 실제 SQL 구현...
        pass
 
class RabbitMQEventPublisher(EventPublisher):
    def __init__(self, connection):
        self.connection = connection
    
    def publish(self, event_name, data):
        # 실제 메시지 큐 구현...
        pass
 
# 의존성 주입 및 애플리케이션 조립
def setup_application():
    db_connection = create_db_connection()
    mq_connection = create_mq_connection()
    
    user_repository = SqlUserRepository(db_connection)
    event_publisher = RabbitMQEventPublisher(mq_connection)
    
    user_service = UserService(user_repository, event_publisher)
    return user_service

주의사항

  • 조기 최적화와 과도한 추상화는 복잡성을 증가시킨다.
  • 명확한 모듈 경계와 책임 분리가 중요하다.
  • 의존성 방향은 항상 세부사항에서 추상화로 향해야 한다.
  • API 설계는 이전 버전과의 호환성을 고려해야 한다.
  • 확장성과 단순성 사이의 균형을 유지한다.

분산 시스템 및 병렬성 안티패턴

개념 설명

분산 시스템은 여러 컴퓨터에서 실행되는 구성 요소가 네트워크를 통해 통신하며 공동 목표를 달성하는 시스템이다. 실생활에서는 여러 지역에 분산된 회사의 사무실들이 하나의 조직으로 기능하는 것과 유사하다. 각 사무실(노드)은 독립적으로 운영되지만 전체 목표를 위해 조율되어야 한다.

기본 동작 방식

파이썬에서 분산 시스템 구현의 주요 접근법:

  • 프로세스 기반 병렬성: multiprocessing 모듈
  • 메시지 큐: RabbitMQ, Kafka, Redis 등
  • 원격 프로시저 호출(RPC): gRPC, XML-RPC 등
  • 분산 작업 큐: Celery, RQ 등
  • 마이크로서비스 아키텍처: Flask, FastAPI 기반 서비스

안티패턴 사례

병렬 작업 조정 실패

# 잘못된 예시: 조정 없는 동시 작업
import multiprocessing
 
def process_data_chunk(chunk_id, shared_result):
    # 데이터 처리...
    result = calculate_something(chunk_id)
    
    # 공유 데이터에 동기화 없이 접근
    shared_result[chunk_id] = result  # 경쟁 조건 가능성
 
def parallel_processing_unsafe():
    # 공유 메모리 생성
    manager = multiprocessing.Manager()
    shared_result = manager.dict()
    
    # 프로세스 생성 및 시작
    processes = []
    for i in range(10):
        p = multiprocessing.Process(
            target=process_data_chunk, 
            args=(i, shared_result)
        )
        processes.append(p)
        p.start()
    
    # 모든 프로세스 종료 대기
    for p in processes:
        p.join()
    
    return dict(shared_result)
 
# 올바른 예시: 락을 사용한 동기화
def process_data_chunk_safe(chunk_id, shared_result, lock):
    # 데이터 처리...
    result = calculate_something(chunk_id)
    
    # 락을 사용한 동기화된 접근
    with lock:
        shared_result[chunk_id] = result
 
def parallel_processing_safe():
    # 공유 메모리와 락 생성
    manager = multiprocessing.Manager()
    shared_result = manager.dict()
    lock = manager.Lock()
    
    # 프로세스 생성 및 시작
    processes = []
    for i in range(10):
        p = multiprocessing.Process(
            target=process_data_chunk_safe, 
            args=(i, shared_result, lock)
        )
        processes.append(p)
        p.start()
    
    # 모든 프로세스 종료 대기
    for p in processes:
        p.join()
    
    return dict(shared_result)

부적절한 작업 분할 전략

# 잘못된 예시: 비효율적인 작업 분할
def inefficient_chunking(large_data, num_processes):
    # 고정된 크기로 분할 (불균형 가능성)
    chunk_size = len(large_data) // num_processes
    chunks = []
    
    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else len(large_data)
        chunks.append(large_data[start:end])
    
    return chunks
 
# 올바른 예시: 지능적인 작업 분할
def efficient_chunking(large_data, num_processes):
    # 작업의 복잡성을 고려한 분할
    # 예: 각 항목의 예상 처리 시간을 기준으로 분할
    total_complexity = sum(estimate_complexity(item) for item in large_data)
    complexity_per_process = total_complexity / num_processes
    
    chunks = []
    current_chunk = []
    current_complexity = 0
    
    for item in large_data:
        item_complexity = estimate_complexity(item)
        
        if current_complexity + item_complexity > complexity_per_process and current_chunk:
            chunks.append(current_chunk)
            current_chunk = [item]
            current_complexity = item_complexity
        else:
            current_chunk.append(item)
            current_complexity += item_complexity
    
    if current_chunk:
        chunks.append(current_chunk)
    
    return chunks

통신 오버헤드 무시

# 잘못된 예시: 과도한 네트워크 통신
def chatty_distributed_processing(data_chunks):
    results = []
    
    for chunk in data_chunks:
        # 매우 작은 단위로 원격 작업 요청
        # 네트워크 오버헤드 > 실제 작업 비용
        for item in chunk:
            # 각 항목마다 별도의 네트워크 호출
            result = remote_service.process_item(item)
            results.append(result)
    
    return results
 
# 올바른 예시: 효율적인 배치 처리
def efficient_distributed_processing(data_chunks):
    results = []
    
    for chunk in data_chunks:
        # 청크 단위로 일괄 처리
        batch_results = remote_service.process_batch(chunk)
        results.extend(batch_results)
    
    return results

분산 상태 관리 문제

# 잘못된 예시: 취약한 상태 관리
class DistributedCounter:
    def __init__(self):
        self.count = 0  # 로컬 상태
    
    def increment(self):
        # 주의: 분산 시스템에서 이 방식은 신뢰할 수 없음
        self.count += 1
    
    def get_count(self):
        return self.count
 
# 올바른 예시: 분산 환경에 적합한 상태 관리
import redis
 
class RedisDistributedCounter:
    def __init__(self, redis_client, counter_key):
        self.redis = redis_client
        self.counter_key = counter_key
    
    def increment(self):
        # 원자적 증가 연산
        return self.redis.incr(self.counter_key)
    
    def get_count(self):
        count = self.redis.get(self.counter_key)
        return int(count) if count else 0

실제 사용 예시

Celery를 활용한 분산 작업 처리:

# Celery를 사용한 분산 작업
from celery import Celery, group
 
# Celery 앱 설정
app = Celery('tasks', broker='redis://localhost:6379/0')
 
@app.task
def process_chunk(chunk):
    """개별 데이터 청크 처리"""
    results = []
    for item in chunk:
        # 데이터 처리 로직...
        result = complex_computation(item)
        results.append(result)
    return results
 
def distribute_processing(large_dataset, chunk_size=100):
    """대규모 데이터셋을 분산 처리"""
    # 데이터를 청크로 분할
    chunks = [large_dataset[i:i+chunk_size] 
              for i in range(0, len(large_dataset), chunk_size)]
    
    # 병렬 작업 설정
    job = group(process_chunk.s(chunk) for chunk in chunks)
    
    # 작업 실행 및 결과 수집
    result = job.apply_async()
    
    # 모든 결과 대기
    chunk_results = result.get()
    
    # 결과 병합
    final_results = []
    for chunk_result in chunk_results:
        final_results.extend(chunk_result)
    
    return final_results

고급 활용법

gRPC를 활용한 마이크로서비스 통신:

# gRPC 정의 (proto 파일)
"""
syntax = "proto3";
 
service DataProcessor {
  rpc ProcessData (DataRequest) returns (DataResponse) {}
  rpc StreamData (stream DataChunk) returns (DataSummary) {}
}
 
message DataRequest {
  string data_id = 1;
  bytes content = 2;
}
 
message DataResponse {
  bool success = 1;
  string result = 2;
}
 
message DataChunk {
  string chunk_id = 1;
  bytes content = 2;
}
 
message DataSummary {
  int32 processed_chunks = 1;
  string result = 2;
}
"""
 
# 서버 구현
import grpc
import data_pb2
import data_pb2_grpc
from concurrent import futures
 
class DataProcessorServicer(data_pb2_grpc.DataProcessorServicer):
    def ProcessData(self, request, context):
        # 단일 요청 처리 로직
        result = process_data(request.content)
        return data_pb2.DataResponse(success=True, result=result)
    
    def StreamData(self, request_iterator, context):
        # 스트리밍 요청 처리 로직
        chunk_count = 0
        combined_result = ""
        
        for chunk in request_iterator:
            result = process_chunk(chunk.content)
            combined_result += result
            chunk_count += 1
        
        return data_pb2.DataSummary(
            processed_chunks=chunk_count,
            result=combined_result
        )
 
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    data_pb2_grpc.add_DataProcessorServicer_to_server(
        DataProcessorServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()
 
# 클라이언트 구현
def process_large_file(file_path):
    # gRPC 클라이언트 설정
    channel = grpc.insecure_channel('localhost:50051')
    stub = data_pb2_grpc.DataProcessorStub(channel)
    
    def read_chunks(file_path, chunk_size=1024*1024):
        with open(file_path, 'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield data_pb2.DataChunk(
                    chunk_id=str(f.tell()),
                    content=chunk
                )
    
    # 스트리밍 요청 전송
    response = stub.StreamData(read_chunks(file_path))
    return response.result

주의사항

  • 분산 시스템에서는 부분 실패를 항상 고려해야 한다.
  • 네트워크 지연과 대역폭 제한을 고려한 설계가 필요하다.
  • 작업 크기와 통신 오버헤드 사이의 균형을 찾아야 한다.
  • 상태 관리는 가능한 분산 데이터 저장소를 활용한다.
  • 일관성, 가용성, 분할 내성(CAP 이론)의 트레이드오프를 이해한다.

비동기 프로그래밍 함정

개념 설명

비동기 프로그래밍은 코드의 실행 흐름을 차단하지 않고 여러 작업을 동시에 처리하는 프로그래밍 패러다임이다. 실생활에서는 여러 작업을 동시에 진행하면서 결과가 도착할 때마다 처리하는 방식으로, 레스토랑에서 주문을 하고 기다리는 동안 다른 일을 하다가 음식이 준비되면 식사를 하는 것과 유사하다.

기본 동작 방식

파이썬의 비동기 프로그래밍 핵심 요소:

  • 이벤트 루프: 비동기 작업을 관리하는 중앙 메커니즘
  • 코루틴: async def로 정의되는 비동기 함수
  • await 표현식: 코루틴의 결과를 기다림
  • 태스크: 이벤트 루프에서 실행되는 코루틴의 인스턴스
  • 퓨처: 나중에 완료될 작업의 결과를 나타내는 객체

안티패턴 사례

이벤트 루프 차단 패턴

# 잘못된 예시: 이벤트 루프 차단
import asyncio
import time
 
async def blocking_coroutine():
    print("시작...")
    # CPU 집약적 또는 I/O 차단 작업
    time.sleep(5)  # asyncio.sleep() 대신 time.sleep() 사용
    print("완료!")
    return "결과"
 
async def main():
    # 전체 이벤트 루프가 5초 동안 차단됨
    result = await blocking_coroutine()
    print(f"결과: {result}")
 
# 올바른 예시: 이벤트 루프 비차단
import asyncio
from concurrent.futures import ThreadPoolExecutor
 
async def non_blocking_coroutine():
    print("시작...")
    # CPU 집약적 작업을 별도 스레드로 오프로드
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        # 스레드 풀에서 차단 함수 실행
        await loop.run_in_executor(pool, lambda: time.sleep(5))
    print("완료!")
    return "결과"
 
async def main_good():
    # 이벤트 루프가 차단되지 않음
    result = await non_blocking_coroutine()
    print(f"결과: {result}")

작업 조정 및 취소 전략 부재

# 잘못된 예시: 작업 취소 처리 부재
import asyncio
 
async def long_running_task():
    # 취소 처리 없음
    try:
        for i in range(100):
            # 여기서 작업 취소를 확인하지 않음
            print(f"작업 진행 중... {i}%")
            await asyncio.sleep(0.1)
    except Exception as e:
        # 구체적인 예외 처리 없음
        print(f"에러 발생: {e}")
 
async def main_without_cancellation():
    task = asyncio.create_task(long_running_task())
    
    # 5초 후 작업 취소
    await asyncio.sleep(5)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("작업이 취소되었습니다.")
 
# 올바른 예시: 적절한 작업 취소 처리
async def cancellable_task():
    try:
        for i in range(100):
            # 주기적으로 취소 여부 확인
            print(f"작업 진행 중... {i}%")
            await asyncio.sleep(0.1)  # 취소 신호를 받을 수 있는 지점
    except asyncio.CancelledError:
        # 정리 로직 수행
        print("작업 취소 중... 리소스 정리")
        # 취소 신호를 상위로 전파
        raise
    finally:
        # 항상 리소스 정리
        print("리소스 정리 완료")
 
async def main_with_cancellation():
    task = asyncio.create_task(cancellable_task())
    
    # 5초 후 작업 취소
    await asyncio.sleep(5)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("작업이 취소되었고 적절히 정리되었습니다.")

동기/비동기 코드 혼합 관련 문제

# 잘못된 예시: 동기/비동기 코드 혼합
import asyncio
import requests  # 동기식 HTTP 라이브러리
 
async def fetch_data_mixed():
    print("데이터 가져오기 시작...")
    # 이벤트 루프 차단 - 동기식 라이브러리 사용
    response = requests.get("https://api.example.com/data")  # 차단 호출
    data = response.json()
    
    # 비동기 함수와 혼합
    await asyncio.sleep(1)
    
    return data
 
# 올바른 예시: 일관된 비동기 접근법
import asyncio
import aiohttp  # 비동기식 HTTP 라이브러리
 
async def fetch_data_consistent():
    print("데이터 가져오기 시작...")
    # 비동기식 HTTP 클라이언트 사용
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/data") as response:
            # 비차단 호출
            data = await response.json()
    
    await asyncio.sleep(1)
    
    return data

콜백 지옥과 잘못된 코루틴 패턴

# 잘못된 예시: 중첩된 콜백 (콜백 지옥)
def callback_hell():
    def on_data(data):
        parsed_data = parse_data(data)
        
        def on_processed(result):
            validated_result = validate_result(result)
            
            def on_validated(final_result):
                save_result(final_result)
                
                def on_saved(success):
                    if success:
                        print("저장 성공!")
                    else:
                        print("저장 실패!")
                
                save_callback(on_saved)
            
            validate_callback(on_validated)
        
        process_callback(on_processed)
    
    fetch_data(on_data)
 
# 올바른 예시: 코루틴을 사용한 간결한 비동기 흐름
async def clean_async_flow():
    try:
        # 선형적인 코드 흐름
        data = await fetch_data_async()
        parsed_data = await parse_data_async(data)
        result = await process_async(parsed_data)
        validated_result = await validate_async(result)
        success = await save_async(validated_result)
        
        if success:
            print("저장 성공!")
        else:
            print("저장 실패!")
    except Exception as e:
        print(f"오류 발생: {e}")

실제 사용 예시

비동기 웹 서비스 구현:

# FastAPI를 사용한 비동기 웹 서비스
from fastapi import FastAPI, HTTPException
import asyncio
import aiohttp
import asyncpg
 
app = FastAPI()
 
# 데이터베이스 풀 생성
async def create_db_pool():
    return await asyncpg.create_pool(
        user="username",
        password="password",
        database="dbname",
        host="localhost"
    )
 
# 애플리케이션 시작 시 실행할 이벤트
@app.on_event("startup")
async def startup():
    app.state.db_pool = await create_db_pool()
    app.state.http_session = aiohttp.ClientSession()
 
# 애플리케이션 종료 시 실행할 이벤트
@app.on_event("shutdown")
async def shutdown():
    await app.state.db_pool.close()
    await app.state.http_session.close()
 
# 비동기 엔드포인트
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with app.state.db_pool.acquire() as conn:
        # 비동기 데이터베이스 쿼리
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )
        
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        
        # 외부 API 비동기 호출
        async with app.state.http_session.get(
            f"https://api.example.com/user-status/{user_id}"
        ) as response:
            status = await response.json()
        
        # 결과 결합
        return {
            "user": dict(user),
            "status": status
        }

고급 활용법

비동기 태스크 관리와 제한:

import asyncio
from aiohttp import ClientSession
import time
 
async def fetch_with_semaphore(semaphore, url, session):
    """세마포어를 사용한 동시 요청 제한"""
    async with semaphore:  # 동시 접속 제한
        start_time = time.time()
        try:
            async with session.get(url) as response:
                data = await response.text()
                elapsed = time.time() - start_time
                print(f"{url} - {len(data)} bytes - {elapsed:.2f}s")
                return data
        except Exception as e:
            elapsed = time.time() - start_time
            print(f"{url} - Error: {e} - {elapsed:.2f}s")
            return None
 
async def fetch_all_urls(urls, max_concurrent=10):
    """여러 URL에서 데이터를 동시에 가져오기 (동시성 제한)"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async with ClientSession() as session:
        # 모든 URL에 대한 태스크 생성
        tasks = [
            fetch_with_semaphore(semaphore, url, session)
            for url in urls
        ]
        
        # 진행 상황 모니터링
        pending = tasks
        results = []
        
        while pending:
            # 완료된 태스크부터 처리
            done, pending = await asyncio.wait(
                pending, 
                return_when=asyncio.FIRST_COMPLETED
            )
            
            for task in done:
                result = task.result()
                if result:
                    results.append(result)
                    
            print(f"진행 상황: {len(results)}/{len(tasks)} 완료, "
                  f"{len(pending)} 남음")
        
        return results
 
# 작업 타임아웃 처리
async def fetch_with_timeout(url, timeout=10):
    """지정된 타임아웃으로 URL 가져오기"""
    try:
        # 태스크에 타임아웃 적용
        async with ClientSession() as session:
            result = await asyncio.wait_for(
                session.get(url), 
                timeout=timeout
            )
            return await result.text()
    except asyncio.TimeoutError:
        print(f"{url} - 제한 시간 {timeout}초 초과")
        return None
    except Exception as e:
        print(f"{url} - 오류: {e}")
        return None

주의사항

  • 비동기 코드를 위한 라이브러리 선택 시 비동기 지원 여부를 확인한다.
  • 코루틴을 실행하려면 항상 awaitasyncio.create_task()를 사용한다.
  • 동기 코드와 비동기 코드를 혼합할 때는 run_in_executor를 활용한다.
  • 긴 CPU 작업은 별도 스레드나 프로세스로 오프로드한다.
  • 적절한 에러 처리와 타임아웃을 설정하여 비동기 작업의 견고성을 확보한다.

결론

파이썬의 고급 아키텍처와 분산 시스템 설계는 단순한 스크립팅을 넘어 더 복잡한 애플리케이션 개발을 위한 필수적인 기술이다. 대규모 시스템 설계에서는 모듈화, 책임 분리, 의존성 관리가 핵심으로, 과도한 추상화보다는 명확한 경계와 인터페이스를 통해 코드의 유지보수성을 확보해야 한다.

분산 시스템과 병렬성은 파이썬의 제한(GIL)을 극복하고 확장성을 확보하는 중요한 방법이다. 그러나 동시성 제어, 리소스 관리, 작업 조정 등의 복잡성을 고려하지 않으면 오히려 성능이 저하되거나 예측할 수 없는 문제가 발생할 수 있다. 적절한 도구와 라이브러리(multiprocessing, asyncio 등)를 선택하고, 작업 특성에 맞는 분할 전략을 적용해야 한다.

비동기 프로그래밍은 I/O 바운드 작업의 성능을 크게 향상시킬 수 있지만, 제대로 활용하지 않으면 코드 복잡성만 증가시키는 결과를 낳는다. 이벤트 루프 차단을 피하고, 동기/비동기 코드의 명확한 구분, 적절한 에러 처리와 취소 전략이 필수적이다.

결국 이러한 고급 아키텍처와 패턴은 단순성과 복잡성 사이의 균형을 찾는 것이 핵심이다. 필요한 수준의 추상화와 구조를 적용하되, “Simple is better than complex”라는 파이썬의 철학을 항상 염두에 두고 과도한 엔지니어링은 피해야 한다. 모든 패턴은 문제 해결을 위한 도구일 뿐, 목적 그 자체가 되어선 안 된다.