본문으로 건너뛰기

© 2026 Molayo

Dev.to헤드라인2026. 05. 26. 16:06

RAG 시스템 실전 구축 (v49)

요약

RAG(Retrieval-Augmented Generation) 시스템의 기초 개념부터 실전 구축을 위한 핵심 단계들을 다룹니다. 검색-증강-생성 루프의 구현 방식과 문맥 기반 청킹(Semantic Chunking) 전략을 포함한 파이프라인 구성 방법을 설명합니다.

핵심 포인트

  • RAG의 핵심인 검색-증강-생성 루프 구현 방법
  • 임베딩 모델과 벡터 DB를 활용한 검색 프로세스
  • 문맥을 유지하기 위한 Semantic Chunking 전략
  • 효율적인 RAG 파이프라인 구축을 위한 단계별 가이드

RAG 시스템 실전 구축 (v49)

목차

  1. RAG 기초 개념
  2. 청킹 전략
  3. 임베딩 모델 선택
  4. 벡터 DB 비교
  5. 전체 RAG 파이프라인 구현
  6. 고급 기술
  7. 평가 및 개선
  8. 운영 고려사항

1. RAG 기초 개념

RAG(Retrieval-Augmented Generation)은 검색 기반 생성 모델로, LLM이 외부 문서를 검색하고 이를 기반으로 생성하는 방식입니다.

검색-증강-생성 루프

# 간단한 RAG 루프 구현
class SimpleRAG:
    def __init__(self, embedding_model, vector_db, llm):
        self.embedding_model = embedding_model
        self.vector_db = vector_db
        self.llm = llm

    def retrieve(self, query, k=5):
        # 1. 쿼리 임베딩
        query_embedding = self.embedding_model.encode(query)
        # 2. 벡터 DB에서 유사 문서 검색
        results = self.vector_db.search(query_embedding, k)
        return results

    def generate(self, query, context):
        # 3. 증강된 쿼리 생성
        prompt = f"Context: {context}\n\nQuestion: {query}"
        response = self.llm.generate(prompt)
        return response

    def process(self, query):
        # 전체 루프
        context = self.retrieve(query)
        answer = self.generate(query, context)
        return answer

2. 청킹 전략

1. 문맥 기반 청킹 (Semantic Chunking)

import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans

class SemanticChunker:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def chunk_document(self, text, min_chunk_size=100, max_chunk_size=500):
        # 문장 분리
        sentences = self._split_sentences(text)
        embeddings = self.model.encode(sentences)

        # 클러스터링을 통한 청킹
        chunks = self._cluster_chunks(sentences, embeddings, 
                                    min_chunk_size, max_chunk_size)
        return chunks

    def _split_sentences(self, text):
        import re
        sentences = re.split(r'[.!?]+', text)
        return [s.strip() for s in sentences if s.strip()]

    def _cluster_chunks(self, sentences, embeddings, min_size, max_size):
        # 클러스터링 기반 청킹
        n_clusters = max(1, len(sentences) // 5)
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings)

        chunks = []
        current_chunk = []
        current_length = 0

        for i, (sentence, label) in enumerate(zip(sentences, labels)):
            if current_length + len(sentence) > max_size:
                if current_chunk:
                    chunks.append(' '.join(current_chunk))
                current_chunk = [sentence]
                current_length = len(sentence)
            else:
                current_chunk.append(sentence)
                current_length += len(sentence)

        if current_chunk:
            chunks.append(' '.join(current_chunk))

        return [chunk for chunk in chunks if len(chunk) >= min_size]

2. 재귀적 청킹 (Recursive Chunking)

class RecursiveChunker:
    def __init__(self, chunk_size=500, overlap=50):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk_text(self, text):
        chunks = []
        start = 0

        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            chunk = text[start:end]

            # 오버랩 처리
            if start > 0:
                overlap_start = max(0, start - self.overlap)
                chunk = text[overlap_start:end]

            chunks.append(chunk)
            start = end - self.overlap

        return chunks

3. 에이전트 기반 청킹

class AgentChunker:
    def __init__(self, model):
        self.model = model

    def smart_chunk(self, text, context_length=1000):
        # 텍스트를 여러 단위로 분할하고 의미를 판단
        sentences = self._split_sentences(text)
        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            if current_length + len(sentence) > context_length:
                if current_chunk:
                    chunks.append(' '.join(current_chunk))
                current_chunk = [sentence]
                current_length = len(sentence)
            else:
                current_chunk.append(sentence)
                current_length += len(sentence)

        if current_chunk:
            chunks.append(' '.join(current_chunk))

        return chunks

3. 임베딩 모델 선택 및 비교

모델 비교 클래스

import torch
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import numpy as np

class EmbeddingComparison:
    def __init__(self):
        self.models = {
            'all-MiniLM-L6-v2': SentenceTransformer('all-MiniLM-L6-v2'),
            'all-mpnet-base-v2': SentenceTransformer('all-mpnet-base-v2'),
            'sentence-t5-base': SentenceTransformer('sentence-t5-base'),
            'gte-small': SentenceTransformer('sentence-t5-base')
        }

    def compare_models(self, texts, model_names):
        results = {}
        for name in model_names:
            model = self.models[name]
            embeddings = model.encode(texts)
            results[name] = {
                'mean_similarity': np.mean(
                    [np.dot(embeddings[i], embeddings[i+1]) 
                     for i in range(0, len(embeddings)-1, 2)]
                ),
                'embedding_dim': embeddings.shape[1],
                'latency': self._measure_latency(model, texts)
            }
        return results

    def _measure_latency(self, model, texts):
        import time
        start = time.time()
        model.encode(texts)
        return time.time() - start

# 사용 예시
comparator = EmbeddingComparison()
texts = ["This is a sample text.", "Another example text."]
results = comparator.compare_models(texts, ['all-MiniLM-L6-v2', 'all-mpnet-base-v2'])
print(results)

성능 기준

# 최적의 모델 선택
class OptimalEmbeddingSelector:
    def __init__(self, models_config):
        self.models_config = models_config

    def select_best_model(self, benchmark_data):
        scores = {}
        for model_name, config in self.models_config.items():
            # 점수 계산 (정확도, 속도, 메모리 사용량)
            score = (
                config['accuracy'] * 0.5 +
                (1/config['latency']) * 0.3 +
                (1/config['memory']) * 0.2
            )
            scores[model_name] = score

        return max(scores, key=scores.get)

4. 벡터 DB 비교

Chroma vs Qdrant vs pgvector


python
# Chroma 구현
import chromadb
from chromadb.config import Settings

class ChromaVectorDB:
    def __init__(self, collection_name="rag_collection"):
        self.client = chromadb.Client()
        self.collection = self.client.get_or_create_collection(collection_name)

    def add_documents(self, documents, embeddings, ids):
        self.collection.add(
            documents=documents,
            embeddings=embeddings,
            ids=ids
        )

    def search(self, query_embedding, k=5):
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=k
        )
        return results['documents'][0]

# Qdrant 구현
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Filter, FieldCondition

class QdrantVectorDB:
    def __init__(self, host="localhost", port=6333, collection_name="rag_collection"):
        self.client = QdrantClient(host=host, port=port)
        self.collection_name = collection_name

        # 컬렉션 생성
        if not self.client.collection_exists(collection_name):
            self.client.recreate_collection(
                collection_name=collection_name,
                vectors_config=VectorParams(size=384, distance="Cosine")
            )

    def add_documents(self, documents, embeddings, ids):
        points = [
            {
                "id": id,
                "vector": embedding.tolist(),
                "payload": {"text": doc}
            }
            for id, doc, embedding in zip(ids, documents, embeddings)
        ]
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )

    def search(self, query_embedding, k=5):
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding.tolist(),
            limit=k
        )
        return [hit

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)

AI 자동 생성 콘텐츠

본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.

원문 바로가기
0

댓글

0