component_framework.adapters.fastapi
FastAPI adapter for component endpoints.
1"""FastAPI adapter for component endpoints.""" 2 3import json 4import logging 5 6try: 7 from fastapi import HTTPException, Request 8 from fastapi.responses import JSONResponse 9 from starlette.responses import StreamingResponse 10except ImportError as e: 11 from . import _require_extra 12 13 raise _require_extra("fastapi", "fastapi") from e 14 15from ..core import StateSerializer, registry 16from ..core.streaming import StreamingComponent, format_sse_frame 17 18logger = logging.getLogger(__name__) 19 20 21def _parse_json_str(value: str | dict | None, default: dict | None = None) -> dict | None: 22 """Parse a value that may be a JSON string, a dict, or None.""" 23 if value is None: 24 return default 25 if isinstance(value, dict): 26 return value 27 try: 28 return json.loads(value) 29 except (json.JSONDecodeError, ValueError): 30 return default 31 32 33async def _parse_request_data(request: Request) -> dict: 34 """Parse request body from JSON or form-encoded data. 35 36 HTMX sends ``application/x-www-form-urlencoded`` by default; the JS 37 ``component-client.js`` sends ``application/json``. This helper 38 normalises both into a dict with ``event``, ``payload``, ``state``, 39 and ``params`` keys. 40 """ 41 content_type = request.headers.get("content-type", "") 42 43 if "application/json" in content_type: 44 try: 45 data = await request.json() 46 except Exception as e: 47 raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") 48 else: 49 # Form-encoded (HTMX default) — hx-vals are merged into form fields 50 form = await request.form() 51 data = dict(form) 52 53 return data 54 55 56def _extract_params(data: dict) -> tuple[dict, str | None, dict, dict | None]: 57 """Extract and normalise event, payload, state, and params from parsed data. 58 59 Returns: 60 (params, event, payload, state) tuple ready for component dispatch. 61 """ 62 params = _parse_json_str(data.get("params"), default={}) or {} 63 event = data.get("event") 64 payload = _parse_json_str(data.get("payload"), default={}) or {} 65 state_raw = data.get("state") 66 67 state = None 68 if state_raw: 69 try: 70 state = ( 71 StateSerializer.deserialize(state_raw) if isinstance(state_raw, str) else state_raw 72 ) 73 except Exception as e: 74 raise HTTPException(status_code=400, detail=f"Invalid state: {e}") 75 76 return params, event, payload, state 77 78 79async def component_endpoint(name: str, request: Request) -> JSONResponse: 80 """ 81 Generic component endpoint for FastAPI. 82 83 POST /components/{name} 84 Body (JSON or form-encoded): { 85 "event": "event_name", 86 "payload": {...}, 87 "state": "serialized_state" 88 } 89 90 Returns: { 91 "html": "rendered_html", 92 "state": "serialized_state", 93 "component_id": "component-id" 94 } 95 """ 96 try: 97 component_cls = registry.get(name) 98 if not component_cls: 99 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 100 101 data = await _parse_request_data(request) 102 params, event, payload, state = _extract_params(data) 103 104 component = component_cls(**params) 105 result = await component.async_dispatch(event=event, payload=payload, state=state) 106 107 result["state"] = StateSerializer.serialize(result["state"]) 108 109 return JSONResponse(content=result) 110 111 except HTTPException: 112 raise 113 except Exception: 114 logger.exception(f"Error processing component '{name}'") 115 raise HTTPException(status_code=500, detail="Internal server error") 116 117 118async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse: 119 """ 120 SSE streaming endpoint for long-running component operations. 121 122 POST /components/{name}/stream 123 Body (JSON or form-encoded): same as component_endpoint 124 125 Returns: text/event-stream with one ``data:`` frame per intermediate render. 126 """ 127 try: 128 component_cls = registry.get(name) 129 if not component_cls: 130 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 131 132 if not issubclass(component_cls, StreamingComponent): 133 raise HTTPException( 134 status_code=400, 135 detail=f"Component '{name}' does not support streaming", 136 ) 137 138 data = await _parse_request_data(request) 139 params, event, payload, state = _extract_params(data) 140 141 component = component_cls(**params) 142 143 async def event_generator(): 144 async for frame in component.async_stream_dispatch( 145 event=event, payload=payload, state=state 146 ): 147 frame["state"] = StateSerializer.serialize(frame["state"]) 148 yield format_sse_frame(frame) 149 150 return StreamingResponse(event_generator(), media_type="text/event-stream") 151 152 except HTTPException: 153 raise 154 except Exception: 155 logger.exception(f"Error processing streaming component '{name}'") 156 raise HTTPException(status_code=500, detail="Internal server error") 157 158 159def create_component_routes(app): 160 """ 161 Add component endpoints to FastAPI app. 162 163 Registers both the standard POST endpoint and the SSE streaming endpoint. 164 165 Usage: 166 from fastapi import FastAPI 167 app = FastAPI() 168 create_component_routes(app) 169 """ 170 app.add_api_route( 171 "/components/{name}", 172 component_endpoint, 173 methods=["POST"], 174 name="component_endpoint", 175 ) 176 app.add_api_route( 177 "/components/{name}/stream", 178 stream_component_endpoint, 179 methods=["POST"], 180 name="stream_component_endpoint", 181 )
logger =
<Logger component_framework.adapters.fastapi (WARNING)>
async def
component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.JSONResponse:
80async def component_endpoint(name: str, request: Request) -> JSONResponse: 81 """ 82 Generic component endpoint for FastAPI. 83 84 POST /components/{name} 85 Body (JSON or form-encoded): { 86 "event": "event_name", 87 "payload": {...}, 88 "state": "serialized_state" 89 } 90 91 Returns: { 92 "html": "rendered_html", 93 "state": "serialized_state", 94 "component_id": "component-id" 95 } 96 """ 97 try: 98 component_cls = registry.get(name) 99 if not component_cls: 100 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 101 102 data = await _parse_request_data(request) 103 params, event, payload, state = _extract_params(data) 104 105 component = component_cls(**params) 106 result = await component.async_dispatch(event=event, payload=payload, state=state) 107 108 result["state"] = StateSerializer.serialize(result["state"]) 109 110 return JSONResponse(content=result) 111 112 except HTTPException: 113 raise 114 except Exception: 115 logger.exception(f"Error processing component '{name}'") 116 raise HTTPException(status_code=500, detail="Internal server error")
Generic component endpoint for FastAPI.
POST /components/{name} Body (JSON or form-encoded): { "event": "event_name", "payload": {...}, "state": "serialized_state" }
Returns: { "html": "rendered_html", "state": "serialized_state", "component_id": "component-id" }
async def
stream_component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.StreamingResponse:
119async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse: 120 """ 121 SSE streaming endpoint for long-running component operations. 122 123 POST /components/{name}/stream 124 Body (JSON or form-encoded): same as component_endpoint 125 126 Returns: text/event-stream with one ``data:`` frame per intermediate render. 127 """ 128 try: 129 component_cls = registry.get(name) 130 if not component_cls: 131 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 132 133 if not issubclass(component_cls, StreamingComponent): 134 raise HTTPException( 135 status_code=400, 136 detail=f"Component '{name}' does not support streaming", 137 ) 138 139 data = await _parse_request_data(request) 140 params, event, payload, state = _extract_params(data) 141 142 component = component_cls(**params) 143 144 async def event_generator(): 145 async for frame in component.async_stream_dispatch( 146 event=event, payload=payload, state=state 147 ): 148 frame["state"] = StateSerializer.serialize(frame["state"]) 149 yield format_sse_frame(frame) 150 151 return StreamingResponse(event_generator(), media_type="text/event-stream") 152 153 except HTTPException: 154 raise 155 except Exception: 156 logger.exception(f"Error processing streaming component '{name}'") 157 raise HTTPException(status_code=500, detail="Internal server error")
SSE streaming endpoint for long-running component operations.
POST /components/{name}/stream Body (JSON or form-encoded): same as component_endpoint
Returns: text/event-stream with one data: frame per intermediate render.
def
create_component_routes(app):
160def create_component_routes(app): 161 """ 162 Add component endpoints to FastAPI app. 163 164 Registers both the standard POST endpoint and the SSE streaming endpoint. 165 166 Usage: 167 from fastapi import FastAPI 168 app = FastAPI() 169 create_component_routes(app) 170 """ 171 app.add_api_route( 172 "/components/{name}", 173 component_endpoint, 174 methods=["POST"], 175 name="component_endpoint", 176 ) 177 app.add_api_route( 178 "/components/{name}/stream", 179 stream_component_endpoint, 180 methods=["POST"], 181 name="stream_component_endpoint", 182 )
Add component endpoints to FastAPI app.
Registers both the standard POST endpoint and the SSE streaming endpoint.
Usage:
from fastapi import FastAPI app = FastAPI() create_component_routes(app)