Data Engineer Agent
One autonomous agent. 12 integrated services. 48 API tools. 6 end-to-end workflows. A unified control plane for the entire data platform — from DAG orchestration to ML model promotion.
Data platforms are fragmented
Modern data platforms are an archipelago of services — each with its own API, its own dashboard, its own authentication model. Airflow manages DAGs. Spark runs compute. dbt handles transforms. PostgreSQL stores state. MLflow tracks experiments. MinIO holds artifacts. Prometheus scrapes metrics. Grafana renders dashboards. Jaeger traces requests. Langfuse watches LLM costs.
A data engineer context-switches between 10+ browser tabs, 3 CLIs, and 2 terminal sessions just to answer: “Is the pipeline healthy?”
The answer requires correlating information from Airflow (DAG status), Spark (executor health), PostgreSQL (connection pool), Prometheus (error rate), and Grafana (dashboard annotations) — manually. Every. Single. Time.
The Data Engineer Agent v3.0 eliminates this fragmentation. It wraps every service behind a unified FastAPI control plane with 48 API endpoints, then composes them into 6 end-to-end workflows that answer platform questions in a single call.
Single agent, twelve services
The agent is a FastAPI application running in a Docker container. It connects to each service via its native API and exposes a normalized REST interface. Auth is handled by a shared JWT middleware. Health checks run in parallel via asyncio.
┌─────────────────────────────────────────────────────────────────┐ │ DATA ENGINEER AGENT v3.0 │ │ FastAPI · Port 8060 │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │ │ /dags/* │ │ /spark/* │ │ /dbt/* │ │ /db/* │ │ │ │ Airflow │ │ Spark │ │ dbt │ │ PostgreSQL │ │ │ │ 7 tools │ │ 2 tools │ │ 2 tools │ │ 7 tools │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────────────┘ │ │ │ │ │ │ │ │ ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────────────┐ │ │ │ /mlflow/*│ │ /minio/* │ │ /prom/* │ │ /grafana/* │ │ │ │ MLflow │ │ MinIO │ │ Prometh. │ │ Grafana │ │ │ │ 5 tools │ │ 3 tools │ │ 4 tools │ │ 4 tools │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────────────┘ │ │ │ │ │ │ │ │ ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────┐ ┌────┴─────────────┐ │ │ │ /jaeger/*│ │/langfuse │ │ /celery/*│ │ /workflows/* │ │ │ │ Jaeger │ │ Langfuse │ │ Celery │ │ 6 E2E + 1 Report │ │ │ │ 3 tools │ │ 2 tools │ │ 2 tools │ │ 7 tools │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────────────┘ │ │ │ │ ┌─ /health ─────────────────────────────────────────────────┐ │ │ │ Parallel async checks: 9 services · timeout 5s each │ │ │ └───────────────────────────────────────────────────────────┘ │ │ ┌─ /execute ────────────────────────────────────────────────┐ │ │ │ NLP task router: pattern-matches 48+ task descriptions │ │ │ └───────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘
48 tools across 12 categories
Every tool is a REST endpoint. Hover a category to see its tools. The radial layout shows the distribution across service domains.
12 services, one interface
Each category wraps a dedicated service API behind normalized FastAPI endpoints with consistent error handling, timeout management, and structured JSON responses.
Full DAG lifecycle management — list, inspect, trigger, pause/unpause DAGs. Monitor run history and drill into individual task instances with execution state, retry count, and duration.
Spark application inventory and cluster health. Lists running/completed applications with core allocation, memory, and duration. Cluster status shows executor count, available cores, and master state.
dbt project introspection — parse and analyze the dbt manifest for model lineage, source-to-model mapping, and test coverage. Project-level metadata including version, profile, and materialization strategies.
Deep PostgreSQL analytics — table profiling with row counts and bloat, index usage ratios, slow query identification (>100ms), connection pool monitoring, cross-schema DDL diff, data freshness tracking, and foreign-key dependency graphing.
Full MLflow integration — list experiments, compare runs with metrics/params, browse the model registry, inspect version lineage, and promote models between stages (Staging → Production) with audit trail.
S3-compatible object storage management — list buckets, browse objects with prefix filtering, and compute storage usage with object counts across the entire MinIO deployment.
Prometheus PromQL integration — scrape target health overview, ad-hoc PromQL queries, pre-built pipeline throughput/latency/error metrics, and active alert enumeration with severity and firing duration.
Grafana dashboard management — list all dashboards with tags and UIDs, retrieve full dashboard JSON for panel analysis, push deployment/incident annotations to timeline, and audit connected datasources.
Distributed tracing via Jaeger — discover instrumented services, search traces by service with time-range and tag filters, and deep-dive into individual trace spans with timing waterfall.
LLM cost and quality tracking — retrieve recent LLM traces with token counts, latency, and model identification. Cost summary aggregates total spend, average tokens per call, and cost trending.
Celery distributed task monitoring — list active workers with concurrency and pool type, inspect active/reserved/scheduled tasks with status, runtime, and routing information.
Six end-to-end workflows that compose multiple service calls into unified reports: full platform status, SLA compliance, data quality audit, ML feature pipeline health, pipeline observability summary, and on-demand ingestion triggering. Plus a comprehensive pipeline health report.
Six workflows that compose everything
The real power of the agent is not the 48 individual tools — it is how they combine. Each E2E workflow orchestrates calls across multiple services to answer a complex platform question in a single API call.
Data Platform Status
GET /workflows/data-platform-statusComposes DAG inventory + Spark cluster state + database connection pool + dbt project metadata into a unified platform health view.
Pipeline SLA Report
GET /workflows/pipeline-sla-reportCorrelates DAG run durations against configured SLA thresholds, flags breaches, and calculates compliance percentages with trend analysis.
Data Quality Audit
GET /workflows/data-quality-auditRuns table freshness checks, null-rate analysis, schema drift detection, and dbt test coverage — outputs a scored quality report per table.
ML Feature Pipeline
GET /workflows/ml-feature-pipeline-statusTracks feature engineering DAGs, correlates with MLflow experiment runs, verifies feature store artifacts in MinIO, and checks Spark job completion.
Pipeline Observability
GET /workflows/pipeline-observabilityFull observability stack query — pulls metrics, active dashboards, recent traces, and LLM cost data into a single observability summary.
Trigger Ingest Pipeline
GET /workflows/trigger-ingest-pipelineOn-demand pipeline trigger — submits a DAG run, monitors Celery task execution, and pushes Grafana annotation + Prometheus metrics on completion.
How a workflow composes services
The “Data Platform Status” workflow demonstrates how the agent composes parallel API calls across four services into a unified response. Each node runs concurrently via asyncio.gather.
9 service health probes
The /health endpoint runs 9 parallel connectivity checks using asyncio.gather with a 5-second timeout per service. This gives instant visibility into the entire platform stack before executing any workflow.
{
"status": "healthy",
"agent": "data-engineer-agent",
"version": "3.0.0",
"tools_count": 48,
"services": {
"airflow": { "status": "reachable", "latency_ms": 12 },
"spark": { "status": "reachable", "latency_ms": 8 },
"postgresql": { "status": "reachable", "latency_ms": 3 },
"mlflow": { "status": "reachable", "latency_ms": 15 },
"minio": { "status": "reachable", "latency_ms": 6 },
"prometheus": { "status": "reachable", "latency_ms": 4 },
"grafana": { "status": "reachable", "latency_ms": 9 },
"jaeger": { "status": "reachable", "latency_ms": 11 },
"langfuse": { "status": "reachable", "latency_ms": 7 }
}
}Real endpoints, real responses
The agent is running in production on port 8060. Here are real responses from the live system captured via the Sentinel Fleet Command Terminal.
48 48
{
"airflow_dags": 12,
"spark_executors": 4,
"pg_active_connections": 23,
"dbt_models": 47,
"overall_health": "operational"
}{
"total_pipelines": 12,
"within_sla": 11,
"breached": 1,
"compliance_pct": 91.7,
"breach_details": ["etl_customer_sync: 45min > 30min SLA"]
}Part of a 28-agent fleet
The Data Engineer Agent is one of 5 agents in the Data Engineering domain. It operates alongside the Data Pipeline Agent, Data Quality Agent, Database Agent, and RAG Knowledge Agent — all coordinated by the Sentinel orchestrator.
ETL orchestration, data transformation workflows, pipeline health monitoring with automatic retry and dead-letter routing
48-tool data platform control plane — Airflow, Spark, dbt, PostgreSQL, MLflow, MinIO, Prometheus, Grafana, Jaeger, Langfuse, Celery
Schema validation, anomaly detection in data streams, quality scoring with threshold alerting and drift monitoring
PostgreSQL operations — connection pool management, query performance analysis, backup verification, index optimization
Dual-mode retrieval: Milvus vector search for episodic memory + Knowledge Library indexing 43 technical books via PyMuPDF
Measured operational results
Across 12 integrated service categories
Unified under a single FastAPI control plane
Cross-service orchestrated reports
Parallel connectivity validation
Measured via Prometheus telemetry
Self-healing Docker with restart policy
Technical Summary
Explore the full fleet
This deep dive covers one of 28 agents in the SMAIS fleet. Read the main architecture article for the complete system — theory, production mapping, and interactive visualizations.