AI-Agent Driven Revenue Investigation: LangGraph Orchestration Meets Medallion Architecture
A European e-commerce company watched EUR 2.1M in revenue vanish over 47 days. Finance blamed seasonality. Marketing blamed ad spend. Neither team had the granularity to see what was really happening.
This is the complete technical walkthrough of how a fleet of AI agents -- coordinated by a LangGraph-based Sentinel orchestrator -- autonomously investigated the anomaly through a medallion data lakehouse. Three LangGraph iterations. Two agent delegations. One root cause. The agents found it in under 15 minutes. No human wrote a single SQL query.
The Silent Bleed
EuroShop GmbH operates across seven European markets -- Germany, France, Netherlands, Austria, Belgium, Switzerland, and Italy. Their e-commerce platform processes roughly 12,000 daily sessions, with an average order value of EUR 87. On a good day, the funnel converts at 3.8%.
In late September 2025, the payments team migrated from Stripe to Adyen as part of a broader cost optimization initiative. The migration included mandatory 3D Secure 2.0 authentication for EU Strong Customer Authentication (SCA) compliance. The rollout was staged by market: Germany and France went live October 14, with remaining markets scheduled for November.
By November, Finance flagged a 23% revenue decline in Q4 actuals versus forecast. Their attribution: weak consumer sentiment in core markets. Marketing countered that ROAS was actually improving. Neither team looked at the funnel at the device level. Nobody connected the dots between the payment migration date and the revenue curve.
The core problem: Revenue attribution arguments consume enormous organizational energy when teams work from aggregated dashboards. Traditional BI requires a human analyst to formulate hypotheses, write queries, and iterate. With AI agents, the entire investigation loop -- hypothesis generation, data profiling, segment drill-down, root cause synthesis -- runs autonomously. The data engineering function becomes an investigative capability, not a reporting layer.
Multi-Agent Investigation Architecture
The investigation is driven by five specialized AI agents, each running as an independent FastAPI service with standardized endpoints. At the center sits the Sentinel Agent -- a LangGraph StateGraph that receives alerts, reasons about investigation strategy, and delegates tasks to specialist agents through tool calls.
Each agent exposes a consistent API surface: /health, /query, /task, /alert, /delegate, and /capabilities. This standardized interface -- derived from the agent_server_template -- allows the Sentinel to discover and invoke any agent without hard-coded integration logic.
LangGraph orchestrator -- central reasoning node that receives alerts, formulates investigation strategies, delegates to specialist agents, and synthesizes findings into actionable reports.
Statistical anomaly detection engine -- profiles table columns using z-score detection, monitors null rates, schema drift, and data freshness. Connected to PostgreSQL for checkpoint persistence.
ETL orchestration specialist -- manages Bronze/Silver/Gold transformations, tracks data lineage, handles backup automation and pipeline health monitoring.
Segment analysis and reporting -- drills into conversion funnels by dimension (country, device, time), generates heatmaps, computes revenue impact by segment.
Platform health monitoring -- tracks Spark cluster status, MinIO capacity, Airflow DAG health, Docker container lifecycle across the data engineering stack.
Hover over any node to see agent responsibilities at each medallion layer. Data flows left-to-right through Bronze, Silver, and Gold stages, with AI agents monitoring and transforming at every checkpoint.
Agent-Pipeline Topology
The complete system topology shows how AI agents integrate with the medallion data pipeline and underlying infrastructure. The Sentinel sits at the center, issuing commands to specialist agents (inner ring) that operate on pipeline layers and infrastructure services (outer ring). Four link types encode the communication patterns: command delegation, data flow, anomaly alerts, and feedback loops.
Hover over any node to see its role, port assignment, and description. The green dot on each node indicates healthy service status.
LangGraph StateGraph: The Investigation Engine
The Sentinel Agent is built on LangGraph -- a stateful, graph-based orchestration framework. The core workflow is a StateGraph with two primary nodes: _call_model (LLM reasoning) and _call_tools (tool execution). A conditional edge should_continue routes between them based on whether the AgentState contains pending tool calls.
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
class SentinelAgent:
def __init__(self):
self.tools = [
RAGTool, MonitoringTool, InfrastructureTool,
delegate_to_agent, list_available_agents,
route_task_to_specialist,
]
self.tools_by_name = {t.name: t for t in self.tools}
# Build the LangGraph StateGraph
workflow = StateGraph(AgentState)
workflow.add_node("agent", self._call_model)
workflow.add_node("tools", self._call_tools)
workflow.set_entry_point("agent")
# Conditional edge: if tool_calls exist -> tools, else -> END
workflow.add_conditional_edges(
"agent",
self._should_continue,
{"continue": "tools", "end": END},
)
workflow.add_edge("tools", "agent")
self.graph = workflow.compile()Each iteration through the graph produces a new AgentState snapshot. The _call_model node runs the LLM with the current message history, producing either a final response or tool calls. The _call_tools node executes those tool calls -- which may include delegating to other agents via HTTP -- and appends results back to the message history.
def _should_continue(self, state: AgentState):
"""LangGraph conditional edge function.
If the last message has tool_calls -> continue to tools node.
Otherwise -> end the graph execution."""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "continue"
return "end"
async def _call_tools(self, state: AgentState):
"""Execute all pending tool calls from the last model response.
Supports: delegate_to_agent, list_available_agents,
route_task_to_specialist, RAGTool, MonitoringTool."""
messages = state["messages"]
last_message = messages[-1]
tool_results = []
for tool_call in last_message.tool_calls:
tool = self.tools_by_name[tool_call["name"]]
result = await tool.ainvoke(tool_call["args"])
tool_results.append(
ToolMessage(content=str(result), tool_call_id=tool_call["id"])
)
return {"messages": tool_results}Hover over any node to see its function. The four swimlanes show how tasks flow through Sentinel classification, LangGraph scheduling, agent execution, and validation with checkpoint persistence.
Autonomous Investigation Sequence
When the Data Quality Agent detects an anomaly in the Gold layer (z-score exceeding threshold), it fires an alert to the Sentinel. The Sentinel then autonomously orchestrates a multi-phase investigation -- no human intervention required. The entire sequence below ran in under 15 minutes wall-clock time.
Anomaly detected by Data Quality Agent
Z-score of -3.7 on conversion_rate triggers automated alert to Sentinel orchestrator.
Sentinel delegates quality profiling
LangGraph _call_model node determines strategy. Delegates to DataQualityAgent for full table profiling.
12-column anomaly profile complete
payment_token null rate at 34% (baseline 2%). 3 critical anomalies detected via z-score.
BI Agent segment drill-down initiated
Sentinel delegates segment analysis. BI Agent queries Silver funnel data by country x device.
Blast radius quantified
DE mobile: -42%. AT mobile: -28%. 21 unaffected segments. Payment gateway timeout pattern isolated.
Root cause synthesized
Sentinel final iteration: no more tool calls needed. Report generated with 3 remediation actions.
Pipeline Agent deploys retry queue
Automated remediation: pipeline reconfigured with payment gateway failover and retry logic.
Recovery confirmed
Data Quality Agent confirms conversion_rate z-score returning to normal range across all segments.
Bronze: Immutable Event Ingestion
The Data Pipeline Agent triggers Bronze ingestion via its etl_pipeline tool. Raw session and event data from seven markets arrives as JSON. The agent orchestrates Spark to ingest it into a Delta Lake table on MinIO -- immutable, append-only, with full schema preservation.
# Data Pipeline Agent delegates to Spark for Bronze ingestion
# Tool: etl_pipeline(stage="bronze", source="raw_events")
raw_events = spark.read.json("s3a://raw-events/euroshop/2025-10-*.json.gz")
# Preserve ALL original fields -- Bronze is immutable
bronze_df = raw_events.select(
col("session_id"),
col("event_type"), # page_view | add_to_cart | checkout | purchase
col("timestamp"),
col("country"),
col("device_type"), # desktop | mobile | tablet
col("payment_method"),
col("payment_token"), # null when gateway fails
col("order_value"),
col("page_url"),
col("user_agent"),
current_timestamp().alias("_ingested_at"),
lit("v1").alias("_schema_version"),
)
bronze_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("s3a://lakehouse/bronze/events")Agent integration note: The Data Pipeline Agent does not just run ETL. After Bronze write completes, it automatically invokes its data_quality_check tool to verify row counts, null rates, and schema conformance. If thresholds are breached, it alerts the Sentinel before proceeding to Silver.
Silver: Enrichment and Funnel Construction
Silver transforms raw events into analysis-ready sessions. The Pipeline Agent assigns numeric funnel stages, deduplicates events, and computes the maximum stage reached per session. This is where the investigation starts to reveal the pattern -- the BI Agent will later query Silver for segment-level drill-downs.
# Funnel stage assignment (deterministic, idempotent)
FUNNEL_STAGES = {
"page_view": 1,
"add_to_cart": 2,
"begin_checkout": 3,
"add_payment_info": 4,
"purchase": 5,
}
silver_df = bronze_df \
.withColumn("funnel_stage",
coalesce(
*[when(col("event_type") == evt, lit(stage))
for evt, stage in FUNNEL_STAGES.items()],
lit(0)
)
) \
.withColumn("has_payment_token",
when(col("payment_token").isNotNull(), lit(True))
.otherwise(lit(False))
)
# Session-level aggregation: max funnel stage reached
session_funnel = silver_df \
.groupBy("session_id", "country", "device_type", "date") \
.agg(
max("funnel_stage").alias("max_stage"),
count("*").alias("event_count"),
max("has_payment_token").alias("payment_completed"),
first("payment_method").alias("payment_method"),
)
session_funnel.write \
.format("delta") \
.mode("overwrite") \
.save("s3a://lakehouse/silver/session_funnels")Gold: Daily Metrics and Conversion Aggregates
The Gold layer is where the Data Quality Agent does its primary work. Daily conversion metrics are aggregated by country and device type. The agent then runs z-score anomaly detection on every column -- any value exceeding 2.5 standard deviations from the rolling 30-day mean triggers an alert. This is how the EUR 2.1M gap was first detected.
# Daily conversion metrics by segment
gold_df = session_funnel \
.groupBy("date", "country", "device_type") \
.agg(
count("*").alias("total_sessions"),
sum(when(col("max_stage") >= 5, 1).otherwise(0))
.alias("purchases"),
sum(when(col("max_stage") >= 3, 1).otherwise(0))
.alias("checkout_starts"),
avg(when(col("max_stage") >= 5, col("order_value")))
.alias("avg_order_value"),
avg(col("payment_completed").cast("int"))
.alias("payment_success_rate"),
) \
.withColumn("conversion_rate",
(col("purchases") / col("total_sessions") * 100)
)
gold_df.write \
.format("delta") \
.mode("overwrite") \
.save("s3a://lakehouse/gold/daily_conversion_metrics")
# >> Data Quality Agent automatically profiled this table
# >> Z-score on conversion_rate: -3.7 (CRITICAL)
# >> Z-score on revenue_total: -3.2 (CRITICAL)
# >> payment_token null rate: 34% vs 2% baseline (ANOMALY)Z-Score Anomaly Detection at Scale
The Data Quality Agent runs a DataQualityEngine that profiles every column in the Gold table. For numeric columns, it computes the z-score against a rolling baseline. For categorical columns, it tracks distribution drift. The engine is connected to PostgreSQL for persisting quality checkpoints and historical profiles.
class DataQualityEngine:
def detect_anomalies(self, df, threshold=2.5):
"""Z-score anomaly detection across all numeric columns.
Returns list of {column, z_score, severity, value, baseline}."""
anomalies = []
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col_name in numeric_cols:
values = df[col_name].dropna()
if len(values) < 10:
continue
mean = values.rolling(30).mean().iloc[-1]
std = values.rolling(30).std().iloc[-1]
if std == 0:
continue
current = values.iloc[-1]
z_score = (current - mean) / std
if abs(z_score) >= threshold:
anomalies.append({
"column": col_name,
"z_score": round(z_score, 2),
"severity": "critical" if abs(z_score) >= 3.0 else "warning",
"current_value": current,
"baseline_mean": round(mean, 4),
"baseline_std": round(std, 4),
})
return anomaliesHover over any cell to see the z-score, current value, and baseline. Red cells with z-score labels indicate detected anomalies. Note how Jan 15 lights up on conversion_rate, revenue_total, payment_success, and payment_token_null_rate -- the exact columns affected by the payment gateway failure.
@app.post("/analyze/quality")
async def analyze_quality(request: QualityRequest):
"""Full quality profile: null rates, z-scores, distribution stats.
Called by Sentinel via delegate_to_agent tool."""
engine = DataQualityEngine()
# Connect to PostgreSQL for historical baselines
async with get_db_connection() as conn:
df = await conn.fetch_dataframe(request.table_name)
profile = engine.profile_dataframe(df)
anomalies = engine.detect_anomalies(df, threshold=request.threshold)
# Persist checkpoint for audit trail
await conn.execute(
"INSERT INTO quality_checkpoints (table_name, profile, anomalies, ts) "
"VALUES ($1, $2, $3, NOW())",
request.table_name, json.dumps(profile), json.dumps(anomalies),
)
return {
"table": request.table_name,
"columns_profiled": len(profile),
"anomalies": anomalies,
"profile_summary": profile,
}Full Investigation Trace: Step by Step
The following trace replays the exact LangGraph execution that investigated the revenue anomaly. Each step shows the current graph node, the AgentState snapshot, tool calls, and -- for _call_model nodes -- the agent's internal reasoning chain. Three iterations through the model-tools loop. Two agent delegations. Total execution time: 6.3 seconds.
Click any step in the timeline to jump to that state, or use Replay to watch the full investigation sequence. Expand the AgentState Snapshot to see the raw state at each node transition.
Root Cause: Payment Gateway Timeout
The BI Analytics Agent's segment drill-down conclusively identified the pattern: mobile-only, DACH-region-only, confined to a 12-hour window. Desktop was unaffected (only -2% delta), which ruled out a general platform issue. The payment_token null rate of 34% (vs 2% baseline) confirmed that the Adyen 3DS2 redirect was timing out specifically on mobile WebView implementations.
| Market | Desktop | Mobile | Tablet | |||
|---|---|---|---|---|---|---|
| Before | After | Before | After | Before | After | |
| DE | 4.2% | 4.1% | 3.8% | 0.5%(-3.3) | 3.5% | 3.4% |
| FR | 3.9% | 3.8% | 3.6% | 0.4%(-3.2) | 3.3% | 3.2% |
| NL | 4.5% | 4.4% | 4.1% | 4.0% | 3.8% | 3.7% |
| AT | 3.7% | 3.6% | 3.4% | 0.8%(-2.6) | 3.1% | 3.0% |
| BE | 4.0% | 3.9% | 3.7% | 3.6% | 3.4% | 3.3% |
| CH | 4.8% | 4.7% | 4.4% | 4.3% | 4.0% | 3.9% |
| IT | 3.5% | 3.4% | 3.2% | 3.1% | 2.9% | 2.8% |
Agent-Synthesized Root Cause Report
What: Adyen 3DS2 authentication redirect times out on mobile WebView implementations. The payment gateway returns empty payment_token values, causing transactions to silently fail at funnel stage 4 (add_payment_info).
Where: Germany and Austria mobile (42% and 28% conversion drops respectively). France mobile affected but not in the current data window. Desktop and tablet unaffected across all markets.
When: October 14 (payment migration date) through November 30 (detection date). 47 days undetected because aggregated dashboards masked the device-level signal.
Impact: EUR 2.1M in lost revenue. 84,293 affected sessions in DE alone. 97,173 total affected sessions across DACH markets.
Agent-Recommended Remediation
The Sentinel Agent's final iteration produced three remediation actions, which the Data Pipeline Agent then coordinated for deployment. This closed the loop from detection through resolution -- fully agent-driven.
1. Payment Gateway Retry Queue
DeployedImplement exponential backoff retry with 3 attempts for 3DS2 authentication. On final failure, fall back to non-3DS flow where PSD2 exemption applies (low-value transactions under EUR 30).
Data Pipeline Agent2. Gateway Failover Configuration
DeployedConfigure Adyen SDK with secondary authentication endpoint. If primary 3DS2 redirect exceeds 8s timeout, route to fallback endpoint with extended timeout parameters for mobile WebView.
Infrastructure Agent3. Alert Threshold Adjustment
ActiveData Quality Agent now monitors payment_token null rate with a 5% threshold (previously only z-score based). Any segment exceeding 5% null rate triggers immediate Sentinel investigation.
Data Quality AgentAgent Implementation: Production Code
All agents follow a common framework pattern from _agent_framework/agent_server_template.py. The template provides standardized FastAPI server creation with health checks, metrics middleware, and agent coordination integration. Below is the actual production code for the agent server factory.
@dataclass
class AgentServerConfig:
name: str
version: str = "1.0.0"
description: str = ""
default_model: str = "qwen2.5:7b"
capabilities: list = field(default_factory=list)
tools: list = field(default_factory=list)
coordinator_url: str = "http://localhost:8080"
def create_agent_server(config: AgentServerConfig) -> FastAPI:
"""Factory function that creates a standardized agent server.
Every agent in the fleet uses this template."""
app = FastAPI(
title=f"{config.name} Agent",
version=config.version,
description=config.description,
)
# Standard endpoints
@app.get("/health")
async def health():
return {"status": "healthy", "agent": config.name,
"version": config.version, "uptime": get_uptime()}
@app.post("/query")
async def query(request: QueryRequest):
return await process_query(request, config)
@app.post("/task")
async def handle_task(request: TaskRequest):
return await execute_task(request, config)
@app.post("/delegate")
async def delegate(request: DelegateRequest):
coordinator = AgentCoordinator(config.coordinator_url)
return await coordinator.delegate(request)
@app.get("/capabilities")
async def capabilities():
return {"name": config.name, "tools": config.tools,
"capabilities": config.capabilities}
return appclass DataPipelineAgent(BaseAgent):
"""Extends BaseAgent with ETL-specific tools.
Registered with Sentinel for delegation via /capabilities."""
def __init__(self):
super().__init__(
name="data_pipeline_agent",
description="ETL orchestration, data lineage, pipeline health",
tools=[
etl_pipeline, # Trigger Bronze/Silver/Gold transforms
data_quality_check, # Pre/post transform validation
backup_data, # Delta Lake snapshot backup to MinIO
data_lineage, # Track column-level lineage graph
query_knowledge_base # RAG over pipeline documentation
],
model="qwen2.5:7b",
)
async def handle_investigation(self, task):
"""Called by Sentinel via delegate_to_agent tool.
Executes ETL replay for specific date range and segments."""
result = await self.run_tool("etl_pipeline", {
"stage": task.stage,
"date_range": task.date_range,
"segments": task.segments,
})
quality = await self.run_tool("data_quality_check", {
"table": f"lakehouse/{task.stage}/*",
})
return {"etl_result": result, "quality": quality}@tool
async def delegate_to_agent(agent_name: str, task: str, params: dict = {}):
"""Delegate a task to a specialist agent via HTTP.
The Sentinel uses this tool to coordinate the investigation.
Args:
agent_name: Target agent (e.g., 'data_quality_agent')
task: Task description for the agent
params: Additional parameters for the task
"""
agent_registry = {
"data_quality_agent": "http://dq-agent:8016",
"data_pipeline_agent": "http://pipeline-agent:8017",
"bi_analytics_agent": "http://bi-agent:8084",
"infrastructure_agent": "http://infra-agent:8083",
}
url = agent_registry.get(agent_name)
if not url:
return {"error": f"Unknown agent: {agent_name}"}
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
f"{url}/task",
json={"task": task, "params": params,
"source": "sentinel_agent"},
)
return response.json()Production Infrastructure
The entire stack runs on a single Proxmox VM (10.0.0.159) with 24 GB RAM and 158 GB disk. All services are containerized with Docker Compose. The AI agents run as additional containers in the same stack, communicating over the internal Docker network.
services:
spark-master:
image: bitnami/spark:3.5.3
ports: ["8080:8080", "7077:7077"]
environment:
SPARK_MODE: master
spark-worker-1:
image: bitnami/spark:3.5.3
environment:
SPARK_MODE: worker
SPARK_MASTER_URL: spark://spark-master:7077
minio:
image: minio/minio
ports: ["9000:9000", "9001:9001"]
command: server /data --console-address ":9001"
airflow:
image: apache/airflow:2.9.3
ports: ["8081:8080"]
jupyter:
image: jupyter/pyspark-notebook
ports: ["8888:8888"]
postgres:
image: postgres:16
ports: ["5432:5432"]
# AI Agent containers
sentinel-agent:
build: ./agents/sentinel
ports: ["8080:8080"]
depends_on: [postgres]
data-quality-agent:
build: ./agents/data-quality
ports: ["8016:8016"]
depends_on: [postgres]
data-pipeline-agent:
build: ./agents/data-pipeline
ports: ["8017:8017"]1 master, 2 workers, 4 cores, 8 GB RAM
Event log browser for completed Spark jobs
PySpark notebooks with Delta Lake integration
DAG orchestrator for scheduled ETL pipelines
S3-compatible object storage for Delta tables
Agent state, quality checkpoints, execution logs
LangGraph orchestrator -- investigation coordinator
Z-score anomaly detection, schema validation
ETL automation, lineage tracking, backup
End-to-End Data Flow
The complete data flow from event sources through agent classification, planning, execution, validation, and output sinks. This Sankey diagram shows how every event is processed through the multi-agent pipeline, with the width of each flow encoding relative data volume.
Results and Reflection
Without AI Agents (traditional approach)
- -- Analyst receives aggregated dashboard alert (hours to days delay)
- -- Manual hypothesis formation: "is it seasonal? marketing? competition?"
- -- Write SQL queries to investigate each hypothesis (hours of work)
- -- Manually cross-reference segments, time windows, device types
- -- Estimated time to root cause: 2-5 business days
With AI Agent Orchestration
- -- Data Quality Agent detects anomaly in real-time (z-score threshold)
- -- Sentinel automatically formulates investigation strategy via LangGraph
- -- Specialist agents execute profiling and drill-down in parallel
- -- Root cause synthesized with evidence chain and remediation plan
- -- Total time: 15 minutes. Zero human-written queries.
Key insight: The value of AI agents in data engineering is not just speed -- it is the elimination of the hypothesis-query-iterate cycle that makes human investigation slow. The LangGraph orchestrator can reason about what to investigate next, delegate to specialist agents, and synthesize findings across multiple data sources in a single automated workflow. The medallion architecture provides the structured, high-quality data layers that make agent-driven investigation reliable: Bronze for audit trails, Silver for enriched analysis, Gold for anomaly detection.
Interactive Lab Demo
Explore the full Revenue Leak Investigation case study