RAG Architecture at Scale on AWS


Use Case: Email Marketing Platform Customer Support

Imagine you’re building an email marketing platform that serves thousands of customers sending millions of emails daily. Your support team is overwhelmed with questions about:

  • Platform Features: “How do I set up A/B testing for my campaigns?”
  • API Documentation: “What’s the correct endpoint for bulk email sending?”
  • Best Practices: “What’s the optimal send time for my target audience?”
  • Troubleshooting: “Why are my emails going to spam?”
  • Billing Questions: “How does pricing work for high-volume senders?”

Your knowledge base contains hundreds of documentation pages, API guides, blog posts, and support articles. Traditional keyword search often returns irrelevant results, and your support team can’t scale fast enough to handle the growing volume of customer inquiries.

The Challenge: You need an intelligent system that can:

  • Answer customer questions instantly using your entire knowledge base
  • Scale to handle thousands of concurrent queries
  • Provide accurate, context-aware responses
  • Reduce support ticket volume by 60-80%
  • Maintain low latency (<2 seconds) for real-time chat support

The Solution: A Retrieval-Augmented Generation (RAG) system that combines your knowledge base with large language models to provide intelligent, accurate answers at scale.

Retrieval-Augmented Generation (RAG) combines the power of large language models with external knowledge bases to provide accurate, context-aware responses. Building RAG systems that can scale to handle millions of queries while maintaining low latency and cost efficiency requires careful architecture design. This guide explores how to build a production-ready RAG system on AWS that can scale horizontally and handle enterprise workloads.

Prerequisites

Before getting started, ensure you have:

  1. AWS Account Setup:
    # Configure AWS CLI
    aws configure
    # Enter your AWS Access Key ID
    # Enter your AWS Secret Access Key
    # Enter your default region (e.g., us-east-1)
    # Enter your output format (json)
    
  2. Required AWS Services:
    • Amazon Bedrock (for LLM inference and embeddings)
    • Amazon OpenSearch Serverless (for vector storage)
    • Amazon S3 (for document storage)
    • AWS Lambda (for processing)
    • Amazon API Gateway (for API endpoints)
    • Amazon ElastiCache (for caching)
  3. Python Environment:
    # Create virtual environment
    python3 -m venv rag-env
    source rag-env/bin/activate
    
    # Install required packages
    pip install boto3 langchain langchain-aws opensearch-py numpy
    

Core Architecture Components

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│                    Client Applications                      │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                  API Gateway                                │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│              Lambda (RAG Query Handler)                     │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │
│  │   Embedding  │  │   Vector     │  │   LLM        │    │
│  │   Generation │─▶│   Search     │─▶│   Inference  │    │
│  └──────────────┘  └──────────────┘  └──────────────┘    │
└─────────────────────┬───────────────────────────────────────┘
                      │
        ┌─────────────┼─────────────┐
        │             │             │
        ▼             ▼             ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│   Bedrock    │ │  OpenSearch  │ │ ElastiCache  │
│  (Embeddings)│ │  (Vectors)   │ │   (Cache)    │
└──────────────┘ └──────────────┘ └──────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│              Document Ingestion Pipeline                    │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │
│  │      S3      │─▶│   Lambda     │─▶│   Textract   │    │
│  │  (Documents) │  │  (Processing)│  │  (Extraction)│    │
│  └──────────────┘  └──────────────┘  └──────────────┘    │
└─────────────────────────────────────────────────────────────┘

Initial Setup

1. Amazon Bedrock Configuration

First, enable the required models in Amazon Bedrock:

# bedrock_config.py
import boto3
from botocore.exceptions import ClientError

class BedrockConfig:
    def __init__(self):
        self.bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1')
        self.bedrock = boto3.client('bedrock', region_name='us-east-1')
        
    def check_model_access(self, model_id):
        """Check if model is accessible"""
        try:
            response = self.bedrock.get_foundation_model(
                modelIdentifier=model_id
            )
            return response['modelDetails']['modelStatus'] == 'ACTIVE'
        except ClientError as e:
            print(f"Error checking model access: {e}")
            return False
    
    def enable_models(self):
        """Enable required models"""
        models = [
            'amazon.titan-embed-text-v1',
            'anthropic.claude-3-sonnet-20240229-v1:0'
        ]
        
        for model_id in models:
            if self.check_model_access(model_id):
                print(f"Model {model_id} is accessible")
            else:
                print(f"Model {model_id} needs to be enabled in Bedrock console")

2. OpenSearch Serverless Setup

Create an OpenSearch Serverless collection for vector storage:

# opensearch_config.py
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from aws_requests_auth.aws_auth import AWSRequestsAuth

class OpenSearchConfig:
    def __init__(self, collection_endpoint, region='us-east-1'):
        self.collection_endpoint = collection_endpoint
        self.region = region
        self.client = self._create_client()
    
    def _create_client(self):
        """Create OpenSearch client with AWS authentication"""
        credentials = boto3.Session().get_credentials()
        awsauth = AWSRequestsAuth(
            credentials,
            self.region,
            'aoss'
        )
        
        client = OpenSearch(
            hosts=[{'host': self.collection_endpoint.replace('https://', ''), 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection
        )
        return client
    
    def create_vector_index(self, index_name, dimension=1536):
        """Create vector index for embeddings"""
        index_body = {
            "settings": {
                "index": {
                    "knn": True,
                    "knn.algo_param.ef_search": 100
                }
            },
            "mappings": {
                "properties": {
                    "embedding": {
                        "type": "knn_vector",
                        "dimension": dimension,
                        "method": {
                            "name": "hnsw",
                            "space_type": "cosinesimil",
                            "engine": "nmslib",
                            "parameters": {
                                "ef_construction": 128,
                                "m": 24
                            }
                        }
                    },
                    "text": {
                        "type": "text"
                    },
                    "metadata": {
                        "type": "object"
                    }
                }
            }
        }
        
        try:
            if not self.client.indices.exists(index=index_name):
                self.client.indices.create(index=index_name, body=index_body)
                print(f"Index {index_name} created successfully")
            else:
                print(f"Index {index_name} already exists")
        except Exception as e:
            print(f"Error creating index: {e}")

Document Ingestion Pipeline

1. Document Processing

# document_processor.py
import boto3
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_aws import BedrockEmbeddings
import json

class DocumentProcessor:
    def __init__(self):
        self.s3 = boto3.client('s3')
        self.textract = boto3.client('textract')
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )
        self.embeddings = BedrockEmbeddings(
            model_id='amazon.titan-embed-text-v1',
            region_name='us-east-1'
        )
    
    def extract_text_from_pdf(self, s3_bucket, s3_key):
        """Extract text from PDF using Textract"""
        try:
            response = self.textract.start_document_text_detection(
                DocumentLocation={
                    'S3Object': {
                        'Bucket': s3_bucket,
                        'Name': s3_key
                    }
                }
            )
            
            job_id = response['JobId']
            
            # Wait for job to complete
            import time
            while True:
                status = self.textract.get_document_text_detection(JobId=job_id)
                if status['JobStatus'] == 'SUCCEEDED':
                    break
                elif status['JobStatus'] == 'FAILED':
                    raise Exception("Textract job failed")
                time.sleep(2)
            
            # Extract text blocks
            text_blocks = []
            next_token = None
            while True:
                if next_token:
                    response = self.textract.get_document_text_detection(
                        JobId=job_id,
                        NextToken=next_token
                    )
                else:
                    response = self.textract.get_document_text_detection(JobId=job_id)
                
                for block in response.get('Blocks', []):
                    if block['BlockType'] == 'LINE':
                        text_blocks.append(block['Text'])
                
                next_token = response.get('NextToken')
                if not next_token:
                    break
            
            return '\n'.join(text_blocks)
        except Exception as e:
            print(f"Error extracting text: {e}")
            return None
    
    def process_document(self, s3_bucket, s3_key, document_id):
        """Process document and create chunks with embeddings"""
        try:
            # Extract text
            text = self.extract_text_from_pdf(s3_bucket, s3_key)
            if not text:
                return None
            
            # Split into chunks
            chunks = self.text_splitter.split_text(text)
            
            # Generate embeddings
            documents = []
            for i, chunk in enumerate(chunks):
                embedding = self.embeddings.embed_query(chunk)
                documents.append({
                    'id': f"{document_id}_chunk_{i}",
                    'text': chunk,
                    'embedding': embedding,
                    'metadata': {
                        'document_id': document_id,
                        'chunk_index': i,
                        's3_bucket': s3_bucket,
                        's3_key': s3_key
                    }
                })
            
            return documents
        except Exception as e:
            print(f"Error processing document: {e}")
            return None

2. Vector Store Integration

# vector_store.py
from opensearch_config import OpenSearchConfig
import json

class VectorStore:
    def __init__(self, collection_endpoint):
        self.opensearch = OpenSearchConfig(collection_endpoint)
        self.index_name = 'rag-documents'
        self.opensearch.create_vector_index(self.index_name)
    
    def index_documents(self, documents):
        """Index documents with embeddings"""
        try:
            for doc in documents:
                self.opensearch.client.index(
                    index=self.index_name,
                    id=doc['id'],
                    body={
                        'text': doc['text'],
                        'embedding': doc['embedding'],
                        'metadata': doc['metadata']
                    }
                )
            self.opensearch.client.indices.refresh(index=self.index_name)
            print(f"Indexed {len(documents)} documents")
        except Exception as e:
            print(f"Error indexing documents: {e}")
    
    def search_similar(self, query_embedding, top_k=5):
        """Search for similar documents"""
        try:
            query = {
                "size": top_k,
                "query": {
                    "knn": {
                        "embedding": {
                            "vector": query_embedding,
                            "k": top_k
                        }
                    }
                },
                "_source": ["text", "metadata"]
            }
            
            response = self.opensearch.client.search(
                index=self.index_name,
                body=query
            )
            
            results = []
            for hit in response['hits']['hits']:
                results.append({
                    'text': hit['_source']['text'],
                    'metadata': hit['_source']['metadata'],
                    'score': hit['_score']
                })
            
            return results
        except Exception as e:
            print(f"Error searching: {e}")
            return []

RAG Query Handler

1. Retrieval and Generation

# rag_handler.py
import boto3
import json
from langchain_aws import BedrockLLM
from vector_store import VectorStore
from langchain_aws import BedrockEmbeddings

class RAGHandler:
    def __init__(self, opensearch_endpoint):
        self.vector_store = VectorStore(opensearch_endpoint)
        self.embeddings = BedrockEmbeddings(
            model_id='amazon.titan-embed-text-v1',
            region_name='us-east-1'
        )
        self.llm = BedrockLLM(
            model_id='anthropic.claude-3-sonnet-20240229-v1:0',
            region_name='us-east-1',
            model_kwargs={
                "max_tokens": 2048,
                "temperature": 0.7
            }
        )
        self.cache = {}  # Simple in-memory cache (use ElastiCache in production)
    
    def retrieve_context(self, query, top_k=5):
        """Retrieve relevant context for query"""
        try:
            # Generate query embedding
            query_embedding = self.embeddings.embed_query(query)
            
            # Search for similar documents
            results = self.vector_store.search_similar(query_embedding, top_k=top_k)
            
            # Combine context
            context = "\n\n".join([r['text'] for r in results])
            return context, results
        except Exception as e:
            print(f"Error retrieving context: {e}")
            return None, []
    
    def generate_response(self, query, context):
        """Generate response using LLM with context"""
        try:
            prompt = f"""Use the following context to answer the question. If the context doesn't contain enough information, say so.

Context:
{context}

Question: {query}

Answer:"""
            
            response = self.llm.invoke(prompt)
            return response
        except Exception as e:
            print(f"Error generating response: {e}")
            return None
    
    def query(self, user_query, use_cache=True):
        """Main query handler with caching"""
        try:
            # Check cache
            if use_cache and user_query in self.cache:
                return self.cache[user_query]
            
            # Retrieve context
            context, sources = self.retrieve_context(user_query)
            
            if not context:
                return {
                    'answer': "I couldn't find relevant information to answer your question.",
                    'sources': []
                }
            
            # Generate response
            answer = self.generate_response(user_query, context)
            
            result = {
                'answer': answer,
                'sources': [s['metadata'] for s in sources]
            }
            
            # Cache result
            if use_cache:
                self.cache[user_query] = result
            
            return result
        except Exception as e:
            print(f"Error processing query: {e}")
            return {
                'answer': "An error occurred while processing your query.",
                'sources': []
            }

2. Lambda Function Implementation

# lambda_function.py
import json
from rag_handler import RAGHandler
import os

def lambda_handler(event, context):
    """Lambda handler for RAG queries"""
    try:
        # Initialize RAG handler
        opensearch_endpoint = os.environ['OPENSEARCH_ENDPOINT']
        rag_handler = RAGHandler(opensearch_endpoint)
        
        # Parse request
        body = json.loads(event['body'])
        query = body.get('query')
        
        if not query:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'Query parameter is required'})
            }
        
        # Process query
        result = rag_handler.query(query)
        
        return {
            'statusCode': 200,
            'body': json.dumps(result),
            'headers': {
                'Content-Type': 'application/json'
            }
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

Scaling Strategies

1. Horizontal Scaling with Lambda

# scaling_config.py
import boto3

class ScalingConfig:
    def __init__(self):
        self.lambda_client = boto3.client('lambda')
    
    def configure_concurrency(self, function_name, reserved_concurrent_executions=100):
        """Configure Lambda concurrency limits"""
        try:
            self.lambda_client.put_function_concurrency(
                FunctionName=function_name,
                ReservedConcurrentExecutions=reserved_concurrent_executions
            )
            print(f"Set concurrency limit to {reserved_concurrent_executions}")
        except Exception as e:
            print(f"Error configuring concurrency: {e}")
    
    def configure_provisioned_concurrency(self, function_name, alias_name, provisioned_count=10):
        """Configure provisioned concurrency for predictable performance"""
        try:
            self.lambda_client.put_provisioned_concurrency_config(
                FunctionName=function_name,
                Qualifier=alias_name,
                ProvisionedConcurrentExecutions=provisioned_count
            )
            print(f"Set provisioned concurrency to {provisioned_count}")
        except Exception as e:
            print(f"Error configuring provisioned concurrency: {e}")

2. Caching with ElastiCache

# cache_manager.py
import boto3
import json
import hashlib
import redis
from redis.cluster import RedisCluster

class CacheManager:
    def __init__(self, redis_endpoint, use_cluster=False):
        if use_cluster:
            self.client = RedisCluster(
                startup_nodes=[{"host": redis_endpoint, "port": 6379}],
                decode_responses=True
            )
        else:
            self.client = redis.Redis(
                host=redis_endpoint,
                port=6379,
                decode_responses=True
            )
        self.ttl = 3600  # 1 hour
    
    def get_cache_key(self, query):
        """Generate cache key from query"""
        return hashlib.md5(query.encode()).hexdigest()
    
    def get(self, query):
        """Get cached result"""
        try:
            key = self.get_cache_key(query)
            cached = self.client.get(key)
            if cached:
                return json.loads(cached)
            return None
        except Exception as e:
            print(f"Error getting from cache: {e}")
            return None
    
    def set(self, query, result):
        """Cache result"""
        try:
            key = self.get_cache_key(query)
            self.client.setex(
                key,
                self.ttl,
                json.dumps(result)
            )
        except Exception as e:
            print(f"Error setting cache: {e}")

3. Async Processing with SQS

# async_processor.py
import boto3
import json
from rag_handler import RAGHandler

class AsyncProcessor:
    def __init__(self, queue_url, opensearch_endpoint):
        self.sqs = boto3.client('sqs')
        self.queue_url = queue_url
        self.rag_handler = RAGHandler(opensearch_endpoint)
    
    def process_queue(self):
        """Process messages from SQS queue"""
        while True:
            try:
                response = self.sqs.receive_message(
                    QueueUrl=self.queue_url,
                    MaxNumberOfMessages=10,
                    WaitTimeSeconds=20
                )
                
                messages = response.get('Messages', [])
                if not messages:
                    continue
                
                for message in messages:
                    try:
                        body = json.loads(message['Body'])
                        query = body['query']
                        callback_url = body.get('callback_url')
                        
                        # Process query
                        result = self.rag_handler.query(query)
                        
                        # Send result to callback URL if provided
                        if callback_url:
                            self._send_callback(callback_url, result)
                        
                        # Delete message
                        self.sqs.delete_message(
                            QueueUrl=self.queue_url,
                            ReceiptHandle=message['ReceiptHandle']
                        )
                    except Exception as e:
                        print(f"Error processing message: {e}")
                        # Move to DLQ or retry
            except Exception as e:
                print(f"Error receiving messages: {e}")

Cost Optimization

1. Embedding Caching

# embedding_cache.py
from cache_manager import CacheManager

class EmbeddingCache:
    def __init__(self, redis_endpoint):
        self.cache = CacheManager(redis_endpoint)
        self.embeddings = None
    
    def get_embedding(self, text):
        """Get embedding with caching"""
        cached = self.cache.get(f"embedding:{text}")
        if cached:
            return cached
        
        # Generate embedding
        embedding = self.embeddings.embed_query(text)
        
        # Cache it
        self.cache.set(f"embedding:{text}", embedding)
        
        return embedding

2. Batch Processing

# batch_processor.py
from langchain_aws import BedrockEmbeddings

class BatchProcessor:
    def __init__(self):
        self.embeddings = BedrockEmbeddings(
            model_id='amazon.titan-embed-text-v1',
            region_name='us-east-1'
        )
    
    def batch_embed(self, texts, batch_size=25):
        """Generate embeddings in batches"""
        all_embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            batch_embeddings = self.embeddings.embed_documents(batch)
            all_embeddings.extend(batch_embeddings)
        
        return all_embeddings

3. Cost Monitoring

# cost_monitor.py
import boto3
from datetime import datetime, timedelta

class CostMonitor:
    def __init__(self):
        self.ce = boto3.client('ce')
    
    def get_bedrock_costs(self, start_date, end_date):
        """Get Bedrock costs for date range"""
        try:
            response = self.ce.get_cost_and_usage(
                TimePeriod={
                    'Start': start_date.strftime('%Y-%m-%d'),
                    'End': end_date.strftime('%Y-%m-%d')
                },
                Granularity='DAILY',
                Metrics=['UnblendedCost'],
                Filter={
                    'Dimensions': {
                        'Key': 'SERVICE',
                        'Values': ['Amazon Bedrock']
                    }
                }
            )
            
            total_cost = sum(
                float(day['Total']['UnblendedCost']['Amount'])
                for day in response['ResultsByTime']
            )
            
            return total_cost
        except Exception as e:
            print(f"Error getting costs: {e}")
            return None

Monitoring & Observability

1. CloudWatch Metrics

# metrics.py
import boto3
from datetime import datetime

class MetricsCollector:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.namespace = 'RAG/System'
    
    def record_latency(self, operation, latency_ms):
        """Record operation latency"""
        try:
            self.cloudwatch.put_metric_data(
                Namespace=self.namespace,
                MetricData=[
                    {
                        'MetricName': f'{operation}Latency',
                        'Value': latency_ms,
                        'Unit': 'Milliseconds',
                        'Timestamp': datetime.utcnow()
                    }
                ]
            )
        except Exception as e:
            print(f"Error recording metric: {e}")
    
    def record_query_count(self, status='success'):
        """Record query count"""
        try:
            self.cloudwatch.put_metric_data(
                Namespace=self.namespace,
                MetricData=[
                    {
                        'MetricName': 'QueryCount',
                        'Value': 1,
                        'Unit': 'Count',
                        'Dimensions': [
                            {
                                'Name': 'Status',
                                'Value': status
                            }
                        ],
                        'Timestamp': datetime.utcnow()
                    }
                ]
            )
        except Exception as e:
            print(f"Error recording metric: {e}")

2. Performance Monitoring

# performance_monitor.py
import time
from functools import wraps
from metrics import MetricsCollector

class PerformanceMonitor:
    def __init__(self):
        self.metrics = MetricsCollector()
    
    def monitor(self, operation_name):
        """Decorator to monitor function performance"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                    status = 'success'
                    return result
                except Exception as e:
                    status = 'error'
                    raise
                finally:
                    latency_ms = (time.time() - start_time) * 1000
                    self.metrics.record_latency(operation_name, latency_ms)
                    self.metrics.record_query_count(status)
            return wrapper
        return decorator

Best Practices

1. Chunking Strategy

# chunking_strategy.py
from langchain.text_splitter import RecursiveCharacterTextSplitter

class ChunkingStrategy:
    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", ". ", " ", ""]
        )
    
    def chunk_document(self, text, metadata=None):
        """Chunk document with metadata preservation"""
        chunks = self.text_splitter.split_text(text)
        
        chunked_documents = []
        for i, chunk in enumerate(chunks):
            chunk_metadata = metadata.copy() if metadata else {}
            chunk_metadata['chunk_index'] = i
            chunk_metadata['total_chunks'] = len(chunks)
            
            chunked_documents.append({
                'text': chunk,
                'metadata': chunk_metadata
            })
        
        return chunked_documents

2. Query Optimization

# query_optimizer.py
class QueryOptimizer:
    def __init__(self):
        self.min_score_threshold = 0.7
    
    def optimize_query(self, query):
        """Optimize query for better retrieval"""
        # Remove stop words (simplified)
        stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
        words = query.lower().split()
        optimized = [w for w in words if w not in stop_words]
        
        return ' '.join(optimized) if optimized else query
    
    def filter_results(self, results, min_score=None):
        """Filter results by relevance score"""
        threshold = min_score or self.min_score_threshold
        return [r for r in results if r.get('score', 0) >= threshold]

3. Error Handling

# error_handler.py
import logging
from functools import wraps

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ErrorHandler:
    @staticmethod
    def retry_on_failure(max_retries=3):
        """Decorator for retrying failed operations"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                for attempt in range(max_retries):
                    try:
                        return func(*args, **kwargs)
                    except Exception as e:
                        if attempt == max_retries - 1:
                            logger.error(f"Failed after {max_retries} attempts: {e}")
                            raise
                        logger.warning(f"Attempt {attempt + 1} failed: {e}, retrying...")
                        time.sleep(2 ** attempt)  # Exponential backoff
            return wrapper
        return decorator
    
    @staticmethod
    def handle_errors(func):
        """Decorator for error handling"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                logger.error(f"Error in {func.__name__}: {e}")
                return {
                    'error': str(e),
                    'status': 'error'
                }
        return wrapper

Complete Example

# complete_rag_system.py
from rag_handler import RAGHandler
from cache_manager import CacheManager
from performance_monitor import PerformanceMonitor
from error_handler import ErrorHandler

class CompleteRAGSystem:
    def __init__(self, opensearch_endpoint, redis_endpoint):
        self.rag_handler = RAGHandler(opensearch_endpoint)
        self.cache = CacheManager(redis_endpoint)
        self.monitor = PerformanceMonitor()
    
    @PerformanceMonitor.monitor('query')
    @ErrorHandler.handle_errors
    def process_query(self, query):
        """Process query with caching and monitoring"""
        # Check cache first
        cached_result = self.cache.get(query)
        if cached_result:
            return cached_result
        
        # Process query
        result = self.rag_handler.query(query, use_cache=False)
        
        # Cache result
        self.cache.set(query, result)
        
        return result

Conclusion

Building a scalable RAG architecture on AWS requires careful consideration of multiple components working together:

  1. Document Processing: Efficient ingestion and chunking of documents
  2. Vector Storage: Fast and scalable vector search with OpenSearch
  3. Embedding Generation: Cost-effective embedding generation with caching
  4. LLM Inference: Reliable and performant LLM access through Bedrock
  5. Scaling: Horizontal scaling with Lambda and async processing
  6. Caching: Multi-layer caching to reduce costs and latency
  7. Monitoring: Comprehensive observability for performance tracking

Key takeaways:

  • Use OpenSearch Serverless for scalable vector storage
  • Implement multi-layer caching (ElastiCache + embedding cache)
  • Leverage Lambda for serverless scaling
  • Monitor costs and optimize embedding generation
  • Implement proper error handling and retry logic

By following these patterns and best practices, you can build a RAG system that scales to handle millions of queries while maintaining low latency and cost efficiency.

Resources