본문으로 건너뛰기

© 2026 Molayo

Dev.to헤드라인2026. 06. 23. 22:49

나의 RAG 파이프라인에 스트리밍 추가하기 — 세 가지 SDK, 세 가지 서로 다른 API

요약

RAG 파이프라인에 SSE(Server-Sent Events)를 활용한 응답 스트리밍을 구현하는 방법을 다룹니다. OpenAI, Groq, Anthropic 등 각 SDK별 스트리밍 API의 차이점과 사용자 경험 개선을 위한 설계 패턴을 설명합니다.

핵심 포인트

  • 스트리밍을 통해 첫 토큰 도착 시간을 단축하여 사용자 경험(UX)을 개선함
  • RAG 구조상 토큰은 실시간 스트리밍하고, 출처(sources)는 생성이 완료된 후 마지막에 전송함
  • OpenAI/Groq는 stream=True 파라미터를 사용하며, Anthropic은 전용 컨텍스트 매니저를 사용함
  • LLMClient 추상화 클래스에 generate와 stream 메서드를 분리하여 설계함

v3에서 저는 cross-encoder reranker를 추가했습니다. 이번 기능은 더 단순했지만 모든 계층을 건드렸습니다: Server-Sent Events (SSE)를 통한 응답 스트리밍 (streaming responses).

목표는 다음과 같습니다: 전체 답변을 위해 3~5초를 기다리는 대신, LLM이 토큰을 생성하는 즉시 보여주기 시작하는 것입니다. 출처(sources)는 여전히 마지막에 도착합니다.

RAG에서 스트리밍이 중요한 이유

스트리밍이 없으면 사용자 경험은 다음과 같습니다: 클릭 → 대기 → 텍스트의 벽. 스트리밍이 있으면 첫 번째 토큰이 약 200ms 내에 도착합니다. 사용자는 모델이 여전히 생성 중인 동안 읽기 시작합니다. 답변은 동일하지만, 즉각적인 느낌을 줍니다.

특히 RAG 파이프라인의 경우, 설계상의 질문이 있습니다: 출처를 언제 보낼 것인가? 출처를 인라인(inline)으로 스트리밍할 수는 없습니다. LLM은 생성 과정에서 구조화된 출처 메타데이터를 생성하지 않기 때문입니다. 따라서 다음과 같은 패턴이 됩니다:

SSE event 1:  {"token": "The"}
SSE event 2:  {"token": " list"}
SSE event 3:  {"token": " price"}
...

토큰은 실시간으로 스트리밍됩니다. 출처는 LLM 생성이 완료되면 마지막 이벤트로 전송됩니다. 클라이언트는 sources 이벤트를 받았을 때 스트림이 완료되었음을 알게 됩니다.

추상화 (The abstraction)

v3에서 LLMClient는 하나의 메서드를 가졌습니다:

class LLMClient(ABC):
    @abstractmethod
    def generate(self, system: str, user: str) -> str: ...

이제 두 개를 가집니다:

class LLMClient(ABC):
    @abstractmethod
    def generate(self, system: str, user: str) -> str: ...
...

입력은 동일하지만 출력 형태가 다릅니다. generate는 문자열(string)을 반환합니다. stream은 문자열 청크(string chunks)를 생성(yield)합니다. 엔드포인트가 어떤 것을 호출할지 결정합니다 — /querygenerate를 호출하고, /query/streamstream을 호출합니다.

여기서 흥미로운 점이 발생했습니다: 각 SDK마다 스트리밍 방식이 다릅니다.

세 가지 SDK, 세 가지 스트리밍 API

Groq 및 OpenAI (유사함)

둘 다 OpenAI와 호환되는 stream=True 파라미터를 사용합니다:

def stream(self, system: str, user: str) -> Iterator[str]:
    resp = self.client.chat.completions.create(
        model=self.model,
...

generate와의 유일한 차이점은 stream=True를 사용하는 것과 .choices[0].message.content를 읽는 대신 청크 (chunks)를 반복(iterating)하는 것입니다. Groq는 OpenAI와 호환되므로 동일한 API 형태를 사용합니다.

Anthropic (다름)

Anthropic의 SDK는 전용 스트리밍 컨텍스트 매니저 (streaming context manager)를 가지고 있습니다:

def stream(self, system: str, user: str) -> Iterator[str]:
    with self.client.messages.stream(
        model=self.model,
...

client.messages.create(..., stream=True) 대신 client.messages.stream(...)을 사용하며, 이는 완전히 다른 메서드입니다. 또한 chunk.choices[0].delta.content를 파싱하는 대신, 깨끗한 텍스트를 직접 생성하는 resp.text_stream을 반복합니다. with 블록이 연결 정리 (connection cleanup)를 처리합니다.

솔직히 말해서 더 깔끔한 API입니다. 델타 (deltas)에 대한 null 체크도 필요 없고, 중첩된 객체를 파고들 필요도 없습니다. 하지만 이는 하나의 스트리밍 구현을 작성하여 여러 제공업체(providers)와 공유할 수 없음을 의미합니다.

엔드포인트 (The endpoint)

FastAPI의 StreamingResponse가 SSE 전송 (SSE transport)을 처리합니다:

@app.post("/query/stream")
def query_stream(req: QueryRequest) -> StreamingResponse:
    # ... /query와 동일한 검색(retrieval) + 재순위화(reranking) ...
...

검색 파이프라인 (임베딩 (embed) → 하이브리드 검색 (hybrid search) → 재순위화 (rerank))은 스트리밍이 시작되기 전에 실행되며, 이는 모두 동기적 (synchronous) 작업입니다. 오직 LLM 생성 단계만 스트리밍됩니다. 즉, 클라이언트는 짧은 일시 정지(검색 + 재순위화)를 경험한 후 토큰 (tokens)이 흐르기 시작하는 것을 보게 됩니다.

출처 (sources) 목록은 스트림이 시작되기 에 검색된 청크들로부터 구축되므로, 추가 처리 없이 최종 이벤트로 보낼 준비가 되어 있습니다.

테스트하기

curl -N -X POST http://localhost:8000/query/stream \
  -H "Content-Type: application/json" \
  -d '{"question": "What is the list price of the Magpie-7?", "top_k": 3}'

출력:

data: {"token": "The"}

data: {"token": " list"}
...

-N 플래그는 curl의 출력 버퍼링 (output buffering)을 비활성화하여 토큰이 도착하는 대로 볼 수 있게 합니다.

현재의 파이프라인

PDF ─► 텍스트 추출 (extract text) ─► 청크 (chunk) ─► 임베딩 (embed) (MiniLM-L6-v2)
                                        │
                                        ▼
...

동일한 검색 (retrieval) 파이프라인이며, 두 가지 출력 모드가 있습니다. 클라이언트는 어떤 엔드포인트 (endpoint)를 호출할지 선택합니다.

배운 점

  1. 스트리밍 (Streaming)은 정확도 (accuracy) 기능이 아니라 UX 기능입니다. 답변은 동일합니다. 스트리밍은 단지 사용자가 답변을 언제 보게 되는지를 바꿀 뿐입니다. 하지만 체감되는 지연 시간 (latency)의 차이는 극적입니다.

  2. SDK의 분산은 실재합니다. Groq와 OpenAI는 동일한 스트리밍 인터페이스 (OpenAI-compatible)를 공유합니다. 반면 Anthropic은 근본적으로 다른 패턴을 사용합니다. 만약 멀티 프로바이더 (multi-provider) 추상화 계층을 구축하고 있다면, 스트리밍은 매우 복잡해지는 지점입니다. 여기서 LLMClient 추상 클래스의 진가가 발휘됩니다.

  3. 출처 (Sources)와 토큰 (tokens)은 별개의 관심사입니다. RAG 파이프라인에서 출처는 LLM이 생성을 시작하기 전에 이미 알고 있는 정보입니다. 이를 최종 SSE 이벤트로 스트리밍하는 것은 깔끔한 분리입니다. 클라이언트는 토큰을 즉시 렌더링하고, 스트림이 종료될 때 출처 인용 (source citations)을 추가할 수 있습니다.

  4. FastAPI는 SSE를 매우 간단하게 만듭니다. 제너레이터 함수 (generator function)와 text/event-stream 미디어 타입 (media type)을 사용하는 StreamingResponse — 그것이 전부입니다. WebSocket 설정도, 특별한 미들웨어 (middleware)도 필요 없습니다.

다음 단계

  • 대화 메모리 (Conversation memory) (멀티 턴 후속 질문)
  • Streamlit UI 도입 가능성

직접 시도해 보세요

uv sync
cp .env.example .env   # API 키를 설정하세요
uv run uvicorn app.main:app --reload

http://localhost:8000/docs를 열고, 샘플 PDF를 업로드한 뒤 /query/stream을 시도해 보세요. 토큰이 하나씩 도착하는 것을 확인할 수 있습니다.

만약 멀티 프로바이더 (multi-provider) 스트리밍을 구축하고 계신다면, SDK 차이점을 어떻게 처리하셨는지 꼭 듣고 싶습니다.

저는 Santanu Mohanta입니다. LinkedIn에서 저와 연결되거나 GitHub에서 제 프로젝트들을 확인해 보세요.

AI 자동 생성 콘텐츠

본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.

원문 바로가기
0

댓글

0