component_framework.adapters.fastapi
FastAPI adapter for component endpoints.
1"""FastAPI adapter for component endpoints.""" 2 3import json 4import logging 5 6try: 7 from fastapi import HTTPException, Request 8 from fastapi.responses import JSONResponse 9 from starlette.responses import StreamingResponse 10except ImportError as e: 11 from . import _require_extra 12 13 raise _require_extra("fastapi", "fastapi") from e 14 15from ..core import StateSerializer, registry 16from ..core.streaming import StreamingComponent, format_sse_frame 17 18logger = logging.getLogger(__name__) 19 20 21async def component_endpoint(name: str, request: Request) -> JSONResponse: 22 """ 23 Generic component endpoint for FastAPI. 24 25 POST /components/{name} 26 Body: { 27 "event": "event_name", 28 "payload": {...}, 29 "state": "serialized_state" 30 } 31 32 Returns: { 33 "html": "rendered_html", 34 "state": "serialized_state", 35 "component_id": "component-id" 36 } 37 """ 38 try: 39 # Get component class 40 component_cls = registry.get(name) 41 if not component_cls: 42 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 43 44 # Parse request data 45 try: 46 data = await request.json() 47 except Exception as e: 48 raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") 49 50 # Extract parameters 51 params = data.get("params", {}) 52 event = data.get("event") 53 payload_raw = data.get("payload", {}) 54 state_str = data.get("state") 55 56 # Guard against double-serialised payload from older client JS 57 if isinstance(payload_raw, str): 58 try: 59 payload = json.loads(payload_raw) 60 except (json.JSONDecodeError, ValueError): 61 payload = {} 62 else: 63 payload = payload_raw 64 65 # Deserialize state if provided 66 state = None 67 if state_str: 68 try: 69 state = StateSerializer.deserialize(state_str) 70 except Exception as e: 71 raise HTTPException(status_code=400, detail=f"Invalid state: {e}") 72 73 # Create and dispatch component (async to support async on_* handlers) 74 component = component_cls(**params) 75 result = await component.async_dispatch(event=event, payload=payload, state=state) 76 77 # Serialize state for response 78 result["state"] = StateSerializer.serialize(result["state"]) 79 80 return JSONResponse(content=result) 81 82 except HTTPException: 83 raise 84 except Exception: 85 logger.exception(f"Error processing component '{name}'") 86 raise HTTPException(status_code=500, detail="Internal server error") 87 88 89async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse: 90 """ 91 SSE streaming endpoint for long-running component operations. 92 93 POST /components/{name}/stream 94 Body: same as component_endpoint 95 96 Returns: text/event-stream with one ``data:`` frame per intermediate render. 97 """ 98 try: 99 component_cls = registry.get(name) 100 if not component_cls: 101 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 102 103 if not issubclass(component_cls, StreamingComponent): 104 raise HTTPException( 105 status_code=400, 106 detail=f"Component '{name}' does not support streaming", 107 ) 108 109 try: 110 data = await request.json() 111 except Exception as e: 112 raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") 113 114 params = data.get("params", {}) 115 event = data.get("event") 116 payload_raw = data.get("payload", {}) 117 state_str = data.get("state") 118 119 if isinstance(payload_raw, str): 120 try: 121 payload = json.loads(payload_raw) 122 except (json.JSONDecodeError, ValueError): 123 payload = {} 124 else: 125 payload = payload_raw 126 127 state = None 128 if state_str: 129 try: 130 state = StateSerializer.deserialize(state_str) 131 except Exception as e: 132 raise HTTPException(status_code=400, detail=f"Invalid state: {e}") 133 134 component = component_cls(**params) 135 136 async def event_generator(): 137 async for frame in component.async_stream_dispatch( 138 event=event, payload=payload, state=state 139 ): 140 frame["state"] = StateSerializer.serialize(frame["state"]) 141 yield format_sse_frame(frame) 142 143 return StreamingResponse(event_generator(), media_type="text/event-stream") 144 145 except HTTPException: 146 raise 147 except Exception: 148 logger.exception(f"Error processing streaming component '{name}'") 149 raise HTTPException(status_code=500, detail="Internal server error") 150 151 152def create_component_routes(app): 153 """ 154 Add component endpoints to FastAPI app. 155 156 Registers both the standard POST endpoint and the SSE streaming endpoint. 157 158 Usage: 159 from fastapi import FastAPI 160 app = FastAPI() 161 create_component_routes(app) 162 """ 163 app.add_api_route( 164 "/components/{name}", 165 component_endpoint, 166 methods=["POST"], 167 name="component_endpoint", 168 ) 169 app.add_api_route( 170 "/components/{name}/stream", 171 stream_component_endpoint, 172 methods=["POST"], 173 name="stream_component_endpoint", 174 )
logger =
<Logger component_framework.adapters.fastapi (WARNING)>
async def
component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.JSONResponse:
22async def component_endpoint(name: str, request: Request) -> JSONResponse: 23 """ 24 Generic component endpoint for FastAPI. 25 26 POST /components/{name} 27 Body: { 28 "event": "event_name", 29 "payload": {...}, 30 "state": "serialized_state" 31 } 32 33 Returns: { 34 "html": "rendered_html", 35 "state": "serialized_state", 36 "component_id": "component-id" 37 } 38 """ 39 try: 40 # Get component class 41 component_cls = registry.get(name) 42 if not component_cls: 43 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 44 45 # Parse request data 46 try: 47 data = await request.json() 48 except Exception as e: 49 raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") 50 51 # Extract parameters 52 params = data.get("params", {}) 53 event = data.get("event") 54 payload_raw = data.get("payload", {}) 55 state_str = data.get("state") 56 57 # Guard against double-serialised payload from older client JS 58 if isinstance(payload_raw, str): 59 try: 60 payload = json.loads(payload_raw) 61 except (json.JSONDecodeError, ValueError): 62 payload = {} 63 else: 64 payload = payload_raw 65 66 # Deserialize state if provided 67 state = None 68 if state_str: 69 try: 70 state = StateSerializer.deserialize(state_str) 71 except Exception as e: 72 raise HTTPException(status_code=400, detail=f"Invalid state: {e}") 73 74 # Create and dispatch component (async to support async on_* handlers) 75 component = component_cls(**params) 76 result = await component.async_dispatch(event=event, payload=payload, state=state) 77 78 # Serialize state for response 79 result["state"] = StateSerializer.serialize(result["state"]) 80 81 return JSONResponse(content=result) 82 83 except HTTPException: 84 raise 85 except Exception: 86 logger.exception(f"Error processing component '{name}'") 87 raise HTTPException(status_code=500, detail="Internal server error")
Generic component endpoint for FastAPI.
POST /components/{name} Body: { "event": "event_name", "payload": {...}, "state": "serialized_state" }
Returns: { "html": "rendered_html", "state": "serialized_state", "component_id": "component-id" }
async def
stream_component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.StreamingResponse:
90async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse: 91 """ 92 SSE streaming endpoint for long-running component operations. 93 94 POST /components/{name}/stream 95 Body: same as component_endpoint 96 97 Returns: text/event-stream with one ``data:`` frame per intermediate render. 98 """ 99 try: 100 component_cls = registry.get(name) 101 if not component_cls: 102 raise HTTPException(status_code=404, detail=f"Component '{name}' not found") 103 104 if not issubclass(component_cls, StreamingComponent): 105 raise HTTPException( 106 status_code=400, 107 detail=f"Component '{name}' does not support streaming", 108 ) 109 110 try: 111 data = await request.json() 112 except Exception as e: 113 raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") 114 115 params = data.get("params", {}) 116 event = data.get("event") 117 payload_raw = data.get("payload", {}) 118 state_str = data.get("state") 119 120 if isinstance(payload_raw, str): 121 try: 122 payload = json.loads(payload_raw) 123 except (json.JSONDecodeError, ValueError): 124 payload = {} 125 else: 126 payload = payload_raw 127 128 state = None 129 if state_str: 130 try: 131 state = StateSerializer.deserialize(state_str) 132 except Exception as e: 133 raise HTTPException(status_code=400, detail=f"Invalid state: {e}") 134 135 component = component_cls(**params) 136 137 async def event_generator(): 138 async for frame in component.async_stream_dispatch( 139 event=event, payload=payload, state=state 140 ): 141 frame["state"] = StateSerializer.serialize(frame["state"]) 142 yield format_sse_frame(frame) 143 144 return StreamingResponse(event_generator(), media_type="text/event-stream") 145 146 except HTTPException: 147 raise 148 except Exception: 149 logger.exception(f"Error processing streaming component '{name}'") 150 raise HTTPException(status_code=500, detail="Internal server error")
SSE streaming endpoint for long-running component operations.
POST /components/{name}/stream Body: same as component_endpoint
Returns: text/event-stream with one data: frame per intermediate render.
def
create_component_routes(app):
153def create_component_routes(app): 154 """ 155 Add component endpoints to FastAPI app. 156 157 Registers both the standard POST endpoint and the SSE streaming endpoint. 158 159 Usage: 160 from fastapi import FastAPI 161 app = FastAPI() 162 create_component_routes(app) 163 """ 164 app.add_api_route( 165 "/components/{name}", 166 component_endpoint, 167 methods=["POST"], 168 name="component_endpoint", 169 ) 170 app.add_api_route( 171 "/components/{name}/stream", 172 stream_component_endpoint, 173 methods=["POST"], 174 name="stream_component_endpoint", 175 )
Add component endpoints to FastAPI app.
Registers both the standard POST endpoint and the SSE streaming endpoint.
Usage:
from fastapi import FastAPI app = FastAPI() create_component_routes(app)