ML Adaptive Limiting

Overview

ML Adaptive Rate Limiting is a revolutionary feature that automatically learns user behavior and adjusts rate limits dynamically.

Key Features: * Pattern learning with Exponential Moving Average * Z-score based anomaly detection * Trust scoring (0.0 to 1.0) * Automatic limit adjustment (0.5x to 3x base) * Model persistence * Violation callbacks

Installation

# Basic installation (no extra dependencies needed!)
pip install ratethrottle

Note: The AdaptiveRateLimiter class uses only statistical methods and has no heavy ML dependencies!

Quick Start

Basic Usage

from ratethrottle import AdaptiveRateLimiter

# Create adaptive limiter
limiter = AdaptiveRateLimiter(
    base_limit=100,          # Starting rate limit
    learning_rate=0.1,       # How fast to adapt (0.0-1.0)
    anomaly_threshold=3.0    # Z-score threshold for anomalies
)

# Check rate limit (learns automatically!)
result = limiter.check_adaptive('user_123')

if result['allowed']:
    # Process request
    print(f"✅ Allowed")
    print(f"   Adjusted limit: {result['adjusted_limit']}")
    print(f"   Trust score: {result['trust_score']:.2f}")
    print(f"   Anomaly score: {result['anomaly_score']:.2f}")
else:
    # Reject request
    print(f"❌ Blocked: {result['reason']}")
    print(f"   Retry after: {result['retry_after']}s")

How It Works

1. Pattern Learning

Learns normal behavior using Exponential Moving Average (EMA):

mean_rate = (1 - learning_rate) × old_mean + learning_rate × current_rate

Example:

# User makes steady requests
# Request 1: 100 req/min → mean = 100
# Request 2: 105 req/min → mean = 100.5
# Request 3: 98 req/min  → mean = 100.35
# ...
# Gradually learns the user's normal pattern

2. Anomaly Detection

Uses Z-score to detect unusual patterns:

z_score = |current_rate - mean_rate| / std_rate
Interpretation:
  • Z-score < 2: Normal behavior

  • Z-score 2-3: Slightly unusual

  • Z-score > 3: Anomalous (default threshold)

Example:

# User normally does 100 req/min (std: 10)
# Current: 250 req/min
# Z-score = |250 - 100| / 10 = 15.0 (highly anomalous!)

3. Trust Scoring

Multi-factor trust score (0.0 to 1.0):

trust = (
    age_score * 0.3 +           # Account age (ramps over 30 days)
    consistency_score * 0.4 +   # Low variance = consistent = trusted
    violation_score * 0.3 +     # Penalties for violations
    good_behavior_bonus         # Bonus for clean record
)
Trust Levels:
  • 0.0-0.3: Untrusted (new or problematic)

  • 0.3-0.6: Neutral (average user)

  • 0.6-0.8: Trusted (good behavior)

  • 0.8-1.0: Highly trusted (excellent behavior)

4. Limit Adjustment

Dynamic limit calculation:

adjusted_limit = base_limit * trust_multiplier * anomaly_multiplier * age_multiplier
Multipliers:
  • Trust: 0.5x to 2.0x (based on trust score)

  • Anomaly: 0.3x if anomalous, 1.0x if normal

  • Age: 0.5x to 1.0x (ramps over 30 days)

Example:

# Base: 100 req/min
# Trust: 0.9 (highly trusted) → 1.95x multiplier
# Anomaly: Normal → 1.0x multiplier
# Age: 45 days → 1.0x multiplier
# Adjusted: 100 × 1.95 × 1.0 × 1.0 = 195 req/min

Configuration

Basic Configuration

limiter = AdaptiveRateLimiter(
    base_limit=100,              # Base rate limit
    window=60,                   # Time window (seconds)
    learning_rate=0.1,           # Adaptation speed (0.0-1.0)
    anomaly_threshold=3.0,       # Z-score threshold
    trust_enabled=True,          # Enable trust scoring
    min_multiplier=0.5,          # Minimum: 50% of base
    max_multiplier=3.0           # Maximum: 300% of base
)

Advanced Configuration

from ratethrottle import RedisStorage

limiter = AdaptiveRateLimiter(
    base_limit=100,
    learning_rate=0.1,

    # Storage backend
    storage=RedisStorage('redis://localhost:6379/0'),

    # Callbacks
    on_anomaly=lambda info: alert_ops_team(info),
    on_trust_change=lambda user, score: log_trust(user, score)
)

Configuration Guidelines

Learning Rate:
  • 0.01-0.05: Very slow (conservative, stable)

  • 0.1: Default (balanced)

  • 0.3-0.5: Fast (responsive, volatile)

Anomaly Threshold:
  • 2.0: Very strict (5% false positive rate)

  • 3.0: Default (0.3% false positive rate)

  • 4.0: Lenient (0.01% false positive rate)

Multipliers:
  • Conservative: min=0.7, max=1.5

  • Balanced: min=0.5, max=3.0 (default)

  • Aggressive: min=0.3, max=5.0

Use Cases

1. Public API Protection

Scenario: Protect public API from scrapers and abuse

limiter = AdaptiveRateLimiter(
    base_limit=60,           # 1 req/second baseline
    learning_rate=0.05,      # Slow learning (be cautious)
    anomaly_threshold=2.5    # Strict detection
)

@app.route('/api/data')
def get_data():
    result = limiter.check_adaptive(get_client_ip(request))

    if not result['allowed']:
        return {'error': 'Rate limit exceeded'}, 429

    return {'data': 'protected'}
Outcomes:
  • New users: 30-40 req/min (cautious)

  • Good users: 90-120 req/min (generous)

  • Scrapers: Detected and limited to 5-10 req/min

2. SaaS Application

Scenario: Different user tiers with adaptive limits

def get_base_limit(user):
    """Get base limit by tier"""
    if user.tier == 'enterprise':
        return 1000
    elif user.tier == 'pro':
        return 500
    else:  # free
        return 100

@app.route('/api/resource')
@login_required
def get_resource():
    user = get_current_user()

    limiter = AdaptiveRateLimiter(
        base_limit=get_base_limit(user)
    )

    result = limiter.check_adaptive(f'user_{user.id}')

    if result['allowed']:
        return process_request()
    else:
        return {
            'error': 'Rate limit exceeded',
            'limit': result['adjusted_limit'],
            'retry_after': result['retry_after']
        }, 429
Outcomes:
  • Each tier gets personalized limits

  • Good behavior rewarded with 1.5-2x higher limits

  • Abuse detected and throttled automatically

3. E-commerce Checkout

Scenario: Prevent fraud while allowing legitimate purchases

checkout_limiter = AdaptiveRateLimiter(
    base_limit=10,           # Only 10 checkouts/hour
    learning_rate=0.1,
    anomaly_threshold=2.0    # Very strict
)

@app.route('/checkout', methods=['POST'])
def checkout():
    session_id = request.session.get('id')

    result = checkout_limiter.check_adaptive(session_id)

    if not result['allowed']:
        logger.warning(f"Checkout blocked: {session_id}")
        return {'error': 'Too many checkout attempts'}, 429

    # Process checkout
    return process_checkout()
Outcomes:
  • Trusted customers get higher limits

  • Unusual patterns (many failed payments) detected

  • Fraud attempts automatically blocked

4. Internal API Gateway

Scenario: Protect backend services from internal abuse

gateway_limiter = AdaptiveRateLimiter(
    base_limit=500,
    learning_rate=0.2,       # Fast adaptation (internal trust)
    anomaly_threshold=4.0    # Lenient (avoid false positives)
)

@app.route('/internal/service/<service_name>')
@require_internal_auth
def internal_api(service_name):
    caller = request.headers.get('X-Service-Name')

    result = gateway_limiter.check_adaptive(f'service_{caller}')

    if result['allowed']:
        return proxy_to_service(service_name)
    else:
        # Alert on internal rate limiting
        alert_ops(f"Service {caller} hitting rate limits")
        return {'error': 'Rate limited'}, 429
Outcomes:
  • Microservices get personalized limits

  • Unusual patterns detected (e.g., infinite loops)

  • Self-healing under load

Monitoring & Insights

Get User Profile

profile = limiter.get_user_profile('user_123')

print(f"Identifier: {profile['identifier']}")
print(f"Age: {profile['age_days']:.1f} days")
print(f"Total requests: {profile['request_count']}")
print(f"Average rate: {profile['mean_rate']:.1f} req/min")
print(f"Trust score: {profile['trust_score']:.2f}")
print(f"Violations: {profile['violation_count']}")
print(f"Current limit: {profile['current_limit']}")

Get Statistics

stats = limiter.get_statistics()

print(f"Total requests: {stats['total_requests']}")
print(f"Anomalies detected: {stats['anomalies_detected']}")
print(f"Limits adjusted: {stats['limits_adjusted']}")
print(f"Users tracked: {stats['users_tracked']}")

Callbacks

Anomaly Detection:

def on_anomaly(info):
    """Called when anomaly detected"""
    logger.warning(
        f"Anomaly detected for {info['identifier']}: "
        f"score={info['anomaly_score']:.2f}, "
        f"rate={info['current_rate']:.0f} (expected: {info['expected_rate']:.0f})"
    )

    # Send alert if severe
    if info['anomaly_score'] > 5.0:
        send_alert(f"Severe anomaly: {info['identifier']}")

limiter = AdaptiveRateLimiter(on_anomaly=on_anomaly)

Trust Changes:

def on_trust_change(identifier, new_trust):
    """Called when trust score changes significantly"""
    logger.info(f"Trust updated for {identifier}: {new_trust:.2f}")

    # Reward highly trusted users
    if new_trust > 0.9:
        send_reward_email(identifier)

limiter = AdaptiveRateLimiter(on_trust_change=on_trust_change)

Model Persistence

Save Model

# After learning from production traffic
limiter.export_model('adaptive_model.json')

Load Model

# Start with learned behavior
limiter = AdaptiveRateLimiter(base_limit=100)
limiter.load_model('adaptive_model.json')

# Model includes all user profiles and learned patterns

Use Case: Graceful Restarts

import atexit

limiter = AdaptiveRateLimiter(base_limit=100)

# Load existing model on startup
try:
    limiter.load_model('adaptive_model.json')
    logger.info("Loaded existing model")
except FileNotFoundError:
    logger.info("Starting with fresh model")

# Save on shutdown
def save_model():
    limiter.export_model('adaptive_model.json')
    logger.info("Model saved")

atexit.register(save_model)

Integration Examples

Flask

from flask import Flask, request, jsonify
from ratethrottle import AdaptiveRateLimiter

app = Flask(__name__)
limiter = AdaptiveRateLimiter(base_limit=100)

@app.before_request
def check_rate_limit():
    """Check rate limit before each request"""
    identifier = request.headers.get('X-User-ID') or request.remote_addr

    result = limiter.check_adaptive(identifier)

    if not result['allowed']:
        return jsonify({
            'error': 'Rate limit exceeded',
            'limit': result['adjusted_limit'],
            'retry_after': result['retry_after'],
            'trust_score': result['trust_score']
        }), 429

FastAPI

from fastapi import FastAPI, Request, HTTPException
from ratethrottle import AdaptiveRateLimiter

app = FastAPI()
limiter = AdaptiveRateLimiter(base_limit=100)

@app.middleware("http")
async def adaptive_rate_limit(request: Request, call_next):
    """Apply adaptive rate limiting"""
    # Extract user identifier
    user_id = request.headers.get('X-User-ID', request.client.host)

    # Check limit
    result = limiter.check_adaptive(user_id)

    if not result['allowed']:
        raise HTTPException(
            status_code=429,
            detail={
                'error': 'Rate limit exceeded',
                'limit': result['adjusted_limit'],
                'trust_score': result['trust_score']
            }
        )

    # Add rate limit headers
    response = await call_next(request)
    response.headers['X-RateLimit-Limit'] = str(result['adjusted_limit'])
    response.headers['X-RateLimit-Remaining'] = str(result['remaining'])

    return response

Django

from django.core.cache import cache
from django.http import JsonResponse
from ratethrottle import AdaptiveRateLimiter

# Global limiter
limiter = AdaptiveRateLimiter(base_limit=100)

def adaptive_rate_limit(view_func):
    """Decorator for adaptive rate limiting"""
    def wrapper(request, *args, **kwargs):
        # Get user identifier
        if request.user.is_authenticated:
            identifier = f'user_{request.user.id}'
        else:
            identifier = request.META.get('REMOTE_ADDR')

        # Check limit
        result = limiter.check_adaptive(identifier)

        if not result['allowed']:
            return JsonResponse({
                'error': 'Rate limit exceeded',
                'limit': result['adjusted_limit'],
                'retry_after': result['retry_after']
            }, status=429)

        return view_func(request, *args, **kwargs)

    return wrapper

# Use on views
@adaptive_rate_limit
def api_view(request):
    return JsonResponse({'data': 'protected'})

Best Practices

1. Start Conservative

# Start with strict limits
limiter = AdaptiveRateLimiter(
    base_limit=60,           # Conservative
    learning_rate=0.05,      # Slow learning
    anomaly_threshold=2.5    # Strict detection
)

# Monitor for a week, then adjust

2. Use Appropriate Identifiers

# For authenticated APIs
identifier = f'user_{user.id}'

# For public APIs
identifier = get_client_ip(request)

# For internal services
identifier = f'service_{service_name}'

# For anonymous sessions
identifier = request.session.get('id')

3. Monitor and Alert

def on_anomaly(info):
    """Alert on severe anomalies"""
    if info['anomaly_score'] > 5.0:
        send_alert(
            f"Severe anomaly: {info['identifier']} "
            f"(score: {info['anomaly_score']:.1f})"
        )

limiter = AdaptiveRateLimiter(on_anomaly=on_anomaly)

4. Combine with Traditional Limits

# Use adaptive for most endpoints
adaptive = AdaptiveRateLimiter(base_limit=100)

# Use strict limits for critical endpoints
from ratethrottle import FlaskRateLimiter
strict = FlaskRateLimiter(app)

@app.route('/api/data')
def get_data():
    # Adaptive limiting
    result = adaptive.check_adaptive(get_user_id())
    if not result['allowed']:
        abort(429)
    return {'data': 'value'}

@app.route('/api/admin/delete')
@strict.limit("5/hour")
def delete_resource():
    # Strict limiting for critical operations
    return {'status': 'deleted'}

5. Persist Models

# Save periodically
def save_model_periodically():
    while True:
        time.sleep(3600)  # Every hour
        limiter.export_model('adaptive_model.json')

threading.Thread(target=save_model_periodically, daemon=True).start()

# Load on startup
limiter.load_model('adaptive_model.json')

See ML Adaptive - Reference for more examples