나의 RAG 파이프라인에 스트리밍 추가하기 — 세 가지 SDK, 세 가지 서로 다른 API
요약
RAG 파이프라인에 SSE(Server-Sent Events)를 활용한 응답 스트리밍을 구현하는 방법을 다룹니다. OpenAI, Groq, Anthropic 등 각 SDK별 스트리밍 API의 차이점과 사용자 경험 개선을 위한 설계 패턴을 설명합니다.
핵심 포인트
- 스트리밍을 통해 첫 토큰 도착 시간을 단축하여 사용자 경험(UX)을 개선함
- RAG 구조상 토큰은 실시간 스트리밍하고, 출처(sources)는 생성이 완료된 후 마지막에 전송함
- OpenAI/Groq는 stream=True 파라미터를 사용하며, Anthropic은 전용 컨텍스트 매니저를 사용함
- LLMClient 추상화 클래스에 generate와 stream 메서드를 분리하여 설계함
v3에서 저는 cross-encoder reranker를 추가했습니다. 이번 기능은 더 단순했지만 모든 계층을 건드렸습니다: Server-Sent Events (SSE)를 통한 응답 스트리밍 (streaming responses).
목표는 다음과 같습니다: 전체 답변을 위해 3~5초를 기다리는 대신, LLM이 토큰을 생성하는 즉시 보여주기 시작하는 것입니다. 출처(sources)는 여전히 마지막에 도착합니다.
RAG에서 스트리밍이 중요한 이유
스트리밍이 없으면 사용자 경험은 다음과 같습니다: 클릭 → 대기 → 텍스트의 벽. 스트리밍이 있으면 첫 번째 토큰이 약 200ms 내에 도착합니다. 사용자는 모델이 여전히 생성 중인 동안 읽기 시작합니다. 답변은 동일하지만, 즉각적인 느낌을 줍니다.
특히 RAG 파이프라인의 경우, 설계상의 질문이 있습니다: 출처를 언제 보낼 것인가? 출처를 인라인(inline)으로 스트리밍할 수는 없습니다. LLM은 생성 과정에서 구조화된 출처 메타데이터를 생성하지 않기 때문입니다. 따라서 다음과 같은 패턴이 됩니다:
SSE event 1: {"token": "The"}
SSE event 2: {"token": " list"}
SSE event 3: {"token": " price"}
...
토큰은 실시간으로 스트리밍됩니다. 출처는 LLM 생성이 완료되면 마지막 이벤트로 전송됩니다. 클라이언트는 sources 이벤트를 받았을 때 스트림이 완료되었음을 알게 됩니다.
추상화 (The abstraction)
v3에서 LLMClient는 하나의 메서드를 가졌습니다:
class LLMClient(ABC):
@abstractmethod
def generate(self, system: str, user: str) -> str: ...
이제 두 개를 가집니다:
class LLMClient(ABC):
@abstractmethod
def generate(self, system: str, user: str) -> str: ...
...
입력은 동일하지만 출력 형태가 다릅니다. generate는 문자열(string)을 반환합니다. stream은 문자열 청크(string chunks)를 생성(yield)합니다. 엔드포인트가 어떤 것을 호출할지 결정합니다 — /query는 generate를 호출하고, /query/stream은 stream을 호출합니다.
여기서 흥미로운 점이 발생했습니다: 각 SDK마다 스트리밍 방식이 다릅니다.
세 가지 SDK, 세 가지 스트리밍 API
Groq 및 OpenAI (유사함)
둘 다 OpenAI와 호환되는 stream=True 파라미터를 사용합니다:
def stream(self, system: str, user: str) -> Iterator[str]:
resp = self.client.chat.completions.create(
model=self.model,
...
generate와의 유일한 차이점은 stream=True를 사용하는 것과 .choices[0].message.content를 읽는 대신 청크 (chunks)를 반복(iterating)하는 것입니다. Groq는 OpenAI와 호환되므로 동일한 API 형태를 사용합니다.
Anthropic (다름)
Anthropic의 SDK는 전용 스트리밍 컨텍스트 매니저 (streaming context manager)를 가지고 있습니다:
def stream(self, system: str, user: str) -> Iterator[str]:
with self.client.messages.stream(
model=self.model,
...
client.messages.create(..., stream=True) 대신 client.messages.stream(...)을 사용하며, 이는 완전히 다른 메서드입니다. 또한 chunk.choices[0].delta.content를 파싱하는 대신, 깨끗한 텍스트를 직접 생성하는 resp.text_stream을 반복합니다. with 블록이 연결 정리 (connection cleanup)를 처리합니다.
솔직히 말해서 더 깔끔한 API입니다. 델타 (deltas)에 대한 null 체크도 필요 없고, 중첩된 객체를 파고들 필요도 없습니다. 하지만 이는 하나의 스트리밍 구현을 작성하여 여러 제공업체(providers)와 공유할 수 없음을 의미합니다.
엔드포인트 (The endpoint)
FastAPI의 StreamingResponse가 SSE 전송 (SSE transport)을 처리합니다:
@app.post("/query/stream")
def query_stream(req: QueryRequest) -> StreamingResponse:
# ... /query와 동일한 검색(retrieval) + 재순위화(reranking) ...
...
검색 파이프라인 (임베딩 (embed) → 하이브리드 검색 (hybrid search) → 재순위화 (rerank))은 스트리밍이 시작되기 전에 실행되며, 이는 모두 동기적 (synchronous) 작업입니다. 오직 LLM 생성 단계만 스트리밍됩니다. 즉, 클라이언트는 짧은 일시 정지(검색 + 재순위화)를 경험한 후 토큰 (tokens)이 흐르기 시작하는 것을 보게 됩니다.
출처 (sources) 목록은 스트림이 시작되기 전에 검색된 청크들로부터 구축되므로, 추가 처리 없이 최종 이벤트로 보낼 준비가 되어 있습니다.
테스트하기
curl -N -X POST http://localhost:8000/query/stream \
-H "Content-Type: application/json" \
-d '{"question": "What is the list price of the Magpie-7?", "top_k": 3}'
출력:
data: {"token": "The"}
data: {"token": " list"}
...
-N 플래그는 curl의 출력 버퍼링 (output buffering)을 비활성화하여 토큰이 도착하는 대로 볼 수 있게 합니다.
현재의 파이프라인
PDF ─► 텍스트 추출 (extract text) ─► 청크 (chunk) ─► 임베딩 (embed) (MiniLM-L6-v2)
│
▼
...
동일한 검색 (retrieval) 파이프라인이며, 두 가지 출력 모드가 있습니다. 클라이언트는 어떤 엔드포인트 (endpoint)를 호출할지 선택합니다.
배운 점
-
스트리밍 (Streaming)은 정확도 (accuracy) 기능이 아니라 UX 기능입니다. 답변은 동일합니다. 스트리밍은 단지 사용자가 답변을 언제 보게 되는지를 바꿀 뿐입니다. 하지만 체감되는 지연 시간 (latency)의 차이는 극적입니다.
-
SDK의 분산은 실재합니다. Groq와 OpenAI는 동일한 스트리밍 인터페이스 (OpenAI-compatible)를 공유합니다. 반면 Anthropic은 근본적으로 다른 패턴을 사용합니다. 만약 멀티 프로바이더 (multi-provider) 추상화 계층을 구축하고 있다면, 스트리밍은 매우 복잡해지는 지점입니다. 여기서
LLMClient추상 클래스의 진가가 발휘됩니다. -
출처 (Sources)와 토큰 (tokens)은 별개의 관심사입니다. RAG 파이프라인에서 출처는 LLM이 생성을 시작하기 전에 이미 알고 있는 정보입니다. 이를 최종 SSE 이벤트로 스트리밍하는 것은 깔끔한 분리입니다. 클라이언트는 토큰을 즉시 렌더링하고, 스트림이 종료될 때 출처 인용 (source citations)을 추가할 수 있습니다.
-
FastAPI는 SSE를 매우 간단하게 만듭니다. 제너레이터 함수 (generator function)와
text/event-stream미디어 타입 (media type)을 사용하는StreamingResponse— 그것이 전부입니다. WebSocket 설정도, 특별한 미들웨어 (middleware)도 필요 없습니다.
다음 단계
- 대화 메모리 (Conversation memory) (멀티 턴 후속 질문)
- Streamlit UI 도입 가능성
직접 시도해 보세요
- v4 (streaming): github.com/santanu2908/chat-with-pdf-rag
- v3 (reranker): github.com/santanu2908/chat-with-pdf-rag/tree/v3
- v2 (hybrid retrieval): github.com/santanu2908/chat-with-pdf-rag/tree/v2
- v1 (pure FAISS): github.com/santanu2908/chat-with-pdf-rag/tree/v1
uv sync
cp .env.example .env # API 키를 설정하세요
uv run uvicorn app.main:app --reload
http://localhost:8000/docs를 열고, 샘플 PDF를 업로드한 뒤 /query/stream을 시도해 보세요. 토큰이 하나씩 도착하는 것을 확인할 수 있습니다.
만약 멀티 프로바이더 (multi-provider) 스트리밍을 구축하고 계신다면, SDK 차이점을 어떻게 처리하셨는지 꼭 듣고 싶습니다.
저는 Santanu Mohanta입니다. LinkedIn에서 저와 연결되거나 GitHub에서 제 프로젝트들을 확인해 보세요.
AI 자동 생성 콘텐츠
본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.
원문 바로가기