GRPC Rate Limiting

Overview

RateThrottle supports comprehensive gRPC rate limiting with:

  • Server interceptors - Global rate limiting for all RPCs

  • Method decorators - Per-method rate limits

  • Service-level limits - Different limits per service

  • Stream support - Unary, server streaming, client streaming, bidirectional

  • Concurrent request limiting - Prevent resource exhaustion

  • Custom metadata extraction - Use user IDs, API keys, etc.

Installation

# Install with gRPC support
pip install ratethrottle[grpc]

Quick Start

1. Basic Server with Rate Limiting

import grpc
from concurrent import futures
from ratethrottle import GRPCRateLimitInterceptor, GRPCLimits

# Import your generated protobuf files
import user_pb2
import user_pb2_grpc

# Create rate limit interceptor
interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(
        requests_per_minute=100,        # Max 100 requests/min per client
        concurrent_requests=10,          # Max 10 concurrent requests per client
        stream_messages_per_minute=1000  # Max 1000 stream messages/min
    )
)

# Create gRPC server with rate limiting
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[interceptor]  # Add interceptor here
)

# Add your service
user_pb2_grpc.add_UserServiceServicer_to_server(
    UserServiceImpl(),
    server
)

# Start server
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server with rate limiting running on port 50051")
server.wait_for_termination()

Complete Examples

Example 1: User Service with Global Rate Limiting

# user_service.py
import grpc
from concurrent import futures
from ratethrottle import GRPCRateLimitInterceptor, GRPCLimits

# Protobuf generated files
import user_pb2
import user_pb2_grpc

class UserServiceImpl(user_pb2_grpc.UserServiceServicer):
    """User service implementation"""

    def GetUser(self, request, context):
        """Get user by ID (unary RPC)"""
        print(f"GetUser called for user_id: {request.user_id}")

        # Your business logic here
        user = get_user_from_db(request.user_id)

        return user_pb2.User(
            id=user['id'],
            name=user['name'],
            email=user['email']
        )

    def ListUsers(self, request, context):
        """List users (server streaming RPC)"""
        print(f"ListUsers called with limit: {request.limit}")

        # Stream users
        users = get_users_from_db(limit=request.limit)
        for user in users:
            yield user_pb2.User(
                id=user['id'],
                name=user['name'],
                email=user['email']
            )

    def CreateUser(self, request, context):
        """Create new user (unary RPC)"""
        print(f"CreateUser called: {request.name}")

        user_id = create_user_in_db(request.name, request.email)

        return user_pb2.User(
            id=user_id,
            name=request.name,
            email=request.email
        )


def serve():
    """Start gRPC server with rate limiting"""

    # Create rate limiter
    interceptor = GRPCRateLimitInterceptor(
        GRPCLimits(
            requests_per_minute=100,
            concurrent_requests=10,
            stream_messages_per_minute=1000
        )
    )

    # Create server
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=[interceptor]
    )

    # Add service
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceImpl(),
        server
    )

    # Start
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051")

    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

Example 2: Per-Method Rate Limiting

# Different limits for different methods
from ratethrottle import grpc_ratelimit
import user_pb2
import user_pb2_grpc

class UserServiceImpl(user_pb2_grpc.UserServiceServicer):
    """User service with method-specific rate limits"""

    @grpc_ratelimit(limit=10, window=60)
    def GetUser(self, request, context):
        """
        Get user - STRICT limit (10 req/min)
        Suitable for expensive operations
        """
        user = get_user_from_db(request.user_id)
        return user_pb2.User(
            id=user['id'],
            name=user['name'],
            email=user['email']
        )

    @grpc_ratelimit(limit=100, window=60)
    def ListUsers(self, request, context):
        """
        List users - MODERATE limit (100 req/min)
        Suitable for read operations
        """
        users = get_users_from_db(limit=request.limit)
        for user in users:
            yield user_pb2.User(
                id=user['id'],
                name=user['name'],
                email=user['email']
            )

    @grpc_ratelimit(limit=5, window=60)
    def CreateUser(self, request, context):
        """
        Create user - VERY STRICT limit (5 req/min)
        Suitable for write operations
        """
        user_id = create_user_in_db(request.name, request.email)
        return user_pb2.User(
            id=user_id,
            name=request.name,
            email=request.email
        )

    @grpc_ratelimit(limit=1000, window=60)
    def SearchUsers(self, request, context):
        """
        Search users - GENEROUS limit (1000 req/min)
        Suitable for lightweight operations
        """
        users = search_users(request.query)
        for user in users:
            yield user_pb2.User(
                id=user['id'],
                name=user['name'],
                email=user['email']
            )

# No need for interceptor when using decorators!
# But you can combine both for defense in depth
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10)
)

user_pb2_grpc.add_UserServiceServicer_to_server(
    UserServiceImpl(),
    server
)

server.add_insecure_port('[::]:50051')
server.start()

Example 3: Custom Client ID Extraction

# Extract user ID from metadata instead of IP
from ratethrottle import (
    GRPCRateLimitInterceptor,
    GRPCLimits,
    extract_user_id_from_metadata
)

# Option 1: Use built-in helper
extractor = extract_user_id_from_metadata('x-user-id')

interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=100),
    extract_client_id=extractor
)


# Option 2: Custom extraction logic
def custom_extract_client_id(context):
    """Extract client ID from API key"""
    metadata = dict(context.invocation_metadata())

    # Try API key first
    api_key = metadata.get('x-api-key', '')
    if api_key:
        # Look up user from API key
        user_id = get_user_from_api_key(api_key)
        if user_id:
            return f"user_{user_id}"

    # Try user ID header
    user_id = metadata.get('x-user-id', '')
    if user_id:
        return f"user_{user_id}"

    # Fallback to IP address
    peer = context.peer()
    if peer and ':' in peer:
        parts = peer.split(':')
        if len(parts) >= 2:
            return f"ip_{parts[1]}"

    return 'anonymous'

interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=100),
    extract_client_id=custom_extract_client_id
)

Example 4: Per-Service Rate Limits

# Different limits for different services
from ratethrottle import GRPCRateLimitInterceptor, GRPCLimits

# Define service-specific limits
method_limits = {
    'GetUser': GRPCLimits(requests_per_minute=100),
    'CreateUser': GRPCLimits(requests_per_minute=10),
    'DeleteUser': GRPCLimits(requests_per_minute=5),
    'ListUsers': GRPCLimits(requests_per_minute=50),
}

interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=100),  # Default
    method_limits=method_limits            # Per-method overrides
)

server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[interceptor]
)

Example 5: Multiple Services with Different Limits

import grpc
from concurrent import futures
from ratethrottle import GRPCRateLimitInterceptor, GRPCLimits

# Import multiple services
import user_pb2_grpc
import product_pb2_grpc
import order_pb2_grpc

# Create interceptor with method-specific limits
method_limits = {
    # User service - moderate limits
    'GetUser': GRPCLimits(requests_per_minute=100),
    'CreateUser': GRPCLimits(requests_per_minute=10),

    # Product service - generous limits (read-heavy)
    'GetProduct': GRPCLimits(requests_per_minute=500),
    'ListProducts': GRPCLimits(requests_per_minute=200),

    # Order service - strict limits (critical operations)
    'CreateOrder': GRPCLimits(requests_per_minute=5),
    'CancelOrder': GRPCLimits(requests_per_minute=10),
}

interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=100),  # Default
    method_limits=method_limits
)

# Create server
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[interceptor]
)

# Add all services
user_pb2_grpc.add_UserServiceServicer_to_server(
    UserServiceImpl(), server
)
product_pb2_grpc.add_ProductServiceServicer_to_server(
    ProductServiceImpl(), server
)
order_pb2_grpc.add_OrderServiceServicer_to_server(
    OrderServiceImpl(), server
)

server.add_insecure_port('[::]:50051')
server.start()

Example 6: Streaming RPCs

class ChatServiceImpl(chat_pb2_grpc.ChatServiceServicer):
    """Chat service with streaming"""

    @grpc_ratelimit(limit=100, window=60)
    def SendMessage(self, request_iterator, context):
        """
        Client streaming - receive messages
        Rate limited: 100 requests/min
        """
        for message in request_iterator:
            print(f"Received: {message.text}")
            save_message(message)

        return chat_pb2.SendResponse(success=True)

    @grpc_ratelimit(limit=1000, window=60)
    def GetMessages(self, request, context):
        """
        Server streaming - send messages
        Rate limited: 1000 messages/min total
        """
        messages = get_messages_from_db(request.chat_id)
        for msg in messages:
            yield chat_pb2.Message(
                id=msg['id'],
                text=msg['text'],
                timestamp=msg['timestamp']
            )

    @grpc_ratelimit(limit=500, window=60)
    def Chat(self, request_iterator, context):
        """
        Bidirectional streaming - live chat
        Rate limited: 500 messages/min
        """
        for message in request_iterator:
            # Process incoming message
            response = process_message(message)

            # Send response
            yield chat_pb2.Message(
                id=response['id'],
                text=response['text'],
                timestamp=response['timestamp']
            )

Example 7: Violation Callbacks

def on_rate_limit_violation(violation_info):
    """Handle rate limit violations"""
    print(f"Rate limit violation: {violation_info}")

    violation_type = violation_info['type']
    client_id = violation_info['client_id']
    method = violation_info.get('method', 'unknown')

    # Log to database
    log_violation({
        'client_id': client_id,
        'method': method,
        'type': violation_type,
        'timestamp': time.time()
    })

    # Send alert for repeated violations
    violation_count = get_violation_count(client_id)
    if violation_count > 10:
        send_alert(f"Client {client_id} has {violation_count} violations")

    # Auto-ban abusive clients
    if violation_count > 50:
        ban_client(client_id, duration=3600)  # Ban for 1 hour

# Create interceptor with callback
interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=100),
    on_violation=on_rate_limit_violation
)

Example 8: Redis Storage for Distributed Systems

from ratethrottle import GRPCRateLimitInterceptor, GRPCLimits
from ratethrottle import RedisStorage

# Use Redis for shared rate limits across multiple servers
storage = RedisStorage('redis://localhost:6379/0')

interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=100),
    storage=storage
)

# Now rate limits are shared across all gRPC servers!
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[interceptor]
)

Client-Side Usage

Handling Rate Limit Errors

# client.py
import grpc
import user_pb2
import user_pb2_grpc

def call_grpc_with_retry(stub, request):
    """Call gRPC method with automatic retry on rate limit"""
    max_retries = 3

    for attempt in range(max_retries):
        try:
            response = stub.GetUser(request)
            return response

        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
                # Extract retry-after from metadata
                metadata = dict(e.trailing_metadata())
                retry_after = int(metadata.get('retry-after', 60))

                print(f"Rate limited. Retry after {retry_after}s")

                if attempt < max_retries - 1:
                    time.sleep(retry_after)
                    continue
                else:
                    raise
            else:
                raise

# Usage
channel = grpc.insecure_channel('localhost:50051')
stub = user_pb2_grpc.UserServiceStub(channel)

request = user_pb2.GetUserRequest(user_id=123)
response = call_grpc_with_retry(stub, request)
print(f"User: {response.name}")

Reading Rate Limit Headers

import grpc
import user_pb2
import user_pb2_grpc

def call_with_rate_limit_info(stub, request):
    """Call gRPC and show rate limit information"""

    # Set up call with metadata callback
    metadata_future = []

    def callback(metadata):
        metadata_future.append(metadata)

    # Make call
    try:
        response = stub.GetUser(request)

        # Check trailing metadata
        if metadata_future:
            metadata = dict(metadata_future[0])

            limit = metadata.get('x-ratelimit-limit', 'N/A')
            remaining = metadata.get('x-ratelimit-remaining', 'N/A')
            reset = metadata.get('x-ratelimit-reset', 'N/A')

            print(f"Rate Limit: {remaining}/{limit}")
            print(f"Resets at: {reset}")

        return response

    except grpc.RpcError as e:
        if e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
            metadata = dict(e.trailing_metadata())
            retry_after = metadata.get('retry-after', 'unknown')
            print(f"❌ Rate limit exceeded. Retry after {retry_after}s")
        raise

Configuration Examples

Public API (Strict)

GRPCLimits(
    requests_per_minute=60,          # 1 req/second
    concurrent_requests=5,            # Max 5 concurrent
    stream_messages_per_minute=600    # Max 10 messages/sec in streams
)

Authenticated API (Moderate)

GRPCLimits(
    requests_per_minute=300,          # 5 req/second
    concurrent_requests=20,           # Max 20 concurrent
    stream_messages_per_minute=3000   # Max 50 messages/sec in streams
)

Internal Services (Generous)

GRPCLimits(
    requests_per_minute=6000,         # 100 req/second
    concurrent_requests=100,          # Max 100 concurrent
    stream_messages_per_minute=60000  # Max 1000 messages/sec in streams
)

Monitoring & Statistics

# Get current statistics
interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=100)
)

# Later, get stats
stats = interceptor.get_statistics()
print(stats)
# {
#     'limits': {
#         'requests_per_minute': 100,
#         'concurrent_requests': 50,
#         'stream_messages_per_minute': 5000
#     },
#     'current_concurrent_requests': 15,
#     'unique_clients': 8,
#     'metrics': {
#         'total_requests': 1523,
#         'allowed_requests': 1498,
#         'blocked_requests': 25
#     }
# }

Protobuf Example

// user.proto
syntax = "proto3";

package user;

service UserService {
  // Unary RPC
  rpc GetUser(GetUserRequest) returns (User);

  // Server streaming RPC
  rpc ListUsers(ListUsersRequest) returns (stream User);

  // Client streaming RPC
  rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);

  // Bidirectional streaming RPC
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message GetUserRequest {
  int32 user_id = 1;
}

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
}

message ListUsersRequest {
  int32 limit = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
}

message CreateUsersResponse {
  int32 created_count = 1;
}

message ChatMessage {
  string text = 1;
  int64 timestamp = 2;
}

Generate Python code:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user.proto

Best Practices

1. Use Per-Method Limits for Fine Control

# Different operations need different limits
@grpc_ratelimit(limit=5, window=60)    # Write: strict
def CreateUser(self, request, context):
    pass

@grpc_ratelimit(limit=100, window=60)  # Read: generous
def GetUser(self, request, context):
    pass

2. Combine Global + Method Limits

# Global interceptor for baseline protection
interceptor = GRPCRateLimitInterceptor(
    GRPCLimits(requests_per_minute=1000)
)

# + Method decorators for specific limits
class UserService:
    @grpc_ratelimit(limit=10, window=60)  # Extra strict
    def DeleteUser(self, request, context):
        pass

3. Use Custom Metadata for Better Tracking

# Track by user ID instead of IP
extractor = extract_user_id_from_metadata('x-user-id')
interceptor = GRPCRateLimitInterceptor(
    extract_client_id=extractor
)

4. Monitor Violations

def on_violation(info):
    logger.warning(f"Rate limit: {info}")
    metrics.increment('rate_limit_violations')

interceptor = GRPCRateLimitInterceptor(
    on_violation=on_violation
)

Summary

  • Global rate limiting - Server interceptors

  • Method-specific limits - Decorators

  • Streaming support - All RPC types

  • Concurrent limiting - Prevent resource exhaustion

  • Custom extraction - User IDs, API keys, etc.

  • Distributed support - Redis storage

  • Monitoring - Statistics and callbacks

Next Steps