component_framework.adapters.fastapi

FastAPI adapter for component endpoints.

  1"""FastAPI adapter for component endpoints."""
  2
  3import json
  4import logging
  5
  6try:
  7    from fastapi import HTTPException, Request
  8    from fastapi.responses import JSONResponse
  9    from starlette.responses import StreamingResponse
 10except ImportError as e:
 11    from . import _require_extra
 12
 13    raise _require_extra("fastapi", "fastapi") from e
 14
 15from ..core import StateSerializer, registry
 16from ..core.streaming import StreamingComponent, format_sse_frame
 17
 18logger = logging.getLogger(__name__)
 19
 20
 21def _parse_json_str(value: str | dict | None, default: dict | None = None) -> dict | None:
 22    """Parse a value that may be a JSON string, a dict, or None."""
 23    if value is None:
 24        return default
 25    if isinstance(value, dict):
 26        return value
 27    try:
 28        return json.loads(value)
 29    except (json.JSONDecodeError, ValueError):
 30        return default
 31
 32
 33async def _parse_request_data(request: Request) -> dict:
 34    """Parse request body from JSON or form-encoded data.
 35
 36    HTMX sends ``application/x-www-form-urlencoded`` by default; the JS
 37    ``component-client.js`` sends ``application/json``.  This helper
 38    normalises both into a dict with ``event``, ``payload``, ``state``,
 39    and ``params`` keys.
 40    """
 41    content_type = request.headers.get("content-type", "")
 42
 43    if "application/json" in content_type:
 44        try:
 45            data = await request.json()
 46        except Exception as e:
 47            raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
 48    else:
 49        # Form-encoded (HTMX default) — hx-vals are merged into form fields
 50        form = await request.form()
 51        data = dict(form)
 52
 53    return data
 54
 55
 56def _extract_params(data: dict) -> tuple[dict, str | None, dict, dict | None]:
 57    """Extract and normalise event, payload, state, and params from parsed data.
 58
 59    Returns:
 60        (params, event, payload, state) tuple ready for component dispatch.
 61    """
 62    params = _parse_json_str(data.get("params"), default={}) or {}
 63    event = data.get("event")
 64    payload = _parse_json_str(data.get("payload"), default={}) or {}
 65    state_raw = data.get("state")
 66
 67    state = None
 68    if state_raw:
 69        try:
 70            state = (
 71                StateSerializer.deserialize(state_raw) if isinstance(state_raw, str) else state_raw
 72            )
 73        except Exception as e:
 74            raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
 75
 76    return params, event, payload, state
 77
 78
 79async def component_endpoint(name: str, request: Request) -> JSONResponse:
 80    """
 81    Generic component endpoint for FastAPI.
 82
 83    POST /components/{name}
 84    Body (JSON or form-encoded): {
 85        "event": "event_name",
 86        "payload": {...},
 87        "state": "serialized_state"
 88    }
 89
 90    Returns: {
 91        "html": "rendered_html",
 92        "state": "serialized_state",
 93        "component_id": "component-id"
 94    }
 95    """
 96    try:
 97        component_cls = registry.get(name)
 98        if not component_cls:
 99            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
100
101        data = await _parse_request_data(request)
102        params, event, payload, state = _extract_params(data)
103
104        component = component_cls(**params)
105        result = await component.async_dispatch(event=event, payload=payload, state=state)
106
107        result["state"] = StateSerializer.serialize(result["state"])
108
109        return JSONResponse(content=result)
110
111    except HTTPException:
112        raise
113    except Exception:
114        logger.exception(f"Error processing component '{name}'")
115        raise HTTPException(status_code=500, detail="Internal server error")
116
117
118async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse:
119    """
120    SSE streaming endpoint for long-running component operations.
121
122    POST /components/{name}/stream
123    Body (JSON or form-encoded): same as component_endpoint
124
125    Returns: text/event-stream with one ``data:`` frame per intermediate render.
126    """
127    try:
128        component_cls = registry.get(name)
129        if not component_cls:
130            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
131
132        if not issubclass(component_cls, StreamingComponent):
133            raise HTTPException(
134                status_code=400,
135                detail=f"Component '{name}' does not support streaming",
136            )
137
138        data = await _parse_request_data(request)
139        params, event, payload, state = _extract_params(data)
140
141        component = component_cls(**params)
142
143        async def event_generator():
144            async for frame in component.async_stream_dispatch(
145                event=event, payload=payload, state=state
146            ):
147                frame["state"] = StateSerializer.serialize(frame["state"])
148                yield format_sse_frame(frame)
149
150        return StreamingResponse(event_generator(), media_type="text/event-stream")
151
152    except HTTPException:
153        raise
154    except Exception:
155        logger.exception(f"Error processing streaming component '{name}'")
156        raise HTTPException(status_code=500, detail="Internal server error")
157
158
159def create_component_routes(app):
160    """
161    Add component endpoints to FastAPI app.
162
163    Registers both the standard POST endpoint and the SSE streaming endpoint.
164
165    Usage:
166        from fastapi import FastAPI
167        app = FastAPI()
168        create_component_routes(app)
169    """
170    app.add_api_route(
171        "/components/{name}",
172        component_endpoint,
173        methods=["POST"],
174        name="component_endpoint",
175    )
176    app.add_api_route(
177        "/components/{name}/stream",
178        stream_component_endpoint,
179        methods=["POST"],
180        name="stream_component_endpoint",
181    )
logger = <Logger component_framework.adapters.fastapi (WARNING)>
async def component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.JSONResponse:
 80async def component_endpoint(name: str, request: Request) -> JSONResponse:
 81    """
 82    Generic component endpoint for FastAPI.
 83
 84    POST /components/{name}
 85    Body (JSON or form-encoded): {
 86        "event": "event_name",
 87        "payload": {...},
 88        "state": "serialized_state"
 89    }
 90
 91    Returns: {
 92        "html": "rendered_html",
 93        "state": "serialized_state",
 94        "component_id": "component-id"
 95    }
 96    """
 97    try:
 98        component_cls = registry.get(name)
 99        if not component_cls:
100            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
101
102        data = await _parse_request_data(request)
103        params, event, payload, state = _extract_params(data)
104
105        component = component_cls(**params)
106        result = await component.async_dispatch(event=event, payload=payload, state=state)
107
108        result["state"] = StateSerializer.serialize(result["state"])
109
110        return JSONResponse(content=result)
111
112    except HTTPException:
113        raise
114    except Exception:
115        logger.exception(f"Error processing component '{name}'")
116        raise HTTPException(status_code=500, detail="Internal server error")

Generic component endpoint for FastAPI.

POST /components/{name} Body (JSON or form-encoded): { "event": "event_name", "payload": {...}, "state": "serialized_state" }

Returns: { "html": "rendered_html", "state": "serialized_state", "component_id": "component-id" }

async def stream_component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.StreamingResponse:
119async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse:
120    """
121    SSE streaming endpoint for long-running component operations.
122
123    POST /components/{name}/stream
124    Body (JSON or form-encoded): same as component_endpoint
125
126    Returns: text/event-stream with one ``data:`` frame per intermediate render.
127    """
128    try:
129        component_cls = registry.get(name)
130        if not component_cls:
131            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
132
133        if not issubclass(component_cls, StreamingComponent):
134            raise HTTPException(
135                status_code=400,
136                detail=f"Component '{name}' does not support streaming",
137            )
138
139        data = await _parse_request_data(request)
140        params, event, payload, state = _extract_params(data)
141
142        component = component_cls(**params)
143
144        async def event_generator():
145            async for frame in component.async_stream_dispatch(
146                event=event, payload=payload, state=state
147            ):
148                frame["state"] = StateSerializer.serialize(frame["state"])
149                yield format_sse_frame(frame)
150
151        return StreamingResponse(event_generator(), media_type="text/event-stream")
152
153    except HTTPException:
154        raise
155    except Exception:
156        logger.exception(f"Error processing streaming component '{name}'")
157        raise HTTPException(status_code=500, detail="Internal server error")

SSE streaming endpoint for long-running component operations.

POST /components/{name}/stream Body (JSON or form-encoded): same as component_endpoint

Returns: text/event-stream with one data: frame per intermediate render.

def create_component_routes(app):
160def create_component_routes(app):
161    """
162    Add component endpoints to FastAPI app.
163
164    Registers both the standard POST endpoint and the SSE streaming endpoint.
165
166    Usage:
167        from fastapi import FastAPI
168        app = FastAPI()
169        create_component_routes(app)
170    """
171    app.add_api_route(
172        "/components/{name}",
173        component_endpoint,
174        methods=["POST"],
175        name="component_endpoint",
176    )
177    app.add_api_route(
178        "/components/{name}/stream",
179        stream_component_endpoint,
180        methods=["POST"],
181        name="stream_component_endpoint",
182    )

Add component endpoints to FastAPI app.

Registers both the standard POST endpoint and the SSE streaming endpoint.

Usage:

from fastapi import FastAPI app = FastAPI() create_component_routes(app)