마이크로서비스 아키텍처에서 정확히 한 번 전달(Exactly-once delivery)을 보장하는 견고한 서버 측 이벤트 버스 구축하기
요약
마이크로서비스 아키텍처에서 이벤트의 정확히 한 번 전달(Exactly-once delivery)을 보장하는 서버 측 이벤트 버스 구축 방법을 다룹니다. PostgreSQL을 내구 저장소로, Redis를 작업 큐로 활용하여 멱등성과 신뢰성을 확보하는 설계 패턴을 제시합니다.
핵심 포인트
- PostgreSQL을 활용한 내구성이 있는 이벤트 로그 설계
- Redis 기반의 작업 큐를 통한 발행과 전달의 분리
- 멱등성 키를 이용한 정확히 한 번 전달(Exactly-once) 구현
- 재생 및 복구를 지원하는 소비자 프로토콜 및 재시도 메커니즘
마이크로서비스 아키텍처에서 정확히 한 번 전달(Exactly-once delivery)을 보장하는 견고한 서버 측 이벤트 버스 구축하기
마이크로서비스 아키텍처에서 정확히 한 번 전달(Exactly-once delivery)을 보장하는 견고한 서버 측 이벤트 버스 구축하기
현대적인 분산 시스템(Distributed systems)에서 서비스 간 도메인 이벤트(Domain events)를 신뢰할 수 있고 정확히 한 번(Exactly-once) 전달되도록 보장하는 것은 실질적인 도전 과제입니다. 이 튜토리얼에서는 멱등성 핸들러(Idempotent handlers), 내구성이 있는 저장소(Durable storage), 그리고 세심한 조정(Coordination)을 사용하여 이벤트 전달에 대해 정확히 한 번 의미론(Exactly-once semantics)을 제공하는 경량 서버 측 이벤트 버스를 설계하고 구현하는 과정을 살펴봅니다. 우리는 아키텍처 선택, 데이터 모델링, 멱등성 키(Idempotency keys), 그리고 내구 저장소로 PostgreSQL을, 빠른 인메모리 큐(In-memory queue)로 Redis를 사용하는 Python 기반의 구체적인 코드 예제를 다룰 것입니다. 여러분은 실제 프로젝트에 적용할 수 있는 실용적인 청사진을 얻게 될 것입니다.
사전 요구 사항
- 마이크로서비스 아키텍처(Microservice architecture) 개념에 대한 숙련도
- 기본적인 Python 지식 (비동기 I/O (Async I/O)는 선택 사항이지만 권장됨)
- PostgreSQL 및 Redis 설치 또는 접근 가능 상태
- Docker (선택 사항이지만 로컬 설정에 유용함)
접근 방식 개요
- 고유한 이벤트 ID와 메타데이터를 포함하여 발행된 이벤트를 저장하기 위해 PostgreSQL의 내구성이 있는 이벤트 로그(Durable event log)를 사용합니다.
- 소비자(Consumer)당 멱등성 키(Idempotency key), 지속되는 상태(Persisted state), 그리고 전달 시도 추적 메커니즘을 사용하여 정확히 한 번 전달(Exactly-once delivery)을 구현합니다.
- Redis를 작업 큐(Work queue)로 사용하여 발행(Publishing)과 전달(Delivery)을 분리하는 동시에, 적절한 경우 소비자별 순서 보장(Ordering guarantees)을 유지합니다.
- 재생(Replay) 및 복구(Recovery)를 지원하는 소비자 프로토콜과, 제한적이고 멱등성이 보장되는 최소 한 번(At-least-once) 재시도 메커니즘을 제공합니다.
- 테스트 전략 및 운영 고려 사항(지표(Metrics), 모니터링(Monitoring), 배압(Backpressure), 장애 모드(Failure modes))을 포함합니다.
시스템 아키텍처 다이어그램 (개념적)
- 이벤트 발행자 (Event Publisher) -> 이벤트 버스 (Event Bus) (PostgreSQL 기반 로그, 내구성이 있는 저장소)
- 이벤트 버스 (Event Bus) -> 전달 워커 (Delivery Workers) (Redis 큐) -> 소비자 서비스 (Consumer services)
- 각 소비자는 처리된 이벤트를 추적하기 위해 멱등성 테이블 (Idempotency Table)을 유지함 (consumer_id, event_id, status)
- 이벤트 버스 (Event Bus)는 발행자를 위한 REST/gRPC API와 소비자를 위한 전달 엔드포인트(delivery endpoint)를 노출함 (또는 재시도가 포함된 웹훅(webhook) 방식의 콜백 사용)
- 운영 고려 사항: 데드 레터 큐 (dead-letter queue), 재시도 정책 (retry policy), 백오프 (backoff), 메트릭 (metrics) 및 알림 (alerting)
데이터 모델 설계 (Data model design)
- events 테이블: 발행된 모든 이벤트를 저장
- event_id (UUID, 기본 키)
- publisher_id (string)
- topic (string)
- payload (jsonb)
- created_at (timestamp)
- status (enum: 'published', 'failed' 등)
- version (int, 이벤트 스키마 버전 관리를 위함)
- delivery_attempts 테이블: 각 소비자에게 전달되는 시도를 추적
- id (serial)
- event_id (UUID, 외래 키)
- consumer_id (string)
- attempt_id (UUID, 시도당 고유값)
- status (enum: 'in_progress', 'delivered', 'failed', 'skipped')
- next_redeliver_at (timestamp)
- created_at (timestamp)
- updated_at (timestamp)
- consumer_idempotency 테이블: 소비자별 멱등성 레지스트리
- consumer_id (string)
- event_id (UUID)
- status (enum: 'processed', 'failed', 'in_progress')
- processed_at (timestamp)
- 기본 키 (consumer_id, event_id)
엔드 투 엔드 흐름 (End-to-end flow)
-
발행자가 이벤트 버스에 이벤트를 생성함: status를 'published'로 하여 events 테이블에 삽입하고 event_id를 생성함.
-
컨슈머(Consumer)가 고유한 delivery_attempt_id와 event_id를 포함한 이벤트를 수신함 (HTTP POST 또는 웹훅(webhook)을 통해).
-
컨슈머가 멱등성(idempotently)을 유지하며 이벤트를 처리함:
- consumer_idempotency 테이블에서 (consumer_id, event_id)를 확인. 이미 'processed' 상태라면 성공 응답을 보내고 재처리를 건너뜀.
- 아직 처리되지 않았다면, 컨슈머의 상태(state)에 이벤트를 결정론적(deterministically)으로 적용함.
- 성공 시, consumer_idempotency에 기록(status 'processed', processed_at)하고 delivery_attempts의 상태를 'delivered'로 업데이트함.
- 일시적인 실패(transient failure) 발생 시, delivery_attempts의 상태를 'failed'로 업데이트하고 지수 백오프(exponential backoff)를 적용하여 next_redeliver_at을 설정함.
-
모니터링 및 보상(compensation):
- 컨슈머가 임계값을 초과하여 응답하지 않는 경우, 이벤트를 데드 레터 큐(dead-letter queue)로 이동하거나 별도의 알림을 생성함.
- 운영자가 중복을 방지하기 위한 멱등성 체크를 유지하면서, 특정 컨슈머를 위해 이벤트를 다시 인큐(re-enqueuing)하여 이벤트를 재실행(replay)할 수 있도록 허용함.
실제 코드 예시
- 기술 스택: Python 3.11, PostgreSQL을 위한 async SQLAlchemy, Redis를 위한 aioredis, HTTP 엔드포인트를 위한 FastAPI.
- 참고: 이 예시는 핵심 패턴에 집중합니다. 실제 배포 시에는 적절한 마이그레이션(migrations), 커넥션 풀링(connection pooling), 그리고 운영 환경 수준의 에러 핸들링(error handling)이 필요합니다.
- 데이터베이스 스키마 (SQL)
events 테이블
CREATE TABLE events (
event_id UUID PRIMARY KEY,
publisher_id TEXT NOT NULL,
topic TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
status TEXT NOT NULL DEFAULT 'published',
version INT NOT NULL DEFAULT 1
);
delivery_attempts 테이블
CREATE TABLE delivery_attempts (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
event_id UUID NOT NULL REFERENCES events(event_id),
consumer_id TEXT NOT NULL,
attempt_id UUID NOT NULL,
status TEXT NOT NULL,
next_redeliver_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(event_id, consumer_id, attempt_id)
);
consumer idempotency 테이블
CREATE TABLE consumer_idempotency (
consumer_id TEXT NOT NULL,
event_id UUID NOT NULL,
status TEXT NOT NULL,
processed_at TIMESTAMPTZ,
PRIMARY KEY (consumer_id, event_id)
);
- Publisher client (publish_event.py)
import uuid
import json
import asyncio
from datetime import datetime
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/events"
engine = create_async_engine(DATABASE_URL)
async def publish_event(publisher_id: str, topic: str, payload: dict, version: int = 1):
event_id = uuid.uuid4()
async with AsyncSession(engine) as session:
await session.execute(
text("""
INSERT INTO events (event_id, publisher_id, topic, payload, created_at, status, version)
VALUES (:event_id, :publisher_id, :topic, :payload, NOW(), 'published', :version)
""""),
{
"event_id": event_id,
"publisher_id": publisher_id,
"topic": topic,
"payload": json.dumps(payload),
"version": version,
},
)
await session.commit()
return event_id
async def main():
event_id = await publish_event("order-service", "order.created", {"order_id": 1234, "amount": 99.99})
print(f"Published event {event_id}")
if name == "main":
asyncio.run(main())
- Delivery worker (deliveries.py)
import asyncio
import uuid
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
import aioredis
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/events"
REDIS_URL = "redis://localhost"
engine = create_async_engine(DATABASE_URL)
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
async def enqueue_delivery(event_id: str, consumer_id: str, attempt_id: str):
await redis.lpush(f"deliveries:{consumer_id}", json.dumps({
"event_id": event_id,
"attempt_id": attempt_id
}))
async def schedule_deliveries():
async with AsyncSession(engine) as session:
result = await session.execute(text("""
SELECT event_id, topic FROM events
WHERE status = 'published'
""""))
for row in result:
event_id = str(row.event_id)
for consumer_id in ["warehouse-service", "billing-service", "analytics-service"]:
attempt_id = str(uuid.uuid4())
await session.execute(text("""
INSERT INTO delivery_attempts (event_id, consumer_id, attempt_id, status)
VALUES (:event_id, :consumer_id, :attempt_id, 'in_progress')
ON CONFLICT DO NOTHING
""""), {
"event_id": event_id,
"consumer_id": consumer_id,
"attempt_id": attempt_id
})
await enqueue_delivery(event_id, consumer_id, attempt_id)
await session.commit()
async def delivery_worker_loop():
while True:
전달 태스크를 꺼냅니다 (Pop a delivery task)
raw = await redis.brpop("deliveries:warehouse-service", timeout=5)
if raw:
payload = json.loads(raw)
event_id = payload["event_id"]
attempt_id = payload["attempt_id"]
여기에서 컨슈머 리스너 엔드포인트(consumer listener endpoint) 또는 웹훅 호출기(webhook invoker)를 호출합니다
데모를 위해, 전달을 시뮬레이션합니다
success = await simulate_delivery(event_id, "warehouse-service", attempt_id)
성공 여부에 따라 delivery_attempts 및 멱등성(idempotency)을 업데이트합니다
async with AsyncSession(engine) as session:
status = 'delivered' if success else 'failed'
await session.execute(text("""
UPDATE delivery_attempts
SET status = :status, updated_at = NOW(), next_redeliver_at = CASE WHEN :status = 'failed' THEN NOW() + INTERVAL '5 minutes' ELSE NULL END
WHERE event_id = :event_id AND consumer_id = :consumer_id AND attempt_id = :attempt_id
"""), {
"status": status,
"event_id": event_id,
"consumer_id": "warehouse-service",
"attempt_id": attempt_id
})
if success:
await session.execute(text("""
INSERT INTO consumer_idempotency (consumer_id, event_id, status, processed_at)
VALUES (:consumer_id, :event_id, 'processed', NOW())
ON CONFLICT (consumer_id, event_id) DO UPDATE SET status = 'processed', processed_at = NOW()
"""), {
"consumer_id": "warehouse-service",
"event_id": event_id
})
await session.commit()
await asyncio.sleep(0.1)
async def simulate_delivery(event_id, consumer_id, attempt_id):
컨슈머로의 실제 HTTP 호출을 위한 플레이스홀더 (Placeholder)
await asyncio.sleep(0.05)
데모를 위해 성공 여부를 무작위로 결정합니다
import random
return random.random() > 0.2
if name == "main":
import asyncio
asyncio.run(schedule_deliveries())
asyncio.run(delivery_worker_loop())
- Idempotent 처리를 위한 Consumer API (FastAPI)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
import uuid
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
from fastapi.responses import JSONResponse
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/events"
engine = create_async_engine(DATABASE_URL)
app = FastAPI(title="Event Consumer Endpoint")
class Envelope(BaseModel):
event_id: str
consumer_id: str
payload: dict
attempt_id: str
async def is_idempotent(consumer_id: str, event_id: str):
async with AsyncSession(engine) as session:
result = await session.execute(text("""
SELECT status FROM consumer_idempotency WHERE consumer_id = :consumer_id AND event_id = :event_id
""""), {"consumer_id": consumer_id, "event_id": event_id})
row = result.fetchone()
return row is not None and row == 'processed'
async def mark_processed(consumer_id: str, event_id: str):
async with AsyncSession(engine) as session:
await session.execute(text("""
INSERT INTO consumer_idempotency (consumer_id, event_id, status, processed_at)
VALUES (:consumer_id, :event_id, 'processed', NOW())
ON CONFLICT (consumer_id, event_id) DO UPDATE SET status = 'processed', processed_at = NOW()
""""), {"consumer_id": consumer_id, "event_id": event_id})
await session.commit()
@app.post("/consume")
async def consume(envelope: Envelope):
if await is_idempotent(envelope.consumer_id, envelope.event_id):
return JSONResponse(content={"status": "skipped", "reason": "already processed"}, status_code=200)
이벤트 페이로드 처리 (도메인별 로직은 여기에 위치해야 함)
try:
# 예시: 로컬 상태에 적용 (가상)
...
If you deploy this service, ensure proper authentication and validation for incoming events
운영 고려 사항
AI 자동 생성 콘텐츠
본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.
원문 바로가기