component_framework.adapters.litestar_websocket

Litestar WebSocket adapter.

 1"""Litestar WebSocket adapter."""
 2
 3import logging
 4from uuid import uuid4
 5
 6try:
 7    from litestar import WebSocket
 8    from litestar.exceptions import WebSocketDisconnect
 9except ImportError as e:
10    from . import _require_extra
11
12    raise _require_extra("litestar", "litestar") from e
13
14from ..core.websocket import WebSocketConnection, ws_manager
15
16logger = logging.getLogger(__name__)
17
18
19class LitestarWebSocketConnection(WebSocketConnection):
20    """Litestar WebSocket connection wrapper."""
21
22    def __init__(self, websocket: WebSocket):
23        self.websocket = websocket
24
25    async def send(self, data: dict):
26        """Send JSON data to client."""
27        await self.websocket.send_json(data)
28
29    async def receive(self) -> dict:
30        """Receive JSON data from client."""
31        return await self.websocket.receive_json()
32
33    async def close(self):
34        """Close WebSocket connection."""
35        await self.websocket.close()
36
37
38async def component_websocket_endpoint(websocket: WebSocket) -> None:
39    """
40    Litestar WebSocket endpoint for components.
41
42    Usage in a Litestar app:
43        from litestar import Litestar, websocket_listener
44        from component_framework.adapters.litestar_websocket import (
45            component_websocket_endpoint,
46        )
47
48        @websocket_listener("/ws")
49        async def ws_handler(websocket: WebSocket) -> None:
50            await component_websocket_endpoint(websocket)
51
52        app = Litestar(route_handlers=[ws_handler])
53    """
54    # Accept connection
55    await websocket.accept()
56
57    # Generate connection ID
58    connection_id = str(uuid4())
59    connection = LitestarWebSocketConnection(websocket)
60
61    # Register with manager
62    await ws_manager.connect(connection, connection_id)
63
64    try:
65        # Send connection confirmation
66        await connection.send({"type": "connected", "connection_id": connection_id})
67
68        # Message loop
69        while True:
70            data = await connection.receive()
71            await ws_manager.handle_message(connection, connection_id, data)
72
73    except WebSocketDisconnect:
74        logger.info(f"WebSocket disconnected: {connection_id}")
75    except Exception as e:
76        logger.exception(f"WebSocket error: {e}")
77    finally:
78        await ws_manager.disconnect(connection_id)
class LitestarWebSocketConnection(component_framework.core.websocket.WebSocketConnection):
20class LitestarWebSocketConnection(WebSocketConnection):
21    """Litestar WebSocket connection wrapper."""
22
23    def __init__(self, websocket: WebSocket):
24        self.websocket = websocket
25
26    async def send(self, data: dict):
27        """Send JSON data to client."""
28        await self.websocket.send_json(data)
29
30    async def receive(self) -> dict:
31        """Receive JSON data from client."""
32        return await self.websocket.receive_json()
33
34    async def close(self):
35        """Close WebSocket connection."""
36        await self.websocket.close()

Litestar WebSocket connection wrapper.

LitestarWebSocketConnection(websocket: litestar.connection.websocket.WebSocket)
23    def __init__(self, websocket: WebSocket):
24        self.websocket = websocket
websocket
async def send(self, data: dict):
26    async def send(self, data: dict):
27        """Send JSON data to client."""
28        await self.websocket.send_json(data)

Send JSON data to client.

async def receive(self) -> dict:
30    async def receive(self) -> dict:
31        """Receive JSON data from client."""
32        return await self.websocket.receive_json()

Receive JSON data from client.

async def close(self):
34    async def close(self):
35        """Close WebSocket connection."""
36        await self.websocket.close()

Close WebSocket connection.

async def component_websocket_endpoint(websocket: litestar.connection.websocket.WebSocket) -> None:
39async def component_websocket_endpoint(websocket: WebSocket) -> None:
40    """
41    Litestar WebSocket endpoint for components.
42
43    Usage in a Litestar app:
44        from litestar import Litestar, websocket_listener
45        from component_framework.adapters.litestar_websocket import (
46            component_websocket_endpoint,
47        )
48
49        @websocket_listener("/ws")
50        async def ws_handler(websocket: WebSocket) -> None:
51            await component_websocket_endpoint(websocket)
52
53        app = Litestar(route_handlers=[ws_handler])
54    """
55    # Accept connection
56    await websocket.accept()
57
58    # Generate connection ID
59    connection_id = str(uuid4())
60    connection = LitestarWebSocketConnection(websocket)
61
62    # Register with manager
63    await ws_manager.connect(connection, connection_id)
64
65    try:
66        # Send connection confirmation
67        await connection.send({"type": "connected", "connection_id": connection_id})
68
69        # Message loop
70        while True:
71            data = await connection.receive()
72            await ws_manager.handle_message(connection, connection_id, data)
73
74    except WebSocketDisconnect:
75        logger.info(f"WebSocket disconnected: {connection_id}")
76    except Exception as e:
77        logger.exception(f"WebSocket error: {e}")
78    finally:
79        await ws_manager.disconnect(connection_id)

Litestar WebSocket endpoint for components.

Usage in a Litestar app:

from litestar import Litestar, websocket_listener from component_framework.adapters.litestar_websocket import ( component_websocket_endpoint, )

@websocket_listener("/ws") async def ws_handler(websocket: WebSocket) -> None: await component_websocket_endpoint(websocket)

app = Litestar(route_handlers=[ws_handler])