본문으로 건너뛰기

© 2026 Molayo

Dev.to헤드라인2026. 05. 25. 15:01

LangGraph 워크플로우 템플릿 (v24)

요약

LangGraph의 핵심 아키텍처인 노드, 엣지, 상태, 체크포인팅을 설명하고 이를 활용한 워크플로우 템플릿을 제공합니다. RAG 에이전트와 다중 툴 에이전트 구현을 위한 구체적인 코드 예시를 포함하고 있습니다.

핵심 포인트

  • LangGraph의 상태 기반 워크플로우 구성 요소 이해
  • 검색, 생성, 검증 단계를 포함한 RAG 에이전트 템플릿
  • 계획 및 실행 중심의 다중 툴 에이전트 구조 설계

LangGraph 워크플로우 템플릿 (v24)

1. LangGraph 아키텍처 개요

LangGraph는 상태 기반 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소로 이루어져 있습니다:

Nodes: 워크플로우의 각 단계
Edges: 노드 간의 전이 조건
State: 워크플로우 내에서 공유되는 상태
Checkpointing: 상태 저장 및 복구 기능

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]

workflow = StateGraph(GraphState)

2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)

이 템플릿은 문서 검색 및 생성을 위한 기본적인 RAG 워크플로우입니다:

import json
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

class RAGState(TypedDict):
    query: str
    context: str
    response: str
    validation: bool

def retrieve_node(state: RAGState):
    # 간단한 문서 검색 로직
    # 실제 구현에서는 vector store 검색 사용
    return {"context": "검색된 문서 내용"}

def generate_node(state: RAGState):
    prompt = PromptTemplate.from_template(
        "질문: {query}\n문맥: {context}\n답변:"
    )
    llm = ChatOpenAI(model="gpt-4")
    chain = prompt | llm | StrOutputParser()
    response = chain.invoke({
        "query": state["query"],
        "context": state["context"]
    })
    return {"response": response}

def validate_node(state: RAGState):
    # 답변 검증 로직
    # 예: 관련성 점수 계산
    return {"validation": True}

# 워크플로우 구성
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)

workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.set_entry_point("retrieve")
workflow.set_finish_point("validate")

3. 템플릿 2: 다중 툴 에이전트 (계획 → 실행 → 관찰 → 결정)

다중 도구를 활용한 에이전트 워크플로우:

from langchain.tools import Tool
from langchain_core.messages import ToolMessage
from typing import List

class ToolAgentState(TypedDict):
    task: str
    plan: List[str]
    execution: str
    observation: str
    decision: str

def plan_node(state: ToolAgentState):
    # 작업 계획 생성
    plan = ["분석 시작", "도구 선택", "실행", "결과 검토"]
    return {"plan": plan}

def execute_node(state: ToolAgentState):
    # 도구 실행
    # 실제 도구 호출 로직
    tools = [Tool(name="search", func=lambda x: f"검색 결과: {x}")]
    tool = tools[0]
    result = tool.func(state["task"])
    return {"execution": result}

def observe_node(state: ToolAgentState):
    # 실행 결과 관찰
    return {"observation": f"관찰 결과: {state['execution']}"}

def decide_node(state: ToolAgentState):
    # 결정 로직
    if "결과" in state["observation"]:
        return {"decision": "성공"}
    return {"decision": "실패"}

workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_node("observe", observe_node)
workflow.add_node("decide", decide_node)

workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.set_entry_point("plan")
workflow.set_finish_point("decide")

4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)

사람의 개입이 필요한 경우 사용:

import asyncio
from datetime import datetime

class HumanInLoopState(TypedDict):
    task: str
    status: str
    human_review: str
    final_result: str

def process_task_node(state: HumanInLoopState):
    # 작업 처리
    return {"status": "processing"}

def pause_for_review_node(state: HumanInLoopState):
    # 인간 검토를 위한 일시정지
    print(f"작업 일시정지: {state['task']}")
    # 실제 구현에서는 API 호출 또는 메시지 대기
    return {"status": "awaiting_review"}

def review_node(state: HumanInLoopState):
    # 인간 검토
    # 이 부분은 외부 인터페이스를 통해 처리
    return {"human_review": "검토 완료"}

def continue_processing_node(state: HumanInLoopState):
    # 검토 후 처리 계속
    return {"status": "completed", "final_result": "최종 결과"}

workflow = StateGraph(HumanInLoopState)
workflow.add_node("process_task", process_task_node)
workflow.add_node("pause_for_review", pause_for_review_node)
workflow.add_node("review", review_node)
workflow.add_node("continue_processing", continue_processing_node)

workflow.add_edge("process_task", "pause_for_review")
workflow.add_edge("pause_for_review", "review")
workflow.add_edge("review", "continue_processing")
workflow.set_entry_point("process_task")
workflow.set_finish_point("continue_processing")

5. 템플릿 5: 병렬 실행 에이전트 (팬아웃 → 처리 → 집계)

병렬 작업 처리:

from concurrent.futures import ThreadPoolExecutor
import threading

class ParallelAgentState(TypedDict):
    tasks: List[str]
    results: List[str]
    aggregated_result: str

def fan_out_node(state: ParallelAgentState):
    # 작업 분할
    tasks = ["task_1", "task_2", "task_3"]
    return {"tasks": tasks, "results": []}

def process_parallel_node(state: ParallelAgentState):
    # 병렬 처리
    def process_task(task):
        # 각 작업 병렬 처리
        return f"{task}_결과"

    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(process_task, state["tasks"]))

    return {"results": results}

def aggregate_node(state: ParallelAgentState):
    # 결과 집계
    aggregated = ", ".join(state["results"])
    return {"aggregated_result": aggregated}

workflow = StateGraph(ParallelAgentState)
workflow.add_node("fan_out", fan_out_node)
workflow.add_node("process_parallel", process_parallel_node)
workflow.add_node("aggregate", aggregate_node)

workflow.add_edge("fan_out", "process_parallel")
workflow.add_edge("process_parallel", "aggregate")
workflow.set_entry_point("fan_out")
workflow.set_finish_point("aggregate")

6. 상태 관리 패턴

상태를 효율적으로 관리하는 방법:

from langgraph.checkpoint import Checkpoint
from langgraph.checkpoint.memory import MemorySaver

# 메모리 체크포인트 사용
memory = MemorySaver()

# 사용자별 상태 저장
class UserState(TypedDict):
    user_id: str
    conversation_history: List[dict]
    preferences: dict

def save_checkpoint(state: UserState):
    # 사용자별 체크포인트 저장
    checkpoint = {
        "user_id": state["user_id"],
        "conversation_history": state["conversation_history"],
        "preferences": state["preferences"],
        "timestamp": datetime.now().isoformat()
    }
    return checkpoint

7. 스트리밍 및 실시간 업데이트

실시간 처리 및 스트리밍:

from langchain_core.callbacks import BaseCallbackHandler

class StreamingCallback(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end="", flush=True)

def streaming_node(state: StreamingState):
    llm = ChatOpenAI(streaming=True, callbacks=[StreamingCallback()])
    # 스트리밍 처리 로직
    return {"streaming_output": "스트리밍 결과"}

# 스트리밍 워크플로우
workflow = StateGraph(StreamingState)
workflow.add_node("streaming", streaming_node)
workflow.add_edge("streaming", END)

8. 오류 복구 및 재시도 로직

안정적인 워크플로우를 위한 오류 처리:


python
import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)

AI 자동 생성 콘텐츠

본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.

원문 바로가기
0

댓글

0