Skip to main content
AI Agent Architecture/Data Engineer Agent
Agent Deep Dive — v3.0.0

Data Engineer Agent

One autonomous agent. 12 integrated services. 48 API tools. 6 end-to-end workflows. A unified control plane for the entire data platform — from DAG orchestration to ML model promotion.

Services
12 integrations
Airflow · Spark · dbt · PostgreSQL · MLflow · MinIO · Prometheus · Grafana · Jaeger · Langfuse · Celery · Flower
Tools
48 API endpoints
Full CRUD across orchestration, compute, transform, database, ML, storage, metrics, dashboards, tracing, LLM obs, task queues, and workflows
Workflows
6 E2E pipelines
Cross-service orchestrated reports: platform status, SLA, quality audit, ML features, observability, and ingest trigger
The Problem

Data platforms are fragmented

Modern data platforms are an archipelago of services — each with its own API, its own dashboard, its own authentication model. Airflow manages DAGs. Spark runs compute. dbt handles transforms. PostgreSQL stores state. MLflow tracks experiments. MinIO holds artifacts. Prometheus scrapes metrics. Grafana renders dashboards. Jaeger traces requests. Langfuse watches LLM costs.

A data engineer context-switches between 10+ browser tabs, 3 CLIs, and 2 terminal sessions just to answer: “Is the pipeline healthy?”

The answer requires correlating information from Airflow (DAG status), Spark (executor health), PostgreSQL (connection pool), Prometheus (error rate), and Grafana (dashboard annotations) — manually. Every. Single. Time.

The Data Engineer Agent v3.0 eliminates this fragmentation. It wraps every service behind a unified FastAPI control plane with 48 API endpoints, then composes them into 6 end-to-end workflows that answer platform questions in a single call.

Architecture

Single agent, twelve services

The agent is a FastAPI application running in a Docker container. It connects to each service via its native API and exposes a normalized REST interface. Auth is handled by a shared JWT middleware. Health checks run in parallel via asyncio.

┌─────────────────────────────────────────────────────────────────┐
│                    DATA ENGINEER AGENT v3.0                      │
│                    FastAPI · Port 8060                           │
│                                                                  │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐   │
│  │ /dags/*  │ │ /spark/* │ │ /dbt/*   │ │ /db/*            │   │
│  │ Airflow  │ │ Spark    │ │ dbt      │ │ PostgreSQL       │   │
│  │ 7 tools  │ │ 2 tools  │ │ 2 tools  │ │ 7 tools          │   │
│  └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────────────┘   │
│       │             │            │             │                  │
│  ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────────────┐   │
│  │ /mlflow/*│ │ /minio/* │ │ /prom/*  │ │ /grafana/*       │   │
│  │ MLflow   │ │ MinIO    │ │ Prometh. │ │ Grafana          │   │
│  │ 5 tools  │ │ 3 tools  │ │ 4 tools  │ │ 4 tools          │   │
│  └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────────────┘   │
│       │             │            │             │                  │
│  ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────────────┐   │
│  │ /jaeger/*│ │/langfuse │ │ /celery/*│ │ /workflows/*     │   │
│  │ Jaeger   │ │ Langfuse │ │ Celery   │ │ 6 E2E + 1 Report │   │
│  │ 3 tools  │ │ 2 tools  │ │ 2 tools  │ │ 7 tools          │   │
│  └──────────┘ └──────────┘ └──────────┘ └──────────────────┘   │
│                                                                  │
│  ┌─ /health ─────────────────────────────────────────────────┐  │
│  │  Parallel async checks: 9 services · timeout 5s each     │  │
│  └───────────────────────────────────────────────────────────┘  │
│  ┌─ /execute ────────────────────────────────────────────────┐  │
│  │  NLP task router: pattern-matches 48+ task descriptions   │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
FastAPI 0.115Python 3.12httpx (async)psycopg2DockerJWT Authasyncio.gatherPydantic v2
Tool Inventory

48 tools across 12 categories

Every tool is a REST endpoint. Hover a category to see its tools. The radial layout shows the distribution across service domains.

Loading visualization…
Service Categories

12 services, one interface

Each category wraps a dedicated service API behind normalized FastAPI endpoints with consistent error handling, timeout management, and structured JSON responses.

Orchestration7 tools
Apache Airflow

Full DAG lifecycle management — list, inspect, trigger, pause/unpause DAGs. Monitor run history and drill into individual task instances with execution state, retry count, and duration.

list_dagsget_dagtrigger_dagpause_unpause_daglist_dag_runsget_dag_runlist_task_instances
Compute2 tools
Apache Spark

Spark application inventory and cluster health. Lists running/completed applications with core allocation, memory, and duration. Cluster status shows executor count, available cores, and master state.

spark_applicationsspark_status
Transform2 tools
dbt

dbt project introspection — parse and analyze the dbt manifest for model lineage, source-to-model mapping, and test coverage. Project-level metadata including version, profile, and materialization strategies.

dbt_manifestdbt_project
Database7 tools
PostgreSQL

Deep PostgreSQL analytics — table profiling with row counts and bloat, index usage ratios, slow query identification (>100ms), connection pool monitoring, cross-schema DDL diff, data freshness tracking, and foreign-key dependency graphing.

db_table_profiledb_index_usagedb_slow_queriesdb_connectionsdb_schema_diffdb_freshnessdb_dependencies
ML Platform5 tools
MLflow

Full MLflow integration — list experiments, compare runs with metrics/params, browse the model registry, inspect version lineage, and promote models between stages (Staging → Production) with audit trail.

mlflow_experimentsmlflow_experiment_runsmlflow_modelsmlflow_model_versionsmlflow_promote_model
Data Lake3 tools
MinIO S3

S3-compatible object storage management — list buckets, browse objects with prefix filtering, and compute storage usage with object counts across the entire MinIO deployment.

minio_bucketsminio_objectsminio_usage
Metrics4 tools
Prometheus

Prometheus PromQL integration — scrape target health overview, ad-hoc PromQL queries, pre-built pipeline throughput/latency/error metrics, and active alert enumeration with severity and firing duration.

prometheus_targetsprometheus_queryprometheus_pipeline_metricsprometheus_alerts
Dashboards4 tools
Grafana

Grafana dashboard management — list all dashboards with tags and UIDs, retrieve full dashboard JSON for panel analysis, push deployment/incident annotations to timeline, and audit connected datasources.

grafana_dashboardsgrafana_dashboard_detailgrafana_create_annotationgrafana_datasources
Tracing3 tools
Jaeger

Distributed tracing via Jaeger — discover instrumented services, search traces by service with time-range and tag filters, and deep-dive into individual trace spans with timing waterfall.

jaeger_servicesjaeger_tracesjaeger_trace_detail
LLM Observability2 tools
Langfuse

LLM cost and quality tracking — retrieve recent LLM traces with token counts, latency, and model identification. Cost summary aggregates total spend, average tokens per call, and cost trending.

langfuse_traceslangfuse_cost_summary
Task Queue2 tools
Celery + Flower

Celery distributed task monitoring — list active workers with concurrency and pool type, inspect active/reserved/scheduled tasks with status, runtime, and routing information.

celery_workerscelery_tasks
Workflows + Reports7 tools
E2E Orchestration

Six end-to-end workflows that compose multiple service calls into unified reports: full platform status, SLA compliance, data quality audit, ML feature pipeline health, pipeline observability summary, and on-demand ingestion triggering. Plus a comprehensive pipeline health report.

data_platform_statuspipeline_sla_reportdata_quality_auditml_feature_pipeline_statuspipeline_observabilitytrigger_ingest_pipelinepipeline_health_report
End-to-End Workflows

Six workflows that compose everything

The real power of the agent is not the 48 individual tools — it is how they combine. Each E2E workflow orchestrates calls across multiple services to answer a complex platform question in a single API call.

🏗️

Data Platform Status

GET /workflows/data-platform-status

Composes DAG inventory + Spark cluster state + database connection pool + dbt project metadata into a unified platform health view.

AirflowSparkPostgreSQLdbt
📊

Pipeline SLA Report

GET /workflows/pipeline-sla-report

Correlates DAG run durations against configured SLA thresholds, flags breaches, and calculates compliance percentages with trend analysis.

AirflowPrometheusPostgreSQL
🔍

Data Quality Audit

GET /workflows/data-quality-audit

Runs table freshness checks, null-rate analysis, schema drift detection, and dbt test coverage — outputs a scored quality report per table.

PostgreSQLdbtPrometheus
🧠

ML Feature Pipeline

GET /workflows/ml-feature-pipeline-status

Tracks feature engineering DAGs, correlates with MLflow experiment runs, verifies feature store artifacts in MinIO, and checks Spark job completion.

AirflowMLflowMinIOSpark
📡

Pipeline Observability

GET /workflows/pipeline-observability

Full observability stack query — pulls metrics, active dashboards, recent traces, and LLM cost data into a single observability summary.

PrometheusGrafanaJaegerLangfuse
🚀

Trigger Ingest Pipeline

GET /workflows/trigger-ingest-pipeline

On-demand pipeline trigger — submits a DAG run, monitors Celery task execution, and pushes Grafana annotation + Prometheus metrics on completion.

AirflowCeleryPrometheus
Pipeline Flow

How a workflow composes services

The “Data Platform Status” workflow demonstrates how the agent composes parallel API calls across four services into a unified response. Each node runs concurrently via asyncio.gather.

Loading visualization…
Health System

9 service health probes

The /health endpoint runs 9 parallel connectivity checks using asyncio.gather with a 5-second timeout per service. This gives instant visibility into the entire platform stack before executing any workflow.

Airflow
:8080 · API health endpoint
Spark
:8081 · Master REST API
PostgreSQL
:5432 · SELECT 1 connection test
MLflow
:5000 · API version endpoint
MinIO
:9000 · Bucket list API
Prometheus
:9090 · /-/healthy probe
Grafana
:3000 · API health endpoint
Jaeger
:16686 · Services list query
Langfuse
:3100 · API health endpoint
GET /health — sample response
{
  "status": "healthy",
  "agent": "data-engineer-agent",
  "version": "3.0.0",
  "tools_count": 48,
  "services": {
    "airflow":    { "status": "reachable", "latency_ms": 12 },
    "spark":      { "status": "reachable", "latency_ms": 8 },
    "postgresql":  { "status": "reachable", "latency_ms": 3 },
    "mlflow":     { "status": "reachable", "latency_ms": 15 },
    "minio":      { "status": "reachable", "latency_ms": 6 },
    "prometheus": { "status": "reachable", "latency_ms": 4 },
    "grafana":    { "status": "reachable", "latency_ms": 9 },
    "jaeger":     { "status": "reachable", "latency_ms": 11 },
    "langfuse":   { "status": "reachable", "latency_ms": 7 }
  }
}
Live Demo

Real endpoints, real responses

The agent is running in production on port 8060. Here are real responses from the live system captured via the Sentinel Fleet Command Terminal.

sentinel-terminal
$ curl :8060/capabilities | jq '.tools_count, .tools | length'
48
48
sentinel-terminal
$ curl :8060/workflows/data-platform-status | jq '.summary'
{
  "airflow_dags": 12,
  "spark_executors": 4,
  "pg_active_connections": 23,
  "dbt_models": 47,
  "overall_health": "operational"
}
sentinel-terminal
$ curl :8060/workflows/pipeline-sla-report | jq '.sla_compliance'
{
  "total_pipelines": 12,
  "within_sla": 11,
  "breached": 1,
  "compliance_pct": 91.7,
  "breach_details": ["etl_customer_sync: 45min > 30min SLA"]
}
Fleet Context

Part of a 28-agent fleet

The Data Engineer Agent is one of 5 agents in the Data Engineering domain. It operates alongside the Data Pipeline Agent, Data Quality Agent, Database Agent, and RAG Knowledge Agent — all coordinated by the Sentinel orchestrator.

Data Pipeline Agent:8057

ETL orchestration, data transformation workflows, pipeline health monitoring with automatic retry and dead-letter routing

Data Engineer Agent:8060

48-tool data platform control plane — Airflow, Spark, dbt, PostgreSQL, MLflow, MinIO, Prometheus, Grafana, Jaeger, Langfuse, Celery

Data Quality Agent:8061

Schema validation, anomaly detection in data streams, quality scoring with threshold alerting and drift monitoring

Database Agent:8055

PostgreSQL operations — connection pool management, query performance analysis, backup verification, index optimization

RAG Knowledge Agent:8062

Dual-mode retrieval: Milvus vector search for episodic memory + Knowledge Library indexing 43 technical books via PyMuPDF

Outcomes

Measured operational results

48
API Tools

Across 12 integrated service categories

12
Services

Unified under a single FastAPI control plane

6
E2E Workflows

Cross-service orchestrated reports

9
Health Checks

Parallel connectivity validation

< 50ms
Avg Latency

Measured via Prometheus telemetry

100%
Uptime

Self-healing Docker with restart policy

Technical Summary

RuntimePython 3.12 · FastAPI 0.115 · uvicorn
ContainerDocker multi-stage · python:3.12-slim · 384MB RAM
Port8060 (internal) · JWT auth middleware
Health9 parallel async service checks · 5s timeout
ServicesAirflow, Spark, dbt, PostgreSQL, MLflow, MinIO, Prometheus, Grafana, Jaeger, Langfuse, Celery, Flower
Workflows6 E2E cross-service orchestrations
Router/execute NLP pattern-matching for 48+ tasks
ObservabilityPrometheus metrics + Langfuse LLM tracing

Explore the full fleet

This deep dive covers one of 28 agents in the SMAIS fleet. Read the main architecture article for the complete system — theory, production mapping, and interactive visualizations.