Building RAG from Scratch: A Complete Implementation Guide

Learn how to implement Retrieval-Augmented Generation (RAG) from the ground up, including document processing, vector embeddings, similarity search, and LLM integration.


Building RAG from Scratch: A Complete Implementation Guide

Retrieval-Augmented Generation (RAG) has revolutionized how we build AI applications by combining the power of large language models with external knowledge sources. In this comprehensive guide, we'll implement RAG from scratch, understanding every component and building a production-ready system.

What is RAG?

RAG is an AI framework that enhances language models by retrieving relevant information from external sources before generating responses. This approach addresses several key limitations of traditional LLMs:

  • Factual Accuracy: Access to up-to-date, verified information
  • Domain Expertise: Specialized knowledge beyond training data
  • Transparency: Traceable sources for generated content
  • Cost Efficiency: Reduced hallucination and improved response quality

RAG Architecture Overview

A typical RAG system consists of four main components:

  1. Document Processor: Ingests and preprocesses documents
  2. Vector Database: Stores document embeddings for similarity search
  3. Retriever: Finds relevant documents based on queries
  4. Generator: Produces final responses using retrieved context

Step 1: Document Processing Pipeline

Let's start by building a robust document processing system:

import os
import re
from typing import List, Dict, Any
from dataclasses import dataclass
from pathlib import Path
import fitz  # PyMuPDF
from bs4 import BeautifulSoup
import markdown

@dataclass
class Document:
    content: str
    metadata: Dict[str, Any]
    chunks: List[str]

class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def process_pdf(self, file_path: str) -> Document:
        """Extract text from PDF files"""
        doc = fitz.open(file_path)
        text = ""
        for page in doc:
            text += page.get_text()
        
        metadata = {
            "source": file_path,
            "type": "pdf",
            "pages": len(doc)
        }
        
        return Document(
            content=text,
            metadata=metadata,
            chunks=self._create_chunks(text)
        )
    
    def process_markdown(self, file_path: str) -> Document:
        """Process markdown files"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # Convert markdown to HTML then extract text
        html = markdown.markdown(content)
        soup = BeautifulSoup(html, 'html.parser')
        text = soup.get_text()
        
        metadata = {
            "source": file_path,
            "type": "markdown"
        }
        
        return Document(
            content=text,
            metadata=metadata,
            chunks=self._create_chunks(text)
        )
    
    def _create_chunks(self, text: str) -> List[str]:
        """Create overlapping text chunks for better retrieval"""
        chunks = []
        start = 0
        
        while start < len(text):
            end = start + self.chunk_size
            chunk = text[start:end].strip()
            
            if chunk:
                chunks.append(chunk)
            
            start = end - self.chunk_overlap
            
            if start >= len(text):
                break
        
        return chunks

Step 2: Vector Embeddings and Storage

Next, we'll implement the vector embedding system using sentence transformers:

import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Tuple
import faiss
import pickle

class VectorStore:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.encoder = SentenceTransformer(model_name)
        self.index = None
        self.documents = []
        self.dimension = self.encoder.get_sentence_embedding_dimension()
    
    def add_documents(self, documents: List[Document]):
        """Add documents to the vector store"""
        all_chunks = []
        all_embeddings = []
        
        for doc in documents:
            for i, chunk in enumerate(doc.chunks):
                embedding = self.encoder.encode(chunk)
                all_embeddings.append(embedding)
                
                chunk_metadata = {
                    "document_source": doc.metadata["source"],
                    "chunk_index": i,
                    "chunk_size": len(chunk),
                    **doc.metadata
                }
                
                all_chunks.append({
                    "content": chunk,
                    "metadata": chunk_metadata
                })
        
        # Convert to numpy arrays
        embeddings_array = np.array(all_embeddings).astype('float32')
        
        # Create FAISS index
        if self.index is None:
            self.index = faiss.IndexFlatIP(self.dimension)
        
        # Normalize embeddings for cosine similarity
        faiss.normalize_L2(embeddings_array)
        self.index.add(embeddings_array)
        
        self.documents = all_chunks
    
    def search(self, query: str, k: int = 5) -> List[Tuple[float, Dict]]:
        """Search for similar documents"""
        query_embedding = self.encoder.encode(query)
        query_embedding = query_embedding.reshape(1, -1).astype('float32')
        faiss.normalize_L2(query_embedding)
        
        # Perform similarity search
        scores, indices = self.index.search(query_embedding, k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx < len(self.documents):
                results.append((float(score), self.documents[idx]))
        
        return results
    
    def save(self, filepath: str):
        """Save the vector store to disk"""
        with open(filepath, 'wb') as f:
            pickle.dump({
                'index': self.index,
                'documents': self.documents,
                'dimension': self.dimension
            }, f)
    
    def load(self, filepath: str):
        """Load the vector store from disk"""
        with open(filepath, 'rb') as f:
            data = pickle.load(f)
            self.index = data['index']
            self.documents = data['documents']
            self.dimension = data['dimension']

Step 3: Retrieval and Reranking

Implementing intelligent retrieval with reranking for better relevance:

from typing import List, Tuple
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class Retriever:
    def __init__(self, vector_store: VectorStore, reranker=None):
        self.vector_store = vector_store
        self.reranker = reranker
    
    def retrieve(self, query: str, k: int = 10, rerank_k: int = 5) -> List[Dict]:
        """Retrieve and optionally rerank documents"""
        # Initial retrieval
        initial_results = self.vector_store.search(query, k)
        
        if self.reranker and len(initial_results) > rerank_k:
            # Rerank top results for better relevance
            reranked_results = self._rerank(query, initial_results[:rerank_k])
            return reranked_results
        
        return [{"content": doc["content"], "metadata": doc["metadata"], "score": score} 
                for score, doc in initial_results]
    
    def _rerank(self, query: str, results: List[Tuple[float, Dict]]) -> List[Dict]:
        """Rerank results using cross-encoder for better relevance"""
        if not self.reranker:
            return [{"content": doc["content"], "metadata": doc["metadata"], "score": score} 
                    for score, doc in results]
        
        # Prepare pairs for reranking
        pairs = [(query, result[1]["content"]) for result in results]
        
        # Get reranking scores
        rerank_scores = self.reranker.predict(pairs)
        
        # Combine original scores with rerank scores
        combined_results = []
        for i, (original_score, doc) in enumerate(results):
            combined_score = 0.7 * original_score + 0.3 * rerank_scores[i]
            combined_results.append({
                "content": doc["content"],
                "metadata": doc["metadata"],
                "score": combined_score,
                "original_score": original_score,
                "rerank_score": float(rerank_scores[i])
            })
        
        # Sort by combined score
        combined_results.sort(key=lambda x: x["score"], reverse=True)
        return combined_results

Step 4: LLM Integration and Response Generation

Now let's integrate with language models for response generation:

import openai
from typing import List, Dict
import json

class RAGGenerator:
    def __init__(self, llm_client, model_name: str = "gpt-3.5-turbo"):
        self.llm_client = llm_client
        self.model_name = model_name
    
    def generate_response(self, query: str, retrieved_docs: List[Dict]) -> str:
        """Generate response using retrieved context"""
        
        # Prepare context from retrieved documents
        context = self._prepare_context(retrieved_docs)
        
        # Create prompt with context
        prompt = self._create_prompt(query, context)
        
        # Generate response
        response = self._call_llm(prompt)
        
        return response
    
    def _prepare_context(self, retrieved_docs: List[Dict]) -> str:
        """Prepare context string from retrieved documents"""
        context_parts = []
        
        for i, doc in enumerate(retrieved_docs):
            context_parts.append(f"Document {i+1}:\n{doc['content']}\n")
        
        return "\n".join(context_parts)
    
    def _create_prompt(self, query: str, context: str) -> str:
        """Create prompt for the LLM"""
        return f"""You are a helpful AI assistant. Use the following context to answer the user's question. 
        If the context doesn't contain enough information to answer the question, say so.

        Context:
        {context}

        Question: {query}

        Answer:"""
    
    def _call_llm(self, prompt: str) -> str:
        """Call the language model"""
        try:
            response = self.llm_client.chat.completions.create(
                model=self.model_name,
                messages=[
                    {"role": "system", "content": "You are a helpful AI assistant."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=1000,
                temperature=0.7
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"Error generating response: {str(e)}"

Step 5: Complete RAG System Integration

Let's put it all together:

class RAGSystem:
    def __init__(self, 
                 vector_store: VectorStore,
                 retriever: Retriever,
                 generator: RAGGenerator):
        self.vector_store = vector_store
        self.retriever = retriever
        self.generator = generator
    
    def add_documents(self, documents: List[Document]):
        """Add documents to the system"""
        self.vector_store.add_documents(documents)
    
    def query(self, question: str, k: int = 5) -> Dict:
        """Process a query through the complete RAG pipeline"""
        
        # Step 1: Retrieve relevant documents
        retrieved_docs = self.retriever.retrieve(question, k)
        
        # Step 2: Generate response
        response = self.generator.generate_response(question, retrieved_docs)
        
        # Step 3: Prepare result
        result = {
            "question": question,
            "response": response,
            "sources": [doc["metadata"] for doc in retrieved_docs],
            "retrieved_documents": retrieved_docs
        }
        
        return result
    
    def save_system(self, base_path: str):
        """Save the complete RAG system"""
        self.vector_store.save(f"{base_path}_vectors.pkl")
        # Save other components as needed
    
    def load_system(self, base_path: str):
        """Load the complete RAG system"""
        self.vector_store.load(f"{base_path}_vectors.pkl")
        # Load other components as needed

Usage Example

Here's how to use the complete RAG system:

# Initialize components
processor = DocumentProcessor()
vector_store = VectorStore()
retriever = Retriever(vector_store)
generator = RAGGenerator(openai_client)

# Create RAG system
rag_system = RAGSystem(vector_store, retriever, generator)

# Process documents
documents = []
for file_path in document_files:
    if file_path.endswith('.pdf'):
        doc = processor.process_pdf(file_path)
    elif file_path.endswith('.md'):
        doc = processor.process_markdown(file_path)
    documents.append(doc)

# Add documents to system
rag_system.add_documents(documents)

# Query the system
result = rag_system.query("What are the key benefits of RAG?")
print(result["response"])

Advanced Features and Optimizations

1. Hybrid Search

Combine dense vector search with traditional keyword search:

class HybridRetriever:
    def __init__(self, vector_store: VectorStore, keyword_index):
        self.vector_store = vector_store
        self.keyword_index = keyword_index
    
    def hybrid_search(self, query: str, k: int = 10, alpha: float = 0.5):
        # Vector search
        vector_results = self.vector_store.search(query, k)
        
        # Keyword search
        keyword_results = self.keyword_index.search(query, k)
        
        # Combine results
        combined = self._combine_results(vector_results, keyword_results, alpha)
        return combined

2. Context Window Optimization

Implement sliding window and hierarchical chunking:

class AdaptiveChunker:
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens
    
    def create_adaptive_chunks(self, text: str) -> List[str]:
        # Implement semantic chunking based on content structure
        # Use NLP techniques to find natural break points
        pass

3. Response Quality Metrics

Track and improve response quality:

class RAGMetrics:
    def __init__(self):
        self.metrics = {}
    
    def calculate_relevance_score(self, query: str, retrieved_docs: List[Dict]) -> float:
        # Implement relevance scoring
        pass
    
    def calculate_faithfulness_score(self, response: str, retrieved_docs: List[Dict]) -> float:
        # Implement faithfulness scoring
        pass

Production Considerations

1. Scalability

  • Use distributed vector databases like Pinecone or Weaviate
  • Implement caching layers for frequently accessed embeddings
  • Use async processing for document ingestion

2. Monitoring and Observability

  • Track retrieval quality metrics
  • Monitor response generation times
  • Implement A/B testing for different retrieval strategies

3. Security and Privacy

  • Implement document access controls
  • Use secure embedding models
  • Audit document sources and usage

Conclusion

Building RAG from scratch gives you complete control over every aspect of the system. This implementation provides:

  • Flexibility: Customize each component for your specific use case
  • Transparency: Full visibility into the retrieval and generation process
  • Scalability: Build upon this foundation for production systems
  • Learning: Deep understanding of RAG principles and trade-offs

The key to successful RAG implementation is iterative improvement. Start with this basic framework, measure performance, and gradually add advanced features like reranking, hybrid search, and quality metrics.

Remember that RAG is not just about technology—it's about creating systems that provide accurate, helpful, and trustworthy information to users. The quality of your document processing, the relevance of your retrieval, and the coherence of your generation all contribute to the user experience.

Happy building!