
Flink SQL로 구현하는 AI 에이전트 파이프라인
요약
Confluent Cloud의 Flink SQL을 활용하여 복잡한 AI 에이전트 파이프라인을 통합 구축하는 방법을 소개합니다. 기존의 다중 컴포넌트 아키텍처 대신 SQL 함수만으로 이상 탐지, 벡터 검색, 에이전트 실행을 구현할 수 있습니다.
핵심 포인트
- Flink SQL 내에서 ML 및 AI 함수를 통한 파이프라인 통합 가능
- 기존의 복잡한 다중 언어/컴포넌트 관리 문제 해결
- ML_DETECT_ANOMALIES, AI_RUN_AGENT 등 전용 함수 제공
- 자동 스케일링 및 매니지드 서비스를 통한 운영 효율성 증대
서론
실시간 데이터 처리 파이프라인을 구축할 때, 기존에는 여러 컴포넌트를 조합해야 했습니다. 예를 들어, Kafka로 데이터를 가져오고, Spark로 집계하며, Python으로 머신러닝 (ML) 모델을 실행하는 방식입니다.
하지만 Confluent Cloud가 제공하는 Confluent Intelligence 기능군을 사용하면, 이러한 복잡한 아키텍처를 Flink SQL 내에서 구현할 수 있습니다.
본 기사에서는 Confluent Cloud가 제공하는 공식 데모[1]를 통해 다음 두 가지 사항을 소개합니다.
Confluent Intelligence의 편리한 함수군에 대하여 (ML_DETECT_ANOMALIES, VECTOR_SEARCH_AGG, ML_PREDICT, AI_RUN_AGENT 등)
이 함수들을 조합함으로써, Flink SQL 내에서 AI 에이전트를 포함하는 파이프라인을 생성할 수 있다는 점
기존 아키텍처의 과제
먼저, 기존의 접근 방식을 되돌아봅시다.
실시간으로 수요의 급증을 감지하고, 원인을 분석하여, 자동으로 대응하는 시스템을 구축할 경우, 예를 들어 다음과 같은 구성이 필요했습니다.
Kafka → Spark → Python ML → MongoDB API → LLM API → 배선(配船) API → DB
각 컴포넌트의 역할:
Kafka Consumer: 데이터 취득
Spark Streaming: 윈도우 집계 (Window Aggregation)
Python ML Service: 이상 탐지 (Anomaly Detection) 모델
Python API Client: 벡터 검색 (Vector Search)
Python LLM Service: 원인 분석
Python Orchestrator: 배선 API 호출
Database: 결과 저장
이러한 기존 구성에는 다음과 같은 어려움이 있었습니다.
- 7개의 컴포넌트를 개별적으로 관리
- 여러 언어 (Java/Scala/Python/SQL) 사용
- 복잡한 운영 및 스케일링 (Scaling)
Confluent Intelligence를 통한 통합
Confluent Cloud 상에서 Flink SQL을 작성하는 것만으로 이러한 어려움을 해결할 수 있습니다.
Confluent Cloud의 주요 이점:
통합된 파이프라인: 여러 컴포넌트를 Flink SQL 내에서 완결
자동 스케일링 (Auto-scaling): 트래픽 양에 따라 리소스를 자동으로 조정 가능
매니지드 서비스 (Managed Service): 인프라 관리가 불필요하며 개발에 집중 가능
그럼 실제로 어떻게 동작하는지 살펴보겠습니다.
데모 시스템 개요
유스케이스: River Robotaxi의 선박 관리
이 데모에서는 선박을 수배하는 가상의 서비스를 소재로 하여, 실시간 선박 관리 시스템을 구축합니다.
3가지 처리 흐름
시스템은 다음 3가지 단계로 동작합니다:
1. 수요의 이상 급증 감지
ML_DETECT_ANOMALIES 함수를 사용하여 머신러닝 (ML)을 수행하고, 승선 요청의 급증을 실시간으로 감지합니다.
2. 원인 분석
VECTOR_SEARCH_AGG와 ML_PREDICT를 조합하여, 수요의 이상 급증 원인(이벤트, 콘서트, 페스티벌 등)을 외부 데이터베이스에서 검색하고, LLM을 사용하여 컨텍스트를 부여합니다.
3. 자동 배선
AI_RUN_AGENT를 사용하여, AI 에이전트가 자율적으로 선박을 선택하고 수요가 높은 곳으로 자동으로 배선을 지시합니다.
아키텍처
벡터 데이터베이스 (MongoDB Atlas)와 MCP 서버 (Zapier)는 외부 플랫폼을 사용하고 있지만, 그 외의 모든 처리는 Confluent Cloud for Apache Flink 위에서 실행되며, 외부 오케스트레이션 (Orchestration)은 필요하지 않습니다.
3가지 처리 단계의 상세 내용
이 데모에서는 수요의 이상 급증 감지부터 자동 배선까지, 3가지 단계로 처리를 실행합니다. 여기서는 각 단계에서 사용하는 편리한 Confluent Intelligence 함수를 중심으로 소개하겠습니다.
※ 실제로 데모에서 사용되는 쿼리를 간략화하여 설명하고 있습니다. 실제 쿼리를 확인하고 싶으신 분은 공식 데모 리포지토리를 확인해 주시기 바랍니다.
단계 1: 수요의 이상 급증 감지
처리 흐름
실시간으로 유입되는 승차 요청을 5분 단위 윈도우(Window)로 집계하여 각 지역의 수요 패턴을 모니터링합니다. 과거 트렌드와 비교하여 수요의 이상 급증을 자동으로 감지합니다.
ML_DETECT_ANOMALIES
사용하는 함수: ML_DETECT_ANOMALIES
은(는) 머신러닝 (ML) 기반의 이상 탐지 함수입니다. 기존에는 Python 등을 사용하여 사용자가 사전에 모델을 훈련 및 배포해야 했으나, 이 함수를 사용하면 분석하고자 하는 변수를 지정하는 것만으로 내부에서 자동으로 모델을 훈련하여 간편하게 이상치를 탐지할 수 있습니다[2].
ML_DETECT_ANOMALIES를 사용한 쿼리 예시
SELECT
pickup_zone,
window_time,
...
ML_DETECT_ANOMALIES 함수의 인자
종속 변수 (Dependent Variable): request_count
-
이상 탐지의 대상이 되는 승차 요청 수 -
타임스탬프 (Timestamp):window_time -
시계열 데이터의 기준이 되는 타임스탬프 -
옵션 설정: 신뢰 구간 99.9% 지정 (JSON 객체 형식) - 여기서는 신뢰 구간을 99.9%로 설정했습니다. -
공식 데모에서는 훈련 크기(Training Size) 지정 등을 수행합니다.
OVER 절 설정
PARTITION BY: 승차 요청이 발생한 지역별로 모델 구축 -
ORDER BY: 데이터를 시계열 순으로 정렬 -
RANGE BETWEEN: 과거의 모든 데이터를 사용하여 누적적으로 학습
실행 결과
French Quarter 지역에서 수요의 이상 급증이 감지되었습니다.

ML_DETECT_ANOMALIES 결과 상세
| 필드 | 값 | 해석 |
|---|---|---|
| timestamp | 2026-06-14 22:49:59 | 분석 시각 |
| actual_value | 61.0 | 실제로는 61건 (예측치의 5배) |
| forecast_value | 12.0 | 통상적인 예측치는 12건 |
| lower_bound | 5.0 | 정상 범위의 하한선 |
| upper_bound | 18.0 | 정상 범위의 상한선 |
| is_anomaly | TRUE | 이상 탐지! (수요 급증) |
| rmse | 1.10 | 예측 오차는 작음 |
| aic | 17.10 | 모델 품질은 양호 |
단계 2: 수요의 이상 급증 원인 분석
처리 흐름
이 단계에서는 벡터 검색 (Vector Search)과 LLM 추론을 결합하여, 이벤트 정보 데이터베이스(각 지역에서 개최되는 라이브 공연이나 콘서트 등의 개최 정보가 등록된 데이터베이스)로부터 관련 정보를 가져와 수요의 이상 급증이 발생한 컨텍스트(원인, 배경 정보)를 확보합니다.
ML_PREDICT
와(과) VECTOR_SEARCH_AGG
사용하는 함수: ML_PREDICT
은(는) LLM 모델을 호출하여 텍스트 생성 및 임베딩 (Embedding) 생성을 수행하는 함수입니다. 모델명과 프롬프트 (Prompt)를 지정하는 것만으로 간편하게 LLM 추론을 실행할 수 있습니다[3].
VECTOR_SEARCH_AGG
은(는) 벡터 데이터베이스에 대해 유사도 검색을 실행하는 함수입니다. 임베딩 벡터를 사용하여 가장 관련성이 높은 문서를 가져올 수 있습니다[4].
ML_PREDICT와 VECTOR_SEARCH_AGG를 사용한 쿼리 예시
※ 아래는 공식 데모 쿼리의 개념을 보여주기 위해 간략화한 것입니다. 실제 실행 결과는 공식 데모의 완전한 버전 쿼리에 의한 것입니다.
-- 단계 2-1: 수요의 이상 급증을 감지한 테이블로부터 벡터 검색을 통해 컨텍스트를 가져옴
SELECT
pickup_zone,
...
공식 데모 쿼리
CREATE TABLE anomalies_enriched
WITH ('changelog.mode' = 'append')
AS SELECT
...
ML_PREDICT 함수의 인자 (임베딩 생성)
모델명 (Model Name): llm_embedding_model
-
임베딩 생성용 LLM 모델 -
입력 텍스트 (Input Text):query_text -
벡터화할 검색 쿼리
VECTOR_SEARCH_AGG 함수의 인자
테이블명 (Table Name): documents_vectordb_lab3
-
MongoDB의 이벤트 정보 테이블 -
임베딩 컬럼 (Embedding Column):DESCRIPTOR(embedding) -
벡터가 저장되어 있는 컬럼 -
검색 벡터 (Search Vector):emb.embedding -
검색 쿼리의 임베딩 벡터 -
취득 건수 (Limit):3 -
상위 3개의 관련 이벤트 취득
ML_PREDICT 함수의 인자 (텍스트 생성)
모델명 (Model Name): llm_textgen_model
- 텍스트 생성용 LLM 모델 -
프롬프트 (Prompt): 검색 쿼리와 취득한 이벤트 정보를 조합한 프롬프트
실행 결과
단계 2-1의 결과: 벡터 검색으로 취득된 이벤트 정보
결과의 top_chunk_1
~ top_chunk_3
컬럼을 보면, 벡터 검색을 통해 French Quarter 지역의 수요 이상 급증과 관련된 3개의 이벤트 정보가 취득되었습니다.
top_chunk_1: French Quarter Nightlife Peak Hours (밤의 엔터테인먼트 지구 피크 시간) -
top_chunk_2: French Quarter Bar Closing Rush (바 폐점 시의 혼잡) -
top_chunk_3: French Quarter Happy Hour Rush (해피아워 혼잡)
단계 2-2의 결과: LLM에 의한 원인 분석
anomaly_reason
컬럼을 보면, 취득한 3개의 이벤트 정보를 바탕으로 LLM이 수요의 이상 급증 이유를 추론하고 있음을 확인할 수 있습니다.

※ 이미지 결과는 위의 쿼리를 직접 실행한 결과가 아니라, 공식 데모의 쿼리를 실행한 결과를 게재하였습니다.
취득된 이벤트 정보의 상세 (단계 2-1)
top_chunk_1: French Quarter Nightlife Peak Hours
# French Quarter Nightlife Peak Hours
**Frequency:** Nightly entertainment district
**Time:** 9:00 PM - 12:00 AM
...
top_chunk_2: French Quarter Bar Closing Rush
# French Quarter Bar Closing Rush
**Frequency:** Nightly closing pattern
**Time:** 11:00 PM - 2:00 AM
...
top_chunk_3: French Quarter Happy Hour Rush
# French Quarter Happy Hour Rush
**Frequency:** Daily weekday and weekend
**Time:** 3:00 PM - 6:00 PM
...
나아가, 이러한 정보를 바탕으로 LLM이 수요의 이상 급증 원인을 자연어로 설명합니다.
LLM으로 원인을 요약한 결과 (단계 2-2)
The surge is caused by **French Quarter Happy Hour Rush** (3:00 PM - 6:00 PM), which draws 1,200-1,800 patrons to bars and restaurants for drink specials and after-work socializing. Your 3:34 PM surge (+400%) aligns precisely with the documented peak demand period as CBD professionals and tourists converge on French Quarter establishments.
단계 3: 자동 배선 실행
처리 흐름
수요의 이상 급증 원인을 파악하면, AI 에이전트가 자동으로 대응합니다.
구체적으로는, 이용 가능한 선박을 확인하고, 최적의 선박을 선택하여, 고수요 지역으로 배선 요청을 전송합니다.
CREATE TOOL
, CREATE AGENT
, AI_RUN_AGENT
사용하는 함수: CREATE TOOL
는 에이전트가 사용하는 외부 도구(API, MCP Server 등)를 정의하는 함수입니다[5].
CREATE AGENT는 AI 에이전트를 정의하는 함수입니다. 사용할 LLM 모델, 동작을 지시하는 프롬프트(Prompt), 이용 가능한 도구(Tool)를 지정합니다[6].
AI_RUN_AGENT는 정의한 에이전트를 실행하는 함수입니다. 에이전트는 자율적으로 도구를 호출하여 태스크(Task)를 완료합니다[7].
CREATE TOOL, CREATE AGENT, AI_RUN_AGENT를 사용한 쿼리 예시
-- 단계 3-1: 도구 정의
CREATE TOOL lab3_remote_mcp
USING CONNECTION `zapier-mcp-connection` -- 1. 연결 정보
...
CREATE TOOL 함수의 인자
연결 정보:zapier-mcp-connection
-
외부 서비스로의 연결 설정 (사전 설정됨) -
도구 타입:mcp -
Model Context Protocol을 통해 도구를 호출 -
허용할 도구: GET/POST 요청을 실행할 수 있는 Webhook 도구를 지정
CREATE AGENT 함수의 인자
LLM 모델:zapier_mcp_model
-
에이전트의 의사결정에 사용할 LLM -
프롬프트: 에이전트의 역할과 실행해야 할 워크플로(Workflow)를 명확히 지시 -
사용 도구:lab3_remote_mcp -
방금 정의한 도구를 지정 -
최대 반복 횟수:10 -
에이전트가 도구를 호출할 수 있는 최대 횟수
AI_RUN_AGENT 함수의 인자
에이전트 이름:boat_dispatch_agent
-
실행할 에이전트 -
프롬프트:anomaly_reason -
수요의 이상 급증 원인 분석 결과 (에이전트에 대한 태스크 지시) -
요청 ID:pickup_zone -
각 요청을 식별하기 위한 ID (배선이 필요한 지역명 사용)
AI_RUN_AGENT의 실행 결과
출력을 보면, 에이전트가 자율적으로 다음을 실행했음을 알 수 있습니다.

1. 수요의 이상 급증 원인 분석 (anomaly_reason)
에이전트는 벡터 검색(Vector Search)으로 취득한 컨텍스트로부터, 'French Quarter Nightlife'라는 이벤트로 인해 시내 호텔에서 엔터테인먼트 지구로 이동하는 것이 수요의 이상 급증의 주요 원인이라고 분석했습니다.
실제 출력 결과 상세
**French Quarter Nightlife Peak Hours** (9:00 PM - 12:00 AM) is the primary cause,
with 3,000-4,000 revelers filling bars, clubs, and live music venues at maximum capacity
during your surge window. The 400% demand spike at 10:49 PM aligns perfectly with sustained
...
2. 배선 실행 프로세스 (dispatch_json)
다음으로, 에이전트는 아래의 워크플로를 자동 실행했습니다.
단계 1: 선박 카탈로그 API를 호출하여 (GET), 이용 가능한 선박을 취득
단계 2: LLM이 최적의 선박 8척을 선택
- Warehouse District, Garden District, CBD에서 선택
- French Quarter에 가까운 지역을 우선
단계 3: 배선 API에 요청을 전송 (POST)
실제 출력 결과 상세 (dispatch_json)
{
"action": "dispatch_boats",
"zone": "French Quarter",
...
실제 출력 결과 상세 (dispatch_summary)
프렌치 쿼터(French Quarter)의 유흥 시간대 피크로 인해 3,000~4,000명의 인파가 엔터테인먼트 시설을 채우면서 수요가 급증함에 따라, 웨어하우스 디스트릭트(Warehouse District), 가든 디스트릭트(Garden District), 그리고 CBD 구역에서 8대의 보트를 추가로 파견했습니다.
3. API로부터의 성공 응답 (api_response)
api_response를 보면 요청이 정상적으로 처리되었음을 확인할 수 있습니다.
실제 출력 결과 상세
{
"statusCode": 200,
"body": {
...
4. 에이전트의 동작 로그 (raw_response)
에이전트가 어떻게 사고하고 도구(tool)를 호출했는지에 대한 상세 로그는 raw_response에 기록되어 있습니다.
실제 출력 결과 상세
I'll analyze this surge and dispatch appropriate boats to handle the French Quarter nightlife demand.
Let me first check the available vessel catalog:
<function_calls>
...
데모 요약
이와 같이, Confluent Cloud가 제공하는 Confluent Intelligence 기능군을 사용하면, 기존에는 Python이나 Spark 등 여러 컴포넌트가 필요했던 처리를 Flink SQL을 작성하는 것만으로 구현할 수 있습니다.
본 기사에서는 다음 사항들에 대해 설명했습니다.
1. Confluent Intelligence가 제공하는 유용한 함수군
ML_DETECT_ANOMALIES
: 모델이 필요 없는 이상 탐지 (Anomaly Detection) -
VECTOR_SEARCH_AGG
: 벡터 검색 (Vector Search)의 네이티브 통합 -
ML_PREDICT
: LLM 추론 (임베딩 및 텍스트 생성) -
AI_RUN_AGENT
: 자율적인 AI 에이전트 (AI Agent) 실행
2. Flink SQL만으로 AI 에이전트 파이프라인을 생성할 수 있다는 점
Confluent Intelligence의 유용한 함수들을 사용함으로써, 기존에는 여러 컴포넌트와 언어가 필요했던 처리를 Flink SQL만으로 완결 지을 수 있게 되었습니다.
마치며
최근 Agentic AI가 주목받고 있는데, IBM의 조사에 따르면 조사 대상 CEO들은 2030년까지 업무상 의사결정의 약 48%가 AI에 의해 이루어질 것이라고 예측하고 있습니다 [8].
이 예측을 실현하기 위해서는 실시간으로 상황을 파악하고, 적절한 컨텍스트 (Context)를 획득하며, 자율적으로 액션 (Action)을 실행하는 메커니즘이 필요합니다.
본 기사에서 소개한 바와 같이, Confluent Cloud는 이러한 메커니즘을 구현할 수 있는 AI 에이전트 구축 관련 기능 추가도 진행되고 있어, 향후 발전에 따른 더욱 활발한 활용이 기대됩니다.
여러분도 꼭 공식 데모를 시도하여 Confluent Intelligence를 체험해 보시기 바랍니다.
데모 시 주의사항
Confluent Cloud는 환경(environment)이 남아 있는 한 계속 과금됩니다. 따라서 매일 작업 후에는 환경을 삭제하는 것을 권장합니다.
uv run destroy
참고 링크
데모에 대하여: Confluent가 제공하는 공식 핸즈온 데모(Lab3: Agentic Fleet Management Using Confluent Intelligence)입니다. GitHub 리포지토리(quickstart-streaming-agents)에서 무료로 체험할 수 있습니다. 본 기사에서 소개하는 기능은 모두 이 데모 환경에서 실제로 동작을 확인할 수 있습니다. ↩︎ -
IBM Institute for Business Value, "2026 CEO Study: Rewiring the C-suite: The fast track to 2030". 조사에 따르면, 현재 조사 대상이 된 CEO들은 운영상의 의사결정 중 25%가 AI에 의해 이루어지고 있다고 답변했으며, 2030년까지 그 비율이 48%로 두 배 증가할 것이라고 예측하고 있습니다. 출처: IBM C-suite Study - CEO ↩︎
Discussion

AI 자동 생성 콘텐츠
본 콘텐츠는 Zenn AI의 원문을 AI가 자동으로 요약·번역·분석한 것입니다. 원 저작권은 원저작자에게 있으며, 정확한 내용은 반드시 원문을 확인해 주세요.
원문 바로가기