component_framework.core

Core framework components.

 1"""Core framework components."""
 2
 3from .component import Component, ComponentError, EventNotFoundError, StateSerializer
 4from .composition import SlotRenderer, compose
 5from .form import FormComponent, ModelFormComponent
 6from .permissions import (
 7    AllowAny,
 8    BasePermission,
 9    DjangoModelPermission,
10    IsAuthenticated,
11    IsStaff,
12    IsSuperuser,
13)
14from .registry import ComponentRegistry, registry
15from .renderer import Renderer
16from .state import InMemoryStateStore, StateStore
17from .streaming import StreamingComponent, format_sse_frame
18from .websocket import ComponentWebSocketManager, WebSocketConnection, ws_manager
19
20__all__ = [
21    "Component",
22    "ComponentError",
23    "EventNotFoundError",
24    "StateSerializer",
25    "compose",
26    "SlotRenderer",
27    "FormComponent",
28    "ModelFormComponent",
29    "AllowAny",
30    "BasePermission",
31    "DjangoModelPermission",
32    "IsAuthenticated",
33    "IsStaff",
34    "IsSuperuser",
35    "ComponentRegistry",
36    "registry",
37    "Renderer",
38    "StateStore",
39    "InMemoryStateStore",
40    "StreamingComponent",
41    "format_sse_frame",
42    "ComponentWebSocketManager",
43    "WebSocketConnection",
44    "ws_manager",
45]
class Component:
 28class Component:
 29    """
 30    Base component class with lifecycle management.
 31
 32    Lifecycle:
 33        1. __init__(**params)
 34        2. mount() OR hydrate(state)
 35        3. handle_event(event, payload)
 36        4. before_render()
 37        5. render()
 38        6. dehydrate()
 39
 40    Slots:
 41        Components can declare named slots via the ``slots`` class variable.
 42        Child components are assigned to slots with ``fill_slot()`` and their
 43        rendered HTML is available in the template context under ``slots``.
 44    """
 45
 46    template_name: str | None = None
 47    renderer = None
 48    permission_classes: ClassVar[list[type["BasePermission"]]] = []
 49    slots: ClassVar[list[str]] = []
 50    """Slot names this component accepts. Empty list means *any* slot name is accepted."""
 51
 52    def __init__(self, **params):
 53        self.params = params
 54        self.state: dict[str, Any] = {}
 55        self.errors: dict[str, str] = {}
 56        self.id = params.get("component_id") or self._generate_id()
 57        self._mounted = False
 58        self._slot_components: dict[str, Component] = {}
 59
 60    # ---------- Lifecycle ----------
 61
 62    def _generate_id(self) -> str:
 63        """Generate unique component ID."""
 64        return f"component-{uuid4().hex[:8]}"
 65
 66    def mount(self):
 67        """Initialize component on first load. Override in subclasses."""
 68        self._mounted = True
 69
 70    def hydrate(self, state: dict):
 71        """Restore component from serialized state."""
 72        self.state.update(state)
 73        self._mounted = True
 74
 75    def dehydrate(self) -> dict:
 76        """Serialize component state for persistence."""
 77        return self.state.copy()
 78
 79    def before_render(self):
 80        """Called before rendering. Use for derived state computation."""
 81        pass
 82
 83    # ---------- Slots / Composition ----------
 84
 85    def fill_slot(self, slot_name: str, component: "Component") -> None:
 86        """
 87        Assign a child component to a named slot.
 88
 89        If the component declares specific ``slots``, *slot_name* must be one of
 90        them.  If ``slots`` is empty (the default), any name is accepted
 91        (permissive mode).
 92
 93        Args:
 94            slot_name: Target slot identifier.
 95            component: Child component instance to render in the slot.
 96
 97        Raises:
 98            ComponentError: If *slot_name* is not in the declared ``slots`` list.
 99        """
100        if self.slots and slot_name not in self.slots:
101            raise ComponentError(f"Unknown slot '{slot_name}'. Available: {self.slots}")
102        self._slot_components[slot_name] = component
103
104    def render_slots(self) -> dict[str, str]:
105        """
106        Render all filled slot components.
107
108        Returns:
109            Dict mapping slot names to their rendered HTML strings.
110        """
111        rendered: dict[str, str] = {}
112        for name, child in self._slot_components.items():
113            rendered[name] = child.render()
114        return rendered
115
116    # ---------- Optimistic UI ----------
117
118    def get_optimistic_patch(self, event: str, payload: dict) -> dict | None:
119        """
120        Return a partial state dict that the client can apply immediately before the server
121        responds, for the given event and payload.
122
123        This enables optimistic UI updates: the client shows the expected result right away
124        and reconciles with the actual server state once the response arrives. On error, the
125        client can roll back to the previous state.
126
127        Override in subclasses to enable optimistic updates for specific events. Return None
128        (the default) to disable optimistic updates for a given event.
129
130        Args:
131            event: The event name being dispatched (e.g., "increment").
132            payload: The event payload dict.
133
134        Returns:
135            A partial state dict to apply optimistically, or None to skip.
136
137        Example::
138
139            def get_optimistic_patch(self, event: str, payload: dict) -> dict | None:
140                if event == "increment":
141                    return {"count": self.state.get("count", 0) + payload.get("amount", 1)}
142                return None
143        """
144        return None
145
146    # ---------- Events ----------
147
148    def handle_event(self, event: str, payload: dict):
149        """
150        Route event to a **synchronous** handler method.
151
152        For async handlers (``async def on_*``), use :meth:`async_handle_event`
153        instead.  Calling this method with an async handler will raise
154        :class:`ComponentError`.
155
156        Args:
157            event: Event name (e.g., "increment")
158            payload: Event data
159
160        Raises:
161            EventNotFoundError: If handler not found
162            ComponentError: If handler raises exception or is async
163        """
164        handler = getattr(self, f"on_{event}", None)
165
166        if not handler:
167            raise EventNotFoundError(f"No handler for event: {event}")
168
169        if inspect.iscoroutinefunction(handler):
170            raise ComponentError(
171                f"Handler 'on_{event}' is async — use async_dispatch() or "
172                "async_handle_event() instead of the sync variants."
173            )
174
175        try:
176            handler(**payload)
177        except TypeError as e:
178            raise ComponentError(f"Invalid payload for {event}: {e}") from e
179        except Exception as e:
180            logger.exception(f"Error handling {event} in {self.__class__.__name__}")
181            raise ComponentError(f"Error handling {event}") from e
182
183    async def async_handle_event(self, event: str, payload: dict):
184        """
185        Route event to handler, awaiting if the handler is async.
186
187        Works with both ``def on_*`` and ``async def on_*`` handlers.
188
189        Args:
190            event: Event name (e.g., "increment")
191            payload: Event data
192
193        Raises:
194            EventNotFoundError: If handler not found
195            ComponentError: If handler raises exception
196        """
197        handler = getattr(self, f"on_{event}", None)
198
199        if not handler:
200            raise EventNotFoundError(f"No handler for event: {event}")
201
202        try:
203            result = handler(**payload)
204            if inspect.isawaitable(result):
205                await result
206        except TypeError as e:
207            raise ComponentError(f"Invalid payload for {event}: {e}") from e
208        except Exception as e:
209            logger.exception(f"Error handling {event} in {self.__class__.__name__}")
210            raise ComponentError(f"Error handling {event}") from e
211
212    # ---------- Rendering ----------
213
214    def get_context(self) -> dict:
215        """
216        Build template context. Does not expose full component.
217
218        Override to add custom context variables.
219        The returned dict always includes a ``slots`` key containing the
220        rendered HTML of any filled child components.
221        """
222        return {
223            "state": self.state,
224            "errors": self.errors,
225            "component_id": self.id,
226            "slots": self.render_slots(),
227        }
228
229    def render(self) -> str:
230        """Render component to HTML."""
231        if not self.renderer:
232            raise ComponentError("No renderer configured")
233
234        if not self.template_name:
235            raise ComponentError("No template_name specified")
236
237        self.before_render()
238
239        return self.renderer.render(
240            self.template_name,
241            self.get_context(),
242        )
243
244    # ---------- Dispatch ----------
245
246    def dispatch(
247        self,
248        event: str | None = None,
249        payload: dict | None = None,
250        state: dict | None = None,
251    ) -> dict:
252        """
253        Synchronous entry point for component execution.
254
255        For components with ``async def on_*`` handlers, use
256        :meth:`async_dispatch` instead.
257
258        Args:
259            event: Event name to handle
260            payload: Event data
261            state: Serialized state to restore
262
263        Returns:
264            Dict with 'html' and 'state' keys
265        """
266        try:
267            # Lifecycle: mount or hydrate
268            if state:
269                self.hydrate(state)
270            else:
271                self.mount()
272
273            # Handle event if provided
274            if event:
275                self.handle_event(event, payload or {})
276
277            # Render
278            html = self.render()
279
280            return {
281                "html": html,
282                "state": self.dehydrate(),
283                "component_id": self.id,
284                "slots": self.render_slots(),
285            }
286
287        except Exception:
288            logger.exception(f"Error in {self.__class__.__name__}.dispatch()")
289            raise
290
291    async def async_dispatch(
292        self,
293        event: str | None = None,
294        payload: dict | None = None,
295        state: dict | None = None,
296    ) -> dict:
297        """
298        Async entry point for component execution.
299
300        Works with both sync and async event handlers.  Use this from async
301        adapters (FastAPI, Litestar, WebSocket) to support ``async def on_*``
302        handlers.
303
304        Args:
305            event: Event name to handle
306            payload: Event data
307            state: Serialized state to restore
308
309        Returns:
310            Dict with 'html' and 'state' keys
311        """
312        try:
313            # Lifecycle: mount or hydrate
314            if state:
315                self.hydrate(state)
316            else:
317                self.mount()
318
319            # Handle event if provided
320            if event:
321                await self.async_handle_event(event, payload or {})
322
323            # Render
324            html = self.render()
325
326            return {
327                "html": html,
328                "state": self.dehydrate(),
329                "component_id": self.id,
330                "slots": self.render_slots(),
331            }
332
333        except Exception:
334            logger.exception(f"Error in {self.__class__.__name__}.async_dispatch()")
335            raise

Base component class with lifecycle management.

Lifecycle:
  1. __init__(**params)
  2. mount() OR hydrate(state)
  3. handle_event(event, payload)
  4. before_render()
  5. render()
  6. dehydrate()
Slots:

Components can declare named slots via the slots class variable. Child components are assigned to slots with fill_slot() and their rendered HTML is available in the template context under slots.

Component(**params)
52    def __init__(self, **params):
53        self.params = params
54        self.state: dict[str, Any] = {}
55        self.errors: dict[str, str] = {}
56        self.id = params.get("component_id") or self._generate_id()
57        self._mounted = False
58        self._slot_components: dict[str, Component] = {}
template_name: str | None = None
renderer = None
permission_classes: ClassVar[list[type[BasePermission]]] = []
slots: ClassVar[list[str]] = []

Slot names this component accepts. Empty list means any slot name is accepted.

params
state: dict[str, typing.Any]
errors: dict[str, str]
id
def mount(self):
66    def mount(self):
67        """Initialize component on first load. Override in subclasses."""
68        self._mounted = True

Initialize component on first load. Override in subclasses.

def hydrate(self, state: dict):
70    def hydrate(self, state: dict):
71        """Restore component from serialized state."""
72        self.state.update(state)
73        self._mounted = True

Restore component from serialized state.

def dehydrate(self) -> dict:
75    def dehydrate(self) -> dict:
76        """Serialize component state for persistence."""
77        return self.state.copy()

Serialize component state for persistence.

def before_render(self):
79    def before_render(self):
80        """Called before rendering. Use for derived state computation."""
81        pass

Called before rendering. Use for derived state computation.

def fill_slot( self, slot_name: str, component: Component) -> None:
 85    def fill_slot(self, slot_name: str, component: "Component") -> None:
 86        """
 87        Assign a child component to a named slot.
 88
 89        If the component declares specific ``slots``, *slot_name* must be one of
 90        them.  If ``slots`` is empty (the default), any name is accepted
 91        (permissive mode).
 92
 93        Args:
 94            slot_name: Target slot identifier.
 95            component: Child component instance to render in the slot.
 96
 97        Raises:
 98            ComponentError: If *slot_name* is not in the declared ``slots`` list.
 99        """
100        if self.slots and slot_name not in self.slots:
101            raise ComponentError(f"Unknown slot '{slot_name}'. Available: {self.slots}")
102        self._slot_components[slot_name] = component

Assign a child component to a named slot.

If the component declares specific slots, slot_name must be one of them. If slots is empty (the default), any name is accepted (permissive mode).

Arguments:
  • slot_name: Target slot identifier.
  • component: Child component instance to render in the slot.
Raises:
  • ComponentError: If slot_name is not in the declared slots list.
def render_slots(self) -> dict[str, str]:
104    def render_slots(self) -> dict[str, str]:
105        """
106        Render all filled slot components.
107
108        Returns:
109            Dict mapping slot names to their rendered HTML strings.
110        """
111        rendered: dict[str, str] = {}
112        for name, child in self._slot_components.items():
113            rendered[name] = child.render()
114        return rendered

Render all filled slot components.

Returns:

Dict mapping slot names to their rendered HTML strings.

def get_optimistic_patch(self, event: str, payload: dict) -> dict | None:
118    def get_optimistic_patch(self, event: str, payload: dict) -> dict | None:
119        """
120        Return a partial state dict that the client can apply immediately before the server
121        responds, for the given event and payload.
122
123        This enables optimistic UI updates: the client shows the expected result right away
124        and reconciles with the actual server state once the response arrives. On error, the
125        client can roll back to the previous state.
126
127        Override in subclasses to enable optimistic updates for specific events. Return None
128        (the default) to disable optimistic updates for a given event.
129
130        Args:
131            event: The event name being dispatched (e.g., "increment").
132            payload: The event payload dict.
133
134        Returns:
135            A partial state dict to apply optimistically, or None to skip.
136
137        Example::
138
139            def get_optimistic_patch(self, event: str, payload: dict) -> dict | None:
140                if event == "increment":
141                    return {"count": self.state.get("count", 0) + payload.get("amount", 1)}
142                return None
143        """
144        return None

Return a partial state dict that the client can apply immediately before the server responds, for the given event and payload.

This enables optimistic UI updates: the client shows the expected result right away and reconciles with the actual server state once the response arrives. On error, the client can roll back to the previous state.

Override in subclasses to enable optimistic updates for specific events. Return None (the default) to disable optimistic updates for a given event.

Arguments:
  • event: The event name being dispatched (e.g., "increment").
  • payload: The event payload dict.
Returns:

A partial state dict to apply optimistically, or None to skip.

Example::

def get_optimistic_patch(self, event: str, payload: dict) -> dict | None:
    if event == "increment":
        return {"count": self.state.get("count", 0) + payload.get("amount", 1)}
    return None
def handle_event(self, event: str, payload: dict):
148    def handle_event(self, event: str, payload: dict):
149        """
150        Route event to a **synchronous** handler method.
151
152        For async handlers (``async def on_*``), use :meth:`async_handle_event`
153        instead.  Calling this method with an async handler will raise
154        :class:`ComponentError`.
155
156        Args:
157            event: Event name (e.g., "increment")
158            payload: Event data
159
160        Raises:
161            EventNotFoundError: If handler not found
162            ComponentError: If handler raises exception or is async
163        """
164        handler = getattr(self, f"on_{event}", None)
165
166        if not handler:
167            raise EventNotFoundError(f"No handler for event: {event}")
168
169        if inspect.iscoroutinefunction(handler):
170            raise ComponentError(
171                f"Handler 'on_{event}' is async — use async_dispatch() or "
172                "async_handle_event() instead of the sync variants."
173            )
174
175        try:
176            handler(**payload)
177        except TypeError as e:
178            raise ComponentError(f"Invalid payload for {event}: {e}") from e
179        except Exception as e:
180            logger.exception(f"Error handling {event} in {self.__class__.__name__}")
181            raise ComponentError(f"Error handling {event}") from e

Route event to a synchronous handler method.

For async handlers (async def on_*), use async_handle_event() instead. Calling this method with an async handler will raise ComponentError.

Arguments:
  • event: Event name (e.g., "increment")
  • payload: Event data
Raises:
  • EventNotFoundError: If handler not found
  • ComponentError: If handler raises exception or is async
async def async_handle_event(self, event: str, payload: dict):
183    async def async_handle_event(self, event: str, payload: dict):
184        """
185        Route event to handler, awaiting if the handler is async.
186
187        Works with both ``def on_*`` and ``async def on_*`` handlers.
188
189        Args:
190            event: Event name (e.g., "increment")
191            payload: Event data
192
193        Raises:
194            EventNotFoundError: If handler not found
195            ComponentError: If handler raises exception
196        """
197        handler = getattr(self, f"on_{event}", None)
198
199        if not handler:
200            raise EventNotFoundError(f"No handler for event: {event}")
201
202        try:
203            result = handler(**payload)
204            if inspect.isawaitable(result):
205                await result
206        except TypeError as e:
207            raise ComponentError(f"Invalid payload for {event}: {e}") from e
208        except Exception as e:
209            logger.exception(f"Error handling {event} in {self.__class__.__name__}")
210            raise ComponentError(f"Error handling {event}") from e

Route event to handler, awaiting if the handler is async.

Works with both def on_* and async def on_* handlers.

Arguments:
  • event: Event name (e.g., "increment")
  • payload: Event data
Raises:
  • EventNotFoundError: If handler not found
  • ComponentError: If handler raises exception
def get_context(self) -> dict:
214    def get_context(self) -> dict:
215        """
216        Build template context. Does not expose full component.
217
218        Override to add custom context variables.
219        The returned dict always includes a ``slots`` key containing the
220        rendered HTML of any filled child components.
221        """
222        return {
223            "state": self.state,
224            "errors": self.errors,
225            "component_id": self.id,
226            "slots": self.render_slots(),
227        }

Build template context. Does not expose full component.

Override to add custom context variables. The returned dict always includes a slots key containing the rendered HTML of any filled child components.

def render(self) -> str:
229    def render(self) -> str:
230        """Render component to HTML."""
231        if not self.renderer:
232            raise ComponentError("No renderer configured")
233
234        if not self.template_name:
235            raise ComponentError("No template_name specified")
236
237        self.before_render()
238
239        return self.renderer.render(
240            self.template_name,
241            self.get_context(),
242        )

Render component to HTML.

def dispatch( self, event: str | None = None, payload: dict | None = None, state: dict | None = None) -> dict:
246    def dispatch(
247        self,
248        event: str | None = None,
249        payload: dict | None = None,
250        state: dict | None = None,
251    ) -> dict:
252        """
253        Synchronous entry point for component execution.
254
255        For components with ``async def on_*`` handlers, use
256        :meth:`async_dispatch` instead.
257
258        Args:
259            event: Event name to handle
260            payload: Event data
261            state: Serialized state to restore
262
263        Returns:
264            Dict with 'html' and 'state' keys
265        """
266        try:
267            # Lifecycle: mount or hydrate
268            if state:
269                self.hydrate(state)
270            else:
271                self.mount()
272
273            # Handle event if provided
274            if event:
275                self.handle_event(event, payload or {})
276
277            # Render
278            html = self.render()
279
280            return {
281                "html": html,
282                "state": self.dehydrate(),
283                "component_id": self.id,
284                "slots": self.render_slots(),
285            }
286
287        except Exception:
288            logger.exception(f"Error in {self.__class__.__name__}.dispatch()")
289            raise

Synchronous entry point for component execution.

For components with async def on_* handlers, use async_dispatch() instead.

Arguments:
  • event: Event name to handle
  • payload: Event data
  • state: Serialized state to restore
Returns:

Dict with 'html' and 'state' keys

async def async_dispatch( self, event: str | None = None, payload: dict | None = None, state: dict | None = None) -> dict:
291    async def async_dispatch(
292        self,
293        event: str | None = None,
294        payload: dict | None = None,
295        state: dict | None = None,
296    ) -> dict:
297        """
298        Async entry point for component execution.
299
300        Works with both sync and async event handlers.  Use this from async
301        adapters (FastAPI, Litestar, WebSocket) to support ``async def on_*``
302        handlers.
303
304        Args:
305            event: Event name to handle
306            payload: Event data
307            state: Serialized state to restore
308
309        Returns:
310            Dict with 'html' and 'state' keys
311        """
312        try:
313            # Lifecycle: mount or hydrate
314            if state:
315                self.hydrate(state)
316            else:
317                self.mount()
318
319            # Handle event if provided
320            if event:
321                await self.async_handle_event(event, payload or {})
322
323            # Render
324            html = self.render()
325
326            return {
327                "html": html,
328                "state": self.dehydrate(),
329                "component_id": self.id,
330                "slots": self.render_slots(),
331            }
332
333        except Exception:
334            logger.exception(f"Error in {self.__class__.__name__}.async_dispatch()")
335            raise

Async entry point for component execution.

Works with both sync and async event handlers. Use this from async adapters (FastAPI, Litestar, WebSocket) to support async def on_* handlers.

Arguments:
  • event: Event name to handle
  • payload: Event data
  • state: Serialized state to restore
Returns:

Dict with 'html' and 'state' keys

class ComponentError(builtins.Exception):
16class ComponentError(Exception):
17    """Base exception for component errors."""
18
19    pass

Base exception for component errors.

class EventNotFoundError(component_framework.core.ComponentError):
22class EventNotFoundError(ComponentError):
23    """Raised when an event handler is not found."""
24
25    pass

Raised when an event handler is not found.

class StateSerializer:
338class StateSerializer:
339    """Handles safe serialization/deserialization of component state.
340
341    Class attributes:
342        warn_bytes: Emit a warning when serialised state exceeds this size
343            (default 64 KB).  Set to ``0`` to disable warnings.
344        max_bytes: Raise :class:`ComponentError` when serialised state exceeds
345            this size (default 512 KB).  Set to ``0`` to disable the hard limit.
346    """
347
348    warn_bytes: int = 64 * 1024  # 64 KB
349    max_bytes: int = 512 * 1024  # 512 KB
350
351    @staticmethod
352    def serialize(state: dict) -> str:
353        """Serialize state to JSON string.
354
355        Emits a warning if the result exceeds :attr:`warn_bytes` and raises
356        :class:`ComponentError` if it exceeds :attr:`max_bytes`.
357        """
358        serialized = json.dumps(state, default=str)
359        size = len(serialized)
360
361        if StateSerializer.max_bytes and size > StateSerializer.max_bytes:
362            raise ComponentError(
363                f"Component state is {size:,} bytes "
364                f"(hard limit: {StateSerializer.max_bytes:,}). "
365                "Move large data out of state — store IDs/keys instead of "
366                "full objects, or use server-side caching."
367            )
368
369        if StateSerializer.warn_bytes and size > StateSerializer.warn_bytes:
370            logger.warning(
371                "Component state is %s bytes (threshold: %s). "
372                "Consider moving large data out of state.",
373                f"{size:,}",
374                f"{StateSerializer.warn_bytes:,}",
375            )
376
377        return serialized
378
379    @staticmethod
380    def deserialize(data: str) -> dict:
381        """Deserialize state from JSON string."""
382        if not data:
383            return {}
384        return json.loads(data)

Handles safe serialization/deserialization of component state.

Class attributes:

warn_bytes: Emit a warning when serialised state exceeds this size (default 64 KB). Set to 0 to disable warnings. max_bytes: Raise ComponentError when serialised state exceeds this size (default 512 KB). Set to 0 to disable the hard limit.

warn_bytes: int = 65536
max_bytes: int = 524288
@staticmethod
def serialize(state: dict) -> str:
351    @staticmethod
352    def serialize(state: dict) -> str:
353        """Serialize state to JSON string.
354
355        Emits a warning if the result exceeds :attr:`warn_bytes` and raises
356        :class:`ComponentError` if it exceeds :attr:`max_bytes`.
357        """
358        serialized = json.dumps(state, default=str)
359        size = len(serialized)
360
361        if StateSerializer.max_bytes and size > StateSerializer.max_bytes:
362            raise ComponentError(
363                f"Component state is {size:,} bytes "
364                f"(hard limit: {StateSerializer.max_bytes:,}). "
365                "Move large data out of state — store IDs/keys instead of "
366                "full objects, or use server-side caching."
367            )
368
369        if StateSerializer.warn_bytes and size > StateSerializer.warn_bytes:
370            logger.warning(
371                "Component state is %s bytes (threshold: %s). "
372                "Consider moving large data out of state.",
373                f"{size:,}",
374                f"{StateSerializer.warn_bytes:,}",
375            )
376
377        return serialized

Serialize state to JSON string.

Emits a warning if the result exceeds warn_bytes and raises ComponentError if it exceeds max_bytes.

@staticmethod
def deserialize(data: str) -> dict:
379    @staticmethod
380    def deserialize(data: str) -> dict:
381        """Deserialize state from JSON string."""
382        if not data:
383            return {}
384        return json.loads(data)

Deserialize state from JSON string.

def compose( parent_cls: type[Component], *, params: dict[str, typing.Any] | None = None, **slot_components: Component) -> Component:
11def compose(
12    parent_cls: type[Component],
13    *,
14    params: dict[str, Any] | None = None,
15    **slot_components: Component,
16) -> Component:
17    """
18    Convenience function: instantiate a parent component and fill its slots in one call.
19
20    Keyword arguments whose values are ``Component`` instances are treated as
21    slot assignments; everything else is forwarded to the parent constructor
22    via *params*.
23
24    Args:
25        parent_cls: The component class to instantiate.
26        params: Explicit keyword arguments forwarded to the parent constructor.
27        **slot_components: Mapping of slot names to child ``Component`` instances.
28
29    Returns:
30        The fully-assembled parent component (not yet dispatched).
31
32    Raises:
33        ComponentError: If a slot name is not accepted by the parent.
34
35    Example::
36
37        card = compose(
38            CardComponent,
39            params={"title": "My Card"},
40            header=HeaderComponent(text="Title"),
41            body=BodyComponent(items=[1, 2, 3]),
42        )
43        result = card.dispatch()
44    """
45    parent = parent_cls(**(params or {}))
46    for slot_name, child in slot_components.items():
47        parent.fill_slot(slot_name, child)
48    return parent

Convenience function: instantiate a parent component and fill its slots in one call.

Keyword arguments whose values are Component instances are treated as slot assignments; everything else is forwarded to the parent constructor via params.

Arguments:
  • parent_cls: The component class to instantiate.
  • params: Explicit keyword arguments forwarded to the parent constructor.
  • **slot_components: Mapping of slot names to child Component instances.
Returns:

The fully-assembled parent component (not yet dispatched).

Raises:
  • ComponentError: If a slot name is not accepted by the parent.

Example::

card = compose(
    CardComponent,
    params={"title": "My Card"},
    header=HeaderComponent(text="Title"),
    body=BodyComponent(items=[1, 2, 3]),
)
result = card.dispatch()
class SlotRenderer:
 51class SlotRenderer:
 52    """
 53    Mixin for renderers that support slot injection into templates.
 54
 55    Subclass your concrete ``Renderer`` together with this mixin to gain a
 56    ``render_with_slots`` helper that merges pre-rendered slot HTML into the
 57    template context before rendering.
 58
 59    Example::
 60
 61        class MyRenderer(SlotRenderer, Renderer):
 62            def render(self, template_name, context):
 63                ...
 64
 65            def render_with_slots(self, template_name, context, slots):
 66                ctx = self.merge_slot_context(context, slots)
 67                return self.render(template_name, ctx)
 68    """
 69
 70    @staticmethod
 71    def merge_slot_context(
 72        context: dict[str, Any],
 73        slots: dict[str, str],
 74    ) -> dict[str, Any]:
 75        """
 76        Return a *new* context dict with ``slots`` merged in.
 77
 78        If the context already contains a ``slots`` key it will be overwritten
 79        with the provided *slots* mapping.
 80
 81        Args:
 82            context: Original template context.
 83            slots: Mapping of slot names to rendered HTML.
 84
 85        Returns:
 86            A new dict combining *context* and the slot HTML.
 87        """
 88        merged = {**context, "slots": slots}
 89        return merged
 90
 91    def render_with_slots(
 92        self,
 93        template_name: str,
 94        context: dict[str, Any],
 95        slots: dict[str, str],
 96    ) -> str:
 97        """
 98        Render a template with slot HTML injected into the context.
 99
100        The default implementation merges the slots into the context and
101        delegates to ``self.render()``.  Override for custom behaviour.
102
103        Args:
104            template_name: Name/path of the template.
105            context: Template variables.
106            slots: Pre-rendered slot HTML keyed by slot name.
107
108        Returns:
109            Rendered HTML string.
110        """
111        merged = self.merge_slot_context(context, slots)
112        # Delegate to the concrete Renderer.render() provided by the subclass.
113        return self.render(template_name, merged)  # type: ignore[attr-defined]

Mixin for renderers that support slot injection into templates.

Subclass your concrete Renderer together with this mixin to gain a render_with_slots helper that merges pre-rendered slot HTML into the template context before rendering.

Example::

class MyRenderer(SlotRenderer, Renderer):
    def render(self, template_name, context):
        ...

    def render_with_slots(self, template_name, context, slots):
        ctx = self.merge_slot_context(context, slots)
        return self.render(template_name, ctx)
@staticmethod
def merge_slot_context( context: dict[str, typing.Any], slots: dict[str, str]) -> dict[str, typing.Any]:
70    @staticmethod
71    def merge_slot_context(
72        context: dict[str, Any],
73        slots: dict[str, str],
74    ) -> dict[str, Any]:
75        """
76        Return a *new* context dict with ``slots`` merged in.
77
78        If the context already contains a ``slots`` key it will be overwritten
79        with the provided *slots* mapping.
80
81        Args:
82            context: Original template context.
83            slots: Mapping of slot names to rendered HTML.
84
85        Returns:
86            A new dict combining *context* and the slot HTML.
87        """
88        merged = {**context, "slots": slots}
89        return merged

Return a new context dict with slots merged in.

If the context already contains a slots key it will be overwritten with the provided slots mapping.

Arguments:
  • context: Original template context.
  • slots: Mapping of slot names to rendered HTML.
Returns:

A new dict combining context and the slot HTML.

def render_with_slots( self, template_name: str, context: dict[str, typing.Any], slots: dict[str, str]) -> str:
 91    def render_with_slots(
 92        self,
 93        template_name: str,
 94        context: dict[str, Any],
 95        slots: dict[str, str],
 96    ) -> str:
 97        """
 98        Render a template with slot HTML injected into the context.
 99
100        The default implementation merges the slots into the context and
101        delegates to ``self.render()``.  Override for custom behaviour.
102
103        Args:
104            template_name: Name/path of the template.
105            context: Template variables.
106            slots: Pre-rendered slot HTML keyed by slot name.
107
108        Returns:
109            Rendered HTML string.
110        """
111        merged = self.merge_slot_context(context, slots)
112        # Delegate to the concrete Renderer.render() provided by the subclass.
113        return self.render(template_name, merged)  # type: ignore[attr-defined]

Render a template with slot HTML injected into the context.

The default implementation merges the slots into the context and delegates to self.render(). Override for custom behaviour.

Arguments:
  • template_name: Name/path of the template.
  • context: Template variables.
  • slots: Pre-rendered slot HTML keyed by slot name.
Returns:

Rendered HTML string.

class FormComponent(component_framework.core.Component):
 11class FormComponent(Component):
 12    """
 13    Form component with Pydantic validation.
 14
 15    Usage:
 16        class UserFormSchema(BaseModel):
 17            name: str
 18            email: EmailStr
 19            age: int = Field(ge=0, le=120)
 20
 21        @registry.register("user_form")
 22        class UserForm(FormComponent):
 23            schema = UserFormSchema
 24            template_name = "user_form"
 25
 26            def on_submit(self):
 27                # Validation happens automatically
 28                # Access validated data in self.validated_data
 29                pass
 30    """
 31
 32    # Pydantic model for validation
 33    schema: ClassVar[type[BaseModel] | None] = None
 34
 35    def __init__(self, **params):
 36        super().__init__(**params)
 37        self.validated_data: dict[str, Any] | None = None
 38        self.field_errors: dict[str, list[str]] = {}
 39
 40    # ---------- Validation ----------
 41
 42    def validate(self, data: dict) -> bool:
 43        """
 44        Validate data against schema.
 45
 46        Args:
 47            data: Form data to validate
 48
 49        Returns:
 50            True if valid, False otherwise
 51        """
 52        if not self.schema:
 53            # No schema, skip validation
 54            self.validated_data = data
 55            return True
 56
 57        try:
 58            validated = self.schema(**data)
 59            self.validated_data = validated.model_dump()
 60            self.errors = {}
 61            self.field_errors = {}
 62            return True
 63        except ValidationError as e:
 64            self.validated_data = None
 65            self.errors["form"] = "Validation failed"
 66
 67            # Convert Pydantic errors to field errors
 68            self.field_errors = {}
 69            for error in e.errors():
 70                field = str(error["loc"][0]) if error["loc"] else "form"
 71                message = error["msg"]
 72
 73                if field not in self.field_errors:
 74                    self.field_errors[field] = []
 75                self.field_errors[field].append(message)
 76
 77            return False
 78
 79    def get_field_errors(self, field_name: str) -> list[str]:
 80        """Get validation errors for a specific field."""
 81        return self.field_errors.get(field_name, [])
 82
 83    def has_field_error(self, field_name: str) -> bool:
 84        """Check if field has validation errors."""
 85        return field_name in self.field_errors
 86
 87    # ---------- Form State ----------
 88
 89    def mount(self):
 90        """Initialize form state."""
 91        super().mount()
 92        self.state["form_data"] = {}
 93        self.state["submitted"] = False
 94        self.state["is_valid"] = False
 95
 96    def get_field_value(self, field_name: str, default: Any = "") -> Any:
 97        """Get current value of form field."""
 98        return self.state.get("form_data", {}).get(field_name, default)
 99
100    def set_field_value(self, field_name: str, value: Any):
101        """Set value of form field."""
102        if "form_data" not in self.state:
103            self.state["form_data"] = {}
104        self.state["form_data"][field_name] = value
105
106    # ---------- Event Handlers ----------
107
108    def on_submit(self):
109        """
110        Handle form submission.
111
112        Override in subclasses to add custom logic.
113        Validation happens automatically before this is called.
114        """
115        pass
116
117    def on_field_change(self, field: str, value: Any):
118        """Handle field value change."""
119        self.set_field_value(field, value)
120
121        # Clear field error on change
122        if field in self.field_errors:
123            del self.field_errors[field]
124
125    def handle_event(self, event: str, payload: dict):
126        """Handle events with automatic validation for submit."""
127        if event == "submit":
128            # Extract form data
129            form_data = payload.get("form_data", self.state.get("form_data", {}))
130            self.state["form_data"] = form_data
131            self.state["submitted"] = True
132
133            # Validate
134            is_valid = self.validate(form_data)
135            self.state["is_valid"] = is_valid
136
137            if is_valid:
138                # Call submit handler
139                self.on_submit()
140        else:
141            # Regular event handling
142            super().handle_event(event, payload)
143
144    # ---------- Context ----------
145
146    def get_context(self) -> dict:
147        """Add form-specific context."""
148        context = super().get_context()
149        context.update(
150            {
151                "form_data": self.state.get("form_data", {}),
152                "field_errors": self.field_errors,
153                "is_valid": self.state.get("is_valid", False),
154                "submitted": self.state.get("submitted", False),
155            }
156        )
157        return context

Form component with Pydantic validation.

Usage:

class UserFormSchema(BaseModel): name: str email: EmailStr age: int = Field(ge=0, le=120)

@registry.register("user_form") class UserForm(FormComponent): schema = UserFormSchema template_name = "user_form"

def on_submit(self):
    # Validation happens automatically
    # Access validated data in self.validated_data
    pass
FormComponent(**params)
35    def __init__(self, **params):
36        super().__init__(**params)
37        self.validated_data: dict[str, Any] | None = None
38        self.field_errors: dict[str, list[str]] = {}
schema: ClassVar[type[pydantic.main.BaseModel] | None] = None
validated_data: dict[str, typing.Any] | None
field_errors: dict[str, list[str]]
def validate(self, data: dict) -> bool:
42    def validate(self, data: dict) -> bool:
43        """
44        Validate data against schema.
45
46        Args:
47            data: Form data to validate
48
49        Returns:
50            True if valid, False otherwise
51        """
52        if not self.schema:
53            # No schema, skip validation
54            self.validated_data = data
55            return True
56
57        try:
58            validated = self.schema(**data)
59            self.validated_data = validated.model_dump()
60            self.errors = {}
61            self.field_errors = {}
62            return True
63        except ValidationError as e:
64            self.validated_data = None
65            self.errors["form"] = "Validation failed"
66
67            # Convert Pydantic errors to field errors
68            self.field_errors = {}
69            for error in e.errors():
70                field = str(error["loc"][0]) if error["loc"] else "form"
71                message = error["msg"]
72
73                if field not in self.field_errors:
74                    self.field_errors[field] = []
75                self.field_errors[field].append(message)
76
77            return False

Validate data against schema.

Arguments:
  • data: Form data to validate
Returns:

True if valid, False otherwise

def get_field_errors(self, field_name: str) -> list[str]:
79    def get_field_errors(self, field_name: str) -> list[str]:
80        """Get validation errors for a specific field."""
81        return self.field_errors.get(field_name, [])

Get validation errors for a specific field.

def has_field_error(self, field_name: str) -> bool:
83    def has_field_error(self, field_name: str) -> bool:
84        """Check if field has validation errors."""
85        return field_name in self.field_errors

Check if field has validation errors.

def mount(self):
89    def mount(self):
90        """Initialize form state."""
91        super().mount()
92        self.state["form_data"] = {}
93        self.state["submitted"] = False
94        self.state["is_valid"] = False

Initialize form state.

def get_field_value(self, field_name: str, default: Any = '') -> Any:
96    def get_field_value(self, field_name: str, default: Any = "") -> Any:
97        """Get current value of form field."""
98        return self.state.get("form_data", {}).get(field_name, default)

Get current value of form field.

def set_field_value(self, field_name: str, value: Any):
100    def set_field_value(self, field_name: str, value: Any):
101        """Set value of form field."""
102        if "form_data" not in self.state:
103            self.state["form_data"] = {}
104        self.state["form_data"][field_name] = value

Set value of form field.

def on_submit(self):
108    def on_submit(self):
109        """
110        Handle form submission.
111
112        Override in subclasses to add custom logic.
113        Validation happens automatically before this is called.
114        """
115        pass

Handle form submission.

Override in subclasses to add custom logic. Validation happens automatically before this is called.

def on_field_change(self, field: str, value: Any):
117    def on_field_change(self, field: str, value: Any):
118        """Handle field value change."""
119        self.set_field_value(field, value)
120
121        # Clear field error on change
122        if field in self.field_errors:
123            del self.field_errors[field]

Handle field value change.

def handle_event(self, event: str, payload: dict):
125    def handle_event(self, event: str, payload: dict):
126        """Handle events with automatic validation for submit."""
127        if event == "submit":
128            # Extract form data
129            form_data = payload.get("form_data", self.state.get("form_data", {}))
130            self.state["form_data"] = form_data
131            self.state["submitted"] = True
132
133            # Validate
134            is_valid = self.validate(form_data)
135            self.state["is_valid"] = is_valid
136
137            if is_valid:
138                # Call submit handler
139                self.on_submit()
140        else:
141            # Regular event handling
142            super().handle_event(event, payload)

Handle events with automatic validation for submit.

def get_context(self) -> dict:
146    def get_context(self) -> dict:
147        """Add form-specific context."""
148        context = super().get_context()
149        context.update(
150            {
151                "form_data": self.state.get("form_data", {}),
152                "field_errors": self.field_errors,
153                "is_valid": self.state.get("is_valid", False),
154                "submitted": self.state.get("submitted", False),
155            }
156        )
157        return context

Add form-specific context.

class ModelFormComponent(component_framework.core.FormComponent):
160class ModelFormComponent(FormComponent):
161    """
162    Form component with model instance support.
163
164    Combines FormComponent validation with model persistence.
165    Works with DjangoModelMixin or similar patterns.
166    """
167
168    # Provided by DjangoModelMixin or subclass
169    instance: Any = None
170    state_fields: ClassVar[list[str]] = []
171
172    def mount(self):
173        """Initialize form with instance data."""
174        super().mount()
175
176        # Populate form from instance if available
177        if self.instance:
178            self.populate_form_from_instance()
179
180    def populate_form_from_instance(self):
181        """Populate form data from model instance."""
182        form_data = {}
183        for field_name in self.state_fields:
184            value = getattr(self.instance, field_name, None)
185            form_data[field_name] = value
186
187        self.state["form_data"] = form_data
188
189    def on_submit(self):
190        """Save validated data to model instance."""
191        if not self.validated_data:
192            return
193
194        # Update instance from validated data
195        for field_name, value in self.validated_data.items():
196            if hasattr(self.instance, field_name):
197                setattr(self.instance, field_name, value)
198
199        # Save instance (if method exists)
200        if hasattr(self, "save_instance"):
201            self.save_instance()  # type: ignore[call-non-callable]  # provided by DjangoModelMixin via MRO

Form component with model instance support.

Combines FormComponent validation with model persistence. Works with DjangoModelMixin or similar patterns.

instance: Any = None
state_fields: ClassVar[list[str]] = []
def mount(self):
172    def mount(self):
173        """Initialize form with instance data."""
174        super().mount()
175
176        # Populate form from instance if available
177        if self.instance:
178            self.populate_form_from_instance()

Initialize form with instance data.

def populate_form_from_instance(self):
180    def populate_form_from_instance(self):
181        """Populate form data from model instance."""
182        form_data = {}
183        for field_name in self.state_fields:
184            value = getattr(self.instance, field_name, None)
185            form_data[field_name] = value
186
187        self.state["form_data"] = form_data

Populate form data from model instance.

def on_submit(self):
189    def on_submit(self):
190        """Save validated data to model instance."""
191        if not self.validated_data:
192            return
193
194        # Update instance from validated data
195        for field_name, value in self.validated_data.items():
196            if hasattr(self.instance, field_name):
197                setattr(self.instance, field_name, value)
198
199        # Save instance (if method exists)
200        if hasattr(self, "save_instance"):
201            self.save_instance()  # type: ignore[call-non-callable]  # provided by DjangoModelMixin via MRO

Save validated data to model instance.

class AllowAny(component_framework.core.BasePermission):
31class AllowAny(BasePermission):
32    """Allow unrestricted access. Default permission class."""
33
34    def has_permission(self, request: Any, component_cls: type) -> bool:
35        return True

Allow unrestricted access. Default permission class.

def has_permission(self, request: Any, component_cls: type) -> bool:
34    def has_permission(self, request: Any, component_cls: type) -> bool:
35        return True

Return True if the request has permission to access the component.

Arguments:
  • request: The incoming request object.
  • component_cls: The component class being accessed.
Returns:

True if permission is granted, False otherwise.

class BasePermission:
10class BasePermission:
11    """
12    Base permission class. All permission classes must inherit from this.
13
14    Subclasses must implement `has_permission`.
15    """
16
17    def has_permission(self, request: Any, component_cls: type) -> bool:
18        """
19        Return True if the request has permission to access the component.
20
21        Args:
22            request: The incoming request object.
23            component_cls: The component class being accessed.
24
25        Returns:
26            True if permission is granted, False otherwise.
27        """
28        return True

Base permission class. All permission classes must inherit from this.

Subclasses must implement has_permission.

def has_permission(self, request: Any, component_cls: type) -> bool:
17    def has_permission(self, request: Any, component_cls: type) -> bool:
18        """
19        Return True if the request has permission to access the component.
20
21        Args:
22            request: The incoming request object.
23            component_cls: The component class being accessed.
24
25        Returns:
26            True if permission is granted, False otherwise.
27        """
28        return True

Return True if the request has permission to access the component.

Arguments:
  • request: The incoming request object.
  • component_cls: The component class being accessed.
Returns:

True if permission is granted, False otherwise.

class DjangoModelPermission(component_framework.core.BasePermission):
59class DjangoModelPermission(BasePermission):
60    """
61    Allow access based on a specific Django model permission.
62
63    Set `permission_required` on the component class:
64
65        @registry.register("my_component")
66        class MyComponent(Component):
67            permission_classes = [DjangoModelPermission]
68            permission_required = "myapp.change_mymodel"
69    """
70
71    def has_permission(self, request: Any, component_cls: type) -> bool:
72        permission = getattr(component_cls, "permission_required", "")
73        if not permission:
74            return True
75        user = getattr(request, "user", None)
76        if not user:
77            return False
78        if isinstance(permission, str):
79            return user.has_perm(permission)
80        # Iterable of permission strings — all must be held
81        return all(user.has_perm(p) for p in permission)

Allow access based on a specific Django model permission.

Set permission_required on the component class:

@registry.register("my_component")
class MyComponent(Component):
    permission_classes = [DjangoModelPermission]
    permission_required = "myapp.change_mymodel"
def has_permission(self, request: Any, component_cls: type) -> bool:
71    def has_permission(self, request: Any, component_cls: type) -> bool:
72        permission = getattr(component_cls, "permission_required", "")
73        if not permission:
74            return True
75        user = getattr(request, "user", None)
76        if not user:
77            return False
78        if isinstance(permission, str):
79            return user.has_perm(permission)
80        # Iterable of permission strings — all must be held
81        return all(user.has_perm(p) for p in permission)

Return True if the request has permission to access the component.

Arguments:
  • request: The incoming request object.
  • component_cls: The component class being accessed.
Returns:

True if permission is granted, False otherwise.

class IsAuthenticated(component_framework.core.BasePermission):
38class IsAuthenticated(BasePermission):
39    """Allow access only to authenticated users."""
40
41    def has_permission(self, request: Any, component_cls: type) -> bool:
42        return bool(getattr(request, "user", None) and request.user.is_authenticated)

Allow access only to authenticated users.

def has_permission(self, request: Any, component_cls: type) -> bool:
41    def has_permission(self, request: Any, component_cls: type) -> bool:
42        return bool(getattr(request, "user", None) and request.user.is_authenticated)

Return True if the request has permission to access the component.

Arguments:
  • request: The incoming request object.
  • component_cls: The component class being accessed.
Returns:

True if permission is granted, False otherwise.

class IsStaff(component_framework.core.BasePermission):
45class IsStaff(BasePermission):
46    """Allow access only to staff users."""
47
48    def has_permission(self, request: Any, component_cls: type) -> bool:
49        return bool(getattr(request, "user", None) and request.user.is_staff)

Allow access only to staff users.

def has_permission(self, request: Any, component_cls: type) -> bool:
48    def has_permission(self, request: Any, component_cls: type) -> bool:
49        return bool(getattr(request, "user", None) and request.user.is_staff)

Return True if the request has permission to access the component.

Arguments:
  • request: The incoming request object.
  • component_cls: The component class being accessed.
Returns:

True if permission is granted, False otherwise.

class IsSuperuser(component_framework.core.BasePermission):
52class IsSuperuser(BasePermission):
53    """Allow access only to superusers."""
54
55    def has_permission(self, request: Any, component_cls: type) -> bool:
56        return bool(getattr(request, "user", None) and request.user.is_superuser)

Allow access only to superusers.

def has_permission(self, request: Any, component_cls: type) -> bool:
55    def has_permission(self, request: Any, component_cls: type) -> bool:
56        return bool(getattr(request, "user", None) and request.user.is_superuser)

Return True if the request has permission to access the component.

Arguments:
  • request: The incoming request object.
  • component_cls: The component class being accessed.
Returns:

True if permission is granted, False otherwise.

class ComponentRegistry:
 7class ComponentRegistry:
 8    """Registry for component classes."""
 9
10    def __init__(self):
11        self._registry: dict[str, type[Component]] = {}
12
13    def register(self, name: str):
14        """
15        Decorator to register a component class.
16
17        Usage:
18            @registry.register("counter")
19            class Counter(Component):
20                ...
21        """
22
23        def decorator(cls: type[Component]):
24            if name in self._registry:
25                if self._registry[name] is cls:
26                    return cls  # idempotent: same class re-imported, no-op
27                raise ValueError(
28                    f"Component '{name}' already registered by a different class "
29                    f"({self._registry[name].__qualname__!r})"
30                )
31            self._registry[name] = cls
32            return cls
33
34        return decorator
35
36    def get(self, name: str) -> type[Component] | None:
37        """Get component class by name."""
38        return self._registry.get(name)
39
40    def __getitem__(self, name: str) -> type[Component]:
41        """Get component class by name, raises KeyError if not found."""
42        return self._registry[name]
43
44    def list(self) -> list[str]:
45        """List all registered component names."""
46        return list(self._registry.keys())

Registry for component classes.

def register(self, name: str):
13    def register(self, name: str):
14        """
15        Decorator to register a component class.
16
17        Usage:
18            @registry.register("counter")
19            class Counter(Component):
20                ...
21        """
22
23        def decorator(cls: type[Component]):
24            if name in self._registry:
25                if self._registry[name] is cls:
26                    return cls  # idempotent: same class re-imported, no-op
27                raise ValueError(
28                    f"Component '{name}' already registered by a different class "
29                    f"({self._registry[name].__qualname__!r})"
30                )
31            self._registry[name] = cls
32            return cls
33
34        return decorator

Decorator to register a component class.

Usage:

@registry.register("counter") class Counter(Component): ...

def get( self, name: str) -> type[Component] | None:
36    def get(self, name: str) -> type[Component] | None:
37        """Get component class by name."""
38        return self._registry.get(name)

Get component class by name.

def list(self) -> list[str]:
44    def list(self) -> list[str]:
45        """List all registered component names."""
46        return list(self._registry.keys())

List all registered component names.

registry = <ComponentRegistry object>
class Renderer(abc.ABC):
 7class Renderer(ABC):
 8    """Abstract base class for template renderers."""
 9
10    @abstractmethod
11    def render(self, template_name: str, context: dict) -> str:
12        """
13        Render template with context.
14
15        Args:
16            template_name: Name/path of template
17            context: Template variables
18
19        Returns:
20            Rendered HTML string
21        """
22        raise NotImplementedError

Abstract base class for template renderers.

@abstractmethod
def render(self, template_name: str, context: dict) -> str:
10    @abstractmethod
11    def render(self, template_name: str, context: dict) -> str:
12        """
13        Render template with context.
14
15        Args:
16            template_name: Name/path of template
17            context: Template variables
18
19        Returns:
20            Rendered HTML string
21        """
22        raise NotImplementedError

Render template with context.

Arguments:
  • template_name: Name/path of template
  • context: Template variables
Returns:

Rendered HTML string

class StateStore(abc.ABC):
 7class StateStore(ABC):
 8    """Abstract base class for state persistence."""
 9
10    @abstractmethod
11    def save(self, component_id: str, state: dict):
12        """
13        Save component state.
14
15        Args:
16            component_id: Unique component identifier
17            state: Component state to persist
18        """
19        raise NotImplementedError
20
21    @abstractmethod
22    def load(self, component_id: str) -> dict | None:
23        """
24        Load component state.
25
26        Args:
27            component_id: Unique component identifier
28
29        Returns:
30            Component state or None if not found
31        """
32        raise NotImplementedError
33
34    @abstractmethod
35    def delete(self, component_id: str):
36        """Delete component state."""
37        raise NotImplementedError

Abstract base class for state persistence.

@abstractmethod
def save(self, component_id: str, state: dict):
10    @abstractmethod
11    def save(self, component_id: str, state: dict):
12        """
13        Save component state.
14
15        Args:
16            component_id: Unique component identifier
17            state: Component state to persist
18        """
19        raise NotImplementedError

Save component state.

Arguments:
  • component_id: Unique component identifier
  • state: Component state to persist
@abstractmethod
def load(self, component_id: str) -> dict | None:
21    @abstractmethod
22    def load(self, component_id: str) -> dict | None:
23        """
24        Load component state.
25
26        Args:
27            component_id: Unique component identifier
28
29        Returns:
30            Component state or None if not found
31        """
32        raise NotImplementedError

Load component state.

Arguments:
  • component_id: Unique component identifier
Returns:

Component state or None if not found

@abstractmethod
def delete(self, component_id: str):
34    @abstractmethod
35    def delete(self, component_id: str):
36        """Delete component state."""
37        raise NotImplementedError

Delete component state.

class InMemoryStateStore(component_framework.core.StateStore):
40class InMemoryStateStore(StateStore):
41    """Simple in-memory state storage for development."""
42
43    def __init__(self):
44        self._store: dict[str, dict] = {}
45
46    def save(self, component_id: str, state: dict):
47        self._store[component_id] = state.copy()
48
49    def load(self, component_id: str) -> dict | None:
50        return self._store.get(component_id)
51
52    def delete(self, component_id: str):
53        self._store.pop(component_id, None)
54
55    def clear(self):
56        """Clear all stored state (useful for testing)."""
57        self._store.clear()

Simple in-memory state storage for development.

def save(self, component_id: str, state: dict):
46    def save(self, component_id: str, state: dict):
47        self._store[component_id] = state.copy()

Save component state.

Arguments:
  • component_id: Unique component identifier
  • state: Component state to persist
def load(self, component_id: str) -> dict | None:
49    def load(self, component_id: str) -> dict | None:
50        return self._store.get(component_id)

Load component state.

Arguments:
  • component_id: Unique component identifier
Returns:

Component state or None if not found

def delete(self, component_id: str):
52    def delete(self, component_id: str):
53        self._store.pop(component_id, None)

Delete component state.

def clear(self):
55    def clear(self):
56        """Clear all stored state (useful for testing)."""
57        self._store.clear()

Clear all stored state (useful for testing).

class StreamingComponent(component_framework.core.Component):
 14class StreamingComponent(Component):
 15    """
 16    Component subclass that supports streaming intermediate renders via SSE.
 17
 18    Event handlers written as async generators will yield intermediate frames.
 19    Each ``yield`` triggers a render and emits the current state as an SSE frame.
 20
 21    Example::
 22
 23        @registry.register("rag_query")
 24        class RagQueryComponent(StreamingComponent):
 25            template_name = "rag_query.html"
 26
 27            async def on_analyze(self, query: str):
 28                async for step in rag_service.stream(query):
 29                    self.state["step"] = step
 30                    yield  # emit intermediate render
 31                self.state["done"] = True
 32
 33    Non-generator handlers (sync or async) are also supported and produce a
 34    single frame, making the streaming endpoint backward-compatible.
 35    """
 36
 37    async def async_stream_dispatch(
 38        self,
 39        event: str | None = None,
 40        payload: dict | None = None,
 41        state: dict | None = None,
 42    ) -> AsyncGenerator[dict, None]:
 43        """
 44        Async generator entry point for streaming component execution.
 45
 46        Yields one dict per intermediate render, plus a final frame with
 47        ``stream_done=True``.  Each frame has the same shape as ``dispatch()``
 48        output, with an added ``stream_done`` key.
 49
 50        Args:
 51            event: Event name to handle.
 52            payload: Event data.
 53            state: Serialized state to restore.
 54
 55        Yields:
 56            Dict with ``html``, ``state``, ``component_id``, ``slots``, and
 57            ``stream_done`` keys.
 58        """
 59        try:
 60            # Lifecycle: mount or hydrate
 61            if state:
 62                self.hydrate(state)
 63            else:
 64                self.mount()
 65
 66            if not event:
 67                # No event — just render once
 68                yield self._render_frame(stream_done=True)
 69                return
 70
 71            handler = getattr(self, f"on_{event}", None)
 72            if not handler:
 73                raise EventNotFoundError(f"No handler for event: {event}")
 74
 75            if inspect.isasyncgenfunction(handler):
 76                # Async generator handler — yield intermediate frames
 77                try:
 78                    async for _ in handler(**(payload or {})):
 79                        yield self._render_frame(stream_done=False)
 80                except TypeError as e:
 81                    raise ComponentError(f"Invalid payload for {event}: {e}") from e
 82                except ComponentError:
 83                    raise
 84                except Exception as e:
 85                    logger.exception(f"Error handling {event} in {self.__class__.__name__}")
 86                    raise ComponentError(f"Error handling {event}") from e
 87
 88                # Final frame after generator exhaustion
 89                yield self._render_frame(stream_done=True)
 90
 91            else:
 92                # Non-generator handler (sync or async) — single frame
 93                try:
 94                    result = handler(**(payload or {}))
 95                    if inspect.isawaitable(result):
 96                        await result
 97                except TypeError as e:
 98                    raise ComponentError(f"Invalid payload for {event}: {e}") from e
 99                except ComponentError:
100                    raise
101                except Exception as e:
102                    logger.exception(f"Error handling {event} in {self.__class__.__name__}")
103                    raise ComponentError(f"Error handling {event}") from e
104
105                yield self._render_frame(stream_done=True)
106
107        except Exception:
108            logger.exception(f"Error in {self.__class__.__name__}.async_stream_dispatch()")
109            raise
110
111    def _render_frame(self, *, stream_done: bool) -> dict:
112        """Render the component and return a frame dict."""
113        self.before_render()
114        html = self.render()
115        return {
116            "html": html,
117            "state": self.dehydrate(),
118            "component_id": self.id,
119            "slots": self.render_slots(),
120            "stream_done": stream_done,
121        }

Component subclass that supports streaming intermediate renders via SSE.

Event handlers written as async generators will yield intermediate frames. Each yield triggers a render and emits the current state as an SSE frame.

Example::

@registry.register("rag_query")
class RagQueryComponent(StreamingComponent):
    template_name = "rag_query.html"

    async def on_analyze(self, query: str):
        async for step in rag_service.stream(query):
            self.state["step"] = step
            yield  # emit intermediate render
        self.state["done"] = True

Non-generator handlers (sync or async) are also supported and produce a single frame, making the streaming endpoint backward-compatible.

async def async_stream_dispatch( self, event: str | None = None, payload: dict | None = None, state: dict | None = None) -> AsyncGenerator[dict, None]:
 37    async def async_stream_dispatch(
 38        self,
 39        event: str | None = None,
 40        payload: dict | None = None,
 41        state: dict | None = None,
 42    ) -> AsyncGenerator[dict, None]:
 43        """
 44        Async generator entry point for streaming component execution.
 45
 46        Yields one dict per intermediate render, plus a final frame with
 47        ``stream_done=True``.  Each frame has the same shape as ``dispatch()``
 48        output, with an added ``stream_done`` key.
 49
 50        Args:
 51            event: Event name to handle.
 52            payload: Event data.
 53            state: Serialized state to restore.
 54
 55        Yields:
 56            Dict with ``html``, ``state``, ``component_id``, ``slots``, and
 57            ``stream_done`` keys.
 58        """
 59        try:
 60            # Lifecycle: mount or hydrate
 61            if state:
 62                self.hydrate(state)
 63            else:
 64                self.mount()
 65
 66            if not event:
 67                # No event — just render once
 68                yield self._render_frame(stream_done=True)
 69                return
 70
 71            handler = getattr(self, f"on_{event}", None)
 72            if not handler:
 73                raise EventNotFoundError(f"No handler for event: {event}")
 74
 75            if inspect.isasyncgenfunction(handler):
 76                # Async generator handler — yield intermediate frames
 77                try:
 78                    async for _ in handler(**(payload or {})):
 79                        yield self._render_frame(stream_done=False)
 80                except TypeError as e:
 81                    raise ComponentError(f"Invalid payload for {event}: {e}") from e
 82                except ComponentError:
 83                    raise
 84                except Exception as e:
 85                    logger.exception(f"Error handling {event} in {self.__class__.__name__}")
 86                    raise ComponentError(f"Error handling {event}") from e
 87
 88                # Final frame after generator exhaustion
 89                yield self._render_frame(stream_done=True)
 90
 91            else:
 92                # Non-generator handler (sync or async) — single frame
 93                try:
 94                    result = handler(**(payload or {}))
 95                    if inspect.isawaitable(result):
 96                        await result
 97                except TypeError as e:
 98                    raise ComponentError(f"Invalid payload for {event}: {e}") from e
 99                except ComponentError:
100                    raise
101                except Exception as e:
102                    logger.exception(f"Error handling {event} in {self.__class__.__name__}")
103                    raise ComponentError(f"Error handling {event}") from e
104
105                yield self._render_frame(stream_done=True)
106
107        except Exception:
108            logger.exception(f"Error in {self.__class__.__name__}.async_stream_dispatch()")
109            raise

Async generator entry point for streaming component execution.

Yields one dict per intermediate render, plus a final frame with stream_done=True. Each frame has the same shape as dispatch() output, with an added stream_done key.

Arguments:
  • event: Event name to handle.
  • payload: Event data.
  • state: Serialized state to restore.
Yields:

Dict with html, state, component_id, slots, and stream_done keys.

def format_sse_frame(frame: dict) -> str:
124def format_sse_frame(frame: dict) -> str:
125    """Format a frame dict as an SSE ``data:`` line.
126
127    Args:
128        frame: The frame dict from ``async_stream_dispatch``.
129
130    Returns:
131        A string ending with two newlines (SSE message terminator).
132    """
133    return f"data: {json.dumps(frame, default=str)}\n\n"

Format a frame dict as an SSE data: line.

Arguments:
  • frame: The frame dict from async_stream_dispatch.
Returns:

A string ending with two newlines (SSE message terminator).

class ComponentWebSocketManager:
 33class ComponentWebSocketManager:
 34    """
 35    Manages WebSocket connections for components.
 36
 37    Handles:
 38    - Connection lifecycle
 39    - Message routing
 40    - Component updates
 41    - Broadcasting
 42    """
 43
 44    def __init__(self):
 45        self.connections: dict[str, list[WebSocketConnection]] = {}
 46        self.component_subscribers: dict[str, set[str]] = {}
 47
 48    async def connect(self, connection: WebSocketConnection, connection_id: str):
 49        """Register new WebSocket connection."""
 50        if connection_id not in self.connections:
 51            self.connections[connection_id] = []
 52        self.connections[connection_id].append(connection)
 53        logger.info(f"WebSocket connected: {connection_id}")
 54
 55    async def disconnect(self, connection_id: str):
 56        """Remove WebSocket connection."""
 57        if connection_id in self.connections:
 58            del self.connections[connection_id]
 59
 60        # Clean up subscriptions
 61        for component_id, subscribers in self.component_subscribers.items():
 62            subscribers.discard(connection_id)
 63
 64        logger.info(f"WebSocket disconnected: {connection_id}")
 65
 66    def subscribe(self, connection_id: str, component_id: str):
 67        """Subscribe connection to component updates."""
 68        if component_id not in self.component_subscribers:
 69            self.component_subscribers[component_id] = set()
 70        self.component_subscribers[component_id].add(connection_id)
 71        logger.debug(f"Subscribed {connection_id} to component {component_id}")
 72
 73    def unsubscribe(self, connection_id: str, component_id: str):
 74        """Unsubscribe connection from component updates."""
 75        if component_id in self.component_subscribers:
 76            self.component_subscribers[component_id].discard(connection_id)
 77
 78    async def handle_message(
 79        self, connection: WebSocketConnection, connection_id: str, message: dict
 80    ):
 81        """
 82        Handle incoming WebSocket message.
 83
 84        Message format:
 85        {
 86            "type": "component_event",
 87            "component": "component_name",
 88            "component_id": "component-123",
 89            "event": "event_name",
 90            "payload": {...},
 91            "state": "serialized_state"
 92        }
 93        """
 94        msg_type = message.get("type")
 95
 96        if msg_type == "component_event":
 97            await self._handle_component_event(connection, connection_id, message)
 98        elif msg_type == "subscribe":
 99            component_id = message.get("component_id")
100            if component_id:
101                self.subscribe(connection_id, component_id)
102                await connection.send({"type": "subscribed", "component_id": component_id})
103        elif msg_type == "unsubscribe":
104            component_id = message.get("component_id")
105            if component_id:
106                self.unsubscribe(connection_id, component_id)
107                await connection.send({"type": "unsubscribed", "component_id": component_id})
108        else:
109            logger.warning(f"Unknown message type: {msg_type}")
110
111    async def _handle_component_event(
112        self, connection: WebSocketConnection, connection_id: str, message: dict
113    ):
114        """Handle component event from WebSocket."""
115        try:
116            component_name: str = message.get("component", "")
117            component_id: str | None = message.get("component_id")
118            event: str | None = message.get("event")
119            payload: dict = message.get("payload", {})
120            state_str: str | None = message.get("state")
121
122            # Get component class
123            component_cls = registry.get(component_name)
124            if not component_cls:
125                await connection.send(
126                    {
127                        "type": "error",
128                        "error": f"Component '{component_name}' not found",
129                    }
130                )
131                return
132
133            # Deserialize state
134            state = None
135            if state_str:
136                state = StateSerializer.deserialize(state_str)
137
138            # Create and dispatch component (async to support async on_* handlers)
139            params = {"component_id": component_id}
140            component = component_cls(**params)
141            result = await component.async_dispatch(event=event, payload=payload, state=state)
142
143            # Serialize state
144            result["state"] = StateSerializer.serialize(result["state"])
145
146            # Send update to requesting connection
147            await connection.send({"type": "component_update", **result})
148
149            # Broadcast to subscribers (optional)
150            if message.get("broadcast", False) and component_id:
151                await self.broadcast_update(component_id, result)
152
153        except Exception as e:
154            logger.exception("Error handling component event")
155            await connection.send({"type": "error", "error": str(e)})
156
157    async def broadcast_update(self, component_id: str, update: dict):
158        """
159        Broadcast component update to all subscribers.
160
161        Args:
162            component_id: Component to broadcast update for
163            update: Update data (html, state, etc.)
164        """
165        if component_id not in self.component_subscribers:
166            return
167
168        subscribers = self.component_subscribers[component_id]
169        message = {"type": "component_update", **update}
170
171        # Send to all subscribers
172        tasks = []
173        for connection_id in subscribers:
174            if connection_id in self.connections:
175                for conn in self.connections[connection_id]:
176                    tasks.append(conn.send(message))
177
178        if tasks:
179            await asyncio.gather(*tasks, return_exceptions=True)
180            logger.debug(f"Broadcasted update for {component_id} to {len(tasks)} connections")
181
182    async def push_update(self, component_id: str, html: str, state: dict):
183        """
184        Push server-initiated update to component subscribers.
185
186        Useful for external events (model updates, background jobs, etc.)
187        """
188        serialized_state = StateSerializer.serialize(state)
189        update = {
190            "component_id": component_id,
191            "html": html,
192            "state": serialized_state,
193        }
194        await self.broadcast_update(component_id, update)

Manages WebSocket connections for components.

Handles:

  • Connection lifecycle
  • Message routing
  • Component updates
  • Broadcasting
connections: dict[str, list[WebSocketConnection]]
component_subscribers: dict[str, set[str]]
async def connect( self, connection: WebSocketConnection, connection_id: str):
48    async def connect(self, connection: WebSocketConnection, connection_id: str):
49        """Register new WebSocket connection."""
50        if connection_id not in self.connections:
51            self.connections[connection_id] = []
52        self.connections[connection_id].append(connection)
53        logger.info(f"WebSocket connected: {connection_id}")

Register new WebSocket connection.

async def disconnect(self, connection_id: str):
55    async def disconnect(self, connection_id: str):
56        """Remove WebSocket connection."""
57        if connection_id in self.connections:
58            del self.connections[connection_id]
59
60        # Clean up subscriptions
61        for component_id, subscribers in self.component_subscribers.items():
62            subscribers.discard(connection_id)
63
64        logger.info(f"WebSocket disconnected: {connection_id}")

Remove WebSocket connection.

def subscribe(self, connection_id: str, component_id: str):
66    def subscribe(self, connection_id: str, component_id: str):
67        """Subscribe connection to component updates."""
68        if component_id not in self.component_subscribers:
69            self.component_subscribers[component_id] = set()
70        self.component_subscribers[component_id].add(connection_id)
71        logger.debug(f"Subscribed {connection_id} to component {component_id}")

Subscribe connection to component updates.

def unsubscribe(self, connection_id: str, component_id: str):
73    def unsubscribe(self, connection_id: str, component_id: str):
74        """Unsubscribe connection from component updates."""
75        if component_id in self.component_subscribers:
76            self.component_subscribers[component_id].discard(connection_id)

Unsubscribe connection from component updates.

async def handle_message( self, connection: WebSocketConnection, connection_id: str, message: dict):
 78    async def handle_message(
 79        self, connection: WebSocketConnection, connection_id: str, message: dict
 80    ):
 81        """
 82        Handle incoming WebSocket message.
 83
 84        Message format:
 85        {
 86            "type": "component_event",
 87            "component": "component_name",
 88            "component_id": "component-123",
 89            "event": "event_name",
 90            "payload": {...},
 91            "state": "serialized_state"
 92        }
 93        """
 94        msg_type = message.get("type")
 95
 96        if msg_type == "component_event":
 97            await self._handle_component_event(connection, connection_id, message)
 98        elif msg_type == "subscribe":
 99            component_id = message.get("component_id")
100            if component_id:
101                self.subscribe(connection_id, component_id)
102                await connection.send({"type": "subscribed", "component_id": component_id})
103        elif msg_type == "unsubscribe":
104            component_id = message.get("component_id")
105            if component_id:
106                self.unsubscribe(connection_id, component_id)
107                await connection.send({"type": "unsubscribed", "component_id": component_id})
108        else:
109            logger.warning(f"Unknown message type: {msg_type}")

Handle incoming WebSocket message.

Message format: { "type": "component_event", "component": "component_name", "component_id": "component-123", "event": "event_name", "payload": {...}, "state": "serialized_state" }

async def broadcast_update(self, component_id: str, update: dict):
157    async def broadcast_update(self, component_id: str, update: dict):
158        """
159        Broadcast component update to all subscribers.
160
161        Args:
162            component_id: Component to broadcast update for
163            update: Update data (html, state, etc.)
164        """
165        if component_id not in self.component_subscribers:
166            return
167
168        subscribers = self.component_subscribers[component_id]
169        message = {"type": "component_update", **update}
170
171        # Send to all subscribers
172        tasks = []
173        for connection_id in subscribers:
174            if connection_id in self.connections:
175                for conn in self.connections[connection_id]:
176                    tasks.append(conn.send(message))
177
178        if tasks:
179            await asyncio.gather(*tasks, return_exceptions=True)
180            logger.debug(f"Broadcasted update for {component_id} to {len(tasks)} connections")

Broadcast component update to all subscribers.

Arguments:
  • component_id: Component to broadcast update for
  • update: Update data (html, state, etc.)
async def push_update(self, component_id: str, html: str, state: dict):
182    async def push_update(self, component_id: str, html: str, state: dict):
183        """
184        Push server-initiated update to component subscribers.
185
186        Useful for external events (model updates, background jobs, etc.)
187        """
188        serialized_state = StateSerializer.serialize(state)
189        update = {
190            "component_id": component_id,
191            "html": html,
192            "state": serialized_state,
193        }
194        await self.broadcast_update(component_id, update)

Push server-initiated update to component subscribers.

Useful for external events (model updates, background jobs, etc.)

class WebSocketConnection(abc.ABC):
14class WebSocketConnection(ABC):
15    """Abstract WebSocket connection."""
16
17    @abstractmethod
18    async def send(self, data: dict):
19        """Send data to client."""
20        raise NotImplementedError
21
22    @abstractmethod
23    async def receive(self) -> dict:
24        """Receive data from client."""
25        raise NotImplementedError
26
27    @abstractmethod
28    async def close(self):
29        """Close connection."""
30        raise NotImplementedError

Abstract WebSocket connection.

@abstractmethod
async def send(self, data: dict):
17    @abstractmethod
18    async def send(self, data: dict):
19        """Send data to client."""
20        raise NotImplementedError

Send data to client.

@abstractmethod
async def receive(self) -> dict:
22    @abstractmethod
23    async def receive(self) -> dict:
24        """Receive data from client."""
25        raise NotImplementedError

Receive data from client.

@abstractmethod
async def close(self):
27    @abstractmethod
28    async def close(self):
29        """Close connection."""
30        raise NotImplementedError

Close connection.

ws_manager = <ComponentWebSocketManager object>