component_framework.adapters.fastapi

FastAPI adapter for component endpoints.

  1"""FastAPI adapter for component endpoints."""
  2
  3import json
  4import logging
  5
  6try:
  7    from fastapi import HTTPException, Request
  8    from fastapi.responses import JSONResponse
  9    from starlette.responses import StreamingResponse
 10except ImportError as e:
 11    from . import _require_extra
 12
 13    raise _require_extra("fastapi", "fastapi") from e
 14
 15from ..core import StateSerializer, registry
 16from ..core.streaming import StreamingComponent, format_sse_frame
 17
 18logger = logging.getLogger(__name__)
 19
 20
 21async def component_endpoint(name: str, request: Request) -> JSONResponse:
 22    """
 23    Generic component endpoint for FastAPI.
 24
 25    POST /components/{name}
 26    Body: {
 27        "event": "event_name",
 28        "payload": {...},
 29        "state": "serialized_state"
 30    }
 31
 32    Returns: {
 33        "html": "rendered_html",
 34        "state": "serialized_state",
 35        "component_id": "component-id"
 36    }
 37    """
 38    try:
 39        # Get component class
 40        component_cls = registry.get(name)
 41        if not component_cls:
 42            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
 43
 44        # Parse request data
 45        try:
 46            data = await request.json()
 47        except Exception as e:
 48            raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
 49
 50        # Extract parameters
 51        params = data.get("params", {})
 52        event = data.get("event")
 53        payload_raw = data.get("payload", {})
 54        state_str = data.get("state")
 55
 56        # Guard against double-serialised payload from older client JS
 57        if isinstance(payload_raw, str):
 58            try:
 59                payload = json.loads(payload_raw)
 60            except (json.JSONDecodeError, ValueError):
 61                payload = {}
 62        else:
 63            payload = payload_raw
 64
 65        # Deserialize state if provided
 66        state = None
 67        if state_str:
 68            try:
 69                state = StateSerializer.deserialize(state_str)
 70            except Exception as e:
 71                raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
 72
 73        # Create and dispatch component (async to support async on_* handlers)
 74        component = component_cls(**params)
 75        result = await component.async_dispatch(event=event, payload=payload, state=state)
 76
 77        # Serialize state for response
 78        result["state"] = StateSerializer.serialize(result["state"])
 79
 80        return JSONResponse(content=result)
 81
 82    except HTTPException:
 83        raise
 84    except Exception:
 85        logger.exception(f"Error processing component '{name}'")
 86        raise HTTPException(status_code=500, detail="Internal server error")
 87
 88
 89async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse:
 90    """
 91    SSE streaming endpoint for long-running component operations.
 92
 93    POST /components/{name}/stream
 94    Body: same as component_endpoint
 95
 96    Returns: text/event-stream with one ``data:`` frame per intermediate render.
 97    """
 98    try:
 99        component_cls = registry.get(name)
100        if not component_cls:
101            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
102
103        if not issubclass(component_cls, StreamingComponent):
104            raise HTTPException(
105                status_code=400,
106                detail=f"Component '{name}' does not support streaming",
107            )
108
109        try:
110            data = await request.json()
111        except Exception as e:
112            raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
113
114        params = data.get("params", {})
115        event = data.get("event")
116        payload_raw = data.get("payload", {})
117        state_str = data.get("state")
118
119        if isinstance(payload_raw, str):
120            try:
121                payload = json.loads(payload_raw)
122            except (json.JSONDecodeError, ValueError):
123                payload = {}
124        else:
125            payload = payload_raw
126
127        state = None
128        if state_str:
129            try:
130                state = StateSerializer.deserialize(state_str)
131            except Exception as e:
132                raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
133
134        component = component_cls(**params)
135
136        async def event_generator():
137            async for frame in component.async_stream_dispatch(
138                event=event, payload=payload, state=state
139            ):
140                frame["state"] = StateSerializer.serialize(frame["state"])
141                yield format_sse_frame(frame)
142
143        return StreamingResponse(event_generator(), media_type="text/event-stream")
144
145    except HTTPException:
146        raise
147    except Exception:
148        logger.exception(f"Error processing streaming component '{name}'")
149        raise HTTPException(status_code=500, detail="Internal server error")
150
151
152def create_component_routes(app):
153    """
154    Add component endpoints to FastAPI app.
155
156    Registers both the standard POST endpoint and the SSE streaming endpoint.
157
158    Usage:
159        from fastapi import FastAPI
160        app = FastAPI()
161        create_component_routes(app)
162    """
163    app.add_api_route(
164        "/components/{name}",
165        component_endpoint,
166        methods=["POST"],
167        name="component_endpoint",
168    )
169    app.add_api_route(
170        "/components/{name}/stream",
171        stream_component_endpoint,
172        methods=["POST"],
173        name="stream_component_endpoint",
174    )
logger = <Logger component_framework.adapters.fastapi (WARNING)>
async def component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.JSONResponse:
22async def component_endpoint(name: str, request: Request) -> JSONResponse:
23    """
24    Generic component endpoint for FastAPI.
25
26    POST /components/{name}
27    Body: {
28        "event": "event_name",
29        "payload": {...},
30        "state": "serialized_state"
31    }
32
33    Returns: {
34        "html": "rendered_html",
35        "state": "serialized_state",
36        "component_id": "component-id"
37    }
38    """
39    try:
40        # Get component class
41        component_cls = registry.get(name)
42        if not component_cls:
43            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
44
45        # Parse request data
46        try:
47            data = await request.json()
48        except Exception as e:
49            raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
50
51        # Extract parameters
52        params = data.get("params", {})
53        event = data.get("event")
54        payload_raw = data.get("payload", {})
55        state_str = data.get("state")
56
57        # Guard against double-serialised payload from older client JS
58        if isinstance(payload_raw, str):
59            try:
60                payload = json.loads(payload_raw)
61            except (json.JSONDecodeError, ValueError):
62                payload = {}
63        else:
64            payload = payload_raw
65
66        # Deserialize state if provided
67        state = None
68        if state_str:
69            try:
70                state = StateSerializer.deserialize(state_str)
71            except Exception as e:
72                raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
73
74        # Create and dispatch component (async to support async on_* handlers)
75        component = component_cls(**params)
76        result = await component.async_dispatch(event=event, payload=payload, state=state)
77
78        # Serialize state for response
79        result["state"] = StateSerializer.serialize(result["state"])
80
81        return JSONResponse(content=result)
82
83    except HTTPException:
84        raise
85    except Exception:
86        logger.exception(f"Error processing component '{name}'")
87        raise HTTPException(status_code=500, detail="Internal server error")

Generic component endpoint for FastAPI.

POST /components/{name} Body: { "event": "event_name", "payload": {...}, "state": "serialized_state" }

Returns: { "html": "rendered_html", "state": "serialized_state", "component_id": "component-id" }

async def stream_component_endpoint( name: str, request: starlette.requests.Request) -> starlette.responses.StreamingResponse:
 90async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse:
 91    """
 92    SSE streaming endpoint for long-running component operations.
 93
 94    POST /components/{name}/stream
 95    Body: same as component_endpoint
 96
 97    Returns: text/event-stream with one ``data:`` frame per intermediate render.
 98    """
 99    try:
100        component_cls = registry.get(name)
101        if not component_cls:
102            raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
103
104        if not issubclass(component_cls, StreamingComponent):
105            raise HTTPException(
106                status_code=400,
107                detail=f"Component '{name}' does not support streaming",
108            )
109
110        try:
111            data = await request.json()
112        except Exception as e:
113            raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
114
115        params = data.get("params", {})
116        event = data.get("event")
117        payload_raw = data.get("payload", {})
118        state_str = data.get("state")
119
120        if isinstance(payload_raw, str):
121            try:
122                payload = json.loads(payload_raw)
123            except (json.JSONDecodeError, ValueError):
124                payload = {}
125        else:
126            payload = payload_raw
127
128        state = None
129        if state_str:
130            try:
131                state = StateSerializer.deserialize(state_str)
132            except Exception as e:
133                raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
134
135        component = component_cls(**params)
136
137        async def event_generator():
138            async for frame in component.async_stream_dispatch(
139                event=event, payload=payload, state=state
140            ):
141                frame["state"] = StateSerializer.serialize(frame["state"])
142                yield format_sse_frame(frame)
143
144        return StreamingResponse(event_generator(), media_type="text/event-stream")
145
146    except HTTPException:
147        raise
148    except Exception:
149        logger.exception(f"Error processing streaming component '{name}'")
150        raise HTTPException(status_code=500, detail="Internal server error")

SSE streaming endpoint for long-running component operations.

POST /components/{name}/stream Body: same as component_endpoint

Returns: text/event-stream with one data: frame per intermediate render.

def create_component_routes(app):
153def create_component_routes(app):
154    """
155    Add component endpoints to FastAPI app.
156
157    Registers both the standard POST endpoint and the SSE streaming endpoint.
158
159    Usage:
160        from fastapi import FastAPI
161        app = FastAPI()
162        create_component_routes(app)
163    """
164    app.add_api_route(
165        "/components/{name}",
166        component_endpoint,
167        methods=["POST"],
168        name="component_endpoint",
169    )
170    app.add_api_route(
171        "/components/{name}/stream",
172        stream_component_endpoint,
173        methods=["POST"],
174        name="stream_component_endpoint",
175    )

Add component endpoints to FastAPI app.

Registers both the standard POST endpoint and the SSE streaming endpoint.

Usage:

from fastapi import FastAPI app = FastAPI() create_component_routes(app)