본문 바로가기

FastAPI 기본부터 극한까지 날먹하기 (Pydantic, Depends 심화)

Discriminated Union

만약 결제 수단(카드, 계좌이체, 간편결제)에 따라 요청 구조가 다르다면?

Spring에서는 @JsonTypeInfo + @JsonSubTypes를 사용해야 하고, 커스텀 디시리얼라이저를 작성해야 하는 경우도 많다.

 

Pydantic에서는 매우 간단하게 처리 가능하다. 다음 코드를 살펴보자.

from pydantic import BaseModel, Field
from typing import Literal, Annotated, Union
from enum import Enum

# 각 결제 수단별 요청 스키마
class CardPaymentRequest(BaseModel):
    method: Literal["CARD"] = "CARD"
    card_number: str = Field(..., pattern=r"^\d{16}$")
    expiry_month: int = Field(..., ge=1, le=12)
    expiry_year: int = Field(..., ge=2024, le=2035)
    cvv: str = Field(..., pattern=r"^\d{3,4}$")
    installment_months: int = Field(default=0, ge=0, le=12)

class BankTransferRequest(BaseModel):
    method: Literal["BANK_TRANSFER"] = "BANK_TRANSFER"
    bank_code: str = Field(..., pattern=r"^\d{3}$")
    account_number: str = Field(..., pattern=r"^\d{10,14}$")
    account_holder: str = Field(..., min_length=2, max_length=20)

class EasyPayRequest(BaseModel):
    method: Literal["EASY_PAY"] = "EASY_PAY"
    provider: str = Field(..., pattern="^(TOSS|KAKAO|NAVER)$")
    token: str

# Discriminated Union — method 필드를 기준으로 자동 판별
PaymentMethodRequest = Annotated[
    Union[CardPaymentRequest, BankTransferRequest, EasyPayRequest],
    Field(discriminator="method"),
]

class CreatePaymentRequest(BaseModel):
    amount: int = Field(..., gt=0, le=100_000_000)
    currency: str = Field(default="KRW", pattern="^(KRW|USD|JPY)$")
    order_id: str = Field(..., min_length=1, max_length=64)
    payment_method: PaymentMethodRequest  # 결제 수단에 따라 다른 스키마 적용
    description: str | None = Field(None, max_length=200)

# 요청 예시:
# {"amount": 50000, "order_id": "ORD-001",
#  "payment_method": {"method": "CARD", "card_number": "1234567890123456", ...}}
# → CardPaymentRequest로 자동 파싱

# {"amount": 50000, "order_id": "ORD-002",
#  "payment_method": {"method": "BANK_TRANSFER", "bank_code": "004", ...}}
# → BankTransferRequest로 자동 파싱

응답 모델

response에서 데이터를 가공해서 필요한 응답을 제공해보자.

간편하게 즉시 연산하여 반환할 수 있다.

from pydantic import BaseModel, ConfigDict, computed_field
from datetime import datetime

class PaymentResponse(BaseModel):
    """결제 응답 — 민감 정보 마스킹 포함"""
    model_config = ConfigDict(
        from_attributes=True,     # ORM 객체에서 자동 변환 (= ModelMapper)
        populate_by_name=True,    # 별칭과 원래 이름 모두 허용
    )

    payment_id: UUID
    status: PaymentStatus
    amount: int
    currency: str
    fee: int
    description: str | None
    created_at: datetime
    completed_at: datetime | None

    # Computed Field — 파생 값 자동 계산
    @computed_field
    @property
    def total_amount(self) -> int:
        """수수료 포함 총액"""
        return self.amount + self.fee

    @computed_field
    @property
    def masked_card_number(self) -> str | None:
        """카드번호 마스킹 (핀테크 필수)"""
        if hasattr(self, "_card_number") and self._card_number:
            return f"{'*' * 12}{self._card_number[-4:]}"
        return None

# 목록 조회용 간소화 응답
class PaymentSummary(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    payment_id: UUID
    status: PaymentStatus
    amount: int
    created_at: datetime

# 페이지네이션 래퍼 (제네릭)
T = TypeVar("T", bound=BaseModel)

class PaginatedResponse(BaseModel, Generic[T]):
    items: list[T]
    total_count: int
    page: int
    size: int

    @computed_field
    @property
    def total_pages(self) -> int:
        return (self.total_count + self.size - 1) // self.size

    @computed_field
    @property
    def has_next(self) -> bool:
        return self.page < self.total_pages

Custom Serializer

api 응답할 때 dto 수준에서 금액을 바로 포맷팅 해보자.

from pydantic import field_serializer, field_validator

class MoneyResponse(BaseModel):
    amount: int           # 내부: 정수 (원 단위)
    currency: str

    @field_serializer("amount")
    def serialize_amount(self, value: int, _info) -> str:
        """API 응답에서 금액을 포맷팅"""
        if self.currency == "KRW":
            return f"{value:,}원"
        return f"{value / 100:.2f}"  # USD 등은 센트 단위

    # 역직렬화 시 (요청 받을 때) 타입 변환
    @field_validator("amount", mode="before")
    @classmethod
    def parse_amount(cls, v: int | str) -> int:
        if isinstance(v, str):
            return int(v.replace(",", "").replace("원", ""))
        return v

의존성 주입(DI) 심화

Depends()는 "함수를 호출하고, 그 반환값을 파라미터로 주입"하는 것이 전부다.
즉, Spring의 IoC Container와는 근본적으로 다르다.

  • Spring: 컨테이너가 빈의 생명주기를 관리 (생성, 주입, 소멸)
  • FastAPI: Depends()가 호출된 시점에 팩토리 함수를 실행하여 값을 주입

Depends()의 동작 원리

실행 흐름은 다음과 같다.

  1. 요청 도착
  2. FastAPI가 엔드포인트 함수의 파라미터를 분석
  3. Depends()를 발견하면 해당 함수를 호출
  4. Depends() 체이닝이 있으면 재귀적으로 해결
  5. 모든 의존성이 해결되면 엔드포인트 함수 호출
  6. 요청 종료 시 generator 기반 의존성의 cleanup 실행

yield를 사용하면 cleanup 로직을 정의할 수 있다.

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    session = async_session_factory()
    try:
        yield session        # ← 여기서 값이 주입됨
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()  # ← 요청 종료 시 실행됨 (= @PreDestroy)

Depends() 보일러플레이트 해결

문제는 Depends() 팩토리 함수가 끝없이 늘어나는 경우이다.
10개 서비스에서 동일하게 아래 코드가 반복 된다고 가정해보자.

def get_payment_repo(db: AsyncSession = Depends(get_db)) -> PaymentRepository:
    return PaymentRepository(db)

def get_payment_service(
    repo: PaymentRepository = Depends(get_payment_repo),
) -> PaymentService:
    return PaymentService(repo)

이는 크게 두 가지 방식으로 해결한다.

  • Annotated 타입으로 재사용하거나 (Spring의 @Autowired처럼)
  • Class 기반 의존성을 활용하거나 (Spring의 @Component처럼)

Annotated 타입으로 재사용 하는 방법

from typing import Annotated

DBSession = Annotated[AsyncSession, Depends(get_db)]
AuthUser = Annotated[CurrentUser, Depends(get_current_user)]
PaymentSvc = Annotated[PaymentService, Depends(get_payment_service)]

@router.post("/payments")
async def create_payment(
    request: CreatePaymentRequest,
    db: DBSession,              # 깔끔
    user: AuthUser,             # 깔끔
    service: PaymentSvc,        # 깔끔
) -> PaymentResponse:
    ...

Class 기반 의존성으로 처리하는 방법 (Spring @Component 처럼)

class PaymentService:
    """
    __init__에 Depends()를 사용하면 클래스 자체를 Depends()에 넣을 수 있다.
    Spring의 생성자 주입과 거의 동일한 형태이다.
    """
    def __init__(
        self,
        repo: PaymentRepository = Depends(get_payment_repo),
        fee_calc: FeeCalculator = Depends(get_fee_calculator),
        event_bus: EventBus = Depends(get_event_bus),
    ):
        self.repo = repo
        self.fee_calc = fee_calc
        self.event_bus = event_bus

    async def create_payment(self, request: CreatePaymentRequest) -> Payment:
        ...

# 라우터에서 바로 클래스를 Depends에 전달
@router.post("/payments")
async def create_payment(
    request: CreatePaymentRequest,
    service: PaymentService = Depends(),  # ← Depends(PaymentService)와 동일
) -> PaymentResponse:
    return await service.create_payment(request)

 

싱글톤 의존성 (app.state와 lifespan)

FastAPI에서 싱글톤이 필요한 경우들이 있다. 가령, 다음과 같다.

(사실 이렇게 나열할 필요도 없이, 당연히 싱글톤으로 작성되어야 하는 설정 정보들이다.)

  • DB 엔진 (커넥션 풀)
  • Redis 클라이언트
  • HTTP 클라이언트 풀
  • 설정 객체
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 싱글톤 초기화
    app.state.db_engine = create_async_engine(settings.DATABASE_URL)
    app.state.redis = redis.from_url(settings.REDIS_URL)
    app.state.http_client = httpx.AsyncClient(timeout=30.0)

    yield

    # 싱글톤 정리
    await app.state.db_engine.dispose()
    await app.state.redis.close()
    await app.state.http_client.aclose()

app = FastAPI(lifespan=lifespan)

# 싱글톤 접근 패턴
from fastapi import Request

async def get_redis(request: Request) -> redis.Redis:
    return request.app.state.redis

RedisClient = Annotated[redis.Redis, Depends(get_redis)]

async/await

동시성 모델의 근본적 차이

Spring을 경험해보지 못한 사람들에게는 미안하지만, 원활한 이해를 위해 Spring과 비교해보자.

Spring MVC FastAPI
Thread Pool (200 스레드)
- Thread-1: 요청 A 처리 중 (DB 대기... 3초 동안 스레드 점유)
- Thread-2: 요청 B 처리 중 (API 호출 대기... 5초 동안 스레드 점유)
- Thread-3: 요청 C 처리 중
- ... 197개 남은 스레드

→ 동시 처리 한계 = 스레드 수 (200)
→ I/O 대기 중에도 스레드를 점유하므로 비효율적
Event Loop (1개) + 코루틴 (무제한)
- 코루틴 A: DB 쿼리 await → 이벤트 루프에 제어권 반환 → 다른 코루틴 실행
- 코루틴 B: API 호출 await → 이벤트 루프에 제어권 반환 → 다른 코루틴 실행
- 코루틴 C: 처리 중
- ... 수천 개의 코루틴이 하나의 스레드에서 교대 실행

→ 동시 처리 한계 = 메모리 (사실상 무제한)
→ I/O 대기 중 스레드를 해제하므로 매우 효율적

 

Event Loop 방식 덕분에 소규모 프로젝트에서는 얻는 장점이 매우 크다.

그러나 다음의 문제가 동반된다.

  • Event Loop가 막히면 전체 서비스가 막힌다는 점
  • 하나만 async를 써도 전체가 async를 강제로 써야한다는 점 (일부만 비동기 불가능)

비동기 동시 실행 패턴

import asyncio
import httpx

# === 순차 실행 (나쁜 예) ===
async def enrich_payment_sequential(payment_id: UUID) -> dict:
    """3개 API를 순서대로 호출 → 총 3초"""
    fraud = await fraud_client.check(payment_id)        # 1초
    balance = await account_client.get_balance(payment_id)  # 1초
    fee = await fee_client.calculate(payment_id)         # 1초
    return {"fraud": fraud, "balance": balance, "fee": fee}

# === 동시 실행 (좋은 예) ===
async def enrich_payment_concurrent(payment_id: UUID) -> dict:
    """3개 API를 동시에 호출 → 총 1초 (가장 느린 것 기준)"""
    fraud, balance, fee = await asyncio.gather(
        fraud_client.check(payment_id),
        account_client.get_balance(payment_id),
        fee_client.calculate(payment_id),
    )
    return {"fraud": fraud, "balance": balance, "fee": fee}

# === 동시 실행 + 개별 에러 처리 ===
async def enrich_payment_safe(payment_id: UUID) -> dict:
    """일부 실패해도 나머지는 사용"""
    results = await asyncio.gather(
        fraud_client.check(payment_id),
        account_client.get_balance(payment_id),
        fee_client.calculate(payment_id),
        return_exceptions=True,  # 예외를 반환값으로 전환
    )

    return {
        "fraud": results[0] if not isinstance(results[0], Exception) else None,
        "balance": results[1] if not isinstance(results[1], Exception) else None,
        "fee": results[2] if not isinstance(results[2], Exception) else None,
        "errors": [
            str(r) for r in results if isinstance(r, Exception)
        ],
    }

# === TaskGroup ===
async def enrich_payment_taskgroup(payment_id: UUID) -> dict:
    """하나라도 실패하면 나머지도 취소"""
    async with asyncio.TaskGroup() as tg:
        fraud_task = tg.create_task(fraud_client.check(payment_id))
        balance_task = tg.create_task(account_client.get_balance(payment_id))
        fee_task = tg.create_task(fee_client.calculate(payment_id))

    # TaskGroup 블록을 벗어나면 모든 Task 완료 보장
    return {
        "fraud": fraud_task.result(),
        "balance": balance_task.result(),
        "fee": fee_task.result(),
    }

 

사용자가 많아지면 Rate Limit를 강제해야하는 경우가 무조건 생긴다. 

이는 다음과 같이 처리 가능하다. (Semaphore를 통한 동시성 제한)

semaphore = asyncio.Semaphore(10)  # 동시 10개까지

async def call_with_limit(url: str) -> dict:
    async with semaphore:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

# 100개 요청을 동시에 10개씩 처리
results = await asyncio.gather(
    *[call_with_limit(f"/api/item/{i}") for i in range(100)]
)

async def vs def

언제 async를 사용해야할까? 어떤 점에 유의해야할까?

 

async를 사용하면 async로 선언하면 된다. 너무 당연하다.

그럼 async 내에서는 딱히 유의할게 없을까?

 

다음을 살펴보자.

@router.get("/report")
async def generate_report():
    # time.sleep은 블로킹! 이벤트 루프 전체를 3초간 멈춤
    time.sleep(3)  # 절대 금지

    # requests.get은 블로킹! httpx.AsyncClient를 써야 함
    result = requests.get("https://api.example.com")  # 절대 금지

앞서 말한것처럼 FastAPI는 하나의 Event Loop를 공유한다.

위처럼 블로킹 함수를 async 안에서 선언하면 매우 위험하다.

 

그렇다면 어떻게 처리할 수 있을까? 방법은 다양할 것이다.

  • 블로킹 함수를 async로 처리한다.
  • 함수를 애당초 동기 함수로 만들어버린다.
  • 블로킹 함수를 별도 쓰레드로 넘긴다.
# async 함수 + async 라이브러리
@router.get("/report")
async def generate_report():
    await asyncio.sleep(3)  # 비동기 sleep
    async with httpx.AsyncClient() as client:
        result = await client.get("https://api.example.com")  # 비동기 HTTP
# 동기 함수 (블로킹 코드를 써야 할 때)
@router.get("/report")
def generate_report():  # async 아님!
    # FastAPI가 자동으로 스레드풀에서 실행
    time.sleep(3)  # 이건 괜찮음 (별도 스레드)
    result = requests.get("https://api.example.com")  # 이것도 괜찮음
# async 함수 안에서 동기 코드를 실행해야 할 때
@router.get("/report")
async def generate_report():
    # 동기 함수를 스레드풀에 위임
    result = await asyncio.to_thread(
        generate_heavy_pdf,  # CPU 바운드 동기 함수
        data=report_data,
    )
    return FileResponse(result)​

 

조금 헷갈릴 수 있어서,

자주 마주할 수 있는 상황을 기준으로 다음과 같이 정리해본다.

async 라이브러리 사용 (asyncpg, httpx, aioredis) async def 논블로킹 I/O 활용
동기 라이브러리 사용 (psycopg2, requests) def FastAPI가 스레드풀 처리
CPU 연산 (PDF 생성, 암호화, 데이터 변환) def CPU 바운드는 async 의미 없음
여러 외부 서비스 동시 호출 async def + gather 병렬 I/O
파일 읽기/쓰기 def 또는 aiofiles 디스크 I/O
WebSocket 처리 async def 필수
의심스러울 때 def 안전한 기본값

 

NORMAL j/k: 이동 · Enter: 열기 · /: 검색 · ?: 도움말