component_framework.adapters.django_ratelimit
Django rate limiting utilities for component endpoints.
Provides a token-bucket rate limiter backed by Django's cache framework (falls back to in-memory) plus a CBV mixin and an FBV decorator.
All over-limit responses are JSON (never redirects), making them safe for HTMX/fetch consumers.
1"""Django rate limiting utilities for component endpoints. 2 3Provides a token-bucket rate limiter backed by Django's cache framework 4(falls back to in-memory) plus a CBV mixin and an FBV decorator. 5 6All over-limit responses are JSON (never redirects), making them safe for 7HTMX/fetch consumers. 8""" 9 10from __future__ import annotations 11 12import functools 13import math 14import threading 15import time 16from collections.abc import Callable 17 18try: 19 from django.http import HttpRequest, JsonResponse 20except ImportError as e: 21 from . import _require_extra 22 23 raise _require_extra("django", "django") from e 24 25# --------------------------------------------------------------------------- 26# Rate-limit parse helpers 27# --------------------------------------------------------------------------- 28 29_PERIOD_SECONDS: dict[str, int] = { 30 "sec": 1, 31 "second": 1, 32 "min": 60, 33 "minute": 60, 34 "hour": 3600, 35 "day": 86400, 36} 37 38 39def _parse_rate(rate: str) -> tuple[int, int]: 40 """Parse a rate string like ``"60/min"`` into ``(limit, period_seconds)``. 41 42 Args: 43 rate: A string in the format ``"N/period"`` where period is one of 44 ``sec``, ``min``, ``hour``, or ``day``. 45 46 Returns: 47 A ``(limit, period_seconds)`` tuple. 48 49 Raises: 50 ValueError: If the rate string is malformed or the period is unknown. 51 """ 52 try: 53 count_str, period_str = rate.split("/", 1) 54 count = int(count_str.strip()) 55 period = period_str.strip().lower() 56 period_secs = _PERIOD_SECONDS.get(period) 57 if period_secs is None: 58 raise ValueError(f"Unknown period '{period}'. Use: {list(_PERIOD_SECONDS)}") 59 return count, period_secs 60 except (AttributeError, ValueError) as exc: 61 raise ValueError( 62 f"Invalid rate '{rate}'. Expected format: 'N/period' (e.g. '60/min')." 63 ) from exc 64 65 66# --------------------------------------------------------------------------- 67# In-memory token-bucket implementation (fallback) 68# --------------------------------------------------------------------------- 69 70 71class _BucketState: 72 """Internal state for a single token-bucket entry.""" 73 74 __slots__ = ("tokens", "last_refill") 75 76 def __init__(self, tokens: float, last_refill: float) -> None: 77 self.tokens = tokens 78 self.last_refill = last_refill 79 80 81class RateLimiter: 82 """Thread-safe in-memory token bucket rate limiter. 83 84 This implementation is used as a fallback when Django's cache is 85 unavailable or raises an exception. For production use with multiple 86 processes, prefer the Django cache backend (``LocMemCache``, 87 ``RedisCache``, etc.). 88 89 Args: 90 limit: Maximum number of tokens (requests) allowed per period. 91 period: Length of the refill period in seconds. 92 """ 93 94 def __init__(self, limit: int, period: int) -> None: 95 self._limit = limit 96 self._period = period 97 self._buckets: dict[str, _BucketState] = {} 98 self._lock = threading.Lock() 99 100 # ------------------------------------------------------------------ 101 # Public API 102 # ------------------------------------------------------------------ 103 104 def is_allowed(self, key: str) -> tuple[bool, int]: 105 """Check whether *key* is within the rate limit. 106 107 Returns: 108 A ``(allowed, retry_after)`` tuple where ``retry_after`` is the 109 number of whole seconds until the next token becomes available 110 (only meaningful when ``allowed`` is ``False``). 111 """ 112 now = time.monotonic() 113 with self._lock: 114 state = self._buckets.get(key) 115 if state is None: 116 state = _BucketState(tokens=float(self._limit), last_refill=now) 117 self._buckets[key] = state 118 119 # Refill tokens proportional to elapsed time 120 elapsed = now - state.last_refill 121 refill = elapsed * (self._limit / self._period) 122 state.tokens = min(float(self._limit), state.tokens + refill) 123 state.last_refill = now 124 125 if state.tokens >= 1.0: 126 state.tokens -= 1.0 127 return True, 0 128 else: 129 # Seconds until the next whole token arrives 130 needed = 1.0 - state.tokens 131 retry_after = math.ceil(needed * self._period / self._limit) 132 return False, retry_after 133 134 135# --------------------------------------------------------------------------- 136# Django-cache-backed bucket helpers 137# --------------------------------------------------------------------------- 138 139# Module-level in-memory fallback (one per (limit, period) pair). 140_in_memory_limiters: dict[tuple[int, int], RateLimiter] = {} 141_in_memory_lock = threading.Lock() 142 143 144def _get_memory_limiter(limit: int, period: int) -> RateLimiter: 145 """Return (or create) the shared in-memory limiter for these parameters.""" 146 key = (limit, period) 147 with _in_memory_lock: 148 if key not in _in_memory_limiters: 149 _in_memory_limiters[key] = RateLimiter(limit, period) 150 return _in_memory_limiters[key] 151 152 153def _check_rate_cache(cache_key: str, limit: int, period: int) -> tuple[bool, int]: 154 """Token-bucket check using Django's cache backend. 155 156 Stores ``(tokens, last_refill)`` in the cache. Falls back to the 157 in-memory ``RateLimiter`` if the cache raises any exception. 158 159 Returns: 160 ``(allowed, retry_after)`` — same semantics as ``RateLimiter.is_allowed``. 161 """ 162 try: 163 from django.core.cache import cache 164 165 now = time.time() 166 bucket_cache_key = f"ratelimit:{cache_key}" 167 state = cache.get(bucket_cache_key) 168 169 if state is None: 170 tokens: float = float(limit) 171 last_refill: float = now 172 else: 173 tokens, last_refill = state 174 175 # Refill 176 elapsed = now - last_refill 177 refill = elapsed * (limit / period) 178 tokens = min(float(limit), tokens + refill) 179 last_refill = now 180 181 if tokens >= 1.0: 182 tokens -= 1.0 183 cache.set(bucket_cache_key, (tokens, last_refill), timeout=period * 2) 184 return True, 0 185 else: 186 needed = 1.0 - tokens 187 retry_after = math.ceil(needed * period / limit) 188 cache.set(bucket_cache_key, (tokens, last_refill), timeout=period * 2) 189 return False, retry_after 190 191 except Exception: 192 # Cache unavailable — fall back to in-memory limiter 193 limiter = _get_memory_limiter(limit, period) 194 return limiter.is_allowed(cache_key) 195 196 197# --------------------------------------------------------------------------- 198# Key-building helpers 199# --------------------------------------------------------------------------- 200 201 202def _default_key(request: HttpRequest) -> str: 203 """Return a cache key based on user ID (auth) or IP address (anon).""" 204 user = getattr(request, "user", None) 205 if user is not None and getattr(user, "is_authenticated", False): 206 return f"user:{user.id}" 207 # Best-effort IP extraction 208 x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") 209 if x_forwarded_for: 210 ip = x_forwarded_for.split(",")[0].strip() 211 else: 212 ip = request.META.get("REMOTE_ADDR", "unknown") 213 return f"ip:{ip}" 214 215 216# --------------------------------------------------------------------------- 217# CBV mixin 218# --------------------------------------------------------------------------- 219 220 221class RateLimitMixin: 222 """CBV mixin that applies token-bucket rate limiting to POST requests. 223 224 Attributes: 225 throttle_rate: Rate in ``"N/period"`` format (default ``"60/min"``). 226 227 Usage:: 228 229 class MyComponentView(RateLimitMixin, ComponentView): 230 throttle_rate = "30/min" 231 232 def get_throttle_key(self, request): 233 # Optional: override to customise the bucket key 234 return f"endpoint:{request.path}:{super().get_throttle_key(request)}" 235 """ 236 237 throttle_rate: str = "60/min" 238 239 def get_throttle_key(self, request: HttpRequest) -> str: 240 """Return the cache key used for this request's rate-limit bucket. 241 242 Defaults to ``user:<id>`` for authenticated requests and ``ip:<addr>`` 243 for anonymous ones. Override to implement custom partitioning. 244 """ 245 return _default_key(request) 246 247 def check_throttle(self, request: HttpRequest) -> JsonResponse | None: 248 """Check whether the current request exceeds the configured rate limit. 249 250 Returns: 251 A 429 ``JsonResponse`` with ``{"error": ..., "retry_after": N}`` 252 if the limit is exceeded, or ``None`` if the request is allowed. 253 """ 254 limit, period = _parse_rate(self.throttle_rate) 255 key = self.get_throttle_key(request) 256 allowed, retry_after = _check_rate_cache(key, limit, period) 257 if not allowed: 258 response = JsonResponse( 259 {"error": "Rate limit exceeded", "retry_after": retry_after}, 260 status=429, 261 ) 262 response["Retry-After"] = str(retry_after) 263 return response 264 return None 265 266 def post(self, request: HttpRequest, *args, **kwargs): 267 """Intercept POST to apply throttle check before normal dispatch.""" 268 throttle_response = self.check_throttle(request) 269 if throttle_response is not None: 270 return throttle_response 271 return super().post(request, *args, **kwargs) # type: ignore[unresolved-attribute] 272 273 274# --------------------------------------------------------------------------- 275# FBV decorator 276# --------------------------------------------------------------------------- 277 278 279def rate_limit_component( 280 rate: str, 281 key_func: Callable[[HttpRequest], str] | None = None, 282): 283 """Decorator factory that rate-limits a component view function. 284 285 Args: 286 rate: Allowed request rate in ``"N/period"`` format 287 (e.g. ``"60/min"``, ``"100/hour"``). 288 key_func: Optional callable ``(request) -> str`` that returns the 289 bucket key. Defaults to user ID for authenticated requests 290 and client IP for anonymous ones. 291 292 Returns: 293 A decorator that wraps the view function. 294 295 Usage:: 296 297 @rate_limit_component("30/min") 298 def my_view(request, name): 299 ... 300 301 # With a custom key: 302 @rate_limit_component("30/min", key_func=lambda r: r.session.session_key or "anon") 303 def my_view(request, name): 304 ... 305 """ 306 limit, period = _parse_rate(rate) 307 resolve_key: Callable[[HttpRequest], str] = key_func or _default_key 308 309 def decorator(view_func: Callable) -> Callable: 310 @functools.wraps(view_func) 311 def wrapper(request: HttpRequest, *args, **kwargs): 312 key = resolve_key(request) 313 allowed, retry_after = _check_rate_cache(key, limit, period) 314 if not allowed: 315 response = JsonResponse( 316 {"error": "Rate limit exceeded", "retry_after": retry_after}, 317 status=429, 318 ) 319 response["Retry-After"] = str(retry_after) 320 return response 321 return view_func(request, *args, **kwargs) 322 323 return wrapper 324 325 return decorator
82class RateLimiter: 83 """Thread-safe in-memory token bucket rate limiter. 84 85 This implementation is used as a fallback when Django's cache is 86 unavailable or raises an exception. For production use with multiple 87 processes, prefer the Django cache backend (``LocMemCache``, 88 ``RedisCache``, etc.). 89 90 Args: 91 limit: Maximum number of tokens (requests) allowed per period. 92 period: Length of the refill period in seconds. 93 """ 94 95 def __init__(self, limit: int, period: int) -> None: 96 self._limit = limit 97 self._period = period 98 self._buckets: dict[str, _BucketState] = {} 99 self._lock = threading.Lock() 100 101 # ------------------------------------------------------------------ 102 # Public API 103 # ------------------------------------------------------------------ 104 105 def is_allowed(self, key: str) -> tuple[bool, int]: 106 """Check whether *key* is within the rate limit. 107 108 Returns: 109 A ``(allowed, retry_after)`` tuple where ``retry_after`` is the 110 number of whole seconds until the next token becomes available 111 (only meaningful when ``allowed`` is ``False``). 112 """ 113 now = time.monotonic() 114 with self._lock: 115 state = self._buckets.get(key) 116 if state is None: 117 state = _BucketState(tokens=float(self._limit), last_refill=now) 118 self._buckets[key] = state 119 120 # Refill tokens proportional to elapsed time 121 elapsed = now - state.last_refill 122 refill = elapsed * (self._limit / self._period) 123 state.tokens = min(float(self._limit), state.tokens + refill) 124 state.last_refill = now 125 126 if state.tokens >= 1.0: 127 state.tokens -= 1.0 128 return True, 0 129 else: 130 # Seconds until the next whole token arrives 131 needed = 1.0 - state.tokens 132 retry_after = math.ceil(needed * self._period / self._limit) 133 return False, retry_after
Thread-safe in-memory token bucket rate limiter.
This implementation is used as a fallback when Django's cache is
unavailable or raises an exception. For production use with multiple
processes, prefer the Django cache backend (LocMemCache,
RedisCache, etc.).
Arguments:
- limit: Maximum number of tokens (requests) allowed per period.
- period: Length of the refill period in seconds.
105 def is_allowed(self, key: str) -> tuple[bool, int]: 106 """Check whether *key* is within the rate limit. 107 108 Returns: 109 A ``(allowed, retry_after)`` tuple where ``retry_after`` is the 110 number of whole seconds until the next token becomes available 111 (only meaningful when ``allowed`` is ``False``). 112 """ 113 now = time.monotonic() 114 with self._lock: 115 state = self._buckets.get(key) 116 if state is None: 117 state = _BucketState(tokens=float(self._limit), last_refill=now) 118 self._buckets[key] = state 119 120 # Refill tokens proportional to elapsed time 121 elapsed = now - state.last_refill 122 refill = elapsed * (self._limit / self._period) 123 state.tokens = min(float(self._limit), state.tokens + refill) 124 state.last_refill = now 125 126 if state.tokens >= 1.0: 127 state.tokens -= 1.0 128 return True, 0 129 else: 130 # Seconds until the next whole token arrives 131 needed = 1.0 - state.tokens 132 retry_after = math.ceil(needed * self._period / self._limit) 133 return False, retry_after
Check whether key is within the rate limit.
Returns:
A
(allowed, retry_after)tuple whereretry_afteris the number of whole seconds until the next token becomes available (only meaningful whenallowedisFalse).
222class RateLimitMixin: 223 """CBV mixin that applies token-bucket rate limiting to POST requests. 224 225 Attributes: 226 throttle_rate: Rate in ``"N/period"`` format (default ``"60/min"``). 227 228 Usage:: 229 230 class MyComponentView(RateLimitMixin, ComponentView): 231 throttle_rate = "30/min" 232 233 def get_throttle_key(self, request): 234 # Optional: override to customise the bucket key 235 return f"endpoint:{request.path}:{super().get_throttle_key(request)}" 236 """ 237 238 throttle_rate: str = "60/min" 239 240 def get_throttle_key(self, request: HttpRequest) -> str: 241 """Return the cache key used for this request's rate-limit bucket. 242 243 Defaults to ``user:<id>`` for authenticated requests and ``ip:<addr>`` 244 for anonymous ones. Override to implement custom partitioning. 245 """ 246 return _default_key(request) 247 248 def check_throttle(self, request: HttpRequest) -> JsonResponse | None: 249 """Check whether the current request exceeds the configured rate limit. 250 251 Returns: 252 A 429 ``JsonResponse`` with ``{"error": ..., "retry_after": N}`` 253 if the limit is exceeded, or ``None`` if the request is allowed. 254 """ 255 limit, period = _parse_rate(self.throttle_rate) 256 key = self.get_throttle_key(request) 257 allowed, retry_after = _check_rate_cache(key, limit, period) 258 if not allowed: 259 response = JsonResponse( 260 {"error": "Rate limit exceeded", "retry_after": retry_after}, 261 status=429, 262 ) 263 response["Retry-After"] = str(retry_after) 264 return response 265 return None 266 267 def post(self, request: HttpRequest, *args, **kwargs): 268 """Intercept POST to apply throttle check before normal dispatch.""" 269 throttle_response = self.check_throttle(request) 270 if throttle_response is not None: 271 return throttle_response 272 return super().post(request, *args, **kwargs) # type: ignore[unresolved-attribute]
CBV mixin that applies token-bucket rate limiting to POST requests.
Attributes:
- throttle_rate: Rate in
"N/period"format (default"60/min").
Usage::
class MyComponentView(RateLimitMixin, ComponentView):
throttle_rate = "30/min"
def get_throttle_key(self, request):
# Optional: override to customise the bucket key
return f"endpoint:{request.path}:{super().get_throttle_key(request)}"
240 def get_throttle_key(self, request: HttpRequest) -> str: 241 """Return the cache key used for this request's rate-limit bucket. 242 243 Defaults to ``user:<id>`` for authenticated requests and ``ip:<addr>`` 244 for anonymous ones. Override to implement custom partitioning. 245 """ 246 return _default_key(request)
Return the cache key used for this request's rate-limit bucket.
Defaults to user:<id> for authenticated requests and ip:<addr>
for anonymous ones. Override to implement custom partitioning.
248 def check_throttle(self, request: HttpRequest) -> JsonResponse | None: 249 """Check whether the current request exceeds the configured rate limit. 250 251 Returns: 252 A 429 ``JsonResponse`` with ``{"error": ..., "retry_after": N}`` 253 if the limit is exceeded, or ``None`` if the request is allowed. 254 """ 255 limit, period = _parse_rate(self.throttle_rate) 256 key = self.get_throttle_key(request) 257 allowed, retry_after = _check_rate_cache(key, limit, period) 258 if not allowed: 259 response = JsonResponse( 260 {"error": "Rate limit exceeded", "retry_after": retry_after}, 261 status=429, 262 ) 263 response["Retry-After"] = str(retry_after) 264 return response 265 return None
Check whether the current request exceeds the configured rate limit.
Returns:
A 429
JsonResponsewith{"error": ..., "retry_after": N}if the limit is exceeded, orNoneif the request is allowed.
267 def post(self, request: HttpRequest, *args, **kwargs): 268 """Intercept POST to apply throttle check before normal dispatch.""" 269 throttle_response = self.check_throttle(request) 270 if throttle_response is not None: 271 return throttle_response 272 return super().post(request, *args, **kwargs) # type: ignore[unresolved-attribute]
Intercept POST to apply throttle check before normal dispatch.
280def rate_limit_component( 281 rate: str, 282 key_func: Callable[[HttpRequest], str] | None = None, 283): 284 """Decorator factory that rate-limits a component view function. 285 286 Args: 287 rate: Allowed request rate in ``"N/period"`` format 288 (e.g. ``"60/min"``, ``"100/hour"``). 289 key_func: Optional callable ``(request) -> str`` that returns the 290 bucket key. Defaults to user ID for authenticated requests 291 and client IP for anonymous ones. 292 293 Returns: 294 A decorator that wraps the view function. 295 296 Usage:: 297 298 @rate_limit_component("30/min") 299 def my_view(request, name): 300 ... 301 302 # With a custom key: 303 @rate_limit_component("30/min", key_func=lambda r: r.session.session_key or "anon") 304 def my_view(request, name): 305 ... 306 """ 307 limit, period = _parse_rate(rate) 308 resolve_key: Callable[[HttpRequest], str] = key_func or _default_key 309 310 def decorator(view_func: Callable) -> Callable: 311 @functools.wraps(view_func) 312 def wrapper(request: HttpRequest, *args, **kwargs): 313 key = resolve_key(request) 314 allowed, retry_after = _check_rate_cache(key, limit, period) 315 if not allowed: 316 response = JsonResponse( 317 {"error": "Rate limit exceeded", "retry_after": retry_after}, 318 status=429, 319 ) 320 response["Retry-After"] = str(retry_after) 321 return response 322 return view_func(request, *args, **kwargs) 323 324 return wrapper 325 326 return decorator
Decorator factory that rate-limits a component view function.
Arguments:
- rate: Allowed request rate in
"N/period"format (e.g."60/min","100/hour"). - key_func: Optional callable
(request) -> strthat returns the bucket key. Defaults to user ID for authenticated requests and client IP for anonymous ones.
Returns:
A decorator that wraps the view function.
Usage::
@rate_limit_component("30/min")
def my_view(request, name):
...
# With a custom key:
@rate_limit_component("30/min", key_func=lambda r: r.session.session_key or "anon")
def my_view(request, name):
...