component_framework.adapters.django_ratelimit

Django rate limiting utilities for component endpoints.

Provides a token-bucket rate limiter backed by Django's cache framework (falls back to in-memory) plus a CBV mixin and an FBV decorator.

All over-limit responses are JSON (never redirects), making them safe for HTMX/fetch consumers.

  1"""Django rate limiting utilities for component endpoints.
  2
  3Provides a token-bucket rate limiter backed by Django's cache framework
  4(falls back to in-memory) plus a CBV mixin and an FBV decorator.
  5
  6All over-limit responses are JSON (never redirects), making them safe for
  7HTMX/fetch consumers.
  8"""
  9
 10from __future__ import annotations
 11
 12import functools
 13import math
 14import threading
 15import time
 16from collections.abc import Callable
 17
 18try:
 19    from django.http import HttpRequest, JsonResponse
 20except ImportError as e:
 21    from . import _require_extra
 22
 23    raise _require_extra("django", "django") from e
 24
 25# ---------------------------------------------------------------------------
 26# Rate-limit parse helpers
 27# ---------------------------------------------------------------------------
 28
 29_PERIOD_SECONDS: dict[str, int] = {
 30    "sec": 1,
 31    "second": 1,
 32    "min": 60,
 33    "minute": 60,
 34    "hour": 3600,
 35    "day": 86400,
 36}
 37
 38
 39def _parse_rate(rate: str) -> tuple[int, int]:
 40    """Parse a rate string like ``"60/min"`` into ``(limit, period_seconds)``.
 41
 42    Args:
 43        rate: A string in the format ``"N/period"`` where period is one of
 44              ``sec``, ``min``, ``hour``, or ``day``.
 45
 46    Returns:
 47        A ``(limit, period_seconds)`` tuple.
 48
 49    Raises:
 50        ValueError: If the rate string is malformed or the period is unknown.
 51    """
 52    try:
 53        count_str, period_str = rate.split("/", 1)
 54        count = int(count_str.strip())
 55        period = period_str.strip().lower()
 56        period_secs = _PERIOD_SECONDS.get(period)
 57        if period_secs is None:
 58            raise ValueError(f"Unknown period '{period}'. Use: {list(_PERIOD_SECONDS)}")
 59        return count, period_secs
 60    except (AttributeError, ValueError) as exc:
 61        raise ValueError(
 62            f"Invalid rate '{rate}'. Expected format: 'N/period' (e.g. '60/min')."
 63        ) from exc
 64
 65
 66# ---------------------------------------------------------------------------
 67# In-memory token-bucket implementation (fallback)
 68# ---------------------------------------------------------------------------
 69
 70
 71class _BucketState:
 72    """Internal state for a single token-bucket entry."""
 73
 74    __slots__ = ("tokens", "last_refill")
 75
 76    def __init__(self, tokens: float, last_refill: float) -> None:
 77        self.tokens = tokens
 78        self.last_refill = last_refill
 79
 80
 81class RateLimiter:
 82    """Thread-safe in-memory token bucket rate limiter.
 83
 84    This implementation is used as a fallback when Django's cache is
 85    unavailable or raises an exception.  For production use with multiple
 86    processes, prefer the Django cache backend (``LocMemCache``,
 87    ``RedisCache``, etc.).
 88
 89    Args:
 90        limit: Maximum number of tokens (requests) allowed per period.
 91        period: Length of the refill period in seconds.
 92    """
 93
 94    def __init__(self, limit: int, period: int) -> None:
 95        self._limit = limit
 96        self._period = period
 97        self._buckets: dict[str, _BucketState] = {}
 98        self._lock = threading.Lock()
 99
100    # ------------------------------------------------------------------
101    # Public API
102    # ------------------------------------------------------------------
103
104    def is_allowed(self, key: str) -> tuple[bool, int]:
105        """Check whether *key* is within the rate limit.
106
107        Returns:
108            A ``(allowed, retry_after)`` tuple where ``retry_after`` is the
109            number of whole seconds until the next token becomes available
110            (only meaningful when ``allowed`` is ``False``).
111        """
112        now = time.monotonic()
113        with self._lock:
114            state = self._buckets.get(key)
115            if state is None:
116                state = _BucketState(tokens=float(self._limit), last_refill=now)
117                self._buckets[key] = state
118
119            # Refill tokens proportional to elapsed time
120            elapsed = now - state.last_refill
121            refill = elapsed * (self._limit / self._period)
122            state.tokens = min(float(self._limit), state.tokens + refill)
123            state.last_refill = now
124
125            if state.tokens >= 1.0:
126                state.tokens -= 1.0
127                return True, 0
128            else:
129                # Seconds until the next whole token arrives
130                needed = 1.0 - state.tokens
131                retry_after = math.ceil(needed * self._period / self._limit)
132                return False, retry_after
133
134
135# ---------------------------------------------------------------------------
136# Django-cache-backed bucket helpers
137# ---------------------------------------------------------------------------
138
139# Module-level in-memory fallback (one per (limit, period) pair).
140_in_memory_limiters: dict[tuple[int, int], RateLimiter] = {}
141_in_memory_lock = threading.Lock()
142
143
144def _get_memory_limiter(limit: int, period: int) -> RateLimiter:
145    """Return (or create) the shared in-memory limiter for these parameters."""
146    key = (limit, period)
147    with _in_memory_lock:
148        if key not in _in_memory_limiters:
149            _in_memory_limiters[key] = RateLimiter(limit, period)
150        return _in_memory_limiters[key]
151
152
153def _check_rate_cache(cache_key: str, limit: int, period: int) -> tuple[bool, int]:
154    """Token-bucket check using Django's cache backend.
155
156    Stores ``(tokens, last_refill)`` in the cache.  Falls back to the
157    in-memory ``RateLimiter`` if the cache raises any exception.
158
159    Returns:
160        ``(allowed, retry_after)`` — same semantics as ``RateLimiter.is_allowed``.
161    """
162    try:
163        from django.core.cache import cache
164
165        now = time.time()
166        bucket_cache_key = f"ratelimit:{cache_key}"
167        state = cache.get(bucket_cache_key)
168
169        if state is None:
170            tokens: float = float(limit)
171            last_refill: float = now
172        else:
173            tokens, last_refill = state
174
175        # Refill
176        elapsed = now - last_refill
177        refill = elapsed * (limit / period)
178        tokens = min(float(limit), tokens + refill)
179        last_refill = now
180
181        if tokens >= 1.0:
182            tokens -= 1.0
183            cache.set(bucket_cache_key, (tokens, last_refill), timeout=period * 2)
184            return True, 0
185        else:
186            needed = 1.0 - tokens
187            retry_after = math.ceil(needed * period / limit)
188            cache.set(bucket_cache_key, (tokens, last_refill), timeout=period * 2)
189            return False, retry_after
190
191    except Exception:
192        # Cache unavailable — fall back to in-memory limiter
193        limiter = _get_memory_limiter(limit, period)
194        return limiter.is_allowed(cache_key)
195
196
197# ---------------------------------------------------------------------------
198# Key-building helpers
199# ---------------------------------------------------------------------------
200
201
202def _default_key(request: HttpRequest) -> str:
203    """Return a cache key based on user ID (auth) or IP address (anon)."""
204    user = getattr(request, "user", None)
205    if user is not None and getattr(user, "is_authenticated", False):
206        return f"user:{user.id}"
207    # Best-effort IP extraction
208    x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
209    if x_forwarded_for:
210        ip = x_forwarded_for.split(",")[0].strip()
211    else:
212        ip = request.META.get("REMOTE_ADDR", "unknown")
213    return f"ip:{ip}"
214
215
216# ---------------------------------------------------------------------------
217# CBV mixin
218# ---------------------------------------------------------------------------
219
220
221class RateLimitMixin:
222    """CBV mixin that applies token-bucket rate limiting to POST requests.
223
224    Attributes:
225        throttle_rate: Rate in ``"N/period"`` format (default ``"60/min"``).
226
227    Usage::
228
229        class MyComponentView(RateLimitMixin, ComponentView):
230            throttle_rate = "30/min"
231
232            def get_throttle_key(self, request):
233                # Optional: override to customise the bucket key
234                return f"endpoint:{request.path}:{super().get_throttle_key(request)}"
235    """
236
237    throttle_rate: str = "60/min"
238
239    def get_throttle_key(self, request: HttpRequest) -> str:
240        """Return the cache key used for this request's rate-limit bucket.
241
242        Defaults to ``user:<id>`` for authenticated requests and ``ip:<addr>``
243        for anonymous ones.  Override to implement custom partitioning.
244        """
245        return _default_key(request)
246
247    def check_throttle(self, request: HttpRequest) -> JsonResponse | None:
248        """Check whether the current request exceeds the configured rate limit.
249
250        Returns:
251            A 429 ``JsonResponse`` with ``{"error": ..., "retry_after": N}``
252            if the limit is exceeded, or ``None`` if the request is allowed.
253        """
254        limit, period = _parse_rate(self.throttle_rate)
255        key = self.get_throttle_key(request)
256        allowed, retry_after = _check_rate_cache(key, limit, period)
257        if not allowed:
258            response = JsonResponse(
259                {"error": "Rate limit exceeded", "retry_after": retry_after},
260                status=429,
261            )
262            response["Retry-After"] = str(retry_after)
263            return response
264        return None
265
266    def post(self, request: HttpRequest, *args, **kwargs):
267        """Intercept POST to apply throttle check before normal dispatch."""
268        throttle_response = self.check_throttle(request)
269        if throttle_response is not None:
270            return throttle_response
271        return super().post(request, *args, **kwargs)  # type: ignore[unresolved-attribute]
272
273
274# ---------------------------------------------------------------------------
275# FBV decorator
276# ---------------------------------------------------------------------------
277
278
279def rate_limit_component(
280    rate: str,
281    key_func: Callable[[HttpRequest], str] | None = None,
282):
283    """Decorator factory that rate-limits a component view function.
284
285    Args:
286        rate: Allowed request rate in ``"N/period"`` format
287              (e.g. ``"60/min"``, ``"100/hour"``).
288        key_func: Optional callable ``(request) -> str`` that returns the
289                  bucket key.  Defaults to user ID for authenticated requests
290                  and client IP for anonymous ones.
291
292    Returns:
293        A decorator that wraps the view function.
294
295    Usage::
296
297        @rate_limit_component("30/min")
298        def my_view(request, name):
299            ...
300
301        # With a custom key:
302        @rate_limit_component("30/min", key_func=lambda r: r.session.session_key or "anon")
303        def my_view(request, name):
304            ...
305    """
306    limit, period = _parse_rate(rate)
307    resolve_key: Callable[[HttpRequest], str] = key_func or _default_key
308
309    def decorator(view_func: Callable) -> Callable:
310        @functools.wraps(view_func)
311        def wrapper(request: HttpRequest, *args, **kwargs):
312            key = resolve_key(request)
313            allowed, retry_after = _check_rate_cache(key, limit, period)
314            if not allowed:
315                response = JsonResponse(
316                    {"error": "Rate limit exceeded", "retry_after": retry_after},
317                    status=429,
318                )
319                response["Retry-After"] = str(retry_after)
320                return response
321            return view_func(request, *args, **kwargs)
322
323        return wrapper
324
325    return decorator
class RateLimiter:
 82class RateLimiter:
 83    """Thread-safe in-memory token bucket rate limiter.
 84
 85    This implementation is used as a fallback when Django's cache is
 86    unavailable or raises an exception.  For production use with multiple
 87    processes, prefer the Django cache backend (``LocMemCache``,
 88    ``RedisCache``, etc.).
 89
 90    Args:
 91        limit: Maximum number of tokens (requests) allowed per period.
 92        period: Length of the refill period in seconds.
 93    """
 94
 95    def __init__(self, limit: int, period: int) -> None:
 96        self._limit = limit
 97        self._period = period
 98        self._buckets: dict[str, _BucketState] = {}
 99        self._lock = threading.Lock()
100
101    # ------------------------------------------------------------------
102    # Public API
103    # ------------------------------------------------------------------
104
105    def is_allowed(self, key: str) -> tuple[bool, int]:
106        """Check whether *key* is within the rate limit.
107
108        Returns:
109            A ``(allowed, retry_after)`` tuple where ``retry_after`` is the
110            number of whole seconds until the next token becomes available
111            (only meaningful when ``allowed`` is ``False``).
112        """
113        now = time.monotonic()
114        with self._lock:
115            state = self._buckets.get(key)
116            if state is None:
117                state = _BucketState(tokens=float(self._limit), last_refill=now)
118                self._buckets[key] = state
119
120            # Refill tokens proportional to elapsed time
121            elapsed = now - state.last_refill
122            refill = elapsed * (self._limit / self._period)
123            state.tokens = min(float(self._limit), state.tokens + refill)
124            state.last_refill = now
125
126            if state.tokens >= 1.0:
127                state.tokens -= 1.0
128                return True, 0
129            else:
130                # Seconds until the next whole token arrives
131                needed = 1.0 - state.tokens
132                retry_after = math.ceil(needed * self._period / self._limit)
133                return False, retry_after

Thread-safe in-memory token bucket rate limiter.

This implementation is used as a fallback when Django's cache is unavailable or raises an exception. For production use with multiple processes, prefer the Django cache backend (LocMemCache, RedisCache, etc.).

Arguments:
  • limit: Maximum number of tokens (requests) allowed per period.
  • period: Length of the refill period in seconds.
RateLimiter(limit: int, period: int)
95    def __init__(self, limit: int, period: int) -> None:
96        self._limit = limit
97        self._period = period
98        self._buckets: dict[str, _BucketState] = {}
99        self._lock = threading.Lock()
def is_allowed(self, key: str) -> tuple[bool, int]:
105    def is_allowed(self, key: str) -> tuple[bool, int]:
106        """Check whether *key* is within the rate limit.
107
108        Returns:
109            A ``(allowed, retry_after)`` tuple where ``retry_after`` is the
110            number of whole seconds until the next token becomes available
111            (only meaningful when ``allowed`` is ``False``).
112        """
113        now = time.monotonic()
114        with self._lock:
115            state = self._buckets.get(key)
116            if state is None:
117                state = _BucketState(tokens=float(self._limit), last_refill=now)
118                self._buckets[key] = state
119
120            # Refill tokens proportional to elapsed time
121            elapsed = now - state.last_refill
122            refill = elapsed * (self._limit / self._period)
123            state.tokens = min(float(self._limit), state.tokens + refill)
124            state.last_refill = now
125
126            if state.tokens >= 1.0:
127                state.tokens -= 1.0
128                return True, 0
129            else:
130                # Seconds until the next whole token arrives
131                needed = 1.0 - state.tokens
132                retry_after = math.ceil(needed * self._period / self._limit)
133                return False, retry_after

Check whether key is within the rate limit.

Returns:

A (allowed, retry_after) tuple where retry_after is the number of whole seconds until the next token becomes available (only meaningful when allowed is False).

class RateLimitMixin:
222class RateLimitMixin:
223    """CBV mixin that applies token-bucket rate limiting to POST requests.
224
225    Attributes:
226        throttle_rate: Rate in ``"N/period"`` format (default ``"60/min"``).
227
228    Usage::
229
230        class MyComponentView(RateLimitMixin, ComponentView):
231            throttle_rate = "30/min"
232
233            def get_throttle_key(self, request):
234                # Optional: override to customise the bucket key
235                return f"endpoint:{request.path}:{super().get_throttle_key(request)}"
236    """
237
238    throttle_rate: str = "60/min"
239
240    def get_throttle_key(self, request: HttpRequest) -> str:
241        """Return the cache key used for this request's rate-limit bucket.
242
243        Defaults to ``user:<id>`` for authenticated requests and ``ip:<addr>``
244        for anonymous ones.  Override to implement custom partitioning.
245        """
246        return _default_key(request)
247
248    def check_throttle(self, request: HttpRequest) -> JsonResponse | None:
249        """Check whether the current request exceeds the configured rate limit.
250
251        Returns:
252            A 429 ``JsonResponse`` with ``{"error": ..., "retry_after": N}``
253            if the limit is exceeded, or ``None`` if the request is allowed.
254        """
255        limit, period = _parse_rate(self.throttle_rate)
256        key = self.get_throttle_key(request)
257        allowed, retry_after = _check_rate_cache(key, limit, period)
258        if not allowed:
259            response = JsonResponse(
260                {"error": "Rate limit exceeded", "retry_after": retry_after},
261                status=429,
262            )
263            response["Retry-After"] = str(retry_after)
264            return response
265        return None
266
267    def post(self, request: HttpRequest, *args, **kwargs):
268        """Intercept POST to apply throttle check before normal dispatch."""
269        throttle_response = self.check_throttle(request)
270        if throttle_response is not None:
271            return throttle_response
272        return super().post(request, *args, **kwargs)  # type: ignore[unresolved-attribute]

CBV mixin that applies token-bucket rate limiting to POST requests.

Attributes:
  • throttle_rate: Rate in "N/period" format (default "60/min").

Usage::

class MyComponentView(RateLimitMixin, ComponentView):
    throttle_rate = "30/min"

    def get_throttle_key(self, request):
        # Optional: override to customise the bucket key
        return f"endpoint:{request.path}:{super().get_throttle_key(request)}"
throttle_rate: str = '60/min'
def get_throttle_key(self, request: django.http.request.HttpRequest) -> str:
240    def get_throttle_key(self, request: HttpRequest) -> str:
241        """Return the cache key used for this request's rate-limit bucket.
242
243        Defaults to ``user:<id>`` for authenticated requests and ``ip:<addr>``
244        for anonymous ones.  Override to implement custom partitioning.
245        """
246        return _default_key(request)

Return the cache key used for this request's rate-limit bucket.

Defaults to user:<id> for authenticated requests and ip:<addr> for anonymous ones. Override to implement custom partitioning.

def check_throttle( self, request: django.http.request.HttpRequest) -> django.http.response.JsonResponse | None:
248    def check_throttle(self, request: HttpRequest) -> JsonResponse | None:
249        """Check whether the current request exceeds the configured rate limit.
250
251        Returns:
252            A 429 ``JsonResponse`` with ``{"error": ..., "retry_after": N}``
253            if the limit is exceeded, or ``None`` if the request is allowed.
254        """
255        limit, period = _parse_rate(self.throttle_rate)
256        key = self.get_throttle_key(request)
257        allowed, retry_after = _check_rate_cache(key, limit, period)
258        if not allowed:
259            response = JsonResponse(
260                {"error": "Rate limit exceeded", "retry_after": retry_after},
261                status=429,
262            )
263            response["Retry-After"] = str(retry_after)
264            return response
265        return None

Check whether the current request exceeds the configured rate limit.

Returns:

A 429 JsonResponse with {"error": ..., "retry_after": N} if the limit is exceeded, or None if the request is allowed.

def post(self, request: django.http.request.HttpRequest, *args, **kwargs):
267    def post(self, request: HttpRequest, *args, **kwargs):
268        """Intercept POST to apply throttle check before normal dispatch."""
269        throttle_response = self.check_throttle(request)
270        if throttle_response is not None:
271            return throttle_response
272        return super().post(request, *args, **kwargs)  # type: ignore[unresolved-attribute]

Intercept POST to apply throttle check before normal dispatch.

def rate_limit_component( rate: str, key_func: Callable[[django.http.request.HttpRequest], str] | None = None):
280def rate_limit_component(
281    rate: str,
282    key_func: Callable[[HttpRequest], str] | None = None,
283):
284    """Decorator factory that rate-limits a component view function.
285
286    Args:
287        rate: Allowed request rate in ``"N/period"`` format
288              (e.g. ``"60/min"``, ``"100/hour"``).
289        key_func: Optional callable ``(request) -> str`` that returns the
290                  bucket key.  Defaults to user ID for authenticated requests
291                  and client IP for anonymous ones.
292
293    Returns:
294        A decorator that wraps the view function.
295
296    Usage::
297
298        @rate_limit_component("30/min")
299        def my_view(request, name):
300            ...
301
302        # With a custom key:
303        @rate_limit_component("30/min", key_func=lambda r: r.session.session_key or "anon")
304        def my_view(request, name):
305            ...
306    """
307    limit, period = _parse_rate(rate)
308    resolve_key: Callable[[HttpRequest], str] = key_func or _default_key
309
310    def decorator(view_func: Callable) -> Callable:
311        @functools.wraps(view_func)
312        def wrapper(request: HttpRequest, *args, **kwargs):
313            key = resolve_key(request)
314            allowed, retry_after = _check_rate_cache(key, limit, period)
315            if not allowed:
316                response = JsonResponse(
317                    {"error": "Rate limit exceeded", "retry_after": retry_after},
318                    status=429,
319                )
320                response["Retry-After"] = str(retry_after)
321                return response
322            return view_func(request, *args, **kwargs)
323
324        return wrapper
325
326    return decorator

Decorator factory that rate-limits a component view function.

Arguments:
  • rate: Allowed request rate in "N/period" format (e.g. "60/min", "100/hour").
  • key_func: Optional callable (request) -> str that returns the bucket key. Defaults to user ID for authenticated requests and client IP for anonymous ones.
Returns:

A decorator that wraps the view function.

Usage::

@rate_limit_component("30/min")
def my_view(request, name):
    ...

# With a custom key:
@rate_limit_component("30/min", key_func=lambda r: r.session.session_key or "anon")
def my_view(request, name):
    ...