BETA

Setup Guide · Litestar · HTTP · WebSocket · SSE

Litestar Adapter

Get started with component-framework on Litestar — HTTP endpoints, WebSocket real-time updates, and SSE streaming for long-running operations. One install, three transport protocols.

Litestar 2.0+ async_dispatch StreamingComponent WebSocket Jinja2

Installation

Install component-framework with the litestar extras group. This pulls in Litestar 2.x, Jinja2, and the adapter itself. No other dependencies are needed for the core HTTP and SSE transports.

terminal Shell
pip install "component-framework[litestar]"

# or with uv (recommended)
uv add "component-framework[litestar]"

If you are managing extras in pyproject.toml, the group looks like this:

pyproject.toml TOML
[project.optional-dependencies]
litestar = [
    "litestar>=2.0",
    "jinja2>=3.1",
    "uvicorn>=0.27",
]
Jinja2 included. The Litestar extras group ships Jinja2 as its default renderer. If you prefer a different engine, implement Renderer from component_framework.core.renderer and pass it to your components at startup.

HTTP Endpoint

The Litestar adapter exposes component_endpoint — an async route handler that hydrates a component, dispatches the incoming event, and returns rendered HTML. Wire it into your app with Litestar(route_handlers=[...]) or with the helper create_component_routes(app).

Defining a Component

Components are plain Python classes. They know nothing about Litestar — the same class works on FastAPI or Django.

components.py Python
from component_framework.core import registry
from component_framework.core.component import Component


@registry.register("counter")
class CounterComponent(Component):
    """Simple increment/decrement counter."""

    template_name = "counter.html"

    def mount(self):
        self.state["count"] = 0

    def on_increment(self):
        self.state["count"] += 1

    def on_decrement(self):
        self.state["count"] = max(0, self.state["count"] - 1)

Wiring into Litestar

Two registration patterns are supported. Use whichever fits your project structure.

app.py — pattern A: explicit route handlers Python
from litestar import Litestar
from litestar.contrib.jinja import JinjaTemplateEngine
from litestar.template import TemplateConfig
from component_framework.adapters.litestar import component_endpoint
from component_framework.adapters.jinjax_renderer import Jinja2Renderer
from component_framework.core.component import Component

import components  # registers CounterComponent via @registry.register

Component.renderer = Jinja2Renderer("templates")

app = Litestar(
    route_handlers=[component_endpoint],
    template_config=TemplateConfig(
        directory="templates",
        engine=JinjaTemplateEngine,
    ),
)
app.py — pattern B: create_component_routes helper Python
from litestar import Litestar
from component_framework.adapters.litestar import create_component_routes

import components

app = Litestar()
create_component_routes(app)  # auto-registers /components/{name}/ for every entry in registry

Make a request with HTMX or plain fetch:

terminal Shell
# Initial mount (no state yet)
curl -X POST http://localhost:8000/components/counter/ \
     -H "Content-Type: application/json" \
     -d '{"event": "mount", "state": {}}'

# Dispatch an event with existing state
curl -X POST http://localhost:8000/components/counter/ \
     -H "Content-Type: application/json" \
     -d '{"event": "increment", "state": {"count": 3}}'

Async Event Handlers

Litestar is async-native, so async def event handlers are the natural pattern. Use async_dispatch() from the core to route the event through the async lifecycle without blocking the event loop.

components.py Python
import asyncio
from component_framework.core import registry
from component_framework.core.component import Component


@registry.register("sentiment")
class SentimentComponent(Component):
    """Calls an external ML service to score a text snippet."""

    template_name = "sentiment.html"

    def mount(self):
        self.state.update({"text": "", "score": None, "loading": False})

    async def on_analyze(self, text: str):
        """Await an external service — zero thread-pool overhead."""
        self.state["loading"] = True
        self.state["text"]    = text

        # Any awaitable — httpx, aiohttp, asyncpg, etc.
        score = await _call_ml_service(text)

        self.state["score"]   = score
        self.state["loading"] = False


async def _call_ml_service(text: str) -> float:
    import httpx
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "https://api.example.com/sentiment",
            json={"text": text},
        )
        return resp.json()["score"]

The Litestar adapter calls async_dispatch() automatically when it detects an async handler. No configuration needed — if your method is async def, it runs natively in the event loop.

async_dispatch vs dispatch: dispatch() is the synchronous entrypoint used by FastAPI and Django. async_dispatch() is its awaitable counterpart. The Litestar adapter selects the right one based on asyncio.iscoroutinefunction(handler), so you never have to choose manually.

SSE Streaming

Server-Sent Events let a component push incremental updates to the browser over a single long-lived HTTP connection. This is the recommended transport for long-running operations like report generation, file processing, or AI token streaming — cases where WebSockets are heavier than needed.

StreamingComponent

Subclass StreamingComponent and implement an async generator handler. Each yield flushes a partial state patch to the client.

components.py Python
import asyncio
from typing import AsyncGenerator
from component_framework.core import registry
from component_framework.core.component import StreamingComponent


@registry.register("report")
class ReportComponent(StreamingComponent):
    """Generates a multi-section report, streaming each section as it completes."""

    template_name = "report.html"

    def mount(self):
        self.state.update({"sections": [], "done": False})

    async def on_generate(self, report_id: int) -> AsyncGenerator:
        """Yield state patches — each becomes one SSE event."""
        sections = ["Executive Summary", "Revenue Analysis", "Forecast"]

        for title in sections:
            # Simulate work: fetch data, run query, call LLM, etc.
            content = await _build_section(report_id, title)
            self.state["sections"].append({"title": title, "content": content})
            yield  # push current state to client immediately

        self.state["done"] = True
        yield  # final flush


async def _build_section(report_id: int, title: str) -> str:
    await asyncio.sleep(0.4)  # replace with real I/O
    return f"Content for {title} (report #{report_id})"

Register the SSE endpoint alongside the standard HTTP endpoint:

app.py Python
from litestar import Litestar
from component_framework.adapters.litestar import (
    component_endpoint,
    component_stream_endpoint,  # SSE handler
)

app = Litestar(
    route_handlers=[
        component_endpoint,         # POST /components/{name}/
        component_stream_endpoint,  # GET  /components/{name}/stream/
    ],
)

Client-side with HTMX SSE

The framework uses HTMX for all client-side interaction. HTMX's built-in SSE extension handles the stream connection and DOM swapping — no custom JavaScript required.

report.html HTML
<!-- Load HTMX + SSE extension -->
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
<script src="https://unpkg.com/htmx.org/dist/ext/sse.js"></script>

<!-- SSE-powered report component -->
<div hx-ext="sse"
     sse-connect="/components/report/stream"
     sse-swap="message"
     hx-swap="innerHTML">
  <!-- Each SSE frame replaces this content -->
  <p>Generating report&hellip;</p>
</div>
Zero JavaScript. HTMX's SSE extension opens the connection, receives each data: frame from StreamingComponent, and swaps the rendered HTML into the target element automatically. This is consistent with the framework's server-driven, minimal-JS philosophy.

WebSocket Support

For true bidirectional real-time (chat, collaborative editing, live dashboards with server push), use the WebSocket transport. The pattern mirrors the FastAPI adapter — a LitestarWebSocketConnection manages the channel lifecycle.

app.py Python
from litestar import Litestar
from component_framework.adapters.litestar import (
    component_endpoint,
    component_websocket_endpoint,  # WS handler
)

app = Litestar(
    route_handlers=[
        component_endpoint,
        component_websocket_endpoint,  # WS /components/{name}/ws/
    ],
)

Push an update to all subscribers of a component from anywhere in your application:

services.py Python
from component_framework.core.websocket import WebSocketManager

manager = WebSocketManager()  # shared singleton, e.g. via DI container


async def broadcast_price_update(ticker: str, price: float):
    """Called from a background task or message-queue consumer."""
    await manager.broadcast(
        component_name="ticker",
        event="price_update",
        payload={"ticker": ticker, "price": price},
    )
Scaling WebSockets. WebSocketManager is in-process by default. For multi-worker deployments, swap the backing store for Redis pub/sub — the interface is identical, only the constructor argument changes.

Full Example App

The repository ships a self-contained Litestar example at examples/litestar_example.py. It wires together a Jinja2 renderer, three components (counter, sentiment, report), and an index page — roughly 120 lines.

examples/litestar_example.py (key parts) Python
from pathlib import Path
from litestar import Litestar, get
from litestar.contrib.jinja import JinjaTemplateEngine
from litestar.response import Template
from litestar.template import TemplateConfig
from component_framework.adapters.litestar import (
    component_endpoint,
    component_stream_endpoint,
    component_websocket_endpoint,
)
from component_framework.adapters.jinjax_renderer import Jinja2Renderer
from component_framework.core.component import Component

# Point the renderer at the templates directory
Component.renderer = Jinja2Renderer(Path("examples/litestar_templates"))

import examples.litestar_components  # noqa: F401 — side-effect: registers all components


@get("/")
async def index() -> Template:
    return Template(template_name="index.html")


app = Litestar(
    route_handlers=[
        index,
        component_endpoint,
        component_stream_endpoint,
        component_websocket_endpoint,
    ],
    template_config=TemplateConfig(
        directory="examples/litestar_templates",
        engine=JinjaTemplateEngine,
    ),
)

Run it:

terminal Shell
python examples/litestar_example.py
# or via uvicorn directly:
uvicorn examples.litestar_example:app --reload --port 8000
  1. Open http://localhost:8000 — the index page renders all three components server-side.
  2. Click Increment / Decrement on the counter — each click POSTs to /components/counter/ and swaps the HTML fragment.
  3. Submit text in the sentiment box — the async handler awaits the (mock) ML service and re-renders the score badge.
  4. Click Generate Report — HTMX's SSE extension connects to /components/report/stream/ and each section appears as it completes.

Testing

Litestar ships its own sync/async test client. Use pytest.importorskip to guard tests so the suite still passes in environments without Litestar installed.

tests/test_litestar_adapter.py Python
import pytest
litestar = pytest.importorskip("litestar")

from litestar.testing import TestClient
from litestar.testing import create_test_client
from component_framework.adapters.litestar import component_endpoint
from component_framework.adapters.jinjax_renderer import Jinja2Renderer
from component_framework.core.component import Component

from tests.fixtures import MockRenderer

# Use a mock renderer so tests never touch the filesystem
Component.renderer = MockRenderer()


@pytest.fixture
def client():
    return create_test_client(route_handlers=[component_endpoint])


def test_counter_mount(client: TestClient):
    resp = client.post(
        "/components/counter/",
        json={"event": "mount", "state": {}},
    )
    assert resp.status_code == 200
    data = resp.json()
    assert data["state"]["count"] == 0


def test_counter_increment(client: TestClient):
    resp = client.post(
        "/components/counter/",
        json={"event": "increment", "state": {"count": 2}},
    )
    assert resp.json()["state"]["count"] == 3


@pytest.mark.asyncio
async def test_sentiment_async_handler():
    """Component logic tested directly — no HTTP layer needed."""
    from tests.fixtures import SentimentComponent

    comp = SentimentComponent()
    comp.mount()
    await comp.on_analyze("I love async Python")
    assert comp.state["score"] is not None
    assert comp.state["loading"] is False
Components are framework-agnostic. The last test calls on_analyze directly — no Litestar, no HTTP, no client. Because components are plain Python classes, the bulk of your test suite can skip the adapter entirely and run in milliseconds.

Summary

The Litestar adapter gives you all three transports with the same component code you would write for FastAPI or Django.

Transport Endpoint Best for
HTTP POST component_endpoint Standard HTMX interactions, forms, state mutations
SSE (GET) component_stream_endpoint Long-running ops, AI streaming, progress indicators
WebSocket component_websocket_endpoint Bidirectional real-time — chat, live dashboards

All three share the same component base class, the same renderer, and the same state serialisation. Switching transport is a one-line change in app.py; the component itself never changes.

Next steps. See the API Reference for the full StreamingComponent and WebSocketManager interfaces. For a production Django deployment with permissions, caching, and WebSocket fan-out, see the Admin Panel guide.