The Problem with Manual Outbound Intelligence
Most growing businesses share the same bottleneck: outbound prospecting and candidate sourcing are overwhelmingly manual. Sales development representatives spend 65% of their time on non-selling activities -- researching leads, enriching records, qualifying prospects, and updating CRM fields. Recruiting teams face an identical problem: sourcing passive candidates, screening resumes, and scheduling outreach across dozens of channels.
The conventional approach layers point solutions on top of each other: a scraping tool here, an enrichment API there, a spreadsheet formula for scoring, and a CRM automation rule to nudge leads forward. The result is a fragile pipeline that breaks when any single integration drifts, produces inconsistent scoring across operators, and offers zero visibility into what actually drives conversion.
This article describes how I built an autonomous intelligence agent -- a single Python application backed by SQLite and tracked with MLflow -- that replaces the patchwork. It ingests raw signals from multiple data sources, applies ML-based scoring and prioritisation, manages the full outreach funnel, and continuously improves through experiment tracking. The system powers both sales prospecting and recruiting pipelines from the same codebase.
Key Pain Points Addressed
- --Manual research consuming 20+ hours per week per SDR or recruiter
- --Inconsistent lead/candidate scoring across team members
- --No visibility into which signals predict conversion
- --Brittle integrations between 6+ separate tools
- --Zero A/B testing infrastructure for outreach sequences
Architecture Overview
The agent is structured as a modular Python application with a clear separation between data ingestion, intelligence processing, funnel management, and experiment tracking. Every component communicates through SQLite -- acting as both the operational data store and the event log -- while MLflow provides the experiment tracking and model registry layer.
+─────────────────────────────────────────────────────────────────+
| Intelligence Agent Core |
+─────────────────────────────────────────────────────────────────+
| |
| +-----------+ +-------------+ +-----------+ |
| | Ingestor | | Scorer / | | Funnel | |
| | Layer |--->| Intelligence|--->| Manager | |
| | | | Layer | | | |
| +-----------+ +-------------+ +-----------+ |
| | | | |
| v v v |
| +──────────────────────────────────────────────+ |
| | SQLite Data Store | |
| | (leads, candidates, events, scores, runs) | |
| +──────────────────────────────────────────────+ |
| | |
| v |
| +──────────────────────────────────────────────+ |
| | MLflow Tracking Server | |
| | (experiments, metrics, model versions) | |
| +──────────────────────────────────────────────+ |
| |
+─────────────────────────────────────────────────────────────────+
The design follows three principles. First, everything is a record in SQLite -- leads, candidates, outreach events, scoring results, and funnel state transitions. This makes the system trivially debuggable: any pipeline outcome can be traced back through a chain of SQL queries. Second, the intelligence layer is stateless with respect to the application: it reads data from SQLite, computes scores, and writes results back. This means the scoring model can be swapped, retrained, or A/B tested without touching the funnel manager. Third, MLflow tracks every experiment end-to-end, from feature engineering choices to outreach template performance.
Technology Stack
Runtime
Python 3.11, asyncio event loop
Data Store
SQLite 3.45 with WAL mode
ML / Scoring
scikit-learn, LightGBM, sentence-transformers
Experiment Tracking
MLflow 2.x with SQLite backend
Scheduling
APScheduler, cron-based triggers
Deployment
Docker, systemd, Proxmox LXC
Data Pipeline Design
The ingestor layer is responsible for pulling raw data from multiple sources, normalising it into a common schema, deduplicating records, and staging them in SQLite for downstream processing. The agent supports both batch (scheduled) and event-driven (webhook) ingestion modes.
SQLite Schema Design
The core schema uses four primary tables: contacts for unified lead/candidate records, signals for raw ingested events, scores for computed intelligence outputs, and funnel_events for pipeline state transitions. All tables share a common contact_id foreign key.
-- Core contact record: unified across sales and recruiting funnels
CREATE TABLE IF NOT EXISTS contacts (
contact_id TEXT PRIMARY KEY,
source TEXT NOT NULL, -- 'linkedin', 'github', 'jobboard', 'referral'
funnel_type TEXT NOT NULL, -- 'sales' or 'recruiting'
full_name TEXT,
company TEXT,
title TEXT,
email TEXT,
location TEXT,
raw_payload TEXT, -- JSON blob of original source data
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
UNIQUE(source, email)
);
-- Raw signals: every data point we ingest about a contact
CREATE TABLE IF NOT EXISTS signals (
signal_id INTEGER PRIMARY KEY AUTOINCREMENT,
contact_id TEXT NOT NULL REFERENCES contacts(contact_id),
signal_type TEXT NOT NULL, -- 'profile_view', 'repo_commit', 'job_change'
signal_source TEXT NOT NULL,
signal_value REAL,
metadata TEXT, -- JSON
observed_at TEXT DEFAULT (datetime('now'))
);
-- Computed scores from the intelligence layer
CREATE TABLE IF NOT EXISTS scores (
score_id INTEGER PRIMARY KEY AUTOINCREMENT,
contact_id TEXT NOT NULL REFERENCES contacts(contact_id),
model_version TEXT NOT NULL,
score_type TEXT NOT NULL, -- 'lead_quality', 'intent', 'fit', 'timing'
score_value REAL NOT NULL,
features_json TEXT, -- serialised feature vector for explainability
scored_at TEXT DEFAULT (datetime('now'))
);
-- Funnel state machine events
CREATE TABLE IF NOT EXISTS funnel_events (
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
contact_id TEXT NOT NULL REFERENCES contacts(contact_id),
from_stage TEXT,
to_stage TEXT NOT NULL,
trigger TEXT, -- what caused the transition
metadata TEXT, -- JSON
occurred_at TEXT DEFAULT (datetime('now'))
);
-- Indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_signals_contact ON signals(contact_id, observed_at);
CREATE INDEX IF NOT EXISTS idx_scores_contact ON scores(contact_id, scored_at);
CREATE INDEX IF NOT EXISTS idx_funnel_contact ON funnel_events(contact_id, occurred_at);Ingestion Pipeline
Each data source is wrapped in an adapter class that implements a common interface. The adapter handles authentication, rate limiting, pagination, and error recovery. Raw payloads are stored verbatim in the contacts.raw_payload column for auditability, while structured fields are extracted and normalised.
import abc
import sqlite3
import json
import hashlib
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime
@dataclass
class RawContact:
"""Normalised contact record from any source."""
source: str
funnel_type: str # 'sales' or 'recruiting'
full_name: str
company: Optional[str] = None
title: Optional[str] = None
email: Optional[str] = None
location: Optional[str] = None
raw_payload: dict = field(default_factory=dict)
signals: List[dict] = field(default_factory=list)
@property
def contact_id(self) -> str:
"""Deterministic ID from source + email for dedup."""
key = f"{self.source}:{self.email or self.full_name}"
return hashlib.sha256(key.encode()).hexdigest()[:16]
class BaseSourceAdapter(abc.ABC):
"""Every data source implements this interface."""
@abc.abstractmethod
async def fetch_batch(self, since: datetime) -> List[RawContact]:
"""Pull new contacts/signals since the given timestamp."""
...
@abc.abstractmethod
def source_name(self) -> str:
...
class GitHubAdapter(BaseSourceAdapter):
"""Ingests contributor and stargazer data from GitHub API."""
def __init__(self, token: str, target_repos: List[str]):
self._token = token
self._repos = target_repos
async def fetch_batch(self, since: datetime) -> List[RawContact]:
contacts = []
for repo in self._repos:
# Paginate through contributors API
contributors = await self._get_contributors(repo, since)
for c in contributors:
contacts.append(RawContact(
source="github",
funnel_type="recruiting",
full_name=c.get("name", c["login"]),
company=c.get("company"),
email=c.get("email"),
location=c.get("location"),
raw_payload=c,
signals=[{
"signal_type": "repo_commit",
"signal_source": repo,
"signal_value": c.get("contributions", 0),
}],
))
return contacts
def source_name(self) -> str:
return "github"
async def _get_contributors(self, repo, since):
# HTTP call implementation with rate-limit backoff
...The ingestion orchestrator runs on a configurable schedule (default: every 4 hours) and coordinates all adapters in parallel usingasyncio.gather. Deduplication is handled at the SQLite level via theUNIQUE(source, email) constraint with an INSERT OR IGNORE strategy for contacts and an upsert pattern for signals.
import asyncio
import logging
logger = logging.getLogger("ingestor")
class IngestionOrchestrator:
"""Coordinates parallel ingestion from all source adapters."""
def __init__(self, db_path: str, adapters: List[BaseSourceAdapter]):
self._db_path = db_path
self._adapters = adapters
async def run_cycle(self, since: datetime) -> dict:
"""Execute one full ingestion cycle. Returns summary stats."""
results = await asyncio.gather(
*[a.fetch_batch(since) for a in self._adapters],
return_exceptions=True,
)
stats = {"ingested": 0, "duplicates": 0, "errors": 0}
conn = sqlite3.connect(self._db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
for adapter, batch in zip(self._adapters, results):
if isinstance(batch, Exception):
logger.error("Adapter %s failed: %s", adapter.source_name(), batch)
stats["errors"] += 1
continue
for contact in batch:
try:
self._upsert_contact(conn, contact)
self._insert_signals(conn, contact)
stats["ingested"] += 1
except sqlite3.IntegrityError:
stats["duplicates"] += 1
conn.commit()
conn.close()
logger.info("Ingestion cycle complete: %s", stats)
return stats
def _upsert_contact(self, conn: sqlite3.Connection, c: RawContact):
conn.execute(
"""INSERT INTO contacts
(contact_id, source, funnel_type, full_name,
company, title, email, location, raw_payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(source, email) DO UPDATE SET
updated_at = datetime('now'),
raw_payload = excluded.raw_payload""",
(c.contact_id, c.source, c.funnel_type, c.full_name,
c.company, c.title, c.email, c.location,
json.dumps(c.raw_payload)),
)
def _insert_signals(self, conn: sqlite3.Connection, c: RawContact):
for sig in c.signals:
conn.execute(
"""INSERT INTO signals
(contact_id, signal_type, signal_source, signal_value, metadata)
VALUES (?, ?, ?, ?, ?)""",
(c.contact_id, sig["signal_type"], sig["signal_source"],
sig.get("signal_value"), json.dumps(sig)),
)Intelligence Layer: ML-Powered Scoring and Prioritisation
The intelligence layer transforms raw signals into actionable scores. It operates in three stages: feature engineering, model inference, and priority ranking. Each stage is independently versioned and tracked via MLflow, allowing the team to iterate on any component without disrupting the rest of the pipeline.
Feature Engineering
Features are computed from the signals table using SQL aggregations and Python transformations. The feature set includes both numerical signals (commit counts, profile views, recency metrics) and text-based features (title similarity to ideal candidate profile, company domain relevance). Text features are encoded using sentence-transformers to produce dense embeddings that capture semantic similarity.
import numpy as np
from sentence_transformers import SentenceTransformer
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class FeatureVector:
contact_id: str
numerical: np.ndarray
text_embedding: np.ndarray
feature_names: list
raw_values: Dict[str, Any]
class FeatureEngineer:
"""Transforms raw signals into ML-ready feature vectors."""
def __init__(self, db_path: str, model_name: str = "all-MiniLM-L6-v2"):
self._db_path = db_path
self._encoder = SentenceTransformer(model_name)
self._ideal_profiles = {} # loaded from config
def compute_features(self, contact_id: str) -> FeatureVector:
conn = sqlite3.connect(self._db_path)
# Numerical features from signal aggregations
row = conn.execute("""
SELECT
COUNT(*) AS total_signals,
COUNT(DISTINCT signal_type) AS signal_diversity,
MAX(signal_value) AS peak_signal,
AVG(signal_value) AS avg_signal,
julianday('now') - julianday(MAX(observed_at)) AS days_since_last,
julianday('now') - julianday(MIN(observed_at)) AS days_since_first
FROM signals
WHERE contact_id = ?
""", (contact_id,)).fetchone()
numerical = np.array(row, dtype=np.float32)
# Text features: encode title + company for semantic similarity
contact = conn.execute(
"SELECT title, company FROM contacts WHERE contact_id = ?",
(contact_id,)
).fetchone()
text_input = f"{contact[0] or ''} at {contact[1] or ''}"
text_embedding = self._encoder.encode(text_input)
conn.close()
return FeatureVector(
contact_id=contact_id,
numerical=numerical,
text_embedding=text_embedding,
feature_names=[
"total_signals", "signal_diversity", "peak_signal",
"avg_signal", "days_since_last", "days_since_first",
],
raw_values=dict(zip(
["total_signals", "signal_diversity", "peak_signal",
"avg_signal", "days_since_last", "days_since_first"],
row,
)),
)Scoring Models
The agent produces four distinct scores per contact: lead quality (overall fit), intent (buying/job-seeking signals), timing (likelihood of conversion in the next 14 days), and fit (alignment with ideal customer/candidate profile). Each score uses a separate LightGBM model trained on historical conversion data.
import lightgbm as lgb
import mlflow
import mlflow.lightgbm
from typing import Dict
class ScoringEngine:
"""Multi-model scoring engine with MLflow tracking."""
SCORE_TYPES = ["lead_quality", "intent", "fit", "timing"]
def __init__(self, db_path: str, mlflow_uri: str):
self._db_path = db_path
mlflow.set_tracking_uri(mlflow_uri)
self._models: Dict[str, lgb.Booster] = {}
self._load_production_models()
def _load_production_models(self):
"""Load the current Production-stage model for each score type."""
for score_type in self.SCORE_TYPES:
model_uri = f"models:/{score_type}_model/Production"
try:
self._models[score_type] = mlflow.lightgbm.load_model(model_uri)
except mlflow.exceptions.MlflowException:
# Fall back to latest version if no Production alias
self._models[score_type] = mlflow.lightgbm.load_model(
f"models:/{score_type}_model/latest"
)
def score_contact(self, features: FeatureVector) -> Dict[str, float]:
"""Compute all score types for a single contact."""
combined = np.concatenate([features.numerical, features.text_embedding])
results = {}
for score_type, model in self._models.items():
prediction = model.predict(combined.reshape(1, -1))[0]
results[score_type] = float(np.clip(prediction, 0.0, 1.0))
return results
def batch_score(self, feature_vectors: list) -> list:
"""Score a batch of contacts. Returns list of (contact_id, scores)."""
results = []
for fv in feature_vectors:
scores = self.score_contact(fv)
self._persist_scores(fv.contact_id, scores, fv)
results.append((fv.contact_id, scores))
return results
def _persist_scores(self, contact_id, scores, features):
conn = sqlite3.connect(self._db_path)
model_version = mlflow.active_run().info.run_id if mlflow.active_run() else "batch"
for score_type, value in scores.items():
conn.execute(
"""INSERT INTO scores
(contact_id, model_version, score_type, score_value, features_json)
VALUES (?, ?, ?, ?, ?)""",
(contact_id, model_version, score_type, value,
json.dumps(features.raw_values)),
)
conn.commit()
conn.close()Pattern Matching and Priority Ranking
Beyond ML scores, the agent applies rule-based pattern matching for high-signal events that should trigger immediate action. Job changes, funding announcements, and open-source contributions to key repositories all generate priority overrides. The final ranking combines ML scores with rule-based boosts using a weighted formula that is itself tracked as an MLflow parameter.
class PriorityRanker:
"""Combines ML scores with rule-based signals for final ranking."""
# Weights tracked as MLflow params for experimentation
DEFAULT_WEIGHTS = {
"lead_quality": 0.30,
"intent": 0.25,
"fit": 0.25,
"timing": 0.20,
}
RULE_BOOSTS = {
"job_change": 0.15, # recently changed role
"funding_event": 0.12, # company raised funding
"key_repo_contrib": 0.10, # contributed to target repo
"profile_view": 0.05, # viewed our content/profile
}
def __init__(self, weights: dict = None):
self.weights = weights or self.DEFAULT_WEIGHTS
def rank(self, contact_id: str, scores: dict, signals: list) -> float:
"""Compute composite priority score in [0, 1]."""
# Weighted ML score
base = sum(
scores.get(k, 0) * w for k, w in self.weights.items()
)
# Rule-based boost from recent signals
boost = 0.0
for signal in signals:
signal_type = signal.get("signal_type", "")
if signal_type in self.RULE_BOOSTS:
recency = signal.get("days_ago", 30)
decay = max(0, 1.0 - recency / 30.0) # linear decay
boost += self.RULE_BOOSTS[signal_type] * decay
return min(1.0, base + boost)Funnel Automation: From Discovery to Conversion
The funnel manager is a state machine that governs the lifecycle of every contact from initial discovery through conversion or disqualification. Each stage has defined entry criteria, time-based rules, and exit conditions. The state machine is intentionally simple -- complexity lives in the intelligence layer, not in the funnel logic.
Funnel Stages
from enum import Enum
from typing import Optional, Callable
class Stage(str, Enum):
DISCOVERED = "discovered"
SCORED = "scored"
QUALIFIED = "qualified"
OUTREACH_1 = "outreach_1"
OUTREACH_2 = "outreach_2"
ENGAGED = "engaged"
CONVERTED = "converted"
NURTURE = "nurture"
DISQUALIFIED = "disqualified"
# Allowed transitions: from_stage -> list of valid to_stages
TRANSITIONS = {
Stage.DISCOVERED: [Stage.SCORED],
Stage.SCORED: [Stage.QUALIFIED, Stage.NURTURE, Stage.DISQUALIFIED],
Stage.QUALIFIED: [Stage.OUTREACH_1],
Stage.OUTREACH_1: [Stage.OUTREACH_2, Stage.ENGAGED],
Stage.OUTREACH_2: [Stage.ENGAGED, Stage.NURTURE],
Stage.ENGAGED: [Stage.CONVERTED, Stage.NURTURE],
Stage.NURTURE: [Stage.QUALIFIED, Stage.DISQUALIFIED],
}
class FunnelManager:
"""State machine managing contact lifecycle through the pipeline."""
def __init__(self, db_path: str, score_threshold: float = 0.65):
self._db_path = db_path
self._threshold = score_threshold
self._hooks: dict = {} # stage -> callback
def register_hook(self, stage: Stage, callback: Callable):
"""Register a callback for when a contact enters a stage."""
self._hooks.setdefault(stage, []).append(callback)
def transition(
self, contact_id: str, to_stage: Stage, trigger: str = "auto"
) -> bool:
"""Attempt a state transition. Returns True if successful."""
conn = sqlite3.connect(self._db_path)
current = self._current_stage(conn, contact_id)
if current and to_stage not in TRANSITIONS.get(current, []):
conn.close()
return False # invalid transition
conn.execute(
"""INSERT INTO funnel_events
(contact_id, from_stage, to_stage, trigger)
VALUES (?, ?, ?, ?)""",
(contact_id, current.value if current else None,
to_stage.value, trigger),
)
conn.commit()
conn.close()
# Fire registered hooks
for hook in self._hooks.get(to_stage, []):
hook(contact_id, to_stage, trigger)
return True
def _current_stage(self, conn, contact_id) -> Optional[Stage]:
row = conn.execute(
"""SELECT to_stage FROM funnel_events
WHERE contact_id = ?
ORDER BY occurred_at DESC LIMIT 1""",
(contact_id,),
).fetchone()
return Stage(row[0]) if row else None
def process_scored_contacts(self):
"""Move scored contacts to qualified or nurture based on threshold."""
conn = sqlite3.connect(self._db_path)
contacts = conn.execute("""
SELECT s.contact_id, s.score_value
FROM scores s
JOIN funnel_events fe ON fe.contact_id = s.contact_id
WHERE fe.to_stage = 'scored'
AND s.score_type = 'lead_quality'
AND s.scored_at = (
SELECT MAX(scored_at) FROM scores
WHERE contact_id = s.contact_id
AND score_type = 'lead_quality'
)
""").fetchall()
conn.close()
for contact_id, score in contacts:
if score >= self._threshold:
self.transition(contact_id, Stage.QUALIFIED, "score_threshold")
else:
self.transition(contact_id, Stage.NURTURE, "below_threshold")The outreach component generates personalised messages using template-based composition with dynamic fields populated from the contact record and scoring rationale. Each outreach event is logged as a signal, creating a feedback loop: the intelligence layer can learn which message templates and timing patterns lead to higher engagement rates.
MLflow Integration: Experiment Tracking and Model Versioning
MLflow serves as the central nervous system for all machine learning operations in the agent. Every training run, scoring batch, and A/B test is tracked as an MLflow experiment with full parameter, metric, and artifact logging. The model registry manages promotion from staging to production, and the tracking UI provides a dashboard for comparing experiment outcomes.
Training Pipeline
Models are retrained weekly using a rolling window of historical conversion data. The training pipeline extracts labelled examples from the funnel events table (contacts that reached the "converted" stage are positive, "disqualified" are negative), computes feature vectors, trains a LightGBM model, and logs everything to MLflow.
import mlflow
import mlflow.lightgbm
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
roc_auc_score, precision_score, recall_score, f1_score
)
def train_scoring_model(
db_path: str,
score_type: str,
mlflow_uri: str,
experiment_name: str = "scoring_models",
):
"""Train a single scoring model and register it with MLflow."""
mlflow.set_tracking_uri(mlflow_uri)
mlflow.set_experiment(experiment_name)
# 1. Extract labelled dataset
X, y, feature_names = extract_training_data(db_path, score_type)
# 2. Split with stratification
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
# 3. Define hyperparameters (tracked by MLflow)
params = {
"objective": "binary",
"metric": "auc",
"learning_rate": 0.05,
"num_leaves": 31,
"max_depth": 6,
"min_child_samples": 20,
"feature_fraction": 0.8,
"bagging_fraction": 0.8,
"bagging_freq": 5,
"verbose": -1,
}
with mlflow.start_run(run_name=f"{score_type}_training"):
# Log parameters
mlflow.log_params(params)
mlflow.log_param("score_type", score_type)
mlflow.log_param("training_samples", len(X_train))
mlflow.log_param("validation_samples", len(X_val))
mlflow.log_param("positive_rate", float(y.mean()))
# Train
train_set = lgb.Dataset(X_train, label=y_train)
val_set = lgb.Dataset(X_val, label=y_val, reference=train_set)
model = lgb.train(
params,
train_set,
num_boost_round=500,
valid_sets=[val_set],
callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)],
)
# Predict and compute metrics
y_pred = model.predict(X_val)
metrics = {
"auc": roc_auc_score(y_val, y_pred),
"precision": precision_score(y_val, (y_pred > 0.5).astype(int)),
"recall": recall_score(y_val, (y_pred > 0.5).astype(int)),
"f1": f1_score(y_val, (y_pred > 0.5).astype(int)),
}
mlflow.log_metrics(metrics)
# Log feature importance
importance = dict(zip(feature_names, model.feature_importance()))
mlflow.log_dict(importance, "feature_importance.json")
# Register model
mlflow.lightgbm.log_model(
model,
artifact_path="model",
registered_model_name=f"{score_type}_model",
)
return metricsA/B Testing Pipeline Performance
Beyond model training, MLflow tracks outreach experiment performance. Each outreach template variant is registered as an MLflow run, with metrics like open rate, reply rate, and conversion rate logged automatically as funnel events accumulate. The agent uses a multi-armed bandit approach (Thompson Sampling) to dynamically allocate traffic to winning variants.
import numpy as np
class OutreachExperiment:
"""Thompson Sampling for outreach template selection."""
def __init__(self, variants: list, mlflow_experiment: str):
self.variants = variants
self._experiment = mlflow_experiment
# Beta distribution priors: (alpha=successes+1, beta=failures+1)
self._alphas = {v: 1.0 for v in variants}
self._betas = {v: 1.0 for v in variants}
def select_variant(self) -> str:
"""Sample from posterior and select best variant."""
samples = {
v: np.random.beta(self._alphas[v], self._betas[v])
for v in self.variants
}
return max(samples, key=samples.get)
def record_outcome(self, variant: str, success: bool):
"""Update posterior with observed outcome."""
if success:
self._alphas[variant] += 1
else:
self._betas[variant] += 1
# Log to MLflow
with mlflow.start_run(run_name=f"outreach_{variant}", nested=True):
total = self._alphas[variant] + self._betas[variant] - 2
rate = (self._alphas[variant] - 1) / max(total, 1)
mlflow.log_metrics({
"total_sends": total,
"success_rate": rate,
"alpha": self._alphas[variant],
"beta": self._betas[variant],
})MLflow Experiment Structure
scoring_models/ -- One run per training iteration per score type. Tracks AUC, precision, recall, F1, feature importance, and model artifacts.
outreach_experiments/ -- One parent run per experiment (e.g., "Q1 Sales Outreach"), with nested runs per template variant. Tracks send volume, open rate, reply rate, conversion.
pipeline_runs/ -- One run per scheduled pipeline execution. Tracks ingestion volume, scoring latency, funnel transitions, and error rates.
feature_experiments/ -- Tracks feature engineering experiments: new signal types, embedding models, aggregation windows.
Results and Metrics
The agent has been running in production for six months across both sales and recruiting funnels. The results below compare the agent-driven pipeline against the previous manual workflow over a matched three-month period.
Pipeline Velocity
40%
Pipeline Velocity Increase
3.2x
More Qualified Leads / Week
58%
Reduction in Manual Research
2.1x
Reply Rate Improvement
Sales Funnel Performance
Recruiting Funnel Performance
Model Performance Over Time
The scoring models show consistent improvement across retraining cycles as the conversion feedback loop accumulates more labelled data. The table below shows the lead quality model performance over the first six monthly retraining iterations.
| Iteration | AUC | Precision | Recall | F1 | Training Samples |
|---|---|---|---|---|---|
| v1 (Month 1) | 0.71 | 0.64 | 0.59 | 0.61 | 1,240 |
| v2 (Month 2) | 0.76 | 0.70 | 0.65 | 0.67 | 2,810 |
| v3 (Month 3) | 0.79 | 0.73 | 0.71 | 0.72 | 4,530 |
| v4 (Month 4) | 0.82 | 0.76 | 0.74 | 0.75 | 6,180 |
| v5 (Month 5) | 0.84 | 0.78 | 0.76 | 0.77 | 8,020 |
| v6 (Month 6) | 0.86 | 0.80 | 0.78 | 0.79 | 10,340 |
The AUC improvement from 0.71 to 0.86 over six months demonstrates the compounding value of the feedback loop: every conversion or disqualification event becomes a labelled training example for the next model iteration. The model saturated around Month 5, suggesting that additional gains require new feature sources rather than more training data.
Lessons Learned
Six months of production operation surfaced several insights that would reshape how I approach similar projects. The following distills the most significant technical and operational learnings.
What Worked
SQLite as the single source of truth
Using SQLite for everything -- contacts, signals, scores, funnel events, and even MLflow backend storage -- eliminated an entire class of data consistency bugs. Every pipeline outcome is reproducible with a single database file. WAL mode provided sufficient concurrency for the workload (peak: 50 writes/second during batch ingestion). The ability to simply copy the database file for backup, debugging, or local development was invaluable.
Unified codebase for sales and recruiting
The abstraction of contacts and funnels as generic concepts (rather than building separate sales and recruiting applications) reduced total code by roughly 60%. The only differences between funnels are configuration: source adapters, scoring model weights, outreach templates, and threshold values. This also meant that improvements to the intelligence layer -- better feature engineering, a new embedding model -- immediately benefited both funnels.
Thompson Sampling for outreach optimisation
The multi-armed bandit approach to template selection outperformed static A/B testing by 23% in cumulative reply rate. Traditional A/B testing requires waiting for statistical significance before switching to the winning variant. Thompson Sampling naturally shifts traffic toward better-performing variants while still exploring, reducing the cost of experimentation.
MLflow for pipeline observability
Using MLflow beyond model training -- tracking every pipeline run, every outreach experiment, every feature engineering decision -- transformed debugging from archaeology into data analysis. When conversion rates dipped in Month 3, the MLflow UI immediately showed that a feature engineering change had degraded the timing model, and the model registry made rollback trivial.
What Did Not Work
Overengineered initial feature set
The first version of the feature engineer computed 47 features per contact. After running feature importance analysis via MLflow, only 8 features carried meaningful predictive weight. The remaining 39 added computational overhead, increased training time by 3x, and introduced noise that actually degraded the timing model. Simplifying to the top 12 features improved both accuracy and inference latency.
Aggressive outreach timing
Early experiments with 24-hour follow-up intervals produced higher unsubscribe rates without improving conversion. The optimal cadence -- discovered through MLflow experiment tracking -- was 3 days for the first follow-up and 5 days for the second. Patience correlated with professionalism in the recipient's perception.
Text embeddings as sole similarity measure
Using sentence-transformer embeddings for title/company similarity worked well for recruiting (where job titles are relatively standardised) but poorly for sales (where titles vary wildly across industries). Adding structured skill tags and industry codes as categorical features alongside the embeddings improved the sales funnel fit score by 0.09 AUC points.
Production Considerations
Rate limiting and source rotation
API rate limits are the primary bottleneck for ingestion throughput. The agent implements exponential backoff with jitter per source adapter, maintains a token bucket rate limiter, and rotates across multiple API keys where supported. All rate-limit events are logged as signals so the intelligence layer can account for ingestion gaps.
Data privacy and compliance
The agent only processes publicly available data and respects robots.txt directives. All contact records include a consent_status field that defaults to "pending". No outreach is sent until consent is verified. The SQLite database is encrypted at rest using SQLCipher, and all API credentials are managed through environment variables, never stored in code or configuration files.
Monitoring and alerting
The agent exposes Prometheus metrics for ingestion volume, scoring latency, funnel transition rates, and error counts. Grafana dashboards provide real-time visibility, and PagerDuty alerts fire when ingestion drops below expected thresholds or error rates exceed 5% of total pipeline volume. MLflow experiment tracking supplements this with weekly automated reports comparing model performance against baseline.
Scaling beyond SQLite
SQLite handles the current workload comfortably (10K contacts, 500K signals, 50K funnel events), but the architecture is designed for migration to PostgreSQL when needed. All database access goes through a repository pattern with SQL abstracted behind method calls. The migration would involve changing connection strings and adjusting a handful of SQLite-specific SQL expressions (datetime functions, JSON operators). The abstraction has already been validated: a PostgreSQL branch runs in CI to ensure compatibility.
Summary
Building an autonomous intelligence agent for outbound prospecting and recruiting demonstrated that a well-structured Python application with SQLite and MLflow can replace a constellation of point tools while delivering measurably better outcomes. The key insight is that the intelligence layer and the funnel manager are separate concerns: the scoring model answers "who should we talk to and when," while the funnel manager handles "how do we move them through the pipeline." Keeping these responsibilities distinct enabled rapid iteration on both axes independently.
The 40% increase in qualified pipeline velocity came not from any single improvement but from the compound effect of faster ingestion, better scoring, personalised outreach, and continuous optimisation through experiment tracking. Each component contributed incrementally, and MLflow provided the visibility to understand which levers had the most impact at each stage of the system's evolution.
For teams considering a similar approach, the advice is to start simple: a single data source, one scoring model, a basic funnel with three stages, and MLflow tracking from day one. The feedback loop compounds quickly -- within two months, the system had enough labelled data to outperform manual scoring, and within four months the outreach optimisation was generating statistically significant improvements in reply rates.