How to Build LLM Observability Without Adding Another Paid Tool: Cost Tracking, Quality Monitoring, and Failure Alerts in Python

Every LLM observability guide tells you to install Langfuse or add Helicone as a proxy. Langfuse needs a database and Docker setup. Helicone routes every API call through their infrastructure — if they're down, your AI features are down. For a production pipeline running 161 sequential GPT-4o calls across 5 workers, I needed cost visibility, quality alerts, and failure notifications without routing calls through a third party. This LLM observability cost tracking quality monitoring Python 2026 without Langfuse guide shows the 60-line class that saved to my existing MongoDB and fires Slack alerts before the invoice arrives.

LLM Observability Without Paid Tools Cost Tracking Quality Monitoring Python 2026
60-line LLMObserver class: cost per call (GPT-4o at $2.50/1M + $10.00/1M), quality threshold alerts, MongoDB trace storage — no proxy, no accounts, no dashboard needed

Every LLM observability guide tells you to install Langfuse or add Helicone as a proxy. Langfuse needs a database and Docker setup. Helicone routes every API call through their infrastructure — if they're down, your AI features are down. For a production AI pipeline running 161 sequential GPT-4o calls across 5 workers, I needed cost visibility, quality alerts, and failure notifications — without routing calls through a third party. Here's the 60-line Python class that replaced the paid tool, saved to my existing MongoDB, and fires Slack alerts before the invoice arrives.

This is not a takedown of paid tools. Langfuse is good. Helicone is good. This post is for developers who have a specific reason not to add another dependency: compliance, proxy risk, existing infrastructure, or simply not wanting to sign up for another service.

What LLM Observability Actually Requires — And What Paid Tools Add on Top

LLM observability requires four things: cost per call, latency per call, quality signal per output, and an alert channel — all achievable with 60 lines of Python and your existing database. Paid tools like Langfuse and Helicone add dashboards, prompt management, and integrations on top of this foundation.

Before comparing vendors, define what you actually need. These four components cover every production incident I've debugged across two anonymised systems — an AI report generation SaaS and a 10-tool Gemini content platform:

  1. Cost tracking: (prompt_tokens + completion_tokens) × price = cost per call
  2. Latency tracking: time from request sent to first token (TTFB) and total wall-clock time
  3. Quality monitoring: output length checks, format validation, score thresholds
  4. Failure alerts: when calls fail, when quality drops below threshold, when cost spikes

Langfuse (MIT, self-hostable) adds tracing dashboards and prompt management. Helicone (Apache-2.0, free up to 10K requests/month) adds a proxy gateway. Arize Phoenix integrates OpenTelemetry for experiments. LangSmith offers 5,000 traces/month free for LangChain-native workflows. Braintrust provides 1M trace spans/month on its free tier. OpenLLMetry and Traceloop are fully open-source. Portkey is an MIT-licensed unified API gateway. All are legitimate choices — the question is whether you need what they add, or only the four foundations above.

Feature Langfuse (self-hosted) Helicone (proxy) DIY (this guide)
Setup time 2–4 hours (Docker) 5 minutes 30 minutes
External dependency Self-hosted DB Their proxy infra None
LLM calls route through Direct Helicone's servers Direct
If tool is down Your app unaffected Your AI features fail Your app unaffected
Cost per month (10K+ req) Hosting cost Paid tier $0
Prompt management Yes No No (not needed here)
Visual dashboard Yes Yes No (DB queries instead)
Cost tracking Yes Yes Yes — 60 lines
Quality monitoring Yes Basic Yes — custom thresholds
Works inside Celery workers Caution Adds latency Yes — no HTTP overhead
Compliance-friendly (no data leaving infra) Yes No Yes

Helicone and Portkey work as a proxy between your application and the LLM. Every API call routes through their infrastructure. If their proxy has an incident, your AI features stop working — regardless of whether OpenAI or Google are up. This is the structural tradeoff that proxy-based observability introduces. It's a legitimate tradeoff for many teams. For a pipeline running 161 sequential calls per job across 5 workers, I couldn't accept that dependency.

DIY observability makes sense when traces contain regulated user data you cannot send to a third party — healthcare, finance, and other regulated industries cannot route prompts and model outputs through a third-party proxy. When you already run MongoDB or PostgreSQL for application data, adding Langfuse means running another database service dedicated to traces. When Celery workers execute 161 calls in a tight loop, Helicone's HTTP proxy adds latency on every iteration. When Helicone's 10K free request limit is exceeded monthly at production scale, the paid tier becomes another line item. For a solo developer or small team shipping AI features, 60 lines of Python attached to infrastructure you already operate is often enough to ship with confidence.

The LLMObserver Class: Cost Tracking and Quality Monitoring in 60 Lines

The LLMObserver class wraps any async LLM call, records an LLMTrace with token counts, cost, latency, and output quality, saves it to your existing database, and fires configurable alerts — in approximately 60 lines, with no dependencies beyond your existing database driver.

The LLMTrace dataclass

Each API call produces one LLMTrace record: model name, prompt and completion token counts, latency in milliseconds, output character length, success flag, optional error string, and arbitrary metadata (job ID, section index, tool name, user ID). The cost_usd property calculates cost from a PRICING dictionary — GPT-4o at $2.50/1M input and $10.00/1M output, GPT-4o-mini at $0.15/$0.60, Gemini 2.5 Flash at $0.30/$2.50, Gemini 3.5 Flash at $1.50/$9.00. No stored cost value means provider price changes don't require data migration.

The observe() wrapper

Wrap the API call in observe(), which times the call with time.monotonic(), catches exceptions, extracts token usage from the response object, builds the trace in a finally block (so failed calls still get logged), checks quality thresholds, accumulates session cost, and saves — all without blocking the pipeline if the database write fails. The wrapper accepts any async callable — OpenAI's chat.completions.create, Gemini's generate_content, or a custom retry wrapper — as long as the response object exposes a usage attribute with token counts. Failed calls record success=False and the exception message, which is how you catch worker crashes and rate-limit storms before users report empty outputs.

from __future__ import annotations
import time
import logging
from dataclasses import dataclass, field
from datetime import datetime, UTC
from typing import Any, Callable, Optional

logger = logging.getLogger(__name__)

# ── PRICING: update when providers change rates ─────────────────────────
PRICING: dict[str, dict[str, float]] = {
    "gpt-4o":           {"input": 2.50,  "output": 10.00},
    "gpt-4o-mini":      {"input": 0.15,  "output": 0.60},
    "gemini-2.5-flash": {"input": 0.30,  "output": 2.50},
    "gemini-3.5-flash": {"input": 1.50,  "output": 9.00},
}

@dataclass
class LLMTrace:
    model: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float
    output_length: int
    success: bool
    error: Optional[str] = None
    metadata: dict = field(default_factory=dict)
    timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))

    @property
    def cost_usd(self) -> float:
        p = PRICING.get(self.model, {"input": 0.0, "output": 0.0})
        return (
            (self.prompt_tokens / 1_000_000) * p["input"] +
            (self.completion_tokens / 1_000_000) * p["output"]
        )

    def to_dict(self) -> dict:
        return {
            "model": self.model,
            "prompt_tokens": self.prompt_tokens,
            "completion_tokens": self.completion_tokens,
            "cost_usd": round(self.cost_usd, 8),
            "latency_ms": self.latency_ms,
            "output_length": self.output_length,
            "success": self.success,
            "error": self.error,
            "metadata": self.metadata,
            "timestamp": self.timestamp.isoformat(),
        }


class LLMObserver:
    """
    60-line production LLM observability.
    Zero external dependencies — plugs into your existing DB and alert channel.

    Usage:
        observer = LLMObserver(save_trace=mongo_save, alert=slack_alert)
        response = await observer.observe(
            lambda: client.chat.completions.create(...),
            model="gpt-4o", metadata={"job_id": "abc", "section": 3}
        )
    """

    def __init__(
        self,
        save_trace: Callable[[dict], None],
        alert: Optional[Callable[[str], None]] = None,
        cost_alert_usd: float = 5.0,
        min_output_chars: int = 50,
    ):
        self.save_trace = save_trace
        self.alert = alert
        self.cost_alert_usd = cost_alert_usd
        self.min_output_chars = min_output_chars
        self._session_cost: float = 0.0

    async def observe(
        self,
        func: Callable,
        model: str,
        metadata: dict | None = None,
    ) -> Any:
        """Wrap any async LLM call with full observability."""
        start = time.monotonic()
        response = error_msg = None

        try:
            response = await func()
            return response
        except Exception as exc:
            error_msg = str(exc)
            raise
        finally:
            latency_ms = round((time.monotonic() - start) * 1000, 2)

            usage = getattr(response, "usage", None)
            prompt_tokens = getattr(usage, "prompt_tokens", 0) if usage else 0
            completion_tokens = getattr(usage, "completion_tokens", 0) if usage else 0

            content = ""
            if response and error_msg is None:
                try:
                    content = response.choices[0].message.content or ""
                except (AttributeError, IndexError):
                    pass

            trace = LLMTrace(
                model=model,
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                latency_ms=latency_ms,
                output_length=len(content),
                success=error_msg is None,
                error=error_msg,
                metadata=metadata or {},
            )

            try:
                self.save_trace(trace.to_dict())
            except Exception as db_err:
                logger.warning("Trace save failed: %s", db_err)

            self._session_cost += trace.cost_usd

            if trace.success and trace.output_length < self.min_output_chars:
                msg = (
                    f"Low-quality output from {model}: "
                    f"{trace.output_length} chars (min {self.min_output_chars}). "
                    f"Section: {metadata.get('section', '?') if metadata else '?'}"
                )
                logger.warning(msg)
                if self.alert:
                    self.alert(msg)

            if self._session_cost > self.cost_alert_usd:
                msg = (
                    f"Cost alert: ${self._session_cost:.4f} spent "
                    f"(threshold ${self.cost_alert_usd})"
                )
                logger.warning(msg)
                if self.alert:
                    self.alert(msg)
                self._session_cost = 0.0

    @property
    def session_cost(self) -> float:
        return round(self._session_cost, 6)

Saving Traces to Your Existing Database and Firing Alerts Without a Dashboard

Plug the LLMObserver into your existing MongoDB or PostgreSQL database with a single save function, and route alerts to Slack via webhook or email via SendGrid — no new infrastructure, no new accounts.

MongoDB save and aggregate queries

The save function is three lines: get a collection reference, call insert_one. The value is in the aggregate pipeline — total cost per job, average latency per model, error rate per section type, and low-quality call counts. Run these at job completion for billing reconciliation and at month-end for per-user quota enforcement. PostgreSQL works identically: insert traces into an llm_traces table with a JSONB metadata column, then query with GROUP BY metadata->>'job_id' and SUM(cost_usd). The observer class does not care which database you use — only that save_trace accepts a dict and returns nothing. Index metadata.job_id and metadata.user_id for fast aggregates at scale.

import os
import httpx
from pymongo import MongoClient
from openai import AsyncOpenAI

# ── MongoDB save (3 lines) ──────────────────────────────────────────────
client_db = MongoClient(os.environ["MONGODB_URI"])
traces = client_db["myapp"]["llm_traces"]

def save_to_mongo(trace: dict) -> None:
    traces.insert_one(trace)

# ── Slack alert via webhook ────────────────────────────────────────────
def slack_alert(message: str) -> None:
    """Fire-and-forget Slack alert. Use a task queue for retries in production."""
    httpx.post(
        os.environ["SLACK_WEBHOOK_URL"],
        json={"text": message},
        timeout=3.0,
    )

# ── Plug it all together ───────────────────────────────────────────────
observer = LLMObserver(
    save_trace=save_to_mongo,
    alert=slack_alert,
    cost_alert_usd=5.0,
    min_output_chars=100,
)

openai_client = AsyncOpenAI()

async def generate_section(section_prompt: str, section_index: int) -> str:
    response = await observer.observe(
        func=lambda: openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": section_prompt}],
            max_tokens=12_000,
        ),
        model="gpt-4o",
        metadata={"section": section_index, "job_id": current_job_id},
    )
    return response.choices[0].message.content or ""

# ── Useful MongoDB aggregate: cost + quality per job ──────────────────
def get_job_summary(job_id: str) -> dict:
    pipeline = [
        {"$match": {"metadata.job_id": job_id}},
        {"$group": {
            "_id": "$metadata.job_id",
            "total_cost_usd":    {"$sum": "$cost_usd"},
            "avg_latency_ms":    {"$avg": "$latency_ms"},
            "total_calls":       {"$sum": 1},
            "failed_calls":      {"$sum": {"$cond": [{"$eq": ["$success", False]}, 1, 0]}},
            "low_quality_calls": {"$sum": {"$cond": [{"$lt": ["$output_length", 100]}, 1, 0]}},
        }}
    ]
    return traces.aggregate(pipeline).next()

Two Production Patterns: The 161-Call Report Pipeline vs the 10-Tool SaaS

A sequential AI report pipeline needs per-call cost accumulation with a per-job ceiling alert; a multi-tool SaaS needs per-tool quality baselines and per-user cost quota enforcement — both patterns work with the same LLMObserver class, configured differently per use case.

Pattern A — Sequential pipeline (the AI report SaaS)

An AI report generation SaaS runs 161 sequential GPT-4o calls per large report over 2–4 hours inside Celery workers — any of 5 worker processes can crash mid-job. Before observability, the $203 first run had no cost visibility until the invoice arrived. After: every call logs tokens and cost, cumulative spend tracks per job, and a $25 cost ceiling fires a Slack alert before runaway spend repeats.

Each of the 161 calls wraps with observe(). Metadata includes {"job_id": job_id, "section_index": i, "report_type": "full"}. Worker crash detection layers on top: if a section's trace never appears in MongoDB within the expected window, alert separately. The async AI pipeline guide for long-running Python jobs covers how the 161 calls are structured inside a Celery worker — the observer wraps directly into that pattern without any architectural change.

The cost visibility that observability provides is what revealed the $203 naive first run. Once every call was logged, the optimization path was clear — the journey from $203 to $14 started with knowing where the tokens were actually going, as documented in the GPT-4o production cost optimization guide.

Pattern B — 10-tool content platform

A 10-tool AI content platform runs Gemini 2.5 Flash for ad copy, captions, scripts, and emails. Each tool should produce at least 100 characters of structured output. Configure min_output_chars per tool — 600 for a 150-word ad copy tool, 50 for a hashtag suggestion list. Pass metadata={"tool": "facebook_ad", "user_id": session.user.id} on every call, then aggregate by metadata.tool to catch prompt regressions when average output length drops across a rolling window.

Tip

For per-user cost tracking in a multi-tenant SaaS, add the user ID to every trace's metadata: metadata={"tool": "facebook_ad", "user_id": session.user.id}. Then run the aggregate grouped by metadata.user_id to enforce per-user monthly quotas without a separate Redis-based tracking system. One MongoDB query gives you the full picture at billing time.

Building Quality Monitoring Without an LLM-as-Judge

Production quality monitoring without a paid tool starts with quantitative signals — output length, format validation, response time — that catch the most common failure modes without requiring another LLM call to score the output.

The quantitative quality signals

  • output_length: below threshold means incomplete generation — the most common failure across both production systems
  • latency_ms: a call taking 3× the section average usually indicates timeout, rate limiting, or model slowdown
  • error rate: track success=False over a rolling 50-call window per tool or section type
  • format validation: for structured JSON output, validate against your Zod or Pydantic schema and log validation failures in the trace metadata

When to add LLM-as-judge scoring

For systems where output quality is business-critical — a $14 professional report must be accurate, not just long enough — a secondary LLM-as-judge call adds a numeric quality score (~$0.002 per judgment on GPT-4o-mini at $0.15/1M input). This catches semantic failures that length checks miss: hallucinated citations, wrong tone, or off-topic sections. Log the score in trace metadata and alert when the rolling average drops below 0.7. Add this only after quantitative signals are in place — length and latency catch 80% of production failures at zero extra API cost. The judge call itself can wrap in the same observe() method with metadata={"type": "quality_judge", "parent_section": 3} so judge costs appear separately in your per-job aggregate.

When to Add a Paid Tool (The Honest Answer)

Add Langfuse when multiple developers need a shared dashboard for debugging, prompt management across team members, or annotation workflows. Add Helicone when you need multi-provider cost visibility out of the box and the proxy risk is acceptable for your use case.

The three signals that mean you've outgrown DIY

  • More than 3 developers working on the AI system — shared visibility needs a UI, not MongoDB aggregates
  • Prompt A/B testing at scale — Langfuse's prompt management is hard to replicate in a weekend
  • Non-engineers need to review AI outputs — dashboards beat ad-hoc database queries

The signals that mean DIY is correct

  • Solo or pair engineering the system
  • Compliance or data sovereignty requirements — traces stay in your infrastructure
  • Already using MongoDB or PostgreSQL for everything else
  • Running in Celery workers where proxy latency matters
  • Helicone's 10K free request limit exceeded monthly at production scale

I document production LLM observability patterns on hassanr.com because the gap between "install Langfuse" and "know your per-job cost before the invoice" is where most solo AI engineers get surprised. Hassan Raza built this observer for two live production systems — 161-call pipelines and 10-tool SaaS platforms — without adding a single paid observability account.

Frequently Asked Questions

Calculate cost per call with (prompt_tokens / 1,000,000) × input_price + (completion_tokens / 1,000,000) × output_price. For GPT-4o, that's $2.50 per million input tokens and $10.00 per million output tokens. Log this value on every response using an LLMTrace dataclass that computes cost_usd from a PRICING dictionary — never store a fixed cost field, so price updates don't require migrations. Save each trace to MongoDB or PostgreSQL and run aggregate queries for cost per job, per user, or per model. Set a per-session cost ceiling in your observer class that sends a Slack alert when accumulated spend crosses the threshold — before the invoice arrives, not after.

LLM observability is logging and monitoring the cost, latency, quality, and failure rate of every LLM API call in production. It matters because LLM behavior is non-deterministic: the same prompt can produce outputs with wildly different quality, length, and cost. Without observability, failures stay invisible until a user complains or an invoice lands. The four core components are cost tracking (tokens × price per call), latency tracking (time to first token and total duration), quality monitoring (output length checks and format validation), and alerting (Slack or email when thresholds break).

Track quality with quantitative proxies that need no visual UI: output_length below your minimum signals incomplete generation, latency_ms spikes above 2× the section average indicate slowdowns or timeouts, and error rate over a rolling window catches systemic failures. Save these fields alongside each API call in MongoDB or PostgreSQL. Fire a Slack webhook when output_length drops below the threshold for that tool or section type. For semantic correctness — whether the answer is factually right — add an optional LLM-as-judge call that scores output 0–1 and logs the score with the trace.