GraphFlow: 멀티 에이전트 오케스트레이션을 위한 경량 Rust 프레임워크
요약
graph-flow는 Rust를 사용하여 고성능, 타입 안전성을 갖춘 멀티 에이전트 워크플로우 오케스트레이션 프레임워크입니다. 이 프레임워크는 복잡하고 상태 유지(Stateful)가 가능한 AI 에이전트 워크플로우를 구축하는 데 초점을 맞추었으며, LangGraph의 패턴을 따르면서도 Rust의 성능과 타입 안전성을 결합했습니다. 이를 통해 프로덕션 환경에 적합한 다양한 기능(세션 관리, 조건부 라우팅, 인간 참여형 루프 등)을 제공합니다.
핵심 포인트
- graph-flow는 Rust 기반으로 구축되어 높은 성능과 타입 안전성을 보장합니다.
- 복잡하고 상태 유지(Stateful)가 가능한 멀티 에이전트 워크플로우 오케스트레이션을 지원하는 핵심 그래프 실행 엔진입니다.
- LangGraph의 설계 패턴을 따르면서도, 플러그형 스토리지 백엔드와 컨텍스트 시스템 등 프로덕션 기능을 강화했습니다.
- 조건부 라우팅(Conditional routing) 및 인간 참여형 루프(Human-in-the-loop)를 포함한 유연하고 완성도 높은 실행 모델을 제공합니다.
graph-flow
고성능 그래프 워크플로우 프레임워크
AI 에이전트 등을 위해 Rust로 복잡하고 상호작용 가능한 워크플로우를 구축하기 위한 타입 안전(Type-safe)하고 유연한 프레임워크
</div>
graph-flow는 Rust를 사용하여 멀티 에이전트 워크플로우 시스템을 구축하기 위한 고성능, 타입 안전(Type-safe) 프레임워크입니다. 이 저장소는 graph-flow가 프로덕션 환경을 위해 어떻게 복잡하고 상태 유지(Stateful)가 가능한 AI 에이전트 오케스트레이션 (Orchestration)을 가능하게 하는지 보여주는 실제 구현 사례와 예시를 소개합니다.
왜 graph-flow인가? 왜 또 다른 프레임워크인가?
LangGraph의 아름다움은 두 가지 강력한 개념의 우아한 결합에 있습니다:
- 그래프 실행 라이브러리 (Graph execution library) - 복잡하고 상태 유지(Stateful)가 가능한 워크플로우를 오케스트레이션하기 위함
- LLM 생태계 통합 (LLM ecosystem integration) (LangChain을 통해) - 원활한 AI 에이전트 기능을 제공하기 위함
이 프레임워크는 동일한 철학을 따르지만, Rust로 처음부터 구축되었습니다:
graph-flow- 상태 유지(Stateful) 작업 오케스트레이션을 위한 핵심 그래프 실행 라이브러리- Rig crate - Rust 네이티브 LLM 통합 및 에이전트 기능
그 결과, LangGraph의 워크플로우 설계 패턴을 Rust의 성능 (Performance) 및 **타입 안전성 (Type safety)**과 결합하고, 추가로 깔끔한 데이터베이스 스키마 (Database schema), 그리고 직관적인 인간 참여형 (Human-in-the-loop) 기능을 갖춘 유연한 실행 모델 (단계별, 배치, 또는 혼합 방식)을 제공하는 프로덕션 준비 완료(Production-ready) 프레임워크를 지향합니다.
이 저장소에 포함된 내용
이 저장소에는 핵심 프레임워크인 graph-flow와 함께, 실제 응용 사례를 보여주는 포괄적인 예시 및 프로덕션 준비 완료된 서비스들이 포함되어 있습니다:
Core Framework (핵심 프레임워크)
graph-flow/- 다음 기능을 제공하는 완전한 프레임워크 라이브러리:- 상태 유지형 태스크 오케스트레이션 (Stateful task orchestration) 기능을 갖춘 그래프 실행 엔진 (Graph execution engine)
- 플러그형 스토리지 백엔드 (Pluggable storage backends)를 지원하는 세션 관리 (Session management)
- 스레드 안전한 상태 공유 (Thread-safe state sharing)를 위한 컨텍스트 시스템 (Context system)
- 조건부 라우팅 (Conditional routing) 및 워크플로우 제어 (Workflow control)
- 내장된 채팅 히스토리 (Chat history) 및 LLM 통합 지원
Examples: real use cases (예시: 실제 사용 사례)
프로덕션 시스템을 구축하는 방법을 보여주는 실제 구현 사례:
-
insurance-claims-service/- 다음을 포함하는 완전한 보험 워크플로우:- 조건부 라우팅 (Conditional routing)을 포함한 다단계 청구 처리
- LLM 기반의 자연어 상호작용 (Natural language interactions)
- 고액 청구에 대한 Human-in-the-loop 승인 프로세스
- 비즈니스 규칙 검증 (Business rule validation) 및 자동 의사결정
-
recommendation-service/- 다음을 특징으로 하는 RAG 기반 추천 시스템:- 의미론적 매칭 (Semantic matching)을 위한 벡터 검색 (Vector search) 통합
- 다단계 추론 (Multi-step reasoning) 및 컨텍스트 축적 (Context accumulation)
- 비정형 입력 (Unstructured input)으로부터의 구조화된 데이터 추출
Learning Examples (학습용 예시)
examples/- 기초부터 고급까지 단계별 예시:simple_example.rs- 기본적인 워크플로우 개념complex_example.rs- 조건부 라우팅 (Conditional routing) 및 분기 (Branching)recommendation_flow.rs- 완전한 RAG 워크플로우 시연
Getting Started (시작하기): 핵심 개념을 이해하려면
examples/simple_example.rs부터 시작하세요. 그 다음 프로덕션 서비스를 탐색하며 실제 패턴과 베스트 프랙티스 (Best practices)를 확인해 보세요.
Quick Start Guide (빠른 시작 가이드)
cargo add graph-flow
examples/simple_example.rs를 사용하여 기초부터 시작해 보겠습니다:
1. Define Tasks (태스크 정의)
태스크 (Tasks)는 워크플로우의 구성 요소입니다. 각 태스크는 Task 트레이트 (Trait)를 구현합니다:
use async_trait::async_trait;
use graph_flow::{Context, Task, TaskResult, NextAction};
...
2. Build the Graph (그래프 구축)
GraphBuilder를 사용하여 워크플로우를 구축합니다:
use graph_flow::{GraphBuilder, InMemorySessionStorage, FlowRunner};
use std::sync::Arc;
...
3. 워크플로우 실행 (Execute the Workflow)
이 프레임워크는 **상태 유지 실행 (stateful execution)**을 제공합니다. 즉, 워크플로우를 일시 중지하고, 재개하며, 여러 상호작용에 걸쳐 관리할 수 있습니다:
// 스토리지 및 러너 생성
let session_storage = Arc::new(InMemorySessionStorage::new());
let flow_runner = FlowRunner::new(graph.clone(), session_storage.clone());
...
또는, 더 세밀한 제어를 위해 더 낮은 수준의 API (lower-level API)를 사용할 수도 있습니다:
// 수동 세션 관리
loop {
let mut session = session_storage.get("session_001").await?.unwrap();
...
실행 제어 (Execution Control)
핵심 개념 (Critical Concept): 태스크에서 적절한 NextAction을 선택함으로써 그래프가 어떻게 실행될지를 결정해야 합니다.
단계별 실행 (Step-by-Step Execution)
워크플로우 진행을 수동으로 제어하려면 NextAction::Continue 또는 NextAction::WaitForInput을 사용하십시오:
// 태스크가 Continue를 반환 - 호출자에게 제어권을 돌려줌
Ok(TaskResult::new(Some("Done".to_string()), NextAction::Continue))
수동 루프 관리가 필요합니다:
let flow_runner = FlowRunner::new(graph, session_storage);
loop {
...
연속 실행 (Continuous Execution)
자동 태스크 실행을 위해서는 NextAction::ContinueAndExecute를 사용하십시오:
// 태스크가 ContinueAndExecute를 반환 - 자동으로 계속 진행
Ok(TaskResult::new(Some("Done".to_string()), NextAction::ContinueAndExecute))
단일 호출로 완료 시까지 실행됩니다:
// End, WaitForInput 또는 에러가 발생할 때까지 자동으로 실행
let result = flow_runner.run(&session_id).await?;
NextAction 옵션
Continue: 다음 태스크로 이동하고 호출자에게 제어권을 반환 (단계별)ContinueAndExecute: 다음 태스크로 이동하고 즉시 실행 (연속적)WaitForInput: 워크플로우를 일시 중지하고 사용자 입력을 대기End: 워크플로우 완료GoTo(task_id): ID를 통해 특정 태스크로 점프GoBack: 이전 태스크로 돌아가기
ExecutionStatus
그래프 실행 엔진은 워크플로 실행 상태에 대한 풍부한 컨텍스트를 제공하는 ExecutionStatus를 반환합니다:
#[derive(Debug, Clone)]
pub enum ExecutionStatus {
/// 일시 중지됨, 지정된 다음 태스크로 자동 계속 진행
...
ExecutionStatus 변체 (Variants)
Paused { next_task_id }: 워크플로가 일시 중지되었으나, 다음 실행 시 지정된 태스크로 자동 계속 진행됩니다. 태스크가NextAction::Continue또는NextAction::GoTo(task_id)를 사용할 때 반환됩니다.WaitingForInput: 워크플로가 계속 진행하기 전 사용자 입력을 기다리는 상태입니다. 태스크가NextAction::WaitForInput을 사용할 때 반환됩니다.Completed: 워크플로가 성공적으로 완료되었습니다. 태스크가NextAction::End를 사용할 때 반환됩니다.Error(String): 제공된 에러 메시지와 함께 워크플로가 실패했습니다.
고급 기능
조건부 엣지 (Conditional Edges)
complex_example.rs 데모는 Context에 저장된 데이터를 기반으로 **런타임(runtime)**에 어떻게 분기할 수 있는지 보여줍니다. 이 예제는 사용자의 입력을 긍정(positive) 또는 _부정(negative)_으로 분류한 다음, 일치하는 분기를 따릅니다.
SentimentAnalysisTask (최소 구현)
struct SentimentAnalysisTask;
#[async_trait]
...
그래프 구축하기
let graph = GraphBuilder::new("sentiment_flow")
.add_task(sentiment_task) // 감정 감지
.add_task(positive_task) // 행복한 기분에 대한 응답
...
graph TD
SA["SentimentAnalysisTask"] --> S{Sentiment?}
S -->|"positive"| P["PositiveResponseTask"]
...
Rig를 이용한 LLM 통합
이 프레임워크는 Rig 크레이트(crate)를 사용하여 LLM 에이전트와 원활하게 통합됩니다:
use rig::{agent::Agent, providers::openrouter};
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
...
채팅 히스토리 관리
전체 직렬화(serialization)를 지원하는 내장 대화 관리 기능:
// 대화에 메시지 추가
context.add_user_message("What's my claim status?".to_string()).await;
context.add_assistant_message("Your claim is being processed".to_string()).await;
...
병렬 작업 실행 (Parallel Task Execution, FanOut)
이 프레임워크는 FanOutTask를 통해 병렬 작업 실행(parallel task execution)에 대한 내장 지원을 제공합니다. 이 복합 작업(composite task)은 여러 개의 자식 작업(child tasks)을 동시에 실행하고, 완료될 때까지 기다린 후 그 결과들을 집계(aggregate)합니다.
주요 특징
- 동시 실행 (Concurrent Execution): Tokio을 사용하여 자식 작업들을 병렬로 실행합니다.
- 결과 집계 (Result Aggregation): 출력값은 접두사(prefix)가 붙은 키와 함께 컨텍스트(context)에 저장됩니다.
- 에러 처리 (Error Handling): 보수적인 에러 정책을 따릅니다. 자식 작업 중 하나라도 실패하면 FanOut 전체가 실패합니다.
- 간편한 통합 (Simple Integration): 기존의 그래프 구조와 원활하게 작동합니다.
중요한 제한 사항
- 제어 흐름 없음 (No Control Flow): 자식 작업의
NextAction은 무시됩니다. 즉, 자식 작업은WaitForInput을 반환하거나 분기(branch)를 생성할 수 없습니다. - 공유 컨텍스트 (Shared Context): 모든 자식 작업은 동일한 컨텍스트를 공유합니다 (충돌을 피하기 위해 쓰기 작업을 조정해야 합니다).
- 선형적 지속 (Linear Continuation): FanOut은 기본적으로 항상
NextAction::Continue를 반환합니다.
기본 사용법
examples/fanout_basic.rs에서 전체 예제를 확인할 수 있습니다:
use graph_flow::{FanOutTask, GraphBuilder, Task, TaskResult, NextAction, Context};
// 병렬로 실행될 자식 작업들을 정의
...
결과 집계 (Result Aggregation)
FanOut은 자식 작업의 결과들을 컨텍스트에 자동으로 저장합니다:
// 기본 집계 키 (접두사 없음)
// fanout.child_a.response - 자식의 응답 메시지
// fanout.child_a.status - 자식의 상태 메시지
...
결과 소비 (Consuming Results)
후속 작업(Downstream tasks)에서 집계된 결과에 접근할 수 있습니다:
struct ConsumeResults;
#[async_trait]
...
전체 예제 실행:
cargo run --bin fanout_basic
스토리지 추상화 (Storage Abstraction)
프로덕션 배포를 위한 플러그인 방식의 스토리지 백엔드(storage backends):
// 인메모리 스토리지 (개발용)
let session_storage = Arc::new(InMemorySessionStorage::new());
...
실제 활용 사례: 보험 청구 처리 (Insurance Claims Processing)
insurance-claims-service는 보험 청구를 처리하기 위한 완전한 에이전트 워크플로우 (agentic workflow)를 보여줍니다. 이는 복잡하고 다단계인 AI 기반 프로세스를 구축하는 데 있어 이 프레임워크가 가진 강력한 성능을 입증합니다.
보험 청구 워크플로우 (Insurance Claims Workflow)
graph TD
A["초기 청구 쿼리 (Initial Claim Query)<br/>• 사용자 환영<br/>• 기본 정보 수집<br/>• LLM 대화"] --> B["보험 유형 분류기 (Insurance Type Classifier)<br/>• 청구 내용 분석<br/>• 보험 유형 추출<br/>"]
...
시연된 주요 특징:
- LLM 기반 상호작용 (LLM-Driven Interactions): 각 작업은 자연어 처리 (NLP) 및 이해를 위해 AI를 사용합니다.
- 조건부 라우팅 (Conditional Routing): 보험 유형 및 청구 금액에 따른 동적 분기.
- 인간 참여형 (Human-in-the-Loop): 고액 청구에 대한 수동 승인 프로세스.
- 상태 유지 대기 (Stateful Waiting): 사용자 입력에 따라 워크플로우가 일시 중지 및 재개됩니다.
- 비즈니스 로직 (Business Logic): 자동 승인과 수동 승인을 구분하는 $1,000 임계값.
- 포괄적 컨텍스트 (Comprehensive Context): 전체 워크플로우 동안 상태 (State)가 유지됩니다.
작업 세부 분석 (Task Breakdown)
1. 초기 청구 쿼리 (initial_claim_query.rs)
- 사용자를 환영하고 기본적인 청구 정보를 수집합니다.
- 자연스러운 대화를 위해 LLM을 사용합니다.
- 자유 형식의 입력에서 구조화된 데이터 (structured data)를 추출합니다.
2. 보험 유형 분류기 (insurance_type_classifier.rs)
- 청구 내용을 분석하여 보험 유형을 결정합니다.
- 조건부 엣지 (conditional edges)를 사용하여 적절한 상세 정보 수집기로 라우팅합니다.
- 콘텐츠 기반의 지능형 라우팅을 보여줍니다.
3. 상세 정보 수집기 (Detail Collectors)
- 자동차 보험 상세 정보 (Car Insurance Details) (
car_insurance_details.rs) - 아파트 보험 상세 정보 (Apartment Insurance Details) (
apartment_insurance_details.rs) - 각 수집기는 해당 도메인에 특화되어 있습니다.
- AI 대화를 통해 포괄적인 정보를 수집합니다.
4. 스마트 청구 검증기 (Smart Claim Validator) (smart_claim_validator.rs)
- 지능형 처리 (Intelligent Processing): 1,000달러 미만의 청구 건은 자동으로 승인합니다.
- 인간 참여형 (Human-in-the-Loop): 더 큰 금액의 청구 건에 대해서는 수동 승인을 요청합니다.
- 상태 유지 대기 (Stateful Waiting): 인간의 결정을 기다리는 동안 워크플로 (workflow)를 일시 중지할 수 있습니다.
- 상태 메시징 (Status Messaging): 포괄적인 로깅 (logging) 및 상태 추적을 제공합니다.
5. 최종 요약 (Final Summary) (final_summary.rs)
- 포괄적인 청구 요약을 생성합니다.
- 승인 및 거절된 결과 모두를 처리합니다.
- 사용자에게 명확한 다음 단계를 제공합니다.
핵심 아키텍처 패턴 (Key Architectural Patterns)
1. LLM 우선 설계 (LLM-First Design)
모든 대화형 작업은 자연어 처리 (natural language processing)를 위해 LLM 에이전트 (agents)를 사용합니다:
AI 자동 생성 콘텐츠
본 콘텐츠는 HN Claude Code Search의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.
원문 바로가기