LLM Guardrails & Safety (Prompt Injection, Abuse)

8 min read6.1k

Introduction

Designing distributed systems requires balancing multiple competing concerns. This article examines llm guardrails & safety (prompt injection, abuse), exploring architecture patterns that power successful tech companies and high-scale applications.

System Requirements

Functional Requirements

Capacity Estimation

MetricValueCalculation
Daily Active Users10MGiven
Requests per second5,00010M * 50 / 86400
Storage per day500 GB10M * 50 KB
Bandwidth200 Mbps500 GB * 8 / 86400
Cache memory100 GB20% of hot data

High-Level Architecture

Detailed Component Design

API Gateway Pattern

typescript
// API Gateway implementation
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';
import rateLimit from 'express-rate-limit';

const app = express();

// Rate limiting
const limiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15 minutes
  max: 100, // limit each IP to 100 requests per windowMs
  message: 'Too many requests from this IP'
});

app.use('/api/', limiter);

// Authentication middleware
app.use('/api/', async (req, res, next) => {
  const token = req.headers.authorization?.split(' ')[1];

  if (!token) {
    return res.status(401).json({ error: 'No token provided' });
  }

  try {
    const user = await verifyToken(token);
    req.user = user;
    next();
  } catch (error) {
    return res.status(401).json({ error: 'Invalid token' });
  }
});

// Service proxies
app.use('/api/users', createProxyMiddleware({
  target: 'http://user-service:3001',
  changeOrigin: true
}));

app.use('/api/orders', createProxyMiddleware({
  target: 'http://order-service:3002',
  changeOrigin: true
}));

app.listen(3000);

Data Flow

Database Design

Schema Implementation

sql
-- Users table
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_users_email ON users(email);

-- Orders table with partitioning
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    total_amount DECIMAL(10, 2) NOT NULL,
    status VARCHAR(50) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) PARTITION BY RANGE (created_at);

CREATE TABLE orders_2024_q1 PARTITION OF orders
    FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE orders_2024_q2 PARTITION OF orders
    FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

-- Order items
CREATE TABLE order_items (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
    product_id UUID NOT NULL REFERENCES products(id),
    quantity INTEGER NOT NULL CHECK (quantity > 0),
    price DECIMAL(10, 2) NOT NULL,
    UNIQUE(order_id, product_id)
);

CREATE INDEX idx_order_items_order_id ON order_items(order_id);
CREATE INDEX idx_order_items_product_id ON order_items(product_id);

Caching Strategy

Cache Implementation

python
import redis
import json
from functools import wraps
from typing import Optional, Any

class CacheManager:
    def __init__(self, host='localhost', port=6379):
        self.redis_client = redis.Redis(
            host=host,
            port=port,
            decode_responses=True
        )
        self.default_ttl = 3600  # 1 hour

    def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        value = self.redis_client.get(key)
        if value:
            return json.loads(value)
        return None

    def set(self, key: str, value: Any, ttl: int = None) -> bool:
        """Set value in cache with TTL"""
        ttl = ttl or self.default_ttl
        return self.redis_client.setex(
            key,
            ttl,
            json.dumps(value)
        )

    def delete(self, key: str) -> bool:
        """Delete key from cache"""
        return self.redis_client.delete(key) > 0

    def cache_aside(self, ttl: int = None):
        """Decorator for cache-aside pattern"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Generate cache key from function name and args
                cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"

                # Try to get from cache
                cached_value = self.get(cache_key)
                if cached_value is not None:
                    return cached_value

                # Cache miss - call function
                result = func(*args, **kwargs)

                # Store in cache
                self.set(cache_key, result, ttl)

                return result
            return wrapper
        return decorator

# Usage example
cache = CacheManager()

@cache.cache_aside(ttl=600)
def get_user_profile(user_id: str):
    # This will be cached for 10 minutes
    return database.query(f"SELECT * FROM users WHERE id = '{user_id}'")

Scaling Strategy

Monitoring & Observability

Failure Modes & Recovery

Circuit Breaker Implementation

go
package circuitbreaker

import (
    "errors"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    maxFailures  int
    resetTimeout time.Duration

    mu           sync.RWMutex
    state        State
    failures     int
    lastFailTime time.Time
}

func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:  maxFailures,
        resetTimeout: resetTimeout,
        state:        StateClosed,
    }
}

func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mu.RLock()
    state := cb.state
    cb.mu.RUnlock()

    if state == StateOpen {
        if time.Since(cb.lastFailTime) > cb.resetTimeout {
            cb.mu.Lock()
            cb.state = StateHalfOpen
            cb.mu.Unlock()
        } else {
            return errors.New("circuit breaker is open")
        }
    }

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.failures++
        cb.lastFailTime = time.Now()

        if cb.failures >= cb.maxFailures {
            cb.state = StateOpen
        }
        return err
    }

    if cb.state == StateHalfOpen {
        cb.state = StateClosed
    }
    cb.failures = 0

    return nil
}

Performance Benchmarks

Deployment Strategy

Conclusion

Building scalable systems requires careful consideration of trade-offs, continuous monitoring, and iterative improvements. The patterns discussed provide a foundation for designing robust architectures.

Key Takeaways

  • Design for failure from the start
  • Implement observability at every layer
  • Use caching strategically
  • Scale horizontally when possible
  • Monitor and optimize continuously

References

  1. Designing Data-Intensive Applications by Martin Kleppmann
  2. System Design Interview by Alex Xu
  3. AWS Architecture Blog - https://aws.amazon.com/blogs/architecture/