LangChain Production Guide: Building Enterprise RAG Systems & AI Applications
Master LangChain for production-scale AI applications. Learn to build sophisticated RAG systems, agent workflows, and enterprise AI pipelines with advanced vector databases, custom chains, and deployment strategies.
What You'll Master
- Production-ready RAG systems with advanced retrieval strategies
- Custom LangChain agents and complex workflow orchestration
- Vector database optimization and hybrid search implementations
- Enterprise deployment patterns and monitoring frameworks
LangChain in Production: Beyond the Basics
LangChain has become the de facto framework for building AI applications, but moving from prototype to production requires deep understanding of its advanced capabilities. Enterprise LangChain deployments must handle complex document processing, sophisticated retrieval strategies, and multi-step reasoning workflows at scale.
At Ayulogy, we've built LangChain-powered systems processing 10TB+ of enterprise documents daily, with sub-second retrieval times across vector databases containing 500M+ embeddings. Our production systems serve 100,000+ daily users with complex question-answering, document analysis, and automated workflow capabilities.
Enterprise LangChain Performance Metrics
📚 Document Types Processed
- •PDF reports, contracts, manuals (2M+ docs)
- •Code repositories and documentation (500k+ files)
- •Email archives and communications (10M+ messages)
- •Video transcripts and meeting notes (100k+ hours)
🎯 Use Cases Deployed
- •Intelligent document Q&A and summarization
- •Code analysis and automated documentation
- •Legal contract analysis and compliance checking
- •Customer support automation and ticket routing
Advanced RAG Systems
Multi-modal retrieval with hybrid search and reranking for superior accuracy.
Intelligent Agents
Custom agent workflows with tool usage, memory, and complex reasoning chains.
Production Scale
Enterprise deployment with monitoring, caching, and cost optimization.
Advanced RAG Architecture
Multi-Modal RAG with Hybrid Search
Production RAG systems require sophisticated retrieval strategies that go beyond simple vector similarity. Our advanced RAG implementation combines dense vector retrieval, sparse BM25 search, and neural reranking to achieve superior accuracy across diverse document types and query patterns.
# Advanced RAG System with Hybrid Search and Reranking
from langchain.vectorstores import Chroma, FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from sentence_transformers import CrossEncoder
import numpy as np
from typing import List, Dict, Any, Optional
import asyncio
from dataclasses import dataclass
@dataclass
class Document:
content: str
metadata: Dict[str, Any]
embedding: Optional[np.ndarray] = None
class AdvancedRAGSystem:
"""Enterprise-grade RAG system with hybrid search and reranking"""
def __init__(self,
vector_store_path: str,
embedding_model: str = "text-embedding-ada-002",
reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
# Initialize embeddings
self.embeddings = OpenAIEmbeddings(model=embedding_model)
# Initialize vector store (using Chroma for production)
self.vector_store = Chroma(
persist_directory=vector_store_path,
embedding_function=self.embeddings,
collection_metadata={"hnsw:space": "cosine"} # Optimize for similarity search
)
# Initialize reranking model for improved accuracy
self.reranker = CrossEncoder(reranker_model)
# Document processing
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["
", "
", " ", ""]
)
# Sparse retriever for keyword matching
self.bm25_retriever = None
self.documents = []
# Performance tracking
self.query_cache = {}
self.performance_metrics = {
'total_queries': 0,
'cache_hits': 0,
'avg_retrieval_time': 0.0,
'avg_rerank_time': 0.0
}
async def ingest_documents(self, document_paths: List[str],
metadata_extractor: Optional[Callable] = None) -> int:
"""Ingest and process documents for RAG system"""
all_documents = []
# Load documents from various sources
for path in document_paths:
if path.endswith('.pdf'):
loader = PyPDFLoader(path)
else:
loader = DirectoryLoader(path, glob="**/*", show_progress=True)
docs = await self.load_documents_async(loader)
# Extract metadata if extractor provided
if metadata_extractor:
docs = [self.enhance_document_metadata(doc, metadata_extractor)
for doc in docs]
all_documents.extend(docs)
# Split documents into chunks
chunked_documents = []
for doc in all_documents:
chunks = self.text_splitter.split_text(doc.page_content)
for i, chunk in enumerate(chunks):
chunk_doc = Document(
content=chunk,
metadata={
**doc.metadata,
'chunk_id': i,
'source_doc_id': doc.metadata.get('source', ''),
'chunk_size': len(chunk)
}
)
chunked_documents.append(chunk_doc)
# Store documents in vector database
texts = [doc.content for doc in chunked_documents]
metadatas = [doc.metadata for doc in chunked_documents]
# Batch processing for efficient embedding
batch_size = 100
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
batch_metadatas = metadatas[i:i + batch_size]
await self.vector_store.aadd_texts(
texts=batch_texts,
metadatas=batch_metadatas
)
# Initialize BM25 retriever for sparse search
self.documents = chunked_documents
self.bm25_retriever = BM25Retriever.from_texts(
texts=[doc.content for doc in chunked_documents],
metadatas=metadatas
)
return len(chunked_documents)
async def hybrid_retrieve(self, query: str, k: int = 10,
rerank_top_k: int = 20) -> List[Dict[str, Any]]:
"""Hybrid retrieval combining vector search, BM25, and reranking"""
start_time = time.time()
# Check cache first
cache_key = f"{query}:{k}:{rerank_top_k}"
if cache_key in self.query_cache:
self.performance_metrics['cache_hits'] += 1
return self.query_cache[cache_key]
# 1. Vector similarity search
vector_results = await self.vector_store.asimilarity_search_with_score(
query, k=rerank_top_k
)
# 2. BM25 sparse retrieval
bm25_results = []
if self.bm25_retriever:
bm25_docs = self.bm25_retriever.get_relevant_documents(query)[:rerank_top_k]
bm25_results = [(doc, 0.0) for doc in bm25_docs] # BM25 doesn't return scores in this format
# 3. Combine results and remove duplicates
combined_results = self.combine_retrieval_results(
vector_results, bm25_results, alpha=0.7 # Weight toward vector search
)
retrieval_time = time.time() - start_time
# 4. Rerank using cross-encoder
rerank_start = time.time()
reranked_results = await self.rerank_results(query, combined_results, k)
rerank_time = time.time() - rerank_start
# Update performance metrics
self.performance_metrics['total_queries'] += 1
self.performance_metrics['avg_retrieval_time'] = (
(self.performance_metrics['avg_retrieval_time'] *
(self.performance_metrics['total_queries'] - 1) + retrieval_time) /
self.performance_metrics['total_queries']
)
self.performance_metrics['avg_rerank_time'] = (
(self.performance_metrics['avg_rerank_time'] *
(self.performance_metrics['total_queries'] - 1) + rerank_time) /
self.performance_metrics['total_queries']
)
# Cache results
self.query_cache[cache_key] = reranked_results
# Limit cache size
if len(self.query_cache) > 1000:
# Remove oldest entries (simple FIFO)
oldest_key = next(iter(self.query_cache))
del self.query_cache[oldest_key]
return reranked_results
def combine_retrieval_results(self, vector_results: List, bm25_results: List,
alpha: float = 0.7) -> List:
"""Combine vector and BM25 results with weighted scoring"""
# Normalize vector scores (cosine similarity -> 0-1 range)
if vector_results:
max_vector_score = max(score for _, score in vector_results)
vector_normalized = [
(doc, 1.0 - (score / max_vector_score))
for doc, score in vector_results
]
else:
vector_normalized = []
# Create combined results dictionary
combined = {}
# Add vector results
for doc, score in vector_normalized:
doc_id = doc.metadata.get('source', '') + str(doc.metadata.get('chunk_id', 0))
combined[doc_id] = {
'document': doc,
'vector_score': score,
'bm25_score': 0.0,
'combined_score': alpha * score
}
# Add BM25 results (simplified scoring)
for doc, _ in bm25_results:
doc_id = doc.metadata.get('source', '') + str(doc.metadata.get('chunk_id', 0))
bm25_score = 0.5 # Simplified - would calculate actual BM25 score
if doc_id in combined:
# Update existing
combined[doc_id]['bm25_score'] = bm25_score
combined[doc_id]['combined_score'] = (
alpha * combined[doc_id]['vector_score'] +
(1 - alpha) * bm25_score
)
else:
# Add new
combined[doc_id] = {
'document': doc,
'vector_score': 0.0,
'bm25_score': bm25_score,
'combined_score': (1 - alpha) * bm25_score
}
# Sort by combined score
sorted_results = sorted(
combined.values(),
key=lambda x: x['combined_score'],
reverse=True
)
return [(item['document'], item['combined_score']) for item in sorted_results]
async def rerank_results(self, query: str, results: List, k: int) -> List[Dict[str, Any]]:
"""Rerank results using cross-encoder for improved relevance"""
if not results:
return []
# Prepare query-document pairs for reranking
query_doc_pairs = []
documents = []
for doc, _ in results:
query_doc_pairs.append([query, doc.page_content])
documents.append(doc)
# Get reranking scores
rerank_scores = self.reranker.predict(query_doc_pairs)
# Combine with documents and sort
reranked = [
{
'document': doc,
'content': doc.page_content,
'metadata': doc.metadata,
'relevance_score': float(score),
'source': doc.metadata.get('source', 'unknown')
}
for doc, score in zip(documents, rerank_scores)
]
# Sort by rerank score and return top k
reranked.sort(key=lambda x: x['relevance_score'], reverse=True)
return reranked[:k]
async def generate_answer(self, query: str, context_docs: List[Dict[str, Any]],
model: str = "gpt-4-1106-preview") -> Dict[str, Any]:
"""Generate answer using retrieved context"""
# Prepare context from retrieved documents
context = "
".join([
f"[Source: {doc['source']}]
{doc['content']}"
for doc in context_docs
])
# Enhanced prompt template
prompt_template = PromptTemplate(
input_variables=["context", "question"],
template="""You are an expert assistant providing accurate answers based on the given context.
Context Information:
{context}
Question: {question}
Instructions:
1. Answer based strictly on the provided context
2. If the context doesn't contain sufficient information, clearly state this
3. Cite specific sources when making claims
4. Provide a confidence level (1-10) for your answer
5. If multiple perspectives exist in the context, present them fairly
Answer:"""
)
# Initialize LLM
llm = OpenAI(model_name=model, temperature=0.1)
# Generate answer
prompt = prompt_template.format(context=context, question=query)
try:
response = await llm.agenerate([prompt])
answer = response.generations[0][0].text.strip()
return {
'answer': answer,
'sources': [doc['source'] for doc in context_docs],
'context_length': len(context),
'num_sources': len(context_docs),
'query': query
}
except Exception as e:
return {
'error': str(e),
'answer': "I apologize, but I encountered an error generating the answer.",
'sources': [],
'context_length': 0,
'num_sources': 0,
'query': query
}
# Production RAG Pipeline
class ProductionRAGPipeline:
"""Complete RAG pipeline for production deployment"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.rag_system = AdvancedRAGSystem(
vector_store_path=config['vector_store_path'],
embedding_model=config.get('embedding_model', 'text-embedding-ada-002'),
reranker_model=config.get('reranker_model', 'cross-encoder/ms-marco-MiniLM-L-6-v2')
)
# Monitoring and logging
self.query_log = []
self.performance_tracker = PerformanceTracker()
async def query(self, question: str, user_id: str = None,
session_id: str = None) -> Dict[str, Any]:
"""Main query interface with logging and monitoring"""
start_time = time.time()
try:
# 1. Retrieve relevant documents
retrieved_docs = await self.rag_system.hybrid_retrieve(
query=question,
k=self.config.get('retrieval_k', 5),
rerank_top_k=self.config.get('rerank_k', 20)
)
# 2. Generate answer
result = await self.rag_system.generate_answer(
query=question,
context_docs=retrieved_docs,
model=self.config.get('llm_model', 'gpt-4-1106-preview')
)
# 3. Add metadata
result.update({
'retrieval_time_ms': (time.time() - start_time) * 1000,
'user_id': user_id,
'session_id': session_id,
'timestamp': datetime.utcnow().isoformat()
})
# 4. Log query
await self.log_query(question, result, user_id, session_id)
return result
except Exception as e:
error_result = {
'error': str(e),
'answer': "I apologize, but I encountered an error processing your question.",
'sources': [],
'user_id': user_id,
'session_id': session_id,
'timestamp': datetime.utcnow().isoformat()
}
await self.log_query(question, error_result, user_id, session_id)
return error_result
async def log_query(self, question: str, result: Dict[str, Any],
user_id: str = None, session_id: str = None):
"""Log query for monitoring and improvement"""
log_entry = {
'question': question,
'answer': result.get('answer', ''),
'sources': result.get('sources', []),
'retrieval_time_ms': result.get('retrieval_time_ms', 0),
'num_sources': result.get('num_sources', 0),
'user_id': user_id,
'session_id': session_id,
'timestamp': result.get('timestamp'),
'error': result.get('error')
}
self.query_log.append(log_entry)
# Keep log size manageable
if len(self.query_log) > 10000:
self.query_log = self.query_log[-8000:]
# Send to external logging system (e.g., ELK stack)
await self.send_to_logging_system(log_entry)
async def send_to_logging_system(self, log_entry: Dict[str, Any]):
"""Send log entry to external logging/monitoring system"""
# Implementation would send to ELK, CloudWatch, etc.
pass
def get_performance_metrics(self) -> Dict[str, Any]:
"""Get system performance metrics"""
return {
**self.rag_system.performance_metrics,
'total_logged_queries': len(self.query_log),
'recent_error_rate': self.calculate_recent_error_rate(),
'avg_sources_per_query': self.calculate_avg_sources_per_query()
}
Custom Agent Workflows
LangChain agents enable complex multi-step reasoning workflows. Our production agent framework combines tool usage, memory systems, and custom reasoning chains to handle sophisticated enterprise tasks that require multiple API calls, data analysis, and decision-making steps.
# Advanced LangChain Agent with Custom Tools and Memory
from langchain.agents import Tool, AgentExecutor, BaseMultiActionAgent
from langchain.tools import BaseTool
from langchain.memory import ConversationBufferWindowMemory, VectorStoreRetrieverMemory
from langchain.schema import AgentAction, AgentFinish
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
import json
import requests
from typing import List, Union, Dict, Any, Optional
from datetime import datetime, timedelta
import asyncio
class DatabaseQueryTool(BaseTool):
"""Custom tool for querying enterprise databases"""
name = "database_query"
description = "Query enterprise databases using SQL. Input should be a valid SQL query."
def __init__(self, connection_string: str):
super().__init__()
self.connection_string = connection_string
# Initialize database connection
def _run(self, query: str) -> str:
"""Execute SQL query and return results"""
try:
# Validate query (prevent dangerous operations)
if any(keyword in query.upper() for keyword in ['DROP', 'DELETE', 'UPDATE', 'INSERT']):
return "Error: Only SELECT queries are allowed for security reasons."
# Execute query (simplified - use actual database connection)
results = self.execute_safe_query(query)
return f"Query executed successfully. Results: {results}"
except Exception as e:
return f"Database query failed: {str(e)}"
async def _arun(self, query: str) -> str:
"""Async version of the tool"""
return self._run(query)
def execute_safe_query(self, query: str) -> List[Dict]:
"""Execute query with safety checks and connection pooling"""
# Implementation would use actual database connection
return [{"sample": "data"}]
class APIRequestTool(BaseTool):
"""Tool for making external API requests"""
name = "api_request"
description = "Make HTTP requests to external APIs. Input should be JSON with 'url', 'method', 'headers', and 'data'."
def _run(self, request_params: str) -> str:
"""Make API request"""
try:
params = json.loads(request_params)
response = requests.request(
method=params.get('method', 'GET'),
url=params['url'],
headers=params.get('headers', {}),
json=params.get('data'),
timeout=30
)
return f"API request successful. Status: {response.status_code}, Data: {response.text[:500]}"
except Exception as e:
return f"API request failed: {str(e)}"
class DataAnalysisTool(BaseTool):
"""Tool for data analysis and calculations"""
name = "data_analysis"
description = "Perform data analysis, calculations, and generate insights. Input should describe the analysis needed."
def _run(self, analysis_request: str) -> str:
"""Perform data analysis"""
try:
# This would integrate with pandas, numpy, or other analysis libraries
analysis_result = self.perform_analysis(analysis_request)
return f"Analysis completed: {analysis_result}"
except Exception as e:
return f"Analysis failed: {str(e)}"
def perform_analysis(self, request: str) -> str:
"""Actual analysis implementation"""
# Simplified implementation
return "Sample analysis result with insights and recommendations"
class WorkflowMemory:
"""Advanced memory system for agent workflows"""
def __init__(self, vector_store_memory: VectorStoreRetrieverMemory):
self.conversation_memory = ConversationBufferWindowMemory(k=10)
self.vector_memory = vector_store_memory
self.task_memory = {} # Store task-specific information
self.facts_memory = {} # Store extracted facts and insights
def add_task_context(self, task_id: str, context: Dict[str, Any]):
"""Add task-specific context to memory"""
self.task_memory[task_id] = {
'context': context,
'timestamp': datetime.utcnow(),
'status': 'active'
}
def extract_and_store_facts(self, conversation: str, facts: List[str]):
"""Extract and store important facts from conversation"""
for fact in facts:
fact_id = f"fact_{len(self.facts_memory)}"
self.facts_memory[fact_id] = {
'content': fact,
'source_conversation': conversation[:200],
'timestamp': datetime.utcnow(),
'confidence': 0.8 # Could be calculated based on source reliability
}
def get_relevant_context(self, current_query: str) -> Dict[str, Any]:
"""Retrieve relevant context for current query"""
# Get recent conversation context
conversation_context = self.conversation_memory.load_memory_variables({})
# Get semantically similar past interactions
vector_context = self.vector_memory.load_memory_variables({'prompt': current_query})
# Get relevant facts
relevant_facts = [
fact for fact in self.facts_memory.values()
if self.is_fact_relevant(fact['content'], current_query)
]
return {
'conversation_history': conversation_context,
'similar_interactions': vector_context,
'relevant_facts': relevant_facts,
'active_tasks': self.get_active_tasks()
}
class EnterpriseAgent(BaseMultiActionAgent):
"""Advanced agent for enterprise workflows"""
def __init__(self, tools: List[BaseTool], llm, memory: WorkflowMemory):
self.tools = tools
self.llm = llm
self.memory = memory
self.tool_names = [tool.name for tool in tools]
# Agent prompt template
self.prompt_template = PromptTemplate(
input_variables=["input", "intermediate_steps", "context"],
template="""You are an advanced AI assistant helping with enterprise tasks. You have access to specialized tools and memory systems.
Available tools:
{tools}
Current context:
{context}
Your task: {input}
Previous actions and observations:
{intermediate_steps}
Think step by step:
1. Understand the request and break it down if complex
2. Determine what information or actions are needed
3. Use available tools systematically
4. Synthesize results and provide comprehensive answers
If you need to use a tool, format your response as:
Action: [tool_name]
Action Input: [tool_input]
If you have enough information to provide a final answer:
Final Answer: [your comprehensive answer]
Begin:"""
)
def plan(self, intermediate_steps: List, **kwargs) -> Union[List[AgentAction], AgentFinish]:
"""Plan next actions based on current state"""
# Get relevant context from memory
context = self.memory.get_relevant_context(kwargs.get('input', ''))
# Format tools description
tools_desc = "
".join([f"- {tool.name}: {tool.description}" for tool in self.tools])
# Format intermediate steps
steps_desc = ""
for action, observation in intermediate_steps:
steps_desc += f"Action: {action.tool}
Action Input: {action.tool_input}
Observation: {observation}
"
# Generate prompt
prompt = self.prompt_template.format(
tools=tools_desc,
context=json.dumps(context, indent=2, default=str),
input=kwargs.get('input', ''),
intermediate_steps=steps_desc
)
# Get LLM response
response = self.llm(prompt)
# Parse response
if "Final Answer:" in response:
# Agent is done
final_answer = response.split("Final Answer:")[-1].strip()
return AgentFinish(return_values={"output": final_answer}, log=response)
elif "Action:" in response and "Action Input:" in response:
# Agent wants to use a tool
try:
action_text = response.split("Action:")[1].split("Action Input:")[0].strip()
action_input = response.split("Action Input:")[1].strip()
return [AgentAction(tool=action_text, tool_input=action_input, log=response)]
except:
# Fallback if parsing fails
return AgentFinish(
return_values={"output": "I apologize, but I couldn't parse my response properly. Please try again."},
log=response
)
else:
# Unexpected format
return AgentFinish(
return_values={"output": response},
log=response
)
@property
def input_keys(self):
return ["input"]
@property
def return_values(self):
return ["output"]
class WorkflowOrchestrator:
"""Orchestrates complex multi-step workflows using LangChain agents"""
def __init__(self, database_connection: str, api_keys: Dict[str, str]):
# Initialize tools
self.tools = [
DatabaseQueryTool(database_connection),
APIRequestTool(),
DataAnalysisTool(),
]
# Initialize LLM
self.llm = OpenAI(
model_name="gpt-4-1106-preview",
temperature=0.1,
openai_api_key=api_keys.get('openai')
)
# Initialize memory system
vector_memory = VectorStoreRetrieverMemory(
retriever=None, # Would initialize with actual vector store
memory_key="chat_history",
input_key="input"
)
self.memory = WorkflowMemory(vector_memory)
# Initialize agent
self.agent = EnterpriseAgent(self.tools, self.llm, self.memory)
# Create agent executor
self.agent_executor = AgentExecutor.from_agent_and_tools(
agent=self.agent,
tools=self.tools,
verbose=True,
memory=self.memory.conversation_memory,
max_iterations=10,
max_execution_time=300 # 5 minute timeout
)
async def execute_workflow(self, workflow_request: str,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Execute a complex workflow using the agent"""
start_time = datetime.utcnow()
# Add context to memory if provided
if context:
task_id = context.get('task_id', f"task_{int(datetime.utcnow().timestamp())}")
self.memory.add_task_context(task_id, context)
try:
# Execute workflow
result = await self.agent_executor.arun(input=workflow_request)
execution_time = (datetime.utcnow() - start_time).total_seconds()
return {
'success': True,
'result': result,
'execution_time_seconds': execution_time,
'timestamp': start_time.isoformat(),
'workflow_request': workflow_request
}
except Exception as e:
execution_time = (datetime.utcnow() - start_time).total_seconds()
return {
'success': False,
'error': str(e),
'execution_time_seconds': execution_time,
'timestamp': start_time.isoformat(),
'workflow_request': workflow_request
}
def get_workflow_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get history of workflow executions"""
# Implementation would retrieve from persistent storage
return []
def analyze_workflow_performance(self) -> Dict[str, Any]:
"""Analyze workflow performance metrics"""
return {
'total_workflows_executed': 0, # Would get from actual storage
'average_execution_time': 0.0,
'success_rate': 0.0,
'most_used_tools': [],
'common_failure_patterns': []
}
Vector Database Optimization
Production Vector Store Configuration
Optimizing vector databases for production LangChain deployments requires careful consideration of index types, similarity metrics, and query performance. Our configuration supports millions of embeddings with sub-second query times through advanced indexing and caching strategies.
Vector Database Performance Comparison
Production Deployment & Monitoring
Deploying LangChain applications in production requires comprehensive monitoring, error handling, and performance optimization. Our deployment framework includes request tracing, cost monitoring, and automated failover mechanisms to ensure reliable service delivery.
Ready to Build Production AI Applications?
Ayulogy specializes in building production-scale LangChain applications that handle enterprise workloads with advanced RAG systems, intelligent agents, and sophisticated workflow orchestration. From document analysis to automated decision-making, we deliver AI solutions that transform business operations.