Use Case: Email Marketing Platform Customer Support
Imagine you’re building an email marketing platform that serves thousands of customers sending millions of emails daily. Your support team is overwhelmed with questions about:
- Platform Features: “How do I set up A/B testing for my campaigns?”
- API Documentation: “What’s the correct endpoint for bulk email sending?”
- Best Practices: “What’s the optimal send time for my target audience?”
- Troubleshooting: “Why are my emails going to spam?”
- Billing Questions: “How does pricing work for high-volume senders?”
Your knowledge base contains hundreds of documentation pages, API guides, blog posts, and support articles. Traditional keyword search often returns irrelevant results, and your support team can’t scale fast enough to handle the growing volume of customer inquiries.
The Challenge: You need an intelligent system that can:
- Answer customer questions instantly using your entire knowledge base
- Scale to handle thousands of concurrent queries
- Provide accurate, context-aware responses
- Reduce support ticket volume by 60-80%
- Maintain low latency (<2 seconds) for real-time chat support
The Solution: A Retrieval-Augmented Generation (RAG) system that combines your knowledge base with large language models to provide intelligent, accurate answers at scale.
Retrieval-Augmented Generation (RAG) combines the power of large language models with external knowledge bases to provide accurate, context-aware responses. Building RAG systems that can scale to handle millions of queries while maintaining low latency and cost efficiency requires careful architecture design. This guide explores how to build a production-ready RAG system on AWS that can scale horizontally and handle enterprise workloads.
Prerequisites
Before getting started, ensure you have:
- AWS Account Setup:
# Configure AWS CLI aws configure # Enter your AWS Access Key ID # Enter your AWS Secret Access Key # Enter your default region (e.g., us-east-1) # Enter your output format (json) - Required AWS Services:
- Amazon Bedrock (for LLM inference and embeddings)
- Amazon OpenSearch Serverless (for vector storage)
- Amazon S3 (for document storage)
- AWS Lambda (for processing)
- Amazon API Gateway (for API endpoints)
- Amazon ElastiCache (for caching)
- Python Environment:
# Create virtual environment python3 -m venv rag-env source rag-env/bin/activate # Install required packages pip install boto3 langchain langchain-aws opensearch-py numpy
Core Architecture Components
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ Client Applications │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ API Gateway │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Lambda (RAG Query Handler) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Embedding │ │ Vector │ │ LLM │ │
│ │ Generation │─▶│ Search │─▶│ Inference │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────┬───────────────────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Bedrock │ │ OpenSearch │ │ ElastiCache │
│ (Embeddings)│ │ (Vectors) │ │ (Cache) │
└──────────────┘ └──────────────┘ └──────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Document Ingestion Pipeline │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ S3 │─▶│ Lambda │─▶│ Textract │ │
│ │ (Documents) │ │ (Processing)│ │ (Extraction)│ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
Initial Setup
1. Amazon Bedrock Configuration
First, enable the required models in Amazon Bedrock:
# bedrock_config.py
import boto3
from botocore.exceptions import ClientError
class BedrockConfig:
def __init__(self):
self.bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1')
self.bedrock = boto3.client('bedrock', region_name='us-east-1')
def check_model_access(self, model_id):
"""Check if model is accessible"""
try:
response = self.bedrock.get_foundation_model(
modelIdentifier=model_id
)
return response['modelDetails']['modelStatus'] == 'ACTIVE'
except ClientError as e:
print(f"Error checking model access: {e}")
return False
def enable_models(self):
"""Enable required models"""
models = [
'amazon.titan-embed-text-v1',
'anthropic.claude-3-sonnet-20240229-v1:0'
]
for model_id in models:
if self.check_model_access(model_id):
print(f"Model {model_id} is accessible")
else:
print(f"Model {model_id} needs to be enabled in Bedrock console")
2. OpenSearch Serverless Setup
Create an OpenSearch Serverless collection for vector storage:
# opensearch_config.py
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from aws_requests_auth.aws_auth import AWSRequestsAuth
class OpenSearchConfig:
def __init__(self, collection_endpoint, region='us-east-1'):
self.collection_endpoint = collection_endpoint
self.region = region
self.client = self._create_client()
def _create_client(self):
"""Create OpenSearch client with AWS authentication"""
credentials = boto3.Session().get_credentials()
awsauth = AWSRequestsAuth(
credentials,
self.region,
'aoss'
)
client = OpenSearch(
hosts=[{'host': self.collection_endpoint.replace('https://', ''), 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
return client
def create_vector_index(self, index_name, dimension=1536):
"""Create vector index for embeddings"""
index_body = {
"settings": {
"index": {
"knn": True,
"knn.algo_param.ef_search": 100
}
},
"mappings": {
"properties": {
"embedding": {
"type": "knn_vector",
"dimension": dimension,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "nmslib",
"parameters": {
"ef_construction": 128,
"m": 24
}
}
},
"text": {
"type": "text"
},
"metadata": {
"type": "object"
}
}
}
}
try:
if not self.client.indices.exists(index=index_name):
self.client.indices.create(index=index_name, body=index_body)
print(f"Index {index_name} created successfully")
else:
print(f"Index {index_name} already exists")
except Exception as e:
print(f"Error creating index: {e}")
Document Ingestion Pipeline
1. Document Processing
# document_processor.py
import boto3
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_aws import BedrockEmbeddings
import json
class DocumentProcessor:
def __init__(self):
self.s3 = boto3.client('s3')
self.textract = boto3.client('textract')
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
self.embeddings = BedrockEmbeddings(
model_id='amazon.titan-embed-text-v1',
region_name='us-east-1'
)
def extract_text_from_pdf(self, s3_bucket, s3_key):
"""Extract text from PDF using Textract"""
try:
response = self.textract.start_document_text_detection(
DocumentLocation={
'S3Object': {
'Bucket': s3_bucket,
'Name': s3_key
}
}
)
job_id = response['JobId']
# Wait for job to complete
import time
while True:
status = self.textract.get_document_text_detection(JobId=job_id)
if status['JobStatus'] == 'SUCCEEDED':
break
elif status['JobStatus'] == 'FAILED':
raise Exception("Textract job failed")
time.sleep(2)
# Extract text blocks
text_blocks = []
next_token = None
while True:
if next_token:
response = self.textract.get_document_text_detection(
JobId=job_id,
NextToken=next_token
)
else:
response = self.textract.get_document_text_detection(JobId=job_id)
for block in response.get('Blocks', []):
if block['BlockType'] == 'LINE':
text_blocks.append(block['Text'])
next_token = response.get('NextToken')
if not next_token:
break
return '\n'.join(text_blocks)
except Exception as e:
print(f"Error extracting text: {e}")
return None
def process_document(self, s3_bucket, s3_key, document_id):
"""Process document and create chunks with embeddings"""
try:
# Extract text
text = self.extract_text_from_pdf(s3_bucket, s3_key)
if not text:
return None
# Split into chunks
chunks = self.text_splitter.split_text(text)
# Generate embeddings
documents = []
for i, chunk in enumerate(chunks):
embedding = self.embeddings.embed_query(chunk)
documents.append({
'id': f"{document_id}_chunk_{i}",
'text': chunk,
'embedding': embedding,
'metadata': {
'document_id': document_id,
'chunk_index': i,
's3_bucket': s3_bucket,
's3_key': s3_key
}
})
return documents
except Exception as e:
print(f"Error processing document: {e}")
return None
2. Vector Store Integration
# vector_store.py
from opensearch_config import OpenSearchConfig
import json
class VectorStore:
def __init__(self, collection_endpoint):
self.opensearch = OpenSearchConfig(collection_endpoint)
self.index_name = 'rag-documents'
self.opensearch.create_vector_index(self.index_name)
def index_documents(self, documents):
"""Index documents with embeddings"""
try:
for doc in documents:
self.opensearch.client.index(
index=self.index_name,
id=doc['id'],
body={
'text': doc['text'],
'embedding': doc['embedding'],
'metadata': doc['metadata']
}
)
self.opensearch.client.indices.refresh(index=self.index_name)
print(f"Indexed {len(documents)} documents")
except Exception as e:
print(f"Error indexing documents: {e}")
def search_similar(self, query_embedding, top_k=5):
"""Search for similar documents"""
try:
query = {
"size": top_k,
"query": {
"knn": {
"embedding": {
"vector": query_embedding,
"k": top_k
}
}
},
"_source": ["text", "metadata"]
}
response = self.opensearch.client.search(
index=self.index_name,
body=query
)
results = []
for hit in response['hits']['hits']:
results.append({
'text': hit['_source']['text'],
'metadata': hit['_source']['metadata'],
'score': hit['_score']
})
return results
except Exception as e:
print(f"Error searching: {e}")
return []
RAG Query Handler
1. Retrieval and Generation
# rag_handler.py
import boto3
import json
from langchain_aws import BedrockLLM
from vector_store import VectorStore
from langchain_aws import BedrockEmbeddings
class RAGHandler:
def __init__(self, opensearch_endpoint):
self.vector_store = VectorStore(opensearch_endpoint)
self.embeddings = BedrockEmbeddings(
model_id='amazon.titan-embed-text-v1',
region_name='us-east-1'
)
self.llm = BedrockLLM(
model_id='anthropic.claude-3-sonnet-20240229-v1:0',
region_name='us-east-1',
model_kwargs={
"max_tokens": 2048,
"temperature": 0.7
}
)
self.cache = {} # Simple in-memory cache (use ElastiCache in production)
def retrieve_context(self, query, top_k=5):
"""Retrieve relevant context for query"""
try:
# Generate query embedding
query_embedding = self.embeddings.embed_query(query)
# Search for similar documents
results = self.vector_store.search_similar(query_embedding, top_k=top_k)
# Combine context
context = "\n\n".join([r['text'] for r in results])
return context, results
except Exception as e:
print(f"Error retrieving context: {e}")
return None, []
def generate_response(self, query, context):
"""Generate response using LLM with context"""
try:
prompt = f"""Use the following context to answer the question. If the context doesn't contain enough information, say so.
Context:
{context}
Question: {query}
Answer:"""
response = self.llm.invoke(prompt)
return response
except Exception as e:
print(f"Error generating response: {e}")
return None
def query(self, user_query, use_cache=True):
"""Main query handler with caching"""
try:
# Check cache
if use_cache and user_query in self.cache:
return self.cache[user_query]
# Retrieve context
context, sources = self.retrieve_context(user_query)
if not context:
return {
'answer': "I couldn't find relevant information to answer your question.",
'sources': []
}
# Generate response
answer = self.generate_response(user_query, context)
result = {
'answer': answer,
'sources': [s['metadata'] for s in sources]
}
# Cache result
if use_cache:
self.cache[user_query] = result
return result
except Exception as e:
print(f"Error processing query: {e}")
return {
'answer': "An error occurred while processing your query.",
'sources': []
}
2. Lambda Function Implementation
# lambda_function.py
import json
from rag_handler import RAGHandler
import os
def lambda_handler(event, context):
"""Lambda handler for RAG queries"""
try:
# Initialize RAG handler
opensearch_endpoint = os.environ['OPENSEARCH_ENDPOINT']
rag_handler = RAGHandler(opensearch_endpoint)
# Parse request
body = json.loads(event['body'])
query = body.get('query')
if not query:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Query parameter is required'})
}
# Process query
result = rag_handler.query(query)
return {
'statusCode': 200,
'body': json.dumps(result),
'headers': {
'Content-Type': 'application/json'
}
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
Scaling Strategies
1. Horizontal Scaling with Lambda
# scaling_config.py
import boto3
class ScalingConfig:
def __init__(self):
self.lambda_client = boto3.client('lambda')
def configure_concurrency(self, function_name, reserved_concurrent_executions=100):
"""Configure Lambda concurrency limits"""
try:
self.lambda_client.put_function_concurrency(
FunctionName=function_name,
ReservedConcurrentExecutions=reserved_concurrent_executions
)
print(f"Set concurrency limit to {reserved_concurrent_executions}")
except Exception as e:
print(f"Error configuring concurrency: {e}")
def configure_provisioned_concurrency(self, function_name, alias_name, provisioned_count=10):
"""Configure provisioned concurrency for predictable performance"""
try:
self.lambda_client.put_provisioned_concurrency_config(
FunctionName=function_name,
Qualifier=alias_name,
ProvisionedConcurrentExecutions=provisioned_count
)
print(f"Set provisioned concurrency to {provisioned_count}")
except Exception as e:
print(f"Error configuring provisioned concurrency: {e}")
2. Caching with ElastiCache
# cache_manager.py
import boto3
import json
import hashlib
import redis
from redis.cluster import RedisCluster
class CacheManager:
def __init__(self, redis_endpoint, use_cluster=False):
if use_cluster:
self.client = RedisCluster(
startup_nodes=[{"host": redis_endpoint, "port": 6379}],
decode_responses=True
)
else:
self.client = redis.Redis(
host=redis_endpoint,
port=6379,
decode_responses=True
)
self.ttl = 3600 # 1 hour
def get_cache_key(self, query):
"""Generate cache key from query"""
return hashlib.md5(query.encode()).hexdigest()
def get(self, query):
"""Get cached result"""
try:
key = self.get_cache_key(query)
cached = self.client.get(key)
if cached:
return json.loads(cached)
return None
except Exception as e:
print(f"Error getting from cache: {e}")
return None
def set(self, query, result):
"""Cache result"""
try:
key = self.get_cache_key(query)
self.client.setex(
key,
self.ttl,
json.dumps(result)
)
except Exception as e:
print(f"Error setting cache: {e}")
3. Async Processing with SQS
# async_processor.py
import boto3
import json
from rag_handler import RAGHandler
class AsyncProcessor:
def __init__(self, queue_url, opensearch_endpoint):
self.sqs = boto3.client('sqs')
self.queue_url = queue_url
self.rag_handler = RAGHandler(opensearch_endpoint)
def process_queue(self):
"""Process messages from SQS queue"""
while True:
try:
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
messages = response.get('Messages', [])
if not messages:
continue
for message in messages:
try:
body = json.loads(message['Body'])
query = body['query']
callback_url = body.get('callback_url')
# Process query
result = self.rag_handler.query(query)
# Send result to callback URL if provided
if callback_url:
self._send_callback(callback_url, result)
# Delete message
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception as e:
print(f"Error processing message: {e}")
# Move to DLQ or retry
except Exception as e:
print(f"Error receiving messages: {e}")
Cost Optimization
1. Embedding Caching
# embedding_cache.py
from cache_manager import CacheManager
class EmbeddingCache:
def __init__(self, redis_endpoint):
self.cache = CacheManager(redis_endpoint)
self.embeddings = None
def get_embedding(self, text):
"""Get embedding with caching"""
cached = self.cache.get(f"embedding:{text}")
if cached:
return cached
# Generate embedding
embedding = self.embeddings.embed_query(text)
# Cache it
self.cache.set(f"embedding:{text}", embedding)
return embedding
2. Batch Processing
# batch_processor.py
from langchain_aws import BedrockEmbeddings
class BatchProcessor:
def __init__(self):
self.embeddings = BedrockEmbeddings(
model_id='amazon.titan-embed-text-v1',
region_name='us-east-1'
)
def batch_embed(self, texts, batch_size=25):
"""Generate embeddings in batches"""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_embeddings = self.embeddings.embed_documents(batch)
all_embeddings.extend(batch_embeddings)
return all_embeddings
3. Cost Monitoring
# cost_monitor.py
import boto3
from datetime import datetime, timedelta
class CostMonitor:
def __init__(self):
self.ce = boto3.client('ce')
def get_bedrock_costs(self, start_date, end_date):
"""Get Bedrock costs for date range"""
try:
response = self.ce.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
Filter={
'Dimensions': {
'Key': 'SERVICE',
'Values': ['Amazon Bedrock']
}
}
)
total_cost = sum(
float(day['Total']['UnblendedCost']['Amount'])
for day in response['ResultsByTime']
)
return total_cost
except Exception as e:
print(f"Error getting costs: {e}")
return None
Monitoring & Observability
1. CloudWatch Metrics
# metrics.py
import boto3
from datetime import datetime
class MetricsCollector:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = 'RAG/System'
def record_latency(self, operation, latency_ms):
"""Record operation latency"""
try:
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
'MetricName': f'{operation}Latency',
'Value': latency_ms,
'Unit': 'Milliseconds',
'Timestamp': datetime.utcnow()
}
]
)
except Exception as e:
print(f"Error recording metric: {e}")
def record_query_count(self, status='success'):
"""Record query count"""
try:
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
'MetricName': 'QueryCount',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{
'Name': 'Status',
'Value': status
}
],
'Timestamp': datetime.utcnow()
}
]
)
except Exception as e:
print(f"Error recording metric: {e}")
2. Performance Monitoring
# performance_monitor.py
import time
from functools import wraps
from metrics import MetricsCollector
class PerformanceMonitor:
def __init__(self):
self.metrics = MetricsCollector()
def monitor(self, operation_name):
"""Decorator to monitor function performance"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
status = 'success'
return result
except Exception as e:
status = 'error'
raise
finally:
latency_ms = (time.time() - start_time) * 1000
self.metrics.record_latency(operation_name, latency_ms)
self.metrics.record_query_count(status)
return wrapper
return decorator
Best Practices
1. Chunking Strategy
# chunking_strategy.py
from langchain.text_splitter import RecursiveCharacterTextSplitter
class ChunkingStrategy:
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " ", ""]
)
def chunk_document(self, text, metadata=None):
"""Chunk document with metadata preservation"""
chunks = self.text_splitter.split_text(text)
chunked_documents = []
for i, chunk in enumerate(chunks):
chunk_metadata = metadata.copy() if metadata else {}
chunk_metadata['chunk_index'] = i
chunk_metadata['total_chunks'] = len(chunks)
chunked_documents.append({
'text': chunk,
'metadata': chunk_metadata
})
return chunked_documents
2. Query Optimization
# query_optimizer.py
class QueryOptimizer:
def __init__(self):
self.min_score_threshold = 0.7
def optimize_query(self, query):
"""Optimize query for better retrieval"""
# Remove stop words (simplified)
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
words = query.lower().split()
optimized = [w for w in words if w not in stop_words]
return ' '.join(optimized) if optimized else query
def filter_results(self, results, min_score=None):
"""Filter results by relevance score"""
threshold = min_score or self.min_score_threshold
return [r for r in results if r.get('score', 0) >= threshold]
3. Error Handling
# error_handler.py
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ErrorHandler:
@staticmethod
def retry_on_failure(max_retries=3):
"""Decorator for retrying failed operations"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"Failed after {max_retries} attempts: {e}")
raise
logger.warning(f"Attempt {attempt + 1} failed: {e}, retrying...")
time.sleep(2 ** attempt) # Exponential backoff
return wrapper
return decorator
@staticmethod
def handle_errors(func):
"""Decorator for error handling"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Error in {func.__name__}: {e}")
return {
'error': str(e),
'status': 'error'
}
return wrapper
Complete Example
# complete_rag_system.py
from rag_handler import RAGHandler
from cache_manager import CacheManager
from performance_monitor import PerformanceMonitor
from error_handler import ErrorHandler
class CompleteRAGSystem:
def __init__(self, opensearch_endpoint, redis_endpoint):
self.rag_handler = RAGHandler(opensearch_endpoint)
self.cache = CacheManager(redis_endpoint)
self.monitor = PerformanceMonitor()
@PerformanceMonitor.monitor('query')
@ErrorHandler.handle_errors
def process_query(self, query):
"""Process query with caching and monitoring"""
# Check cache first
cached_result = self.cache.get(query)
if cached_result:
return cached_result
# Process query
result = self.rag_handler.query(query, use_cache=False)
# Cache result
self.cache.set(query, result)
return result
Conclusion
Building a scalable RAG architecture on AWS requires careful consideration of multiple components working together:
- Document Processing: Efficient ingestion and chunking of documents
- Vector Storage: Fast and scalable vector search with OpenSearch
- Embedding Generation: Cost-effective embedding generation with caching
- LLM Inference: Reliable and performant LLM access through Bedrock
- Scaling: Horizontal scaling with Lambda and async processing
- Caching: Multi-layer caching to reduce costs and latency
- Monitoring: Comprehensive observability for performance tracking
Key takeaways:
- Use OpenSearch Serverless for scalable vector storage
- Implement multi-layer caching (ElastiCache + embedding cache)
- Leverage Lambda for serverless scaling
- Monitor costs and optimize embedding generation
- Implement proper error handling and retry logic
By following these patterns and best practices, you can build a RAG system that scales to handle millions of queries while maintaining low latency and cost efficiency.