Basic Usage
This guide covers the fundamental concepts and common usage patterns for RateThrottle.
Core Concepts
RateThrottle operates on three main components:
Rules: Define rate limiting policies (limit, window, strategy)
Storage: Persist rate limit state (in-memory or Redis)
Limiter: The core engine that enforces rules
Creating a Rate Limiter
Quick Creation
The simplest way to create a limiter:
from ratethrottle import create_limiter
# In-memory storage (single instance)
limiter = create_limiter()
# Redis storage (distributed)
limiter = create_limiter('redis', 'redis://localhost:6379/0')
Manual Creation
For more control:
from ratethrottle import RateThrottleCore, InMemoryStorage
# With in-memory storage
storage = InMemoryStorage()
limiter = RateThrottleCore(storage=storage)
# With Redis storage
from ratethrottle import RedisStorage
import redis
redis_client = redis.from_url('redis://localhost:6379/0')
storage = RedisStorage(redis_client)
limiter = RateThrottleCore(storage=storage)
Defining Rules
Basic Rule
from ratethrottle import RateThrottleRule
rule = RateThrottleRule(
name="api_limit",
limit=100, # 100 requests
window=60, # per 60 seconds
)
limiter.add_rule(rule)
Rule with Strategy
rule = RateThrottleRule(
name="api_burst",
limit=100,
window=60,
strategy="token_bucket", # token_bucket, leaky_bucket, fixed_window, sliding_window
burst=150 # allow bursts up to 150
)
Rule with Scope
# Limit by IP address (default)
ip_rule = RateThrottleRule(
name="by_ip",
limit=100,
window=60,
scope="ip"
)
# Limit by user ID
user_rule = RateThrottleRule(
name="by_user",
limit=1000,
window=60,
scope="user"
)
# Limit by endpoint
endpoint_rule = RateThrottleRule(
name="by_endpoint",
limit=500,
window=60,
scope="endpoint"
)
# Global limit (all requests)
global_rule = RateThrottleRule(
name="global",
limit=10000,
window=60,
scope="global"
)
Rule with Block Duration
rule = RateThrottleRule(
name="api_protected",
limit=10,
window=60,
block_duration=300 # Block for 5 minutes after exceeding limit
)
Checking Rate Limits
Basic Check
# Check if request is allowed
status = limiter.check_rate_limit("192.168.1.1", "api_limit")
if status.allowed:
# Process request
process_request()
else:
# Reject request
return error_response(status.retry_after)
Using Status Object
The status object contains useful information:
status = limiter.check_rate_limit("192.168.1.1", "api_limit")
print(f"Allowed: {status.allowed}")
print(f"Remaining: {status.remaining}")
print(f"Limit: {status.limit}")
print(f"Reset time: {status.reset_time}")
print(f"Retry after: {status.retry_after} seconds")
print(f"Rule applied: {status.rule_name}")
print(f"Blocked: {status.blocked}")
# Convert to dict for JSON responses
response_data = status.to_dict()
# Convert to HTTP headers
headers = status.to_headers()
# Headers: X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset, Retry-After
Adding Metadata
Track additional context with requests:
metadata = {
'user_id': 'user123',
'endpoint': '/api/data',
'method': 'GET'
}
status = limiter.check_rate_limit(
identifier="192.168.1.1",
rule_name="api_limit",
metadata=metadata
)
Managing Rules
Add Rules
rule1 = RateThrottleRule(name="rule1", limit=100, window=60)
rule2 = RateThrottleRule(name="rule2", limit=1000, window=3600)
limiter.add_rule(rule1)
limiter.add_rule(rule2)
Remove Rules
limiter.remove_rule("rule1")
List Rules
rules = limiter.list_rules()
for rule in rules:
print(f"{rule.name}: {rule.limit}/{rule.window}s")
Get Specific Rule
rule = limiter.get_rule("api_limit")
print(f"Strategy: {rule.strategy}")
print(f"Scope: {rule.scope}")
Whitelist and Blacklist
Whitelist Management
Whitelisted identifiers bypass all rate limits:
# Add to whitelist
limiter.add_to_whitelist("10.0.0.1")
limiter.add_to_whitelist("trusted_user_123")
# Check whitelist
if limiter.is_whitelisted("10.0.0.1"):
print("IP is whitelisted")
# Remove from whitelist
limiter.remove_from_whitelist("10.0.0.1")
Blacklist Management
Blacklisted identifiers are always blocked:
# Add to blacklist
limiter.add_to_blacklist("192.168.1.100")
limiter.add_to_blacklist("bad_user_456")
# Check blacklist
if limiter.is_blacklisted("192.168.1.100"):
print("IP is blacklisted")
# Remove from blacklist
limiter.remove_from_blacklist("192.168.1.100")
Violation Callbacks
Register callbacks to be notified of violations:
def log_violation(violation):
"""Log rate limit violations"""
print(f"Violation by {violation.identifier}")
print(f"Rule: {violation.rule_name}")
print(f"Time: {violation.timestamp}")
print(f"Requests: {violation.requests_made}/{violation.limit}")
def alert_admin(violation):
"""Send alert to administrators"""
if violation.requests_made > violation.limit * 2:
send_email_alert(violation)
# Register callbacks
limiter.register_violation_callback(log_violation)
limiter.register_violation_callback(alert_admin)
Metrics and Monitoring
Get Metrics
metrics = limiter.get_metrics()
print(f"Total requests: {metrics['total_requests']}")
print(f"Allowed requests: {metrics['allowed_requests']}")
print(f"Blocked requests: {metrics['blocked_requests']}")
print(f"Block rate: {metrics['block_rate']:.2f}%")
print(f"Total violations: {metrics['total_violations']}")
print(f"Active rules: {metrics['active_rules']}")
print(f"Whitelisted count: {metrics['whitelisted_count']}")
print(f"Blacklisted count: {metrics['blacklisted_count']}")
# Recent violations
for violation in metrics['recent_violations']:
print(f" {violation.identifier} - {violation.rule_name}")
Reset Metrics
limiter.reset_metrics()
System Status
status = limiter.get_status()
print(f"Rules: {status['rules']}")
print(f"Storage type: {status['storage_type']}")
print(f"Available strategies: {status['strategies_available']}")
Error Handling
RateThrottle provides specific exceptions for different error conditions:
from ratethrottle.exceptions import (
RateLimitExceeded,
RuleNotFoundError,
ConfigurationError,
StorageError
)
try:
status = limiter.check_rate_limit("user123", "api_limit")
except RuleNotFoundError as e:
print(f"Rule not found: {e}")
except StorageError as e:
print(f"Storage error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Handling RateLimitExceeded
try:
status = limiter.check_rate_limit("user123", "api_limit")
if not status.allowed:
raise RateLimitExceeded(
message="Rate limit exceeded",
retry_after=status.retry_after,
limit=status.limit,
remaining=status.remaining,
reset_time=status.reset_time
)
except RateLimitExceeded as e:
# Return appropriate HTTP response
return {
'error': str(e),
'retry_after': e.retry_after,
'limit': e.limit,
'remaining': e.remaining
}, 429
Using with Context Managers
Clean up resources properly:
from ratethrottle import create_limiter, RateThrottleRule
import redis
# For Redis connections
redis_client = redis.from_url('redis://localhost:6379/0')
try:
limiter = create_limiter('redis', 'redis://localhost:6379/0')
rule = RateThrottleRule(name="api", limit=100, window=60)
limiter.add_rule(rule)
# Use limiter
status = limiter.check_rate_limit("user123", "api")
finally:
redis_client.close()
Parsing Rate Limit Strings
Use the helper function to parse rate limit strings:
from ratethrottle.helpers import parse_rate_limit
# Parse common formats
limit, window = parse_rate_limit("100/minute") # (100, 60)
limit, window = parse_rate_limit("5/second") # (5, 1)
limit, window = parse_rate_limit("1000/hour") # (1000, 3600)
limit, window = parse_rate_limit("10000/day") # (10000, 86400)
# Use in rule creation
limit, window = parse_rate_limit("50/minute")
rule = RateThrottleRule(
name="api_limit",
limit=limit,
window=window
)
Best Practices
Choose the Right Strategy - Use
token_bucketfor APIs that need burst support - Usesliding_windowfor smooth, consistent rate limiting - Usefixed_windowfor simple, high-performance scenarios - Useleaky_bucketfor constant-rate processingSet Appropriate Limits - Start conservative and adjust based on metrics - Consider different limits for authenticated vs anonymous users - Use shorter windows for sensitive endpoints
Monitor Metrics - Regularly check block rates and violations - Adjust limits based on actual usage patterns - Set up alerts for unusual activity
Use Redis for Production - Use in-memory storage only for development/testing - Redis ensures rate limits work across multiple servers - Configure Redis with appropriate persistence settings
Handle Errors Gracefully - Always catch and handle rate limit exceptions - Provide clear error messages to users - Include
Retry-Afterheaders in responses
Next Steps
Learn about Rate Limiting Strategies in detail
Set up Storage Backends backends
Explore Analytics and Reporting capabilities
Integrate with your framework: Flask Integration, FastAPI Integration, Django Integration
Checkout protocls reference WebSocket Rate Limiting, GraphQL Rate Limiting, GRPC Rate Limiting