Building RAG from Scratch: A Complete Implementation Guide
Learn how to implement Retrieval-Augmented Generation (RAG) from the ground up, including document processing, vector embeddings, similarity search, and LLM integration.
Building RAG from Scratch: A Complete Implementation Guide
Retrieval-Augmented Generation (RAG) has revolutionized how we build AI applications by combining the power of large language models with external knowledge sources. In this comprehensive guide, we'll implement RAG from scratch, understanding every component and building a production-ready system.
What is RAG?
RAG is an AI framework that enhances language models by retrieving relevant information from external sources before generating responses. This approach addresses several key limitations of traditional LLMs:
- Factual Accuracy: Access to up-to-date, verified information
- Domain Expertise: Specialized knowledge beyond training data
- Transparency: Traceable sources for generated content
- Cost Efficiency: Reduced hallucination and improved response quality
RAG Architecture Overview
A typical RAG system consists of four main components:
- Document Processor: Ingests and preprocesses documents
- Vector Database: Stores document embeddings for similarity search
- Retriever: Finds relevant documents based on queries
- Generator: Produces final responses using retrieved context
Step 1: Document Processing Pipeline
Let's start by building a robust document processing system:
import os
import re
from typing import List, Dict, Any
from dataclasses import dataclass
from pathlib import Path
import fitz # PyMuPDF
from bs4 import BeautifulSoup
import markdown
@dataclass
class Document:
content: str
metadata: Dict[str, Any]
chunks: List[str]
class DocumentProcessor:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def process_pdf(self, file_path: str) -> Document:
"""Extract text from PDF files"""
doc = fitz.open(file_path)
text = ""
for page in doc:
text += page.get_text()
metadata = {
"source": file_path,
"type": "pdf",
"pages": len(doc)
}
return Document(
content=text,
metadata=metadata,
chunks=self._create_chunks(text)
)
def process_markdown(self, file_path: str) -> Document:
"""Process markdown files"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Convert markdown to HTML then extract text
html = markdown.markdown(content)
soup = BeautifulSoup(html, 'html.parser')
text = soup.get_text()
metadata = {
"source": file_path,
"type": "markdown"
}
return Document(
content=text,
metadata=metadata,
chunks=self._create_chunks(text)
)
def _create_chunks(self, text: str) -> List[str]:
"""Create overlapping text chunks for better retrieval"""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - self.chunk_overlap
if start >= len(text):
break
return chunks
Step 2: Vector Embeddings and Storage
Next, we'll implement the vector embedding system using sentence transformers:
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Tuple
import faiss
import pickle
class VectorStore:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.encoder = SentenceTransformer(model_name)
self.index = None
self.documents = []
self.dimension = self.encoder.get_sentence_embedding_dimension()
def add_documents(self, documents: List[Document]):
"""Add documents to the vector store"""
all_chunks = []
all_embeddings = []
for doc in documents:
for i, chunk in enumerate(doc.chunks):
embedding = self.encoder.encode(chunk)
all_embeddings.append(embedding)
chunk_metadata = {
"document_source": doc.metadata["source"],
"chunk_index": i,
"chunk_size": len(chunk),
**doc.metadata
}
all_chunks.append({
"content": chunk,
"metadata": chunk_metadata
})
# Convert to numpy arrays
embeddings_array = np.array(all_embeddings).astype('float32')
# Create FAISS index
if self.index is None:
self.index = faiss.IndexFlatIP(self.dimension)
# Normalize embeddings for cosine similarity
faiss.normalize_L2(embeddings_array)
self.index.add(embeddings_array)
self.documents = all_chunks
def search(self, query: str, k: int = 5) -> List[Tuple[float, Dict]]:
"""Search for similar documents"""
query_embedding = self.encoder.encode(query)
query_embedding = query_embedding.reshape(1, -1).astype('float32')
faiss.normalize_L2(query_embedding)
# Perform similarity search
scores, indices = self.index.search(query_embedding, k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < len(self.documents):
results.append((float(score), self.documents[idx]))
return results
def save(self, filepath: str):
"""Save the vector store to disk"""
with open(filepath, 'wb') as f:
pickle.dump({
'index': self.index,
'documents': self.documents,
'dimension': self.dimension
}, f)
def load(self, filepath: str):
"""Load the vector store from disk"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.index = data['index']
self.documents = data['documents']
self.dimension = data['dimension']
Step 3: Retrieval and Reranking
Implementing intelligent retrieval with reranking for better relevance:
from typing import List, Tuple
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class Retriever:
def __init__(self, vector_store: VectorStore, reranker=None):
self.vector_store = vector_store
self.reranker = reranker
def retrieve(self, query: str, k: int = 10, rerank_k: int = 5) -> List[Dict]:
"""Retrieve and optionally rerank documents"""
# Initial retrieval
initial_results = self.vector_store.search(query, k)
if self.reranker and len(initial_results) > rerank_k:
# Rerank top results for better relevance
reranked_results = self._rerank(query, initial_results[:rerank_k])
return reranked_results
return [{"content": doc["content"], "metadata": doc["metadata"], "score": score}
for score, doc in initial_results]
def _rerank(self, query: str, results: List[Tuple[float, Dict]]) -> List[Dict]:
"""Rerank results using cross-encoder for better relevance"""
if not self.reranker:
return [{"content": doc["content"], "metadata": doc["metadata"], "score": score}
for score, doc in results]
# Prepare pairs for reranking
pairs = [(query, result[1]["content"]) for result in results]
# Get reranking scores
rerank_scores = self.reranker.predict(pairs)
# Combine original scores with rerank scores
combined_results = []
for i, (original_score, doc) in enumerate(results):
combined_score = 0.7 * original_score + 0.3 * rerank_scores[i]
combined_results.append({
"content": doc["content"],
"metadata": doc["metadata"],
"score": combined_score,
"original_score": original_score,
"rerank_score": float(rerank_scores[i])
})
# Sort by combined score
combined_results.sort(key=lambda x: x["score"], reverse=True)
return combined_results
Step 4: LLM Integration and Response Generation
Now let's integrate with language models for response generation:
import openai
from typing import List, Dict
import json
class RAGGenerator:
def __init__(self, llm_client, model_name: str = "gpt-3.5-turbo"):
self.llm_client = llm_client
self.model_name = model_name
def generate_response(self, query: str, retrieved_docs: List[Dict]) -> str:
"""Generate response using retrieved context"""
# Prepare context from retrieved documents
context = self._prepare_context(retrieved_docs)
# Create prompt with context
prompt = self._create_prompt(query, context)
# Generate response
response = self._call_llm(prompt)
return response
def _prepare_context(self, retrieved_docs: List[Dict]) -> str:
"""Prepare context string from retrieved documents"""
context_parts = []
for i, doc in enumerate(retrieved_docs):
context_parts.append(f"Document {i+1}:\n{doc['content']}\n")
return "\n".join(context_parts)
def _create_prompt(self, query: str, context: str) -> str:
"""Create prompt for the LLM"""
return f"""You are a helpful AI assistant. Use the following context to answer the user's question.
If the context doesn't contain enough information to answer the question, say so.
Context:
{context}
Question: {query}
Answer:"""
def _call_llm(self, prompt: str) -> str:
"""Call the language model"""
try:
response = self.llm_client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
{"role": "user", "content": prompt}
],
max_tokens=1000,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
return f"Error generating response: {str(e)}"
Step 5: Complete RAG System Integration
Let's put it all together:
class RAGSystem:
def __init__(self,
vector_store: VectorStore,
retriever: Retriever,
generator: RAGGenerator):
self.vector_store = vector_store
self.retriever = retriever
self.generator = generator
def add_documents(self, documents: List[Document]):
"""Add documents to the system"""
self.vector_store.add_documents(documents)
def query(self, question: str, k: int = 5) -> Dict:
"""Process a query through the complete RAG pipeline"""
# Step 1: Retrieve relevant documents
retrieved_docs = self.retriever.retrieve(question, k)
# Step 2: Generate response
response = self.generator.generate_response(question, retrieved_docs)
# Step 3: Prepare result
result = {
"question": question,
"response": response,
"sources": [doc["metadata"] for doc in retrieved_docs],
"retrieved_documents": retrieved_docs
}
return result
def save_system(self, base_path: str):
"""Save the complete RAG system"""
self.vector_store.save(f"{base_path}_vectors.pkl")
# Save other components as needed
def load_system(self, base_path: str):
"""Load the complete RAG system"""
self.vector_store.load(f"{base_path}_vectors.pkl")
# Load other components as needed
Usage Example
Here's how to use the complete RAG system:
# Initialize components
processor = DocumentProcessor()
vector_store = VectorStore()
retriever = Retriever(vector_store)
generator = RAGGenerator(openai_client)
# Create RAG system
rag_system = RAGSystem(vector_store, retriever, generator)
# Process documents
documents = []
for file_path in document_files:
if file_path.endswith('.pdf'):
doc = processor.process_pdf(file_path)
elif file_path.endswith('.md'):
doc = processor.process_markdown(file_path)
documents.append(doc)
# Add documents to system
rag_system.add_documents(documents)
# Query the system
result = rag_system.query("What are the key benefits of RAG?")
print(result["response"])
Advanced Features and Optimizations
1. Hybrid Search
Combine dense vector search with traditional keyword search:
class HybridRetriever:
def __init__(self, vector_store: VectorStore, keyword_index):
self.vector_store = vector_store
self.keyword_index = keyword_index
def hybrid_search(self, query: str, k: int = 10, alpha: float = 0.5):
# Vector search
vector_results = self.vector_store.search(query, k)
# Keyword search
keyword_results = self.keyword_index.search(query, k)
# Combine results
combined = self._combine_results(vector_results, keyword_results, alpha)
return combined
2. Context Window Optimization
Implement sliding window and hierarchical chunking:
class AdaptiveChunker:
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
def create_adaptive_chunks(self, text: str) -> List[str]:
# Implement semantic chunking based on content structure
# Use NLP techniques to find natural break points
pass
3. Response Quality Metrics
Track and improve response quality:
class RAGMetrics:
def __init__(self):
self.metrics = {}
def calculate_relevance_score(self, query: str, retrieved_docs: List[Dict]) -> float:
# Implement relevance scoring
pass
def calculate_faithfulness_score(self, response: str, retrieved_docs: List[Dict]) -> float:
# Implement faithfulness scoring
pass
Production Considerations
1. Scalability
- Use distributed vector databases like Pinecone or Weaviate
- Implement caching layers for frequently accessed embeddings
- Use async processing for document ingestion
2. Monitoring and Observability
- Track retrieval quality metrics
- Monitor response generation times
- Implement A/B testing for different retrieval strategies
3. Security and Privacy
- Implement document access controls
- Use secure embedding models
- Audit document sources and usage
Conclusion
Building RAG from scratch gives you complete control over every aspect of the system. This implementation provides:
- Flexibility: Customize each component for your specific use case
- Transparency: Full visibility into the retrieval and generation process
- Scalability: Build upon this foundation for production systems
- Learning: Deep understanding of RAG principles and trade-offs
The key to successful RAG implementation is iterative improvement. Start with this basic framework, measure performance, and gradually add advanced features like reranking, hybrid search, and quality metrics.
Remember that RAG is not just about technology—it's about creating systems that provide accurate, helpful, and trustworthy information to users. The quality of your document processing, the relevance of your retrieval, and the coherence of your generation all contribute to the user experience.
Happy building!