component_framework.adapters.fastapi_websocket

FastAPI WebSocket adapter.

 1"""FastAPI WebSocket adapter."""
 2
 3import logging
 4from uuid import uuid4
 5
 6try:
 7    from fastapi import WebSocket, WebSocketDisconnect
 8except ImportError as e:
 9    from . import _require_extra
10
11    raise _require_extra("fastapi", "fastapi") from e
12
13from ..core.websocket import WebSocketConnection, ws_manager
14
15logger = logging.getLogger(__name__)
16
17
18class FastAPIWebSocketConnection(WebSocketConnection):
19    """FastAPI WebSocket connection wrapper."""
20
21    def __init__(self, websocket: WebSocket):
22        self.websocket = websocket
23
24    async def send(self, data: dict):
25        """Send JSON data to client."""
26        await self.websocket.send_json(data)
27
28    async def receive(self) -> dict:
29        """Receive JSON data from client."""
30        return await self.websocket.receive_json()
31
32    async def close(self):
33        """Close WebSocket connection."""
34        await self.websocket.close()
35
36
37async def component_websocket_endpoint(websocket: WebSocket):
38    """
39    FastAPI WebSocket endpoint for components.
40
41    Usage in FastAPI app:
42        from fastapi import FastAPI
43        from component_framework.adapters.fastapi_websocket import component_websocket_endpoint
44
45        app = FastAPI()
46
47        @app.websocket("/ws")
48        async def websocket_route(websocket: WebSocket):
49            await component_websocket_endpoint(websocket)
50    """
51    # Accept connection
52    await websocket.accept()
53
54    # Generate connection ID
55    connection_id = str(uuid4())
56    connection = FastAPIWebSocketConnection(websocket)
57
58    # Register with manager
59    await ws_manager.connect(connection, connection_id)
60
61    try:
62        # Send connection confirmation
63        await connection.send({"type": "connected", "connection_id": connection_id})
64
65        # Message loop
66        while True:
67            # Receive message
68            data = await connection.receive()
69
70            # Handle message
71            await ws_manager.handle_message(connection, connection_id, data)
72
73    except WebSocketDisconnect:
74        logger.info(f"WebSocket disconnected: {connection_id}")
75    except Exception as e:
76        logger.exception(f"WebSocket error: {e}")
77    finally:
78        await ws_manager.disconnect(connection_id)
79
80
81def create_websocket_route(app):
82    """
83    Add WebSocket route to FastAPI app.
84
85    Usage:
86        from fastapi import FastAPI
87        app = FastAPI()
88        create_websocket_route(app)
89    """
90
91    @app.websocket("/ws")
92    async def websocket_route(websocket: WebSocket):
93        await component_websocket_endpoint(websocket)
class FastAPIWebSocketConnection(component_framework.core.websocket.WebSocketConnection):
19class FastAPIWebSocketConnection(WebSocketConnection):
20    """FastAPI WebSocket connection wrapper."""
21
22    def __init__(self, websocket: WebSocket):
23        self.websocket = websocket
24
25    async def send(self, data: dict):
26        """Send JSON data to client."""
27        await self.websocket.send_json(data)
28
29    async def receive(self) -> dict:
30        """Receive JSON data from client."""
31        return await self.websocket.receive_json()
32
33    async def close(self):
34        """Close WebSocket connection."""
35        await self.websocket.close()

FastAPI WebSocket connection wrapper.

FastAPIWebSocketConnection(websocket: starlette.websockets.WebSocket)
22    def __init__(self, websocket: WebSocket):
23        self.websocket = websocket
websocket
async def send(self, data: dict):
25    async def send(self, data: dict):
26        """Send JSON data to client."""
27        await self.websocket.send_json(data)

Send JSON data to client.

async def receive(self) -> dict:
29    async def receive(self) -> dict:
30        """Receive JSON data from client."""
31        return await self.websocket.receive_json()

Receive JSON data from client.

async def close(self):
33    async def close(self):
34        """Close WebSocket connection."""
35        await self.websocket.close()

Close WebSocket connection.

async def component_websocket_endpoint(websocket: starlette.websockets.WebSocket):
38async def component_websocket_endpoint(websocket: WebSocket):
39    """
40    FastAPI WebSocket endpoint for components.
41
42    Usage in FastAPI app:
43        from fastapi import FastAPI
44        from component_framework.adapters.fastapi_websocket import component_websocket_endpoint
45
46        app = FastAPI()
47
48        @app.websocket("/ws")
49        async def websocket_route(websocket: WebSocket):
50            await component_websocket_endpoint(websocket)
51    """
52    # Accept connection
53    await websocket.accept()
54
55    # Generate connection ID
56    connection_id = str(uuid4())
57    connection = FastAPIWebSocketConnection(websocket)
58
59    # Register with manager
60    await ws_manager.connect(connection, connection_id)
61
62    try:
63        # Send connection confirmation
64        await connection.send({"type": "connected", "connection_id": connection_id})
65
66        # Message loop
67        while True:
68            # Receive message
69            data = await connection.receive()
70
71            # Handle message
72            await ws_manager.handle_message(connection, connection_id, data)
73
74    except WebSocketDisconnect:
75        logger.info(f"WebSocket disconnected: {connection_id}")
76    except Exception as e:
77        logger.exception(f"WebSocket error: {e}")
78    finally:
79        await ws_manager.disconnect(connection_id)

FastAPI WebSocket endpoint for components.

Usage in FastAPI app:

from fastapi import FastAPI from component_framework.adapters.fastapi_websocket import component_websocket_endpoint

app = FastAPI()

@app.websocket("/ws") async def websocket_route(websocket: WebSocket): await component_websocket_endpoint(websocket)

def create_websocket_route(app):
82def create_websocket_route(app):
83    """
84    Add WebSocket route to FastAPI app.
85
86    Usage:
87        from fastapi import FastAPI
88        app = FastAPI()
89        create_websocket_route(app)
90    """
91
92    @app.websocket("/ws")
93    async def websocket_route(websocket: WebSocket):
94        await component_websocket_endpoint(websocket)

Add WebSocket route to FastAPI app.

Usage:

from fastapi import FastAPI app = FastAPI() create_websocket_route(app)