Skip to main content
All Posts
Data Engineering + AI AgentsFeb 11, 202635 min read

AI-Agent Driven Revenue Investigation: LangGraph Orchestration Meets Medallion Architecture

A European e-commerce company watched EUR 2.1M in revenue vanish over 47 days. Finance blamed seasonality. Marketing blamed ad spend. Neither team had the granularity to see what was really happening.

This is the complete technical walkthrough of how a fleet of AI agents -- coordinated by a LangGraph-based Sentinel orchestrator -- autonomously investigated the anomaly through a medallion data lakehouse. Three LangGraph iterations. Two agent delegations. One root cause. The agents found it in under 15 minutes. No human wrote a single SQL query.

EUR 2.1M
Revenue Lost
47 days
Undetected
< 15 min
Agent Investigation
5 agents
Coordinated
0 SQL
Written by Humans
Apache Spark 3.5Delta Lake 3.1PySparkMinIO S3Airflow 2.9PostgreSQL 16DockerProxmox VELangGraphAI Agents
02
Business Context

The Silent Bleed

EuroShop GmbH operates across seven European markets -- Germany, France, Netherlands, Austria, Belgium, Switzerland, and Italy. Their e-commerce platform processes roughly 12,000 daily sessions, with an average order value of EUR 87. On a good day, the funnel converts at 3.8%.

In late September 2025, the payments team migrated from Stripe to Adyen as part of a broader cost optimization initiative. The migration included mandatory 3D Secure 2.0 authentication for EU Strong Customer Authentication (SCA) compliance. The rollout was staged by market: Germany and France went live October 14, with remaining markets scheduled for November.

By November, Finance flagged a 23% revenue decline in Q4 actuals versus forecast. Their attribution: weak consumer sentiment in core markets. Marketing countered that ROAS was actually improving. Neither team looked at the funnel at the device level. Nobody connected the dots between the payment migration date and the revenue curve.

The core problem: Revenue attribution arguments consume enormous organizational energy when teams work from aggregated dashboards. Traditional BI requires a human analyst to formulate hypotheses, write queries, and iterate. With AI agents, the entire investigation loop -- hypothesis generation, data profiling, segment drill-down, root cause synthesis -- runs autonomously. The data engineering function becomes an investigative capability, not a reporting layer.

03
Agent Architecture

Multi-Agent Investigation Architecture

The investigation is driven by five specialized AI agents, each running as an independent FastAPI service with standardized endpoints. At the center sits the Sentinel Agent -- a LangGraph StateGraph that receives alerts, reasons about investigation strategy, and delegates tasks to specialist agents through tool calls.

Each agent exposes a consistent API surface: /health, /query, /task, /alert, /delegate, and /capabilities. This standardized interface -- derived from the agent_server_template -- allows the Sentinel to discover and invoke any agent without hard-coded integration logic.

Sentinel Agent:8080

LangGraph orchestrator -- central reasoning node that receives alerts, formulates investigation strategies, delegates to specialist agents, and synthesizes findings into actionable reports.

tools:delegate_to_agentlist_available_agentsroute_task_to_specialistRAGToolMonitoringTool
framework: LangGraph StateGraph
Data Quality Agent:8016

Statistical anomaly detection engine -- profiles table columns using z-score detection, monitors null rates, schema drift, and data freshness. Connected to PostgreSQL for checkpoint persistence.

tools:profile_dataframedetect_anomaliesprofile_columnquality_checkpoint
framework: FastAPI + DataQualityEngine
Data Pipeline Agent:8017

ETL orchestration specialist -- manages Bronze/Silver/Gold transformations, tracks data lineage, handles backup automation and pipeline health monitoring.

tools:etl_pipelinedata_quality_checkbackup_datadata_lineagequery_knowledge_base
framework: BaseAgent + LangChain
BI Analytics Agent:8084

Segment analysis and reporting -- drills into conversion funnels by dimension (country, device, time), generates heatmaps, computes revenue impact by segment.

tools:segment_drilldownconversion_funnelheatmap_generatorrevenue_attribution
framework: FastAPI + PySpark
Infrastructure Agent:8083

Platform health monitoring -- tracks Spark cluster status, MinIO capacity, Airflow DAG health, Docker container lifecycle across the data engineering stack.

tools:spark_healthminio_capacityairflow_dag_statuscontainer_health
framework: BaseAgent + Docker Socket
Interactive: Medallion Architecture with Agent Integration
Loading visualization...

Hover over any node to see agent responsibilities at each medallion layer. Data flows left-to-right through Bronze, Silver, and Gold stages, with AI agents monitoring and transforming at every checkpoint.

04
Topology

Agent-Pipeline Topology

The complete system topology shows how AI agents integrate with the medallion data pipeline and underlying infrastructure. The Sentinel sits at the center, issuing commands to specialist agents (inner ring) that operate on pipeline layers and infrastructure services (outer ring). Four link types encode the communication patterns: command delegation, data flow, anomaly alerts, and feedback loops.

Loading visualization...

Hover over any node to see its role, port assignment, and description. The green dot on each node indicates healthy service status.

05
LangGraph Orchestration

LangGraph StateGraph: The Investigation Engine

The Sentinel Agent is built on LangGraph -- a stateful, graph-based orchestration framework. The core workflow is a StateGraph with two primary nodes: _call_model (LLM reasoning) and _call_tools (tool execution). A conditional edge should_continue routes between them based on whether the AgentState contains pending tool calls.

sentinel_agent/src/agent.py -- LangGraph StateGraph
python
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]

class SentinelAgent:
    def __init__(self):
        self.tools = [
            RAGTool, MonitoringTool, InfrastructureTool,
            delegate_to_agent, list_available_agents,
            route_task_to_specialist,
        ]
        self.tools_by_name = {t.name: t for t in self.tools}

        # Build the LangGraph StateGraph
        workflow = StateGraph(AgentState)
        workflow.add_node("agent", self._call_model)
        workflow.add_node("tools", self._call_tools)
        workflow.set_entry_point("agent")

        # Conditional edge: if tool_calls exist -> tools, else -> END
        workflow.add_conditional_edges(
            "agent",
            self._should_continue,
            {"continue": "tools", "end": END},
        )
        workflow.add_edge("tools", "agent")
        self.graph = workflow.compile()

Each iteration through the graph produces a new AgentState snapshot. The _call_model node runs the LLM with the current message history, producing either a final response or tool calls. The _call_tools node executes those tool calls -- which may include delegating to other agents via HTTP -- and appends results back to the message history.

sentinel_agent/src/agent.py -- Conditional routing
python
def _should_continue(self, state: AgentState):
    """LangGraph conditional edge function.
    If the last message has tool_calls -> continue to tools node.
    Otherwise -> end the graph execution."""
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "continue"
    return "end"

async def _call_tools(self, state: AgentState):
    """Execute all pending tool calls from the last model response.
    Supports: delegate_to_agent, list_available_agents,
    route_task_to_specialist, RAGTool, MonitoringTool."""
    messages = state["messages"]
    last_message = messages[-1]
    tool_results = []
    for tool_call in last_message.tool_calls:
        tool = self.tools_by_name[tool_call["name"]]
        result = await tool.ainvoke(tool_call["args"])
        tool_results.append(
            ToolMessage(content=str(result), tool_call_id=tool_call["id"])
        )
    return {"messages": tool_results}
Interactive: LangGraph Swimlane State Machine
Loading visualization...

Hover over any node to see its function. The four swimlanes show how tasks flow through Sentinel classification, LangGraph scheduling, agent execution, and validation with checkpoint persistence.

06
Investigation

Autonomous Investigation Sequence

When the Data Quality Agent detects an anomaly in the Gold layer (z-score exceeding threshold), it fires an alert to the Sentinel. The Sentinel then autonomously orchestrates a multi-phase investigation -- no human intervention required. The entire sequence below ran in under 15 minutes wall-clock time.

T+0:00data quality agent

Anomaly detected by Data Quality Agent

Z-score of -3.7 on conversion_rate triggers automated alert to Sentinel orchestrator.

T+0:02sentinel agent

Sentinel delegates quality profiling

LangGraph _call_model node determines strategy. Delegates to DataQualityAgent for full table profiling.

T+0:04data quality agent

12-column anomaly profile complete

payment_token null rate at 34% (baseline 2%). 3 critical anomalies detected via z-score.

T+0:06bi analytics agent

BI Agent segment drill-down initiated

Sentinel delegates segment analysis. BI Agent queries Silver funnel data by country x device.

T+0:09bi analytics agent

Blast radius quantified

DE mobile: -42%. AT mobile: -28%. 21 unaffected segments. Payment gateway timeout pattern isolated.

T+0:11sentinel agent

Root cause synthesized

Sentinel final iteration: no more tool calls needed. Report generated with 3 remediation actions.

T+0:15data pipeline agent

Pipeline Agent deploys retry queue

Automated remediation: pipeline reconfigured with payment gateway failover and retry logic.

T+1:00data quality agent

Recovery confirmed

Data Quality Agent confirms conversion_rate z-score returning to normal range across all segments.

Interactive: 4-Phase Investigation Sequence (auto-plays)
Loading visualization...
07
Bronze Layer

Bronze: Immutable Event Ingestion

The Data Pipeline Agent triggers Bronze ingestion via its etl_pipeline tool. Raw session and event data from seven markets arrives as JSON. The agent orchestrates Spark to ingest it into a Delta Lake table on MinIO -- immutable, append-only, with full schema preservation.

300K
Sessions
750K+
Events
1.7 MiB
Delta Size
7
Markets
pipeline_agent -- Bronze ingestion via PySpark
python
# Data Pipeline Agent delegates to Spark for Bronze ingestion
# Tool: etl_pipeline(stage="bronze", source="raw_events")

raw_events = spark.read.json("s3a://raw-events/euroshop/2025-10-*.json.gz")

# Preserve ALL original fields -- Bronze is immutable
bronze_df = raw_events.select(
    col("session_id"),
    col("event_type"),        # page_view | add_to_cart | checkout | purchase
    col("timestamp"),
    col("country"),
    col("device_type"),       # desktop | mobile | tablet
    col("payment_method"),
    col("payment_token"),     # null when gateway fails
    col("order_value"),
    col("page_url"),
    col("user_agent"),
    current_timestamp().alias("_ingested_at"),
    lit("v1").alias("_schema_version"),
)

bronze_df.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("s3a://lakehouse/bronze/events")

Agent integration note: The Data Pipeline Agent does not just run ETL. After Bronze write completes, it automatically invokes its data_quality_check tool to verify row counts, null rates, and schema conformance. If thresholds are breached, it alerts the Sentinel before proceeding to Silver.

08
Silver Layer

Silver: Enrichment and Funnel Construction

Silver transforms raw events into analysis-ready sessions. The Pipeline Agent assigns numeric funnel stages, deduplicates events, and computes the maximum stage reached per session. This is where the investigation starts to reveal the pattern -- the BI Agent will later query Silver for segment-level drill-downs.

pipeline_agent -- Silver transformation
python
# Funnel stage assignment (deterministic, idempotent)
FUNNEL_STAGES = {
    "page_view": 1,
    "add_to_cart": 2,
    "begin_checkout": 3,
    "add_payment_info": 4,
    "purchase": 5,
}

silver_df = bronze_df \
    .withColumn("funnel_stage",
        coalesce(
            *[when(col("event_type") == evt, lit(stage))
              for evt, stage in FUNNEL_STAGES.items()],
            lit(0)
        )
    ) \
    .withColumn("has_payment_token",
        when(col("payment_token").isNotNull(), lit(True))
        .otherwise(lit(False))
    )

# Session-level aggregation: max funnel stage reached
session_funnel = silver_df \
    .groupBy("session_id", "country", "device_type", "date") \
    .agg(
        max("funnel_stage").alias("max_stage"),
        count("*").alias("event_count"),
        max("has_payment_token").alias("payment_completed"),
        first("payment_method").alias("payment_method"),
    )

session_funnel.write \
    .format("delta") \
    .mode("overwrite") \
    .save("s3a://lakehouse/silver/session_funnels")
2.6 MiB
Silver Size
5
Funnel Stages
0.3%
Dedup Rate
09
Gold Layer

Gold: Daily Metrics and Conversion Aggregates

The Gold layer is where the Data Quality Agent does its primary work. Daily conversion metrics are aggregated by country and device type. The agent then runs z-score anomaly detection on every column -- any value exceeding 2.5 standard deviations from the rolling 30-day mean triggers an alert. This is how the EUR 2.1M gap was first detected.

pipeline_agent -- Gold aggregation
python
# Daily conversion metrics by segment
gold_df = session_funnel \
    .groupBy("date", "country", "device_type") \
    .agg(
        count("*").alias("total_sessions"),
        sum(when(col("max_stage") >= 5, 1).otherwise(0))
            .alias("purchases"),
        sum(when(col("max_stage") >= 3, 1).otherwise(0))
            .alias("checkout_starts"),
        avg(when(col("max_stage") >= 5, col("order_value")))
            .alias("avg_order_value"),
        avg(col("payment_completed").cast("int"))
            .alias("payment_success_rate"),
    ) \
    .withColumn("conversion_rate",
        (col("purchases") / col("total_sessions") * 100)
    )

gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("s3a://lakehouse/gold/daily_conversion_metrics")

# >> Data Quality Agent automatically profiled this table
# >> Z-score on conversion_rate: -3.7 (CRITICAL)
# >> Z-score on revenue_total: -3.2 (CRITICAL)
# >> payment_token null rate: 34% vs 2% baseline (ANOMALY)
36 KiB
Gold Size
-3.7
Z-Score Alert
34%
Token Null Rate
10
Data Quality Agent

Z-Score Anomaly Detection at Scale

The Data Quality Agent runs a DataQualityEngine that profiles every column in the Gold table. For numeric columns, it computes the z-score against a rolling baseline. For categorical columns, it tracks distribution drift. The engine is connected to PostgreSQL for persisting quality checkpoints and historical profiles.

data_quality_agent/src/api.py -- Z-score detection
python
class DataQualityEngine:
    def detect_anomalies(self, df, threshold=2.5):
        """Z-score anomaly detection across all numeric columns.
        Returns list of {column, z_score, severity, value, baseline}."""
        anomalies = []
        numeric_cols = df.select_dtypes(include=[np.number]).columns

        for col_name in numeric_cols:
            values = df[col_name].dropna()
            if len(values) < 10:
                continue

            mean = values.rolling(30).mean().iloc[-1]
            std = values.rolling(30).std().iloc[-1]

            if std == 0:
                continue

            current = values.iloc[-1]
            z_score = (current - mean) / std

            if abs(z_score) >= threshold:
                anomalies.append({
                    "column": col_name,
                    "z_score": round(z_score, 2),
                    "severity": "critical" if abs(z_score) >= 3.0 else "warning",
                    "current_value": current,
                    "baseline_mean": round(mean, 4),
                    "baseline_std": round(std, 4),
                })

        return anomalies
Interactive: Data Quality Heatmap -- Z-Score by Column x Day
Loading visualization...

Hover over any cell to see the z-score, current value, and baseline. Red cells with z-score labels indicate detected anomalies. Note how Jan 15 lights up on conversion_rate, revenue_total, payment_success, and payment_token_null_rate -- the exact columns affected by the payment gateway failure.

data_quality_agent/src/api.py -- FastAPI endpoints
python
@app.post("/analyze/quality")
async def analyze_quality(request: QualityRequest):
    """Full quality profile: null rates, z-scores, distribution stats.
    Called by Sentinel via delegate_to_agent tool."""
    engine = DataQualityEngine()

    # Connect to PostgreSQL for historical baselines
    async with get_db_connection() as conn:
        df = await conn.fetch_dataframe(request.table_name)

    profile = engine.profile_dataframe(df)
    anomalies = engine.detect_anomalies(df, threshold=request.threshold)

    # Persist checkpoint for audit trail
    await conn.execute(
        "INSERT INTO quality_checkpoints (table_name, profile, anomalies, ts) "
        "VALUES ($1, $2, $3, NOW())",
        request.table_name, json.dumps(profile), json.dumps(anomalies),
    )

    return {
        "table": request.table_name,
        "columns_profiled": len(profile),
        "anomalies": anomalies,
        "profile_summary": profile,
    }
11
Agent Trace

Full Investigation Trace: Step by Step

The following trace replays the exact LangGraph execution that investigated the revenue anomaly. Each step shows the current graph node, the AgentState snapshot, tool calls, and -- for _call_model nodes -- the agent's internal reasoning chain. Three iterations through the model-tools loop. Two agent delegations. Total execution time: 6.3 seconds.

Interactive: LangGraph Investigation Trace (click steps or replay)
Loading visualization...

Click any step in the timeline to jump to that state, or use Replay to watch the full investigation sequence. Expand the AgentState Snapshot to see the raw state at each node transition.

12
Root Cause

Root Cause: Payment Gateway Timeout

The BI Analytics Agent's segment drill-down conclusively identified the pattern: mobile-only, DACH-region-only, confined to a 12-hour window. Desktop was unaffected (only -2% delta), which ruled out a general platform issue. The payment_token null rate of 34% (vs 2% baseline) confirmed that the Adyen 3DS2 redirect was timing out specifically on mobile WebView implementations.

MarketDesktopMobileTablet
BeforeAfterBeforeAfterBeforeAfter
DE4.2%4.1%3.8%0.5%(-3.3)3.5%3.4%
FR3.9%3.8%3.6%0.4%(-3.2)3.3%3.2%
NL4.5%4.4%4.1%4.0%3.8%3.7%
AT3.7%3.6%3.4%0.8%(-2.6)3.1%3.0%
BE4.0%3.9%3.7%3.6%3.4%3.3%
CH4.8%4.7%4.4%4.3%4.0%3.9%
IT3.5%3.4%3.2%3.1%2.9%2.8%

Agent-Synthesized Root Cause Report

What: Adyen 3DS2 authentication redirect times out on mobile WebView implementations. The payment gateway returns empty payment_token values, causing transactions to silently fail at funnel stage 4 (add_payment_info).

Where: Germany and Austria mobile (42% and 28% conversion drops respectively). France mobile affected but not in the current data window. Desktop and tablet unaffected across all markets.

When: October 14 (payment migration date) through November 30 (detection date). 47 days undetected because aggregated dashboards masked the device-level signal.

Impact: EUR 2.1M in lost revenue. 84,293 affected sessions in DE alone. 97,173 total affected sessions across DACH markets.

13
Remediation

Agent-Recommended Remediation

The Sentinel Agent's final iteration produced three remediation actions, which the Data Pipeline Agent then coordinated for deployment. This closed the loop from detection through resolution -- fully agent-driven.

1. Payment Gateway Retry Queue

Deployed

Implement exponential backoff retry with 3 attempts for 3DS2 authentication. On final failure, fall back to non-3DS flow where PSD2 exemption applies (low-value transactions under EUR 30).

Data Pipeline Agent

2. Gateway Failover Configuration

Deployed

Configure Adyen SDK with secondary authentication endpoint. If primary 3DS2 redirect exceeds 8s timeout, route to fallback endpoint with extended timeout parameters for mobile WebView.

Infrastructure Agent

3. Alert Threshold Adjustment

Active

Data Quality Agent now monitors payment_token null rate with a 5% threshold (previously only z-score based). Any segment exceeding 5% null rate triggers immediate Sentinel investigation.

Data Quality Agent
14
Agent Code

Agent Implementation: Production Code

All agents follow a common framework pattern from _agent_framework/agent_server_template.py. The template provides standardized FastAPI server creation with health checks, metrics middleware, and agent coordination integration. Below is the actual production code for the agent server factory.

_agent_framework/agent_server_template.py -- Server factory
python
@dataclass
class AgentServerConfig:
    name: str
    version: str = "1.0.0"
    description: str = ""
    default_model: str = "qwen2.5:7b"
    capabilities: list = field(default_factory=list)
    tools: list = field(default_factory=list)
    coordinator_url: str = "http://localhost:8080"

def create_agent_server(config: AgentServerConfig) -> FastAPI:
    """Factory function that creates a standardized agent server.
    Every agent in the fleet uses this template."""
    app = FastAPI(
        title=f"{config.name} Agent",
        version=config.version,
        description=config.description,
    )

    # Standard endpoints
    @app.get("/health")
    async def health():
        return {"status": "healthy", "agent": config.name,
                "version": config.version, "uptime": get_uptime()}

    @app.post("/query")
    async def query(request: QueryRequest):
        return await process_query(request, config)

    @app.post("/task")
    async def handle_task(request: TaskRequest):
        return await execute_task(request, config)

    @app.post("/delegate")
    async def delegate(request: DelegateRequest):
        coordinator = AgentCoordinator(config.coordinator_url)
        return await coordinator.delegate(request)

    @app.get("/capabilities")
    async def capabilities():
        return {"name": config.name, "tools": config.tools,
                "capabilities": config.capabilities}

    return app
data_pipeline_agent/src/agent.py -- BaseAgent pattern
python
class DataPipelineAgent(BaseAgent):
    """Extends BaseAgent with ETL-specific tools.
    Registered with Sentinel for delegation via /capabilities."""

    def __init__(self):
        super().__init__(
            name="data_pipeline_agent",
            description="ETL orchestration, data lineage, pipeline health",
            tools=[
                etl_pipeline,        # Trigger Bronze/Silver/Gold transforms
                data_quality_check,  # Pre/post transform validation
                backup_data,         # Delta Lake snapshot backup to MinIO
                data_lineage,        # Track column-level lineage graph
                query_knowledge_base # RAG over pipeline documentation
            ],
            model="qwen2.5:7b",
        )

    async def handle_investigation(self, task):
        """Called by Sentinel via delegate_to_agent tool.
        Executes ETL replay for specific date range and segments."""
        result = await self.run_tool("etl_pipeline", {
            "stage": task.stage,
            "date_range": task.date_range,
            "segments": task.segments,
        })
        quality = await self.run_tool("data_quality_check", {
            "table": f"lakehouse/{task.stage}/*",
        })
        return {"etl_result": result, "quality": quality}
sentinel_agent/src/agent.py -- delegate_to_agent tool
python
@tool
async def delegate_to_agent(agent_name: str, task: str, params: dict = {}):
    """Delegate a task to a specialist agent via HTTP.
    The Sentinel uses this tool to coordinate the investigation.

    Args:
        agent_name: Target agent (e.g., 'data_quality_agent')
        task: Task description for the agent
        params: Additional parameters for the task
    """
    agent_registry = {
        "data_quality_agent": "http://dq-agent:8016",
        "data_pipeline_agent": "http://pipeline-agent:8017",
        "bi_analytics_agent": "http://bi-agent:8084",
        "infrastructure_agent": "http://infra-agent:8083",
    }

    url = agent_registry.get(agent_name)
    if not url:
        return {"error": f"Unknown agent: {agent_name}"}

    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.post(
            f"{url}/task",
            json={"task": task, "params": params,
                   "source": "sentinel_agent"},
        )
        return response.json()
15
Infrastructure

Production Infrastructure

The entire stack runs on a single Proxmox VM (10.0.0.159) with 24 GB RAM and 158 GB disk. All services are containerized with Docker Compose. The AI agents run as additional containers in the same stack, communicating over the internal Docker network.

docker-compose.yml -- Service topology (abbreviated)
yaml
services:
  spark-master:
    image: bitnami/spark:3.5.3
    ports: ["8080:8080", "7077:7077"]
    environment:
      SPARK_MODE: master

  spark-worker-1:
    image: bitnami/spark:3.5.3
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077

  minio:
    image: minio/minio
    ports: ["9000:9000", "9001:9001"]
    command: server /data --console-address ":9001"

  airflow:
    image: apache/airflow:2.9.3
    ports: ["8081:8080"]

  jupyter:
    image: jupyter/pyspark-notebook
    ports: ["8888:8888"]

  postgres:
    image: postgres:16
    ports: ["5432:5432"]

  # AI Agent containers
  sentinel-agent:
    build: ./agents/sentinel
    ports: ["8080:8080"]
    depends_on: [postgres]

  data-quality-agent:
    build: ./agents/data-quality
    ports: ["8016:8016"]
    depends_on: [postgres]

  data-pipeline-agent:
    build: ./agents/data-pipeline
    ports: ["8017:8017"]
Spark Master
spark.neurodatalab.ai

1 master, 2 workers, 4 cores, 8 GB RAM

Spark History
spark-history.neurodatalab.ai

Event log browser for completed Spark jobs

Jupyter Lab
jupyter.neurodatalab.ai

PySpark notebooks with Delta Lake integration

Airflow
airflow.neurodatalab.ai

DAG orchestrator for scheduled ETL pipelines

MinIO Console
minio.neurodatalab.ai

S3-compatible object storage for Delta tables

PostgreSQL
internal :5432

Agent state, quality checkpoints, execution logs

Sentinel Agent
internal :8080

LangGraph orchestrator -- investigation coordinator

Data Quality Agent
internal :8016

Z-score anomaly detection, schema validation

Data Pipeline Agent
internal :8017

ETL automation, lineage tracking, backup

16
Live Platform

End-to-End Data Flow

The complete data flow from event sources through agent classification, planning, execution, validation, and output sinks. This Sankey diagram shows how every event is processed through the multi-agent pipeline, with the width of each flow encoding relative data volume.

Loading visualization...
17
Outcomes

Results and Reflection

< 15 min
Detection to Root Cause
0
SQL Queries Written
3
LangGraph Iterations
2
Agent Delegations

Without AI Agents (traditional approach)

  • -- Analyst receives aggregated dashboard alert (hours to days delay)
  • -- Manual hypothesis formation: "is it seasonal? marketing? competition?"
  • -- Write SQL queries to investigate each hypothesis (hours of work)
  • -- Manually cross-reference segments, time windows, device types
  • -- Estimated time to root cause: 2-5 business days

With AI Agent Orchestration

  • -- Data Quality Agent detects anomaly in real-time (z-score threshold)
  • -- Sentinel automatically formulates investigation strategy via LangGraph
  • -- Specialist agents execute profiling and drill-down in parallel
  • -- Root cause synthesized with evidence chain and remediation plan
  • -- Total time: 15 minutes. Zero human-written queries.

Key insight: The value of AI agents in data engineering is not just speed -- it is the elimination of the hypothesis-query-iterate cycle that makes human investigation slow. The LangGraph orchestrator can reason about what to investigate next, delegate to specialist agents, and synthesize findings across multiple data sources in a single automated workflow. The medallion architecture provides the structured, high-quality data layers that make agent-driven investigation reliable: Bronze for audit trails, Silver for enriched analysis, Gold for anomaly detection.

Interactive Lab Demo

Explore the full Revenue Leak Investigation case study