Skip to main content
AutomationFeb 2, 202614 min read

Scaling Business Growth: Building Autonomous Intelligence Agents

How a Python-based autonomous agent replaced manual prospecting and recruiting workflows, delivering a 40% increase in qualified pipeline velocity. A technical walkthrough of data ingestion, ML-powered scoring, funnel automation, and experiment tracking with MLflow.

The Problem with Manual Outbound Intelligence

Most growing businesses share the same bottleneck: outbound prospecting and candidate sourcing are overwhelmingly manual. Sales development representatives spend 65% of their time on non-selling activities -- researching leads, enriching records, qualifying prospects, and updating CRM fields. Recruiting teams face an identical problem: sourcing passive candidates, screening resumes, and scheduling outreach across dozens of channels.

The conventional approach layers point solutions on top of each other: a scraping tool here, an enrichment API there, a spreadsheet formula for scoring, and a CRM automation rule to nudge leads forward. The result is a fragile pipeline that breaks when any single integration drifts, produces inconsistent scoring across operators, and offers zero visibility into what actually drives conversion.

This article describes how I built an autonomous intelligence agent -- a single Python application backed by SQLite and tracked with MLflow -- that replaces the patchwork. It ingests raw signals from multiple data sources, applies ML-based scoring and prioritisation, manages the full outreach funnel, and continuously improves through experiment tracking. The system powers both sales prospecting and recruiting pipelines from the same codebase.

Key Pain Points Addressed

  • --Manual research consuming 20+ hours per week per SDR or recruiter
  • --Inconsistent lead/candidate scoring across team members
  • --No visibility into which signals predict conversion
  • --Brittle integrations between 6+ separate tools
  • --Zero A/B testing infrastructure for outreach sequences

Architecture Overview

The agent is structured as a modular Python application with a clear separation between data ingestion, intelligence processing, funnel management, and experiment tracking. Every component communicates through SQLite -- acting as both the operational data store and the event log -- while MLflow provides the experiment tracking and model registry layer.


  +─────────────────────────────────────────────────────────────────+
  |                    Intelligence Agent Core                      |
  +─────────────────────────────────────────────────────────────────+
  |                                                                 |
  |  +-----------+    +-------------+    +-----------+              |
  |  |  Ingestor |    | Scorer /    |    |  Funnel   |              |
  |  |  Layer    |--->| Intelligence|--->|  Manager  |              |
  |  |           |    | Layer       |    |           |              |
  |  +-----------+    +-------------+    +-----------+              |
  |       |                |                   |                    |
  |       v                v                   v                    |
  |  +──────────────────────────────────────────────+               |
  |  |              SQLite Data Store               |               |
  |  |  (leads, candidates, events, scores, runs)   |               |
  |  +──────────────────────────────────────────────+               |
  |       |                                                         |
  |       v                                                         |
  |  +──────────────────────────────────────────────+               |
  |  |         MLflow Tracking Server               |               |
  |  |  (experiments, metrics, model versions)       |               |
  |  +──────────────────────────────────────────────+               |
  |                                                                 |
  +─────────────────────────────────────────────────────────────────+
            

The design follows three principles. First, everything is a record in SQLite -- leads, candidates, outreach events, scoring results, and funnel state transitions. This makes the system trivially debuggable: any pipeline outcome can be traced back through a chain of SQL queries. Second, the intelligence layer is stateless with respect to the application: it reads data from SQLite, computes scores, and writes results back. This means the scoring model can be swapped, retrained, or A/B tested without touching the funnel manager. Third, MLflow tracks every experiment end-to-end, from feature engineering choices to outreach template performance.

Technology Stack

Runtime

Python 3.11, asyncio event loop

Data Store

SQLite 3.45 with WAL mode

ML / Scoring

scikit-learn, LightGBM, sentence-transformers

Experiment Tracking

MLflow 2.x with SQLite backend

Scheduling

APScheduler, cron-based triggers

Deployment

Docker, systemd, Proxmox LXC

Data Pipeline Design

The ingestor layer is responsible for pulling raw data from multiple sources, normalising it into a common schema, deduplicating records, and staging them in SQLite for downstream processing. The agent supports both batch (scheduled) and event-driven (webhook) ingestion modes.

SQLite Schema Design

The core schema uses four primary tables: contacts for unified lead/candidate records, signals for raw ingested events, scores for computed intelligence outputs, and funnel_events for pipeline state transitions. All tables share a common contact_id foreign key.

-- Core contact record: unified across sales and recruiting funnels
CREATE TABLE IF NOT EXISTS contacts (
    contact_id    TEXT PRIMARY KEY,
    source        TEXT NOT NULL,          -- 'linkedin', 'github', 'jobboard', 'referral'
    funnel_type   TEXT NOT NULL,          -- 'sales' or 'recruiting'
    full_name     TEXT,
    company       TEXT,
    title         TEXT,
    email         TEXT,
    location      TEXT,
    raw_payload   TEXT,                   -- JSON blob of original source data
    created_at    TEXT DEFAULT (datetime('now')),
    updated_at    TEXT DEFAULT (datetime('now')),
    UNIQUE(source, email)
);

-- Raw signals: every data point we ingest about a contact
CREATE TABLE IF NOT EXISTS signals (
    signal_id     INTEGER PRIMARY KEY AUTOINCREMENT,
    contact_id    TEXT NOT NULL REFERENCES contacts(contact_id),
    signal_type   TEXT NOT NULL,          -- 'profile_view', 'repo_commit', 'job_change'
    signal_source TEXT NOT NULL,
    signal_value  REAL,
    metadata      TEXT,                   -- JSON
    observed_at   TEXT DEFAULT (datetime('now'))
);

-- Computed scores from the intelligence layer
CREATE TABLE IF NOT EXISTS scores (
    score_id      INTEGER PRIMARY KEY AUTOINCREMENT,
    contact_id    TEXT NOT NULL REFERENCES contacts(contact_id),
    model_version TEXT NOT NULL,
    score_type    TEXT NOT NULL,          -- 'lead_quality', 'intent', 'fit', 'timing'
    score_value   REAL NOT NULL,
    features_json TEXT,                   -- serialised feature vector for explainability
    scored_at     TEXT DEFAULT (datetime('now'))
);

-- Funnel state machine events
CREATE TABLE IF NOT EXISTS funnel_events (
    event_id      INTEGER PRIMARY KEY AUTOINCREMENT,
    contact_id    TEXT NOT NULL REFERENCES contacts(contact_id),
    from_stage    TEXT,
    to_stage      TEXT NOT NULL,
    trigger       TEXT,                   -- what caused the transition
    metadata      TEXT,                   -- JSON
    occurred_at   TEXT DEFAULT (datetime('now'))
);

-- Indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_signals_contact ON signals(contact_id, observed_at);
CREATE INDEX IF NOT EXISTS idx_scores_contact  ON scores(contact_id, scored_at);
CREATE INDEX IF NOT EXISTS idx_funnel_contact  ON funnel_events(contact_id, occurred_at);

Ingestion Pipeline

Each data source is wrapped in an adapter class that implements a common interface. The adapter handles authentication, rate limiting, pagination, and error recovery. Raw payloads are stored verbatim in the contacts.raw_payload column for auditability, while structured fields are extracted and normalised.

import abc
import sqlite3
import json
import hashlib
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime


@dataclass
class RawContact:
    """Normalised contact record from any source."""
    source: str
    funnel_type: str            # 'sales' or 'recruiting'
    full_name: str
    company: Optional[str] = None
    title: Optional[str] = None
    email: Optional[str] = None
    location: Optional[str] = None
    raw_payload: dict = field(default_factory=dict)
    signals: List[dict] = field(default_factory=list)

    @property
    def contact_id(self) -> str:
        """Deterministic ID from source + email for dedup."""
        key = f"{self.source}:{self.email or self.full_name}"
        return hashlib.sha256(key.encode()).hexdigest()[:16]


class BaseSourceAdapter(abc.ABC):
    """Every data source implements this interface."""

    @abc.abstractmethod
    async def fetch_batch(self, since: datetime) -> List[RawContact]:
        """Pull new contacts/signals since the given timestamp."""
        ...

    @abc.abstractmethod
    def source_name(self) -> str:
        ...


class GitHubAdapter(BaseSourceAdapter):
    """Ingests contributor and stargazer data from GitHub API."""

    def __init__(self, token: str, target_repos: List[str]):
        self._token = token
        self._repos = target_repos

    async def fetch_batch(self, since: datetime) -> List[RawContact]:
        contacts = []
        for repo in self._repos:
            # Paginate through contributors API
            contributors = await self._get_contributors(repo, since)
            for c in contributors:
                contacts.append(RawContact(
                    source="github",
                    funnel_type="recruiting",
                    full_name=c.get("name", c["login"]),
                    company=c.get("company"),
                    email=c.get("email"),
                    location=c.get("location"),
                    raw_payload=c,
                    signals=[{
                        "signal_type": "repo_commit",
                        "signal_source": repo,
                        "signal_value": c.get("contributions", 0),
                    }],
                ))
        return contacts

    def source_name(self) -> str:
        return "github"

    async def _get_contributors(self, repo, since):
        # HTTP call implementation with rate-limit backoff
        ...

The ingestion orchestrator runs on a configurable schedule (default: every 4 hours) and coordinates all adapters in parallel usingasyncio.gather. Deduplication is handled at the SQLite level via theUNIQUE(source, email) constraint with an INSERT OR IGNORE strategy for contacts and an upsert pattern for signals.

import asyncio
import logging

logger = logging.getLogger("ingestor")


class IngestionOrchestrator:
    """Coordinates parallel ingestion from all source adapters."""

    def __init__(self, db_path: str, adapters: List[BaseSourceAdapter]):
        self._db_path = db_path
        self._adapters = adapters

    async def run_cycle(self, since: datetime) -> dict:
        """Execute one full ingestion cycle. Returns summary stats."""
        results = await asyncio.gather(
            *[a.fetch_batch(since) for a in self._adapters],
            return_exceptions=True,
        )

        stats = {"ingested": 0, "duplicates": 0, "errors": 0}
        conn = sqlite3.connect(self._db_path)
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA busy_timeout=5000")

        for adapter, batch in zip(self._adapters, results):
            if isinstance(batch, Exception):
                logger.error("Adapter %s failed: %s", adapter.source_name(), batch)
                stats["errors"] += 1
                continue

            for contact in batch:
                try:
                    self._upsert_contact(conn, contact)
                    self._insert_signals(conn, contact)
                    stats["ingested"] += 1
                except sqlite3.IntegrityError:
                    stats["duplicates"] += 1

        conn.commit()
        conn.close()
        logger.info("Ingestion cycle complete: %s", stats)
        return stats

    def _upsert_contact(self, conn: sqlite3.Connection, c: RawContact):
        conn.execute(
            """INSERT INTO contacts
               (contact_id, source, funnel_type, full_name,
                company, title, email, location, raw_payload)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
               ON CONFLICT(source, email) DO UPDATE SET
                 updated_at = datetime('now'),
                 raw_payload = excluded.raw_payload""",
            (c.contact_id, c.source, c.funnel_type, c.full_name,
             c.company, c.title, c.email, c.location,
             json.dumps(c.raw_payload)),
        )

    def _insert_signals(self, conn: sqlite3.Connection, c: RawContact):
        for sig in c.signals:
            conn.execute(
                """INSERT INTO signals
                   (contact_id, signal_type, signal_source, signal_value, metadata)
                   VALUES (?, ?, ?, ?, ?)""",
                (c.contact_id, sig["signal_type"], sig["signal_source"],
                 sig.get("signal_value"), json.dumps(sig)),
            )

Intelligence Layer: ML-Powered Scoring and Prioritisation

The intelligence layer transforms raw signals into actionable scores. It operates in three stages: feature engineering, model inference, and priority ranking. Each stage is independently versioned and tracked via MLflow, allowing the team to iterate on any component without disrupting the rest of the pipeline.

Feature Engineering

Features are computed from the signals table using SQL aggregations and Python transformations. The feature set includes both numerical signals (commit counts, profile views, recency metrics) and text-based features (title similarity to ideal candidate profile, company domain relevance). Text features are encoded using sentence-transformers to produce dense embeddings that capture semantic similarity.

import numpy as np
from sentence_transformers import SentenceTransformer
from dataclasses import dataclass
from typing import Dict, Any


@dataclass
class FeatureVector:
    contact_id: str
    numerical: np.ndarray
    text_embedding: np.ndarray
    feature_names: list
    raw_values: Dict[str, Any]


class FeatureEngineer:
    """Transforms raw signals into ML-ready feature vectors."""

    def __init__(self, db_path: str, model_name: str = "all-MiniLM-L6-v2"):
        self._db_path = db_path
        self._encoder = SentenceTransformer(model_name)
        self._ideal_profiles = {}  # loaded from config

    def compute_features(self, contact_id: str) -> FeatureVector:
        conn = sqlite3.connect(self._db_path)

        # Numerical features from signal aggregations
        row = conn.execute("""
            SELECT
                COUNT(*)                                     AS total_signals,
                COUNT(DISTINCT signal_type)                  AS signal_diversity,
                MAX(signal_value)                            AS peak_signal,
                AVG(signal_value)                            AS avg_signal,
                julianday('now') - julianday(MAX(observed_at)) AS days_since_last,
                julianday('now') - julianday(MIN(observed_at)) AS days_since_first
            FROM signals
            WHERE contact_id = ?
        """, (contact_id,)).fetchone()

        numerical = np.array(row, dtype=np.float32)

        # Text features: encode title + company for semantic similarity
        contact = conn.execute(
            "SELECT title, company FROM contacts WHERE contact_id = ?",
            (contact_id,)
        ).fetchone()

        text_input = f"{contact[0] or ''} at {contact[1] or ''}"
        text_embedding = self._encoder.encode(text_input)

        conn.close()

        return FeatureVector(
            contact_id=contact_id,
            numerical=numerical,
            text_embedding=text_embedding,
            feature_names=[
                "total_signals", "signal_diversity", "peak_signal",
                "avg_signal", "days_since_last", "days_since_first",
            ],
            raw_values=dict(zip(
                ["total_signals", "signal_diversity", "peak_signal",
                 "avg_signal", "days_since_last", "days_since_first"],
                row,
            )),
        )

Scoring Models

The agent produces four distinct scores per contact: lead quality (overall fit), intent (buying/job-seeking signals), timing (likelihood of conversion in the next 14 days), and fit (alignment with ideal customer/candidate profile). Each score uses a separate LightGBM model trained on historical conversion data.

import lightgbm as lgb
import mlflow
import mlflow.lightgbm
from typing import Dict


class ScoringEngine:
    """Multi-model scoring engine with MLflow tracking."""

    SCORE_TYPES = ["lead_quality", "intent", "fit", "timing"]

    def __init__(self, db_path: str, mlflow_uri: str):
        self._db_path = db_path
        mlflow.set_tracking_uri(mlflow_uri)
        self._models: Dict[str, lgb.Booster] = {}
        self._load_production_models()

    def _load_production_models(self):
        """Load the current Production-stage model for each score type."""
        for score_type in self.SCORE_TYPES:
            model_uri = f"models:/{score_type}_model/Production"
            try:
                self._models[score_type] = mlflow.lightgbm.load_model(model_uri)
            except mlflow.exceptions.MlflowException:
                # Fall back to latest version if no Production alias
                self._models[score_type] = mlflow.lightgbm.load_model(
                    f"models:/{score_type}_model/latest"
                )

    def score_contact(self, features: FeatureVector) -> Dict[str, float]:
        """Compute all score types for a single contact."""
        combined = np.concatenate([features.numerical, features.text_embedding])
        results = {}

        for score_type, model in self._models.items():
            prediction = model.predict(combined.reshape(1, -1))[0]
            results[score_type] = float(np.clip(prediction, 0.0, 1.0))

        return results

    def batch_score(self, feature_vectors: list) -> list:
        """Score a batch of contacts. Returns list of (contact_id, scores)."""
        results = []
        for fv in feature_vectors:
            scores = self.score_contact(fv)
            self._persist_scores(fv.contact_id, scores, fv)
            results.append((fv.contact_id, scores))
        return results

    def _persist_scores(self, contact_id, scores, features):
        conn = sqlite3.connect(self._db_path)
        model_version = mlflow.active_run().info.run_id if mlflow.active_run() else "batch"
        for score_type, value in scores.items():
            conn.execute(
                """INSERT INTO scores
                   (contact_id, model_version, score_type, score_value, features_json)
                   VALUES (?, ?, ?, ?, ?)""",
                (contact_id, model_version, score_type, value,
                 json.dumps(features.raw_values)),
            )
        conn.commit()
        conn.close()

Pattern Matching and Priority Ranking

Beyond ML scores, the agent applies rule-based pattern matching for high-signal events that should trigger immediate action. Job changes, funding announcements, and open-source contributions to key repositories all generate priority overrides. The final ranking combines ML scores with rule-based boosts using a weighted formula that is itself tracked as an MLflow parameter.

class PriorityRanker:
    """Combines ML scores with rule-based signals for final ranking."""

    # Weights tracked as MLflow params for experimentation
    DEFAULT_WEIGHTS = {
        "lead_quality": 0.30,
        "intent":       0.25,
        "fit":          0.25,
        "timing":       0.20,
    }

    RULE_BOOSTS = {
        "job_change":        0.15,   # recently changed role
        "funding_event":     0.12,   # company raised funding
        "key_repo_contrib":  0.10,   # contributed to target repo
        "profile_view":      0.05,   # viewed our content/profile
    }

    def __init__(self, weights: dict = None):
        self.weights = weights or self.DEFAULT_WEIGHTS

    def rank(self, contact_id: str, scores: dict, signals: list) -> float:
        """Compute composite priority score in [0, 1]."""
        # Weighted ML score
        base = sum(
            scores.get(k, 0) * w for k, w in self.weights.items()
        )

        # Rule-based boost from recent signals
        boost = 0.0
        for signal in signals:
            signal_type = signal.get("signal_type", "")
            if signal_type in self.RULE_BOOSTS:
                recency = signal.get("days_ago", 30)
                decay = max(0, 1.0 - recency / 30.0)  # linear decay
                boost += self.RULE_BOOSTS[signal_type] * decay

        return min(1.0, base + boost)

Funnel Automation: From Discovery to Conversion

The funnel manager is a state machine that governs the lifecycle of every contact from initial discovery through conversion or disqualification. Each stage has defined entry criteria, time-based rules, and exit conditions. The state machine is intentionally simple -- complexity lives in the intelligence layer, not in the funnel logic.

Funnel Stages

discoveredContact ingested, not yet scored. Auto-transitions to scored after feature computation.
scoredML scoring complete. Contacts above threshold move to qualified; below to nurture.
qualifiedHigh-priority contact. Outreach sequence initiated. Sales/recruiter notified.
outreach_1First personalised message sent. Waits 3 days for response before escalation.
outreach_2Follow-up with different angle. Waits 5 days. If no response, moves to nurture.
engagedContact responded. Handed off to human operator with full context package.
convertedDeal closed or candidate hired. Terminal state. Triggers retrospective scoring.
nurtureLow-priority or unresponsive. Re-scored monthly. Can re-enter qualified on signal change.
from enum import Enum
from typing import Optional, Callable


class Stage(str, Enum):
    DISCOVERED = "discovered"
    SCORED     = "scored"
    QUALIFIED  = "qualified"
    OUTREACH_1 = "outreach_1"
    OUTREACH_2 = "outreach_2"
    ENGAGED    = "engaged"
    CONVERTED  = "converted"
    NURTURE    = "nurture"
    DISQUALIFIED = "disqualified"


# Allowed transitions: from_stage -> list of valid to_stages
TRANSITIONS = {
    Stage.DISCOVERED: [Stage.SCORED],
    Stage.SCORED:     [Stage.QUALIFIED, Stage.NURTURE, Stage.DISQUALIFIED],
    Stage.QUALIFIED:  [Stage.OUTREACH_1],
    Stage.OUTREACH_1: [Stage.OUTREACH_2, Stage.ENGAGED],
    Stage.OUTREACH_2: [Stage.ENGAGED, Stage.NURTURE],
    Stage.ENGAGED:    [Stage.CONVERTED, Stage.NURTURE],
    Stage.NURTURE:    [Stage.QUALIFIED, Stage.DISQUALIFIED],
}


class FunnelManager:
    """State machine managing contact lifecycle through the pipeline."""

    def __init__(self, db_path: str, score_threshold: float = 0.65):
        self._db_path = db_path
        self._threshold = score_threshold
        self._hooks: dict = {}  # stage -> callback

    def register_hook(self, stage: Stage, callback: Callable):
        """Register a callback for when a contact enters a stage."""
        self._hooks.setdefault(stage, []).append(callback)

    def transition(
        self, contact_id: str, to_stage: Stage, trigger: str = "auto"
    ) -> bool:
        """Attempt a state transition. Returns True if successful."""
        conn = sqlite3.connect(self._db_path)
        current = self._current_stage(conn, contact_id)

        if current and to_stage not in TRANSITIONS.get(current, []):
            conn.close()
            return False  # invalid transition

        conn.execute(
            """INSERT INTO funnel_events
               (contact_id, from_stage, to_stage, trigger)
               VALUES (?, ?, ?, ?)""",
            (contact_id, current.value if current else None,
             to_stage.value, trigger),
        )
        conn.commit()
        conn.close()

        # Fire registered hooks
        for hook in self._hooks.get(to_stage, []):
            hook(contact_id, to_stage, trigger)

        return True

    def _current_stage(self, conn, contact_id) -> Optional[Stage]:
        row = conn.execute(
            """SELECT to_stage FROM funnel_events
               WHERE contact_id = ?
               ORDER BY occurred_at DESC LIMIT 1""",
            (contact_id,),
        ).fetchone()
        return Stage(row[0]) if row else None

    def process_scored_contacts(self):
        """Move scored contacts to qualified or nurture based on threshold."""
        conn = sqlite3.connect(self._db_path)
        contacts = conn.execute("""
            SELECT s.contact_id, s.score_value
            FROM scores s
            JOIN funnel_events fe ON fe.contact_id = s.contact_id
            WHERE fe.to_stage = 'scored'
              AND s.score_type = 'lead_quality'
              AND s.scored_at = (
                  SELECT MAX(scored_at) FROM scores
                  WHERE contact_id = s.contact_id
                    AND score_type = 'lead_quality'
              )
        """).fetchall()
        conn.close()

        for contact_id, score in contacts:
            if score >= self._threshold:
                self.transition(contact_id, Stage.QUALIFIED, "score_threshold")
            else:
                self.transition(contact_id, Stage.NURTURE, "below_threshold")

The outreach component generates personalised messages using template-based composition with dynamic fields populated from the contact record and scoring rationale. Each outreach event is logged as a signal, creating a feedback loop: the intelligence layer can learn which message templates and timing patterns lead to higher engagement rates.

MLflow Integration: Experiment Tracking and Model Versioning

MLflow serves as the central nervous system for all machine learning operations in the agent. Every training run, scoring batch, and A/B test is tracked as an MLflow experiment with full parameter, metric, and artifact logging. The model registry manages promotion from staging to production, and the tracking UI provides a dashboard for comparing experiment outcomes.

Training Pipeline

Models are retrained weekly using a rolling window of historical conversion data. The training pipeline extracts labelled examples from the funnel events table (contacts that reached the "converted" stage are positive, "disqualified" are negative), computes feature vectors, trains a LightGBM model, and logs everything to MLflow.

import mlflow
import mlflow.lightgbm
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score, f1_score
)


def train_scoring_model(
    db_path: str,
    score_type: str,
    mlflow_uri: str,
    experiment_name: str = "scoring_models",
):
    """Train a single scoring model and register it with MLflow."""

    mlflow.set_tracking_uri(mlflow_uri)
    mlflow.set_experiment(experiment_name)

    # 1. Extract labelled dataset
    X, y, feature_names = extract_training_data(db_path, score_type)

    # 2. Split with stratification
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, stratify=y, random_state=42
    )

    # 3. Define hyperparameters (tracked by MLflow)
    params = {
        "objective": "binary",
        "metric": "auc",
        "learning_rate": 0.05,
        "num_leaves": 31,
        "max_depth": 6,
        "min_child_samples": 20,
        "feature_fraction": 0.8,
        "bagging_fraction": 0.8,
        "bagging_freq": 5,
        "verbose": -1,
    }

    with mlflow.start_run(run_name=f"{score_type}_training"):
        # Log parameters
        mlflow.log_params(params)
        mlflow.log_param("score_type", score_type)
        mlflow.log_param("training_samples", len(X_train))
        mlflow.log_param("validation_samples", len(X_val))
        mlflow.log_param("positive_rate", float(y.mean()))

        # Train
        train_set = lgb.Dataset(X_train, label=y_train)
        val_set = lgb.Dataset(X_val, label=y_val, reference=train_set)
        model = lgb.train(
            params,
            train_set,
            num_boost_round=500,
            valid_sets=[val_set],
            callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)],
        )

        # Predict and compute metrics
        y_pred = model.predict(X_val)
        metrics = {
            "auc":       roc_auc_score(y_val, y_pred),
            "precision": precision_score(y_val, (y_pred > 0.5).astype(int)),
            "recall":    recall_score(y_val, (y_pred > 0.5).astype(int)),
            "f1":        f1_score(y_val, (y_pred > 0.5).astype(int)),
        }
        mlflow.log_metrics(metrics)

        # Log feature importance
        importance = dict(zip(feature_names, model.feature_importance()))
        mlflow.log_dict(importance, "feature_importance.json")

        # Register model
        mlflow.lightgbm.log_model(
            model,
            artifact_path="model",
            registered_model_name=f"{score_type}_model",
        )

    return metrics

A/B Testing Pipeline Performance

Beyond model training, MLflow tracks outreach experiment performance. Each outreach template variant is registered as an MLflow run, with metrics like open rate, reply rate, and conversion rate logged automatically as funnel events accumulate. The agent uses a multi-armed bandit approach (Thompson Sampling) to dynamically allocate traffic to winning variants.

import numpy as np


class OutreachExperiment:
    """Thompson Sampling for outreach template selection."""

    def __init__(self, variants: list, mlflow_experiment: str):
        self.variants = variants
        self._experiment = mlflow_experiment
        # Beta distribution priors: (alpha=successes+1, beta=failures+1)
        self._alphas = {v: 1.0 for v in variants}
        self._betas  = {v: 1.0 for v in variants}

    def select_variant(self) -> str:
        """Sample from posterior and select best variant."""
        samples = {
            v: np.random.beta(self._alphas[v], self._betas[v])
            for v in self.variants
        }
        return max(samples, key=samples.get)

    def record_outcome(self, variant: str, success: bool):
        """Update posterior with observed outcome."""
        if success:
            self._alphas[variant] += 1
        else:
            self._betas[variant] += 1

        # Log to MLflow
        with mlflow.start_run(run_name=f"outreach_{variant}", nested=True):
            total = self._alphas[variant] + self._betas[variant] - 2
            rate = (self._alphas[variant] - 1) / max(total, 1)
            mlflow.log_metrics({
                "total_sends": total,
                "success_rate": rate,
                "alpha": self._alphas[variant],
                "beta": self._betas[variant],
            })

MLflow Experiment Structure

scoring_models/ -- One run per training iteration per score type. Tracks AUC, precision, recall, F1, feature importance, and model artifacts.

outreach_experiments/ -- One parent run per experiment (e.g., "Q1 Sales Outreach"), with nested runs per template variant. Tracks send volume, open rate, reply rate, conversion.

pipeline_runs/ -- One run per scheduled pipeline execution. Tracks ingestion volume, scoring latency, funnel transitions, and error rates.

feature_experiments/ -- Tracks feature engineering experiments: new signal types, embedding models, aggregation windows.

Results and Metrics

The agent has been running in production for six months across both sales and recruiting funnels. The results below compare the agent-driven pipeline against the previous manual workflow over a matched three-month period.

Pipeline Velocity

40%

Pipeline Velocity Increase

3.2x

More Qualified Leads / Week

58%

Reduction in Manual Research

2.1x

Reply Rate Improvement

Sales Funnel Performance

Leads ingested per week
~120 manual~850 automated
Qualified rate
18%31%
Outreach reply rate
8%17%
Time to first outreach
3.5 days4.2 hours
Conversion rate (lead to deal)
2.3%4.1%

Recruiting Funnel Performance

Candidates sourced per week
~45 manual~320 automated
Fit score accuracy (AUC)
N/A (gut feel)0.84
Recruiter screen pass rate
35%62%
Time to hire (median)
34 days21 days

Model Performance Over Time

The scoring models show consistent improvement across retraining cycles as the conversion feedback loop accumulates more labelled data. The table below shows the lead quality model performance over the first six monthly retraining iterations.

IterationAUCPrecisionRecallF1Training Samples
v1 (Month 1)0.710.640.590.611,240
v2 (Month 2)0.760.700.650.672,810
v3 (Month 3)0.790.730.710.724,530
v4 (Month 4)0.820.760.740.756,180
v5 (Month 5)0.840.780.760.778,020
v6 (Month 6)0.860.800.780.7910,340

The AUC improvement from 0.71 to 0.86 over six months demonstrates the compounding value of the feedback loop: every conversion or disqualification event becomes a labelled training example for the next model iteration. The model saturated around Month 5, suggesting that additional gains require new feature sources rather than more training data.

Lessons Learned

Six months of production operation surfaced several insights that would reshape how I approach similar projects. The following distills the most significant technical and operational learnings.

What Worked

SQLite as the single source of truth

Using SQLite for everything -- contacts, signals, scores, funnel events, and even MLflow backend storage -- eliminated an entire class of data consistency bugs. Every pipeline outcome is reproducible with a single database file. WAL mode provided sufficient concurrency for the workload (peak: 50 writes/second during batch ingestion). The ability to simply copy the database file for backup, debugging, or local development was invaluable.

Unified codebase for sales and recruiting

The abstraction of contacts and funnels as generic concepts (rather than building separate sales and recruiting applications) reduced total code by roughly 60%. The only differences between funnels are configuration: source adapters, scoring model weights, outreach templates, and threshold values. This also meant that improvements to the intelligence layer -- better feature engineering, a new embedding model -- immediately benefited both funnels.

Thompson Sampling for outreach optimisation

The multi-armed bandit approach to template selection outperformed static A/B testing by 23% in cumulative reply rate. Traditional A/B testing requires waiting for statistical significance before switching to the winning variant. Thompson Sampling naturally shifts traffic toward better-performing variants while still exploring, reducing the cost of experimentation.

MLflow for pipeline observability

Using MLflow beyond model training -- tracking every pipeline run, every outreach experiment, every feature engineering decision -- transformed debugging from archaeology into data analysis. When conversion rates dipped in Month 3, the MLflow UI immediately showed that a feature engineering change had degraded the timing model, and the model registry made rollback trivial.

What Did Not Work

Overengineered initial feature set

The first version of the feature engineer computed 47 features per contact. After running feature importance analysis via MLflow, only 8 features carried meaningful predictive weight. The remaining 39 added computational overhead, increased training time by 3x, and introduced noise that actually degraded the timing model. Simplifying to the top 12 features improved both accuracy and inference latency.

Aggressive outreach timing

Early experiments with 24-hour follow-up intervals produced higher unsubscribe rates without improving conversion. The optimal cadence -- discovered through MLflow experiment tracking -- was 3 days for the first follow-up and 5 days for the second. Patience correlated with professionalism in the recipient's perception.

Text embeddings as sole similarity measure

Using sentence-transformer embeddings for title/company similarity worked well for recruiting (where job titles are relatively standardised) but poorly for sales (where titles vary wildly across industries). Adding structured skill tags and industry codes as categorical features alongside the embeddings improved the sales funnel fit score by 0.09 AUC points.

Production Considerations

Rate limiting and source rotation

API rate limits are the primary bottleneck for ingestion throughput. The agent implements exponential backoff with jitter per source adapter, maintains a token bucket rate limiter, and rotates across multiple API keys where supported. All rate-limit events are logged as signals so the intelligence layer can account for ingestion gaps.

Data privacy and compliance

The agent only processes publicly available data and respects robots.txt directives. All contact records include a consent_status field that defaults to "pending". No outreach is sent until consent is verified. The SQLite database is encrypted at rest using SQLCipher, and all API credentials are managed through environment variables, never stored in code or configuration files.

Monitoring and alerting

The agent exposes Prometheus metrics for ingestion volume, scoring latency, funnel transition rates, and error counts. Grafana dashboards provide real-time visibility, and PagerDuty alerts fire when ingestion drops below expected thresholds or error rates exceed 5% of total pipeline volume. MLflow experiment tracking supplements this with weekly automated reports comparing model performance against baseline.

Scaling beyond SQLite

SQLite handles the current workload comfortably (10K contacts, 500K signals, 50K funnel events), but the architecture is designed for migration to PostgreSQL when needed. All database access goes through a repository pattern with SQL abstracted behind method calls. The migration would involve changing connection strings and adjusting a handful of SQLite-specific SQL expressions (datetime functions, JSON operators). The abstraction has already been validated: a PostgreSQL branch runs in CI to ensure compatibility.

Summary

Building an autonomous intelligence agent for outbound prospecting and recruiting demonstrated that a well-structured Python application with SQLite and MLflow can replace a constellation of point tools while delivering measurably better outcomes. The key insight is that the intelligence layer and the funnel manager are separate concerns: the scoring model answers "who should we talk to and when," while the funnel manager handles "how do we move them through the pipeline." Keeping these responsibilities distinct enabled rapid iteration on both axes independently.

The 40% increase in qualified pipeline velocity came not from any single improvement but from the compound effect of faster ingestion, better scoring, personalised outreach, and continuous optimisation through experiment tracking. Each component contributed incrementally, and MLflow provided the visibility to understand which levers had the most impact at each stage of the system's evolution.

For teams considering a similar approach, the advice is to start simple: a single data source, one scoring model, a basic funnel with three stages, and MLflow tracking from day one. The feedback loop compounds quickly -- within two months, the system had enough labelled data to outperform manual scoring, and within four months the outreach optimisation was generating statistically significant improvements in reply rates.