component_framework.adapters.litestar

Litestar adapter for component endpoints.

  1"""Litestar adapter for component endpoints."""
  2
  3import json
  4import logging
  5
  6try:
  7    from litestar import Request, post
  8    from litestar.exceptions import HTTPException
  9    from litestar.response import Response, Stream
 10except ImportError as e:
 11    from . import _require_extra
 12
 13    raise _require_extra("litestar", "litestar") from e
 14
 15from ..core import StateSerializer, registry
 16from ..core.streaming import StreamingComponent, format_sse_frame
 17
 18logger = logging.getLogger(__name__)
 19
 20
 21@post("/components/{name:str}")
 22async def component_endpoint(name: str, request: Request) -> Response:
 23    """
 24    Generic component endpoint for Litestar.
 25
 26    POST /components/{name}
 27    Body: {
 28        "event": "event_name",
 29        "payload": {...},
 30        "state": "serialized_state"
 31    }
 32
 33    Returns: {
 34        "html": "rendered_html",
 35        "state": "serialized_state",
 36        "component_id": "component-id"
 37    }
 38    """
 39    try:
 40        # Get component class
 41        component_cls = registry.get(name)
 42        if not component_cls:
 43            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
 44
 45        # Parse request data
 46        try:
 47            data = await request.json()
 48        except Exception as e:
 49            raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
 50
 51        # Extract parameters
 52        params = data.get("params", {})
 53        event = data.get("event")
 54        payload_raw = data.get("payload", {})
 55        state_str = data.get("state")
 56
 57        # Guard against double-serialised payload from older client JS
 58        if isinstance(payload_raw, str):
 59            try:
 60                payload = json.loads(payload_raw)
 61            except (json.JSONDecodeError, ValueError):
 62                payload = {}
 63        else:
 64            payload = payload_raw
 65
 66        # Deserialize state if provided
 67        state = None
 68        if state_str:
 69            try:
 70                state = StateSerializer.deserialize(state_str)
 71            except Exception as e:
 72                raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
 73
 74        # Create and dispatch component (async to support async on_* handlers)
 75        component = component_cls(**params)
 76        result = await component.async_dispatch(event=event, payload=payload, state=state)
 77
 78        # Serialize state for response
 79        result["state"] = StateSerializer.serialize(result["state"])
 80
 81        return Response(content=result, media_type="application/json", status_code=200)
 82
 83    except HTTPException:
 84        raise
 85    except Exception:
 86        logger.exception(f"Error processing component '{name}'")
 87        raise HTTPException(status_code=500, detail="Internal server error")
 88
 89
 90@post("/components/{name:str}/stream")
 91async def stream_component_endpoint(name: str, request: Request) -> Stream:
 92    """
 93    SSE streaming endpoint for long-running component operations.
 94
 95    POST /components/{name}/stream
 96    Body: same as component_endpoint
 97
 98    Returns: text/event-stream with one ``data:`` frame per intermediate render.
 99    """
100    component_cls = registry.get(name)
101    if not component_cls:
102        raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
103
104    if not issubclass(component_cls, StreamingComponent):
105        raise HTTPException(
106            status_code=400,
107            detail=f"Component '{name}' does not support streaming",
108        )
109
110    try:
111        data = await request.json()
112    except Exception as e:
113        raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
114
115    params = data.get("params", {})
116    event = data.get("event")
117    payload_raw = data.get("payload", {})
118    state_str = data.get("state")
119
120    if isinstance(payload_raw, str):
121        try:
122            payload = json.loads(payload_raw)
123        except (json.JSONDecodeError, ValueError):
124            payload = {}
125    else:
126        payload = payload_raw
127
128    state = None
129    if state_str:
130        try:
131            state = StateSerializer.deserialize(state_str)
132        except Exception as e:
133            raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
134
135    component = component_cls(**params)
136
137    async def event_generator():
138        async for frame in component.async_stream_dispatch(
139            event=event, payload=payload, state=state
140        ):
141            frame["state"] = StateSerializer.serialize(frame["state"])
142            yield format_sse_frame(frame)
143
144    return Stream(event_generator(), media_type="text/event-stream", status_code=200)
145
146
147def create_component_routes(app):
148    """
149    Register component endpoint handlers with a Litestar app.
150
151    Registers both the standard POST endpoint and the SSE streaming endpoint.
152
153    Usage:
154        from litestar import Litestar
155        from component_framework.adapters.litestar import create_component_routes
156
157        app = Litestar(route_handlers=[])
158        create_component_routes(app)
159
160    Alternatively, pass the handlers directly at app creation:
161        from component_framework.adapters.litestar import (
162            component_endpoint,
163            stream_component_endpoint,
164        )
165
166        app = Litestar(route_handlers=[component_endpoint, stream_component_endpoint])
167    """
168    app.register(component_endpoint)
169    app.register(stream_component_endpoint)
logger = <Logger component_framework.adapters.litestar (WARNING)>
def create_component_routes(app):
148def create_component_routes(app):
149    """
150    Register component endpoint handlers with a Litestar app.
151
152    Registers both the standard POST endpoint and the SSE streaming endpoint.
153
154    Usage:
155        from litestar import Litestar
156        from component_framework.adapters.litestar import create_component_routes
157
158        app = Litestar(route_handlers=[])
159        create_component_routes(app)
160
161    Alternatively, pass the handlers directly at app creation:
162        from component_framework.adapters.litestar import (
163            component_endpoint,
164            stream_component_endpoint,
165        )
166
167        app = Litestar(route_handlers=[component_endpoint, stream_component_endpoint])
168    """
169    app.register(component_endpoint)
170    app.register(stream_component_endpoint)

Register component endpoint handlers with a Litestar app.

Registers both the standard POST endpoint and the SSE streaming endpoint.

Usage:

from litestar import Litestar from component_framework.adapters.litestar import create_component_routes

app = Litestar(route_handlers=[]) create_component_routes(app)

Alternatively, pass the handlers directly at app creation: from component_framework.adapters.litestar import ( component_endpoint, stream_component_endpoint, )

app = Litestar(route_handlers=[component_endpoint, stream_component_endpoint])