LangGraph 워크플로우 템플릿 (v45)
요약
LangGraph를 활용하여 복잡한 AI 에이전트 시나리오를 구축하는 방법과 4가지 핵심 워크플로우 템플릿을 소개합니다. 노드, 엣지, 상태 관리 및 체크포인팅을 통한 체계적인 에이전트 설계 가이드를 제공합니다.
핵심 포인트
- LangGraph의 핵심 구성 요소인 노드, 엣지, 상태, 체크포인팅 이해
- 검색, 생성, 검증 단계로 구성된 RAG 에이전트 템플릿 구현
- 상태(State) 관리를 통한 워크플로우 내 데이터 공유 및 흐름 제어
- 오류 방지를 위한 답변 검증(Validation) 패턴 적용
LangGraph 워크플로우 템플릿 (v45)
Introduction
LangGraph는 LangChain과 함께 사용하는 AI 워크플로우 프레임워크로, 복잡한 에이전트 시나리오를 체계적으로 구축할 수 있게 해줍니다. 이 가이드는 실제 개발 문제를 해결하는 4가지 핵심 템플릿과 상태 관리, 오류 처리 패턴을 제공합니다.
1. LangGraph 아키텍처 개요
핵심 구성 요소:
- 노드(Node): 각각의 작업 단위 (예: 검색, 생성, 검증)
- 엣지(Edge): 노드 간의 전이 조건
- 상태(State): 워크플로우 내에서 공유되는 데이터 구조
- 체크포인팅(Checkpointing): 중단 지점을 저장하여 재시작 가능
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
graph = StateGraph(State)
2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)
실제 문제: 문서 검색 후 생성된 답변을 검증하지 않으면 오류가 발생할 수 있습니다.
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
class RAGState(TypedDict):
query: str
retrieved_docs: list
generated_answer: str
validation: str
def retrieve_docs(state: RAGState):
# 검색 엔진 구현
docs = vector_store.similarity_search(state["query"], k=3)
return {"retrieved_docs": docs}
def generate_answer(state: RAGState):
# 문서 기반 답변 생성
template = PromptTemplate.from_template(
"문서 기반으로 질문에 답변하세요:\n{context}\n질문: {query}"
)
chain = template | ChatOpenAI(model="gpt-4") | StrOutputParser()
answer = chain.invoke({
"context": "\n".join([doc.page_content for doc in state["retrieved_docs"]]),
"query": state["query"]
})
return {"generated_answer": answer}
def validate_answer(state: RAGState):
# 답변 검증
validation_prompt = PromptTemplate.from_template(
"주어진 답변이 질문에 대해 정확한지 검증하세요. 정확하다면 'VALID'를 반환하고, "
"정확하지 않다면 'INVALID'와 함께 수정 제안을 제공하세요.\n질문: {query}\n답변: {answer}"
)
chain = validation_prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
validation = chain.invoke({
"query": state["query"],
"answer": state["generated_answer"]
})
return {"validation": validation}
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_docs)
workflow.add_node("generate", generate_answer)
workflow.add_node("validate", validate_answer)
# 조건 부여
def should_reject(state: RAGState):
return "INVALID" in state["validation"]
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
# 유효성 검사 결과에 따라 분기
workflow.add_conditional_edges(
"validate",
should_reject,
{
True: "retrieve", # 재검색
False: END
}
)
3. 템플릿 2: 멀티-도구 에이전트 (계획 → 실행 → 관찰 → 결정)
실제 문제: 단일 도구로 해결할 수 없는 복잡한 작업은 여러 도구의 조합이 필요
import json
from typing import List, Dict
class ToolAgentState(TypedDict):
plan: List[Dict]
execution_log: List[Dict]
current_step: int
observation: str
decision: str
def plan_task(state: ToolAgentState):
# 작업 계획 생성
plan_prompt = PromptTemplate.from_template(
"다음 작업을 수행하기 위한 단계별 계획을 생성하세요:\n{task}"
)
plan_chain = plan_prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
plan_response = plan_chain.invoke({"task": state["task"]})
plan = json.loads(plan_response)
return {"plan": plan, "current_step": 0}
def execute_step(state: ToolAgentState):
# 단계별 실행
current_plan = state["plan"][state["current_step"]]
tool_name = current_plan["tool"]
tool_args = current_plan["args"]
# 도구 실행 (예: 외부 API 호출)
result = execute_tool(tool_name, tool_args)
return {
"execution_log": [{"step": state["current_step"], "result": result}],
"observation": str(result)
}
def observe_and_decide(state: ToolAgentState):
# 관찰 및 결정
decision_prompt = PromptTemplate.from_template(
"다음 작업 결과를 분석하세요:\n{observation}\n결정: {decision}"
)
decision_chain = decision_prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
decision = decision_chain.invoke({
"observation": state["observation"],
"decision": state["decision"]
})
return {"decision": decision}
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan_task)
workflow.add_node("execute", execute_step)
workflow.add_node("observe", observe_and_decide)
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
def continue_workflow(state: ToolAgentState):
if state["current_step"] < len(state["plan"]) - 1:
return "execute"
else:
return END
workflow.add_conditional_edges(
"observe",
continue_workflow,
{
"execute": "execute",
END: END
}
)
4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)
실제 문제: 자동화 시스템에서 인간의 개입이 필요할 때, 작업을 일시정지하고 재개할 수 있는 기능이 필요
from enum import Enum
import time
class HumanWorkflowState(TypedDict):
task_description: str
intermediate_result: str
human_review: str
approval: bool
retry_count: int
class ApprovalStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
def execute_task(state: HumanWorkflowState):
# 작업 수행
task_result = perform_complex_calculation(state["task_description"])
return {"intermediate_result": task_result}
def pause_for_human_review(state: HumanWorkflowState):
# 인간 검토를 위한 일시정지
# 실제 구현에서는 외부 API나 메시지 큐를 통해 처리
return {"human_review": "검토 필요"}
def human_review_and_approve(state: HumanWorkflowState):
# 인간 검토 (실제 구현에서는 외부 시스템 호출)
return {"approval": True} # 예시로 승인
def retry_logic(state: HumanWorkflowState):
if state["retry_count"] < 3:
return "execute_task" # 재시도
else:
return "end" # 최대 재시도 횟수 초과
workflow = StateGraph(HumanWorkflowState)
workflow.add_node("execute_task", execute_task)
workflow.add_node("pause_for_review", pause_for_human_review)
workflow.add_node("human_approve", human_review_and_approve)
workflow.add_edge("execute_task", "pause_for_review")
workflow.add_edge("pause_for_review", "human_approve")
def check_approval(state: HumanWorkflowState):
if state["approval"]:
return "end"
else:
return "execute_task"
workflow.add_conditional_edges(
"human_approve",
check_approval,
{
"end": END,
"execute_task": "execute_task"
}
)
5. 템플릿 5: 병렬 실행 에이전트 (분산 → 처리 → 집계)
실제 문제: 여러 소스에서 동시에 정보를 수집하고 병합해야 할 때, 병렬 처리가 필요
python
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgentState(TypedDict):
sources: List[str]
results: List[Dict]
aggregated_result: Dict
def fan_out_sources(state: ParallelAgentState):
# 소스 분산
return {"results": []}
def process_source_parallel(state: ParallelAgentState):
# 병렬 처리
def fetch_source(source):
# 각 소스에서 데이터 가져오기
data = fetch_from_source(source)
return {"source": source, "data": data}
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
AI 자동 생성 콘텐츠
본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.
원문 바로가기