
RAG, pgvector, AWS를 활용한 AI 문서 처리기 구축
요약
RAG, pgvector, AWS를 활용하여 PDF, 이미지 등 다양한 문서를 처리하고 자연어로 대화할 수 있는 풀스택 AI 플랫폼 구축 사례를 소개합니다. 비동기 파이프라인과 WebSocket을 통해 실시간 처리 상태를 제공하며, 답변의 근거를 명확히 하는 그라운딩 기술이 핵심입니다.
핵심 포인트
- S3, Textract, Claude, pgvector를 결합한 비동기 처리 파이프라인 구축
- Presigned URL을 활용한 효율적인 파일 업로드 방식 적용
- WebSocket을 통한 실시간 문서 처리 상태 모니터링 구현
- 출처 인용(Citation)을 포함하여 답변의 신뢰성을 높인 RAG 시스템
내가 만든 것
PDF, 송장, 계약서, 이미지 등 어떤 문서든 업로드하면 시스템이 자동으로 텍text를 추출하고, 문서 유형을 분류하며, 요약을 생성하고, 자연어로 대화할 수 있는 풀스택(Full-stack) AI 플랫폼입니다.
사용자 흐름:
- PDF 또는 이미지 업로드 — Presigned URL을 통해 S3로 직접 전송됩니다.
- Textract, Claude, 임베딩(Embedding) 파이프라인이 실행되는 실시간 처리 상태를 WebSocket을 통해 확인합니다.
- AI가 생성한 분류 및 요약을 읽습니다.
- 자연어로 질문합니다 — 인용된 페이지 번호와 함께 답변을 받습니다.
- 단 한 번의 쿼리로 전체 문서 라이브러리를 검색합니다.
단순한 챗봇과의 핵심적인 차이점은 **답변이 사용자의 문서에 근거한다(grounded)**는 점입니다. Claude는 검색된 청크(Chunks)만을 사용할 수 있으며, 모든 답변에는 페이지 번호가 포함된 출처 인용이 포함됩니다.
아키텍처 (Architecture)
Browser
│
▼
...
이 프로젝트는 제가 진행한 세 가지 프로젝트 중 가장 복잡합니다. 네트워킹, 스토리지, 인증(Auth), 데이터베이스, 비동기 처리(Async processing), WebSocket, 컴퓨팅, 모니터링을 아우르는 8개의 CDK 스택으로 구성되어 있습니다. 비동기 처리 파이프라인(S3 → Lambda → Textract → Lambda → Claude → pgvector)은 사용자 대상 API를 처리하는 FastAPI 백엔드와 분리된 아키텍처의 핵심입니다.
기술 스택 (Tech Stack)
| 계층 (Layer) | 기술 (Technology) |
|---|---|
| 프론트엔드 (Frontend) | React 18 + TypeScript + Vite + TailwindCSS + React Query |
| ... |
사용된 AWS 서비스
| 서비스 (Service) | 용도 (Purpose) |
|---|---|
| ECS Fargate | FastAPI를 위한 서버리스 컨테이너 호스팅 |
| ... |
처리 파이프라인 (The Processing Pipeline)
사용자가 문서를 업로드하면 FastAPI 요청 사이클 외부에서 완전히 실행되는 비동기 파이프라인이 트리거됩니다.
1단계 — Presigned URL을 통한 직접 S3 업로드
프론트엔드는 백엔드로부터 Presigned POST URL을 요청한 다음, S3로 파일을 직접 업로드합니다. 이 과정에서 바이너리 데이터는 FastAPI 서버를 거치지 않습니다.
# api/v1/upload.py
async def get_presigned_url(request: PresignedUrlRequest, user=Depends(get_current_user)):
s3_key = f"{user.id}/{doc_id}/{request.file_name}"
...
업로드가 완료되면 프론트엔드는 /api/v1/upload/complete/{doc_id}를 호출하며, 이는 비동기 파이프라인 (async pipeline)을 트리거합니다.
2단계 — Lambda Orchestrator: Textract
S3의 ObjectCreated 이벤트가 Orchestrator Lambda를 트리거하며, 이 Lambda는 비동기 Textract 작업을 시작하고 문서 상태를 processing으로 업데이트합니다.
Textract는 비동기적으로 실행됩니다. 20페이지 분량의 PDF의 경우 30~60초가 소요될 수 있습니다. Textract 작업이 완료되면 SNS 토픽으로 완료 이벤트를 발행하며, 이는 Processor Lambda를 트리거합니다.
3단계 — Lambda Processor: Claude + Voyage AI + pgvector
이 단계는 가장 복잡한 Lambda입니다. 다음 6단계를 순차적으로 실행합니다:
# lambdas/processor/handler.py
def handler(event, context):
for sns_record in event["Records"]:
...
**Claude 분류 프롬프트 (classification prompt)**는 구조화된 JSON을 반환합니다. 여기에는 문서 유형(송장, 계약서, 보고서 등)과 2~3문장의 요약이 포함됩니다. Pydantic은 저장하기 전에 JSON을 검증합니다.
청킹 전략 (Chunking strategy): 청크당 800자, 청크 간 100자의 중첩(overlap)을 적용합니다. 각 청크는 해당 페이지 번호를 저장하며, 이는 최종 답변에서 페이지 단위 인용 (page-level citations)을 가능하게 합니다.
Voyage AI 임베딩 (embeddings): 각 청크는 voyage-3를 사용하여 단일 배치 API 호출(batched API call)로 임베딩됩니다. 청크당 한 번씩 호출하는 것이 아니라, 문서 내의 모든 청크를 한 번의 호출로 처리합니다.
4단계 — RAG Chat: Embed → Retrieve → Stream
사용자가 질문을 하면, RAG 파이프라인이 FastAPI 내부에서 실행됩니다:
# services/rag_service.py
async def embed_question(question: str) -> list[float]:
client = voyageai.Client(api_key=settings.voyage_api_key)
...
가장 유사한 상위 5개의 청크 (chunks)가 컨텍스트 블록 (context block)으로 조립되어 Claude에게 전달됩니다. Claude는 제공된 발췌문만을 사용하도록 제한됩니다. 만약 답변이 해당 내용에 없다면, Claude는 없다고 답변합니다. 응답은 SSE (Server-Sent Events)를 통해 브라우저로 단어 단위로 스트리밍 (streaming) 됩니다.
WebSocket — 실시간 처리 상태
문서를 처리하는 데는 30~90초가 소요됩니다. 폴링 (polling) 방식 대신, 브라우저는 페이지 로드 시 API Gateway를 통해 WebSocket 연결을 엽니다. Processor Lambda가 작업을 완료하면 WebSocket Lambda를 호출하며, 이 Lambda는 브라우저로 상태 업데이트를 직접 푸시 (push) 합니다.
Browser ──WebSocket──► API Gateway ──► Lambda WS handler
│
DynamoDB ◄───────┘
...
WebSocket 연결 저장소 (DynamoDB)는 connection_id → user_id를 매핑합니다. Processor Lambda가 _notify_websocket(doc_id, status)를 호출하면, WS Lambda는 사용자의 활성 연결을 조회하여 모든 연결에 업데이트를 푸시합니다. 사용자가 여러 브라우저 탭을 열어두었더라도 모든 탭이 동시에 업데이트됩니다.
Cognito Auth — 사용자별 문서 격리
모든 API 요청에는 유효한 Cognito JWT가 필요합니다. FastAPI 의존성 (dependency)인 get_current_user는 Cognito의 JWKS 엔드포인트(endpoint)를 통해 토큰을 검증하고 sub 클레임 (claim)을 사용자 ID로 추출합니다.
# core/security.py
async def verify_cognito_token(token: str) -> dict:
jwks_url = f"https://cognito-idp.{settings.cognito_region}.amazonaws.com/{settings.cognito_user_pool_id}/.well-known/jwks.json"
...
모든 데이터베이스 쿼리는 owner_id = current_user.id로 필터링됩니다. 즉, 사용자는 자신의 문서만 볼 수 있습니다. 이는 라우트 (route) 계층이 아닌 리포지토리 (repository) 계층에서 강제됩니다.
API 엔드포인트 (Endpoints)
| Method | Endpoint | Description |
|---|---|---|
| GET | /health | 상태 확인 (Health check) |
| ... | ||
![]() |
AWS CDK를 활용한 인프라 구성 — 8개의 스택 (Stacks)
각 스택은 단일 관심사 (single concern)를 가지며 독립적으로 배포됩니다.
| Stack | 생성 항목 |
|---|---|
DocuAI-Network | VPC, 2개의 가용 영역 (AZs), NAT 게이트웨이 (NAT Gateway), S3 + SQS VPC 엔드포인트 |
| ... |
# infrastructure/stacks/processing_stack.py (간략화 버전)
orchestrator = lambda_.Function(
self, "Orchestrator",
...
데이터베이스 스택은 removal_policy=RETAIN을 사용합니다. 즉, cdk destroy를 실행하더라도 Aurora는 절대 삭제되지 않습니다. 이는 인프라 해체 과정에서 발생할 수 있는 예기치 않은 데이터 손실을 방지합니다.
Docker Compose를 활용한 로컬 개발
모든 AWS 서비스는 LocalStack을 사용하여 로컬에서 에뮬레이션되므로, AWS 자격 증명이나 비용 없이 개발할 수 있습니다:
# Postgres + Redis + LocalStack 시작
docker-compose up -d
...
USE_TEXTRACT_STUB=true 환경 변수를 사용하면 실제 Textract를 건너뛰고 스텁 (stub) 문서를 사용합니다. 따라서 전체 파이프라인 (pipeline)이 AWS 호출 없이 로컬에서 실행됩니다.
테스트 전략
cd backend
pytest tests/ -v --cov=app # 커버리지 (coverage)를 포함한 모든 테스트
pytest tests/api/test_upload.py # 업로드 엔드포인트 (upload endpoints)
...
모든 AWS 서비스는 moto를 통해 모킹 (mocked)됩니다. Claude와 Voyage AI는 서비스 수준에서 모킹되어 테스트 중에 실제 API 호출이 발생하지 않습니다.
배운 점 (Lessons Learned)
1. 무거운 AI 워크로드에는 비동기 처리 (Async processing) 패턴이 적합합니다
API 요청과 분리된 Lambda에서 Textract + Claude + 임베딩 (embedding)을 실행하면 HTTP 호출이 즉시 반환됩니다. 사용자는 타임아웃이 발생할 수 있는 60초 동안의 HTTP 요청을 기다리는 대신, WebSocket을 통해 진행 상황을 확인할 수 있습니다.
2. API Gateway + Lambda를 통한 WebSocket은 보기보다 간단합니다
연결 저장소(연결 ID를 사용자 ID에 매핑하는 DynamoDB)가 핵심 통찰입니다. 이것이 구축되면, 어떤 Lambda에서든 WebSocket 관리 엔드포인트로 한 줄의 API 호출만 하면 업데이트를 푸시할 수 있습니다.
3. 배치 임베딩 (Batch embeddings) — 청크(chunk)당 한 번이 아닌 문서당 한 번의 API 호출
청크당 한 번씩 호출하는 대신, 단일 배치 호출(모든 청크를 한 번에)로 임베딩을 생성하면 10배 더 빠르고 비용도 훨씬 저렴합니다. Voyage AI의 client.embed([text1, text2, ...])는 배치를 네이티브로 처리합니다.
4. Aurora Serverless v2의 pgvector는 프로덕션 환경에 적합한 RAG 저장소입니다
Aurora Serverless v2는 유휴 상태일 때 0으로 스케일링되며, ivfflat 인덱싱을 통해 pgvector 코사인 유사도 (cosine similarity) 쿼리를 처리합니다. 중간 정도의 트래픽을 가진 문서 Q&A 워크로드의 경우, 전용 벡터 데이터베이스 (vector database)보다 저렴하고 간단합니다.
5. 8개의 CDK 스택을 통해 인프라를 관리 가능한 수준으로 유지했습니다
스택당 하나의 관심사를 유지하면 cdk diff DocuAI-Processing-dev 명령 시 200개의 리소스 전체가 아닌 Lambda 변경 사항만 표시됩니다. 또한 스택 간에는 명확한 의존성 체인(Network → Storage/Auth → Database → Processing/WebSocket → Compute → Monitoring)이 존재합니다.
6. Cognito JWKS 캐싱은 필수적입니다
캐싱이 없으면 모든 API 요청마다 JWKS 엔드포인트를 가져와야 하므로, 200ms 이상의 지연 시간 (latency)이 추가되고 부하 발생 시 Cognito 속도 제한 (rate limits)에 걸리게 됩니다. PyJWKClient(jwks_url, cache_keys=True)를 사용하면 이 두 가지 문제를 모두 해결할 수 있습니다.
7. USE_TEXTRACT_STUB를 통한 로컬 개발 가능성 확보
실제 Textract를 사용하려면 실제 AWS 버킷에 S3 객체가 있어야 하며, 문서당 10~30초가 소요됩니다. 고정된 텍스트 문자열을 반환하는 스텁 (stub)을 사용하면 Claude 분류, 청킹 (chunking), 임베딩 (embedding), pgvector 저장에 이르는 전체 파이프라인을 로컬에서 1초 미만으로 실행할 수 있습니다.
GitHub
주요 코드 샘플 (RAG 서비스, Lambda 프로세서, CDK 스택)은 GitHub에서 확인할 수 있습니다:
👉 github.com/sanjaypatoliya/ai-document-processor-sample
저자 소개
저는 Sanjay Patoliya입니다. AWS에서 프로덕션 수준의 AI 시스템을 구축하는 AWS Certified Solutions Architect이자 DevOps Engineer입니다. 이와 유사한 시스템 구축을 원하시거나 AWS + AI 원격 엔지니어가 필요하시다면 언제든 연락해 주세요.
- 포트폴리오: sanjaypatoliya.com
- LinkedIn: linkedin.com/in/sanjaykumar-patoliya-b234a287
- GitHub: github.com/sanjaypatoliya
AI 자동 생성 콘텐츠
본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.
원문 바로가기


