LangGraph 워크플로우 템플릿 (v34)
요약
LangGraph를 활용하여 상태 기반의 워크플로우를 구축하는 방법과 Python 개발자를 위한 템플릿을 제공합니다. 검색, 생성, 검증 단계로 구성된 RAG 에이전트 구현 예시를 통해 복잡한 에이전트 아키텍처 설계법을 설명합니다.
핵심 포인트
- LangGraph의 핵심 요소인 노드, 엣지, 상태, 체크포인팅 이해
- 상태 기반 워크플로우를 통한 안정적인 에이전트 설계
- 검색-생성-검증 단계로 이어지는 RAG 에이전트 템플릿 제공
- TypedDict를 활용한 에이전트 상태 관리 방법
LangGraph 워크플로우 템플릿 (v34)
Python 개발자를 위한 LangChain/LangGraph 워크플로우 템플릿
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반의 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소로 작동합니다:
- 노드 (Nodes): 각 단계의 작업을 정의하는 함수
- 엣지 (Edges): 노드 간의 전이 조건
- 상태 (State): 모든 노드에서 공유되는 데이터 구조
- 체크포인팅 (Checkpointing): 상태를 저장하고 복구하는 기능
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
user_input: str
# 기본 워크플로우 구성
workflow = StateGraph(AgentState)
2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)
실제 문제: 문서 검색 후 생성된 답변의 정확성 검증 필요
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# RAG 워크플로우
def retrieve(state):
# 문서 검색 로직
vector_store = VectorStore()
query = state["user_input"]
retrieved_docs = vector_store.search(query, k=3)
return {"retrieved_docs": retrieved_docs}
def generate(state):
# 생성 로직
prompt = PromptTemplate.from_template(
"다음 문서를 기반으로 질문에 답하세요: {context}\n질문: {query}"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
context = "\n".join([doc.page_content for doc in state["retrieved_docs"]])
answer = chain.invoke({"context": context, "query": state["user_input"]})
return {"generated_answer": answer}
def validate(state):
# 검증 로직
prompt = PromptTemplate.from_template(
"다음 답변이 질문에 답하고 있는지 검증하세요:\n질문: {query}\n답변: {answer}\n정답 여부: (예/아니오)"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
verdict = chain.invoke({
"query": state["user_input"],
"answer": state["generated_answer"]
})
return {"validation": verdict, "valid": "예" in verdict}
# 워크플로우 정의
workflow = StateGraph(AgentState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
# 엣지 정의
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
# 검증 결과에 따라 분기
def route_validation(state):
if state["valid"]:
return END
else:
return "retrieve" # 재검색
workflow.add_conditional_edges(
"validate",
route_validation,
{
"retrieve": "retrieve",
END: END
}
)
3. 템플릿 2: 다중 도구 에이전트 (계획 → 실행 → 관찰 → 결정)
실제 문제: 여러 도구를 사용해야 하는 복잡한 작업 처리
from langchain.tools import Tool
from langchain_openai import OpenAI
# 도구 정의
tools = [
Tool(
name="search",
func=lambda query: f"검색 결과: {query}",
description="웹 검색을 위한 도구"
),
Tool(
name="calculator",
func=lambda expression: f"계산 결과: {eval(expression)}",
description="수학 계산을 위한 도구"
)
]
class ToolAgentState(TypedDict):
messages: Annotated[list, operator.add]
plan: str
execution_result: str
observation: str
def plan(state):
# 작업 계획 생성
prompt = PromptTemplate.from_template(
"다음 작업을 위한 계획을 세우세요:\n{task}\n도구 사용 계획:"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
plan = chain.invoke({"task": state["user_input"]})
return {"plan": plan}
def execute(state):
# 도구 실행
tools_dict = {tool.name: tool for tool in tools}
# 간단한 실행 로직 (실제 구현 시 도구 선택 로직 포함)
tool_result = tools_dict["search"].run(state["user_input"])
return {"execution_result": tool_result}
def observe(state):
# 실행 결과 관찰
return {"observation": f"실행 결과: {state['execution_result']}"}
def decide(state):
# 결정 로직
prompt = PromptTemplate.from_template(
"다음 작업을 고려하세요:\n{task}\n관찰 결과: {observation}\n결정:"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
decision = chain.invoke({
"task": state["user_input"],
"observation": state["observation"]
})
return {"decision": decision}
# 워크플로우 구성
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)
실제 문제: 인간의 판단이 필요한 작업에서 자동화와 인간 통제의 균형
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from datetime import datetime
class HumanInLoopState(TypedDict):
messages: Annotated[list, operator.add]
task: str
human_review: str
status: str # "pending", "approved", "rejected"
timestamp: str
def task_creation(state):
# 작업 생성
task = f"사용자 요청: {state['user_input']}"
return {"task": task, "status": "pending", "timestamp": datetime.now().isoformat()}
def review_request(state):
# 검토 요청
return {"messages": state["messages"] + [{"role": "assistant", "content":
f"작업을 검토해 주세요:\n{state['task']}\n[승인/거부]"}]}
def human_review(state):
# 인간 검토
review = state["human_review"] # 사용자 입력
status = "approved" if "승인" in review else "rejected"
return {"status": status, "messages": state["messages"] + [{"role": "user", "content": review}]}
def handle_rejection(state):
# 거부 시 재작업
return {"messages": state["messages"] + [{"role": "assistant", "content":
"작업이 거부되었습니다. 새로운 요청을 주세요"}]}
# 워크플로우 정의
workflow = StateGraph(HumanInLoopState)
workflow.add_node("create_task", task_creation)
workflow.add_node("request_review", review_request)
workflow.add_node("human_review", human_review)
workflow.add_node("handle_rejection", handle_rejection)
workflow.set_entry_point("create_task")
workflow.add_edge("create_task", "request_review")
workflow.add_edge("request_review", "human_review")
def route_review(state):
if state["status"] == "approved":
return "END"
else:
return "handle_rejection"
workflow.add_conditional_edges(
"human_review",
route_review,
{
"handle_rejection": "handle_rejection",
"END": END
}
)
5. 템플릿 5: 병렬 실행 에이전트 (팬아웃 → 처리 → 집계)
실제 문제: 여러 데이터 소스를 동시에 처리하고 결과를 통합하는 작업
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelProcessingState(TypedDict):
messages: Annotated[list, operator.add]
data_sources: list
processed_results: list
aggregated_result: str
def fan_out(state):
# 데이터 소스 분산
sources = state["data_sources"]
return {"processed_results": []}
async def process_async(data_source):
# 비동기 처리 로직
await asyncio.sleep(1) # 실제 API 호출
return f"처리된 {data_source}"
def process_parallel(state):
# 병렬 처리
with ThreadPoolExecutor(max
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)
AI 자동 생성 콘텐츠
본 콘텐츠는 Dev.to AI tag의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.
원문 바로가기