GadaaLabs
RAG Engineering — Production Retrieval-Augmented Generation
Lesson 11

Production RAG — Latency, Caching, Scaling & Cost

28 min

The Production Latency Budget

A responsive RAG API should complete in under 3 seconds end-to-end at the 99th percentile. The budget breaks down as follows:

| Stage | Target p50 | Target p99 | Notes | |-------|-----------|-----------|-------| | Embed query | 15 ms | 25 ms | Local model on CPU | | ANN search | 10 ms | 30 ms | HNSW, 1M vectors | | Reranking | 150 ms | 250 ms | MiniLM-L-6 CPU | | LLM generation | 800 ms | 1,800 ms | Groq, 500 token output | | Overhead (serialisation, I/O) | 20 ms | 50 ms | | | Total | ~1 s | ~2.2 s | |

Hitting these numbers requires async retrieval, an intelligent cache, and a fast LLM. Missing them usually means retrieval or reranking is running synchronously when it could be parallelised.

Async Parallel Retrieval

Dense embedding search and BM25 (sparse) search are independent. Run them concurrently:

python
import asyncio
import time
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient

embed_model = SentenceTransformer("BAAI/bge-large-en-v1.5")
qdrant = QdrantClient(url="http://localhost:6333")


async def embed_query_async(query: str) -> list[float]:
    """Run embedding in a thread pool so it doesn't block the event loop."""
    loop = asyncio.get_event_loop()
    vec = await loop.run_in_executor(
        None,
        lambda: embed_model.encode([query], normalize_embeddings=True)[0].tolist()
    )
    return vec


async def dense_search_async(query_vec: list[float], collection: str, k: int) -> list[dict]:
    """Async wrapper for Qdrant search."""
    loop = asyncio.get_event_loop()
    results = await loop.run_in_executor(
        None,
        lambda: qdrant.search(collection_name=collection, query_vector=query_vec, limit=k, with_payload=True)
    )
    return [{"id": str(r.id), "text": r.payload["text"], "score": r.score} for r in results]


async def bm25_search_async(query: str, k: int) -> list[dict]:
    """Async wrapper for BM25 search (using a BM25 library like rank_bm25)."""
    loop = asyncio.get_event_loop()
    # Placeholder — replace with your BM25 index lookup
    results = await loop.run_in_executor(None, lambda: bm25_index.search(query, k))
    return results


async def parallel_retrieve(
    query: str,
    collection: str,
    k: int = 20,
) -> list[dict]:
    """
    Run dense embedding + BM25 retrieval in parallel.
    Merge with RRF and return top-k results.
    """
    t0 = time.perf_counter()

    # Embed and search in parallel
    query_vec = await embed_query_async(query)
    dense_results, sparse_results = await asyncio.gather(
        dense_search_async(query_vec, collection, k),
        bm25_search_async(query, k),
    )

    latency_ms = (time.perf_counter() - t0) * 1000
    print(f"Parallel retrieval: {latency_ms:.1f} ms")

    # RRF merge
    all_ids = [r["id"] for r in dense_results]
    sparse_ids = [r["id"] for r in sparse_results]
    scores: dict[str, float] = {}
    for ranked, source_ids in [(enumerate(all_ids, 1), all_ids), (enumerate(sparse_ids, 1), sparse_ids)]:
        for rank, doc_id in enumerate(source_ids, start=1):
            scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (60 + rank)

    result_map = {r["id"]: r for r in dense_results + sparse_results}
    merged_ids = sorted(scores, key=lambda x: scores[x], reverse=True)[:k]
    return [result_map[d] for d in merged_ids if d in result_map]

Semantic Cache

A semantic cache stores (query_embedding, cached_answer) pairs. When a new query arrives:

  1. Embed it.
  2. Search the cache for any query with cosine similarity > 0.95.
  3. If found, return the cached answer immediately — skipping retrieval and generation.
  4. If not found, run the full pipeline and store the result in the cache.

This works because many users ask semantically identical questions with different wording. The cache hit rate in a production customer-facing RAG system is typically 20–40%.

python
import hashlib
import chromadb
from chromadb.config import Settings

# Use a lightweight ChromaDB instance as the cache store
cache_db = chromadb.Client(Settings(anonymized_telemetry=False))
cache_collection = cache_db.get_or_create_collection(
    name="semantic_cache",
    metadata={"hnsw:space": "cosine"},
)


def semantic_cache_lookup(
    query: str,
    query_vec: list[float],
    threshold: float = 0.95,
) -> str | None:
    """
    Look up a query in the semantic cache.
    Returns cached answer string if a sufficiently similar query exists,
    otherwise returns None.
    """
    if cache_collection.count() == 0:
        return None

    results = cache_collection.query(
        query_embeddings=[query_vec],
        n_results=1,
        include=["metadatas", "distances"],
    )

    if not results["distances"][0]:
        return None

    distance = results["distances"][0][0]
    similarity = 1.0 - distance  # ChromaDB cosine distance → similarity

    if similarity >= threshold:
        cached_answer = results["metadatas"][0][0]["answer"]
        print(f"Cache HIT (similarity={similarity:.3f})")
        return cached_answer

    return None


def semantic_cache_store(query: str, query_vec: list[float], answer: str) -> None:
    """Store a query-answer pair in the semantic cache."""
    cache_id = hashlib.sha256(query.encode()).hexdigest()[:16]
    cache_collection.upsert(
        ids=[cache_id],
        embeddings=[query_vec],
        metadatas=[{"query": query, "answer": answer}],
    )

Exact Match Cache with Redis

For truly identical queries (same string, lowercased and stripped), an exact match cache with Redis is faster and cheaper than a semantic lookup:

python
import redis
import hashlib
import json

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

CACHE_TTL_SECONDS = 3600  # 1 hour


def exact_cache_get(query: str) -> str | None:
    """Look up an exact query string in Redis."""
    key = "rag:" + hashlib.sha256(query.lower().strip().encode()).hexdigest()
    cached = redis_client.get(key)
    if cached:
        return json.loads(cached)["answer"]
    return None


def exact_cache_set(query: str, answer: str) -> None:
    """Store a query-answer pair in Redis with TTL."""
    key = "rag:" + hashlib.sha256(query.lower().strip().encode()).hexdigest()
    redis_client.setex(key, CACHE_TTL_SECONDS, json.dumps({"answer": answer}))

Tiered Retrieval

For very large corpora, add a fast BM25 first pass to reduce the ANN search space:

python
def tiered_retrieve(
    query: str,
    bm25_top_n: int = 200,
    ann_top_k: int = 10,
) -> list[dict]:
    """
    Two-stage retrieval:
    Stage 1: BM25 filters the corpus to top-200 candidates (fast, keyword-based)
    Stage 2: ANN search only within those 200 IDs (avoids full 1M-vector scan)
    """
    # Stage 1: BM25 candidate filtering
    bm25_candidates = bm25_index.search(query, top_n=bm25_top_n)
    candidate_ids = [c["id"] for c in bm25_candidates]

    # Stage 2: ANN search with ID filter
    query_vec = embed_model.encode([query], normalize_embeddings=True)[0].tolist()
    from qdrant_client.models import Filter, HasIdCondition
    results = qdrant.search(
        collection_name="rag_chunks",
        query_vector=query_vec,
        query_filter=Filter(must=[HasIdCondition(has_id=candidate_ids)]),
        limit=ann_top_k,
        with_payload=True,
    )
    return [{"id": str(r.id), "text": r.payload["text"], "score": r.score} for r in results]

Index Update Strategies

| Strategy | Latency to Reflect Updates | Complexity | Best For | |----------|---------------------------|------------|----------| | Real-time | <1 second | Low | Low-volume, time-sensitive | | Micro-batch | 1–5 minutes | Medium | Standard production | | Nightly rebuild | Up to 24 hours | Low | Stable corpora |

python
import asyncio
from collections import deque
import datetime

# Micro-batch update queue
update_queue: deque = deque()

async def queue_document_update(doc_id: str, text: str, metadata: dict) -> None:
    """Add a document update to the pending queue."""
    update_queue.append({"doc_id": doc_id, "text": text, "metadata": metadata, "queued_at": datetime.datetime.utcnow().isoformat()})

async def flush_update_queue(batch_size: int = 100) -> int:
    """Process pending updates in batches. Call every 5 minutes from a background task."""
    if not update_queue:
        return 0

    batch = []
    while update_queue and len(batch) < batch_size:
        batch.append(update_queue.popleft())

    # Embed all texts in the batch
    texts = [item["text"] for item in batch]
    embeddings = embed_model.encode(texts, normalize_embeddings=True, batch_size=32).tolist()

    # Upsert into Qdrant
    from qdrant_client.models import PointStruct
    points = [
        PointStruct(id=item["doc_id"], vector=emb, payload={**item["metadata"], "text": item["text"]})
        for item, emb in zip(batch, embeddings)
    ]
    qdrant.upsert(collection_name="rag_chunks", points=points)
    return len(batch)

Cost Breakdown

| Component | Free / Self-Hosted | Cloud Managed | |-----------|-------------------|---------------| | Embedding model | $0 (BGE-large local) | $0.13/1M tokens (ada-002) | | Vector DB (1M vectors) | $0 (Qdrant Docker) | ~$70/month (Pinecone starter) | | Reranker | $0 (MiniLM local) | ~$1/1M calls (Cohere) | | LLM | $0 (Groq free tier) | $5/1M tokens (GPT-4o) | | Redis cache | $0 (self-hosted) | ~$15/month (Redis Cloud) |

At Groq free tier + local models + self-hosted Qdrant: effectively $0 for development and low-volume production.

Production FastAPI Server

python
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from contextlib import asynccontextmanager
import uuid

app = FastAPI(title="Production RAG API")


class QueryRequest(BaseModel):
    query: str
    tenant_id: str
    k: int = 5
    use_cache: bool = True


class QueryResponse(BaseModel):
    answer: str
    sources: list[dict]
    cache_hit: bool
    latency_ms: float


@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest) -> QueryResponse:
    """
    Main RAG query endpoint.
    Order: exact cache → semantic cache → parallel retrieval → rerank → generate.
    """
    t_start = time.perf_counter()

    # 1. Exact cache check (fastest)
    if request.use_cache:
        cached = exact_cache_get(request.query)
        if cached:
            return QueryResponse(
                answer=cached,
                sources=[],
                cache_hit=True,
                latency_ms=(time.perf_counter() - t_start) * 1000,
            )

    # 2. Embed query (needed for semantic cache + retrieval)
    query_vec = await embed_query_async(request.query)

    # 3. Semantic cache check
    if request.use_cache:
        cached = semantic_cache_lookup(request.query, query_vec)
        if cached:
            return QueryResponse(
                answer=cached,
                sources=[],
                cache_hit=True,
                latency_ms=(time.perf_counter() - t_start) * 1000,
            )

    # 4. Parallel retrieval
    candidates = await parallel_retrieve(request.query, collection="rag_chunks", k=50)

    # 5. Rerank
    from sentence_transformers import CrossEncoder
    reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
    pairs = [(request.query, c["text"]) for c in candidates[:50]]
    scores = reranker.predict(pairs)
    for c, s in zip(candidates, scores):
        c["rerank_score"] = float(s)
    top_docs = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:request.k]

    # 6. Generate
    context = "\n\n".join(f"[{i+1}] {d['text']}" for i, d in enumerate(top_docs))
    from groq import Groq
    groq = Groq()
    gen_response = groq.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[
            {"role": "system", "content": "Answer using only the provided context. Cite sources by number."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {request.query}"},
        ],
        temperature=0.1,
        max_tokens=500,
    )
    answer = gen_response.choices[0].message.content

    # 7. Store in cache
    if request.use_cache:
        exact_cache_set(request.query, answer)
        semantic_cache_store(request.query, query_vec, answer)

    sources = [{"text": d["text"][:200], "score": d["rerank_score"]} for d in top_docs]
    return QueryResponse(
        answer=answer,
        sources=sources,
        cache_hit=False,
        latency_ms=(time.perf_counter() - t_start) * 1000,
    )


@app.get("/health")
async def health() -> dict:
    return {"status": "ok"}

Monitoring

Track these metrics in production:

python
import sqlite3
import datetime


class RAGMetrics:
    """Lightweight SQLite-based metrics store for production RAG."""

    def __init__(self, db_path: str = "rag_metrics.db"):
        self.db_path = db_path
        with sqlite3.connect(db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS query_metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT,
                    latency_ms REAL,
                    cache_hit INTEGER,
                    embed_ms REAL,
                    retrieve_ms REAL,
                    rerank_ms REAL,
                    llm_ms REAL,
                    cost_usd REAL
                )
            """)

    def record(self, **kwargs) -> None:
        kwargs["timestamp"] = datetime.datetime.utcnow().isoformat()
        with sqlite3.connect(self.db_path) as conn:
            cols = ", ".join(kwargs.keys())
            placeholders = ", ".join("?" * len(kwargs))
            conn.execute(f"INSERT INTO query_metrics ({cols}) VALUES ({placeholders})", list(kwargs.values()))

    def get_p99_latency(self, window_hours: int = 1) -> float:
        since = (datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours)).isoformat()
        with sqlite3.connect(self.db_path) as conn:
            rows = conn.execute(
                "SELECT latency_ms FROM query_metrics WHERE timestamp > ? ORDER BY latency_ms",
                (since,)
            ).fetchall()
        if not rows:
            return 0.0
        p99_idx = int(len(rows) * 0.99)
        return rows[p99_idx][0]

    def get_cache_hit_rate(self, window_hours: int = 1) -> float:
        since = (datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours)).isoformat()
        with sqlite3.connect(self.db_path) as conn:
            total, hits = conn.execute(
                "SELECT COUNT(*), SUM(cache_hit) FROM query_metrics WHERE timestamp > ?", (since,)
            ).fetchone()
        return hits / total if total else 0.0

Alert when: p99 latency > 3 s, cache hit rate drops below 20%, or rerank latency exceeds 500 ms (GPU may have gone to sleep).

Key Takeaways

  • The production RAG latency budget is p99 < 3 s; the dominant costs are reranking (~200 ms) and LLM generation (~1–2 s).
  • Run dense and sparse retrieval in parallel with asyncio.gather — this alone removes 100–200 ms from the critical path.
  • A semantic cache (cosine similarity > 0.95) achieves 20–40% hit rates on real traffic; always check the exact Redis cache first (O(1)) before the semantic lookup.
  • Tiered retrieval (BM25 first pass → ANN on filtered set) is the right approach for corpora larger than 10M vectors.
  • Use micro-batch index updates (queue + flush every 5 minutes) as the default; real-time upserts for high-urgency pipelines.
  • At zero cloud cost: BGE-large (free, local) + Qdrant self-hosted + Groq free tier delivers production-quality RAG for development and low-volume deployments.
  • Monitor p99 latency, cache hit rate, reranker latency, and LLM latency separately — each has a different fix when it degrades.
  • Run the quality sampler (LLM-as-judge on 10% of queries) continuously; latency and error rate metrics alone do not catch answer quality degradation.