component_framework.adapters.litestar_websocket
Litestar WebSocket adapter.
1"""Litestar WebSocket adapter.""" 2 3import logging 4from uuid import uuid4 5 6try: 7 from litestar import WebSocket 8 from litestar.exceptions import WebSocketDisconnect 9except ImportError as e: 10 from . import _require_extra 11 12 raise _require_extra("litestar", "litestar") from e 13 14from ..core.websocket import WebSocketConnection, ws_manager 15 16logger = logging.getLogger(__name__) 17 18 19class LitestarWebSocketConnection(WebSocketConnection): 20 """Litestar WebSocket connection wrapper.""" 21 22 def __init__(self, websocket: WebSocket): 23 self.websocket = websocket 24 25 async def send(self, data: dict): 26 """Send JSON data to client.""" 27 await self.websocket.send_json(data) 28 29 async def receive(self) -> dict: 30 """Receive JSON data from client.""" 31 return await self.websocket.receive_json() 32 33 async def close(self): 34 """Close WebSocket connection.""" 35 await self.websocket.close() 36 37 38async def component_websocket_endpoint(websocket: WebSocket) -> None: 39 """ 40 Litestar WebSocket endpoint for components. 41 42 Usage in a Litestar app: 43 from litestar import Litestar, websocket_listener 44 from component_framework.adapters.litestar_websocket import ( 45 component_websocket_endpoint, 46 ) 47 48 @websocket_listener("/ws") 49 async def ws_handler(websocket: WebSocket) -> None: 50 await component_websocket_endpoint(websocket) 51 52 app = Litestar(route_handlers=[ws_handler]) 53 """ 54 # Accept connection 55 await websocket.accept() 56 57 # Generate connection ID 58 connection_id = str(uuid4()) 59 connection = LitestarWebSocketConnection(websocket) 60 61 # Register with manager 62 await ws_manager.connect(connection, connection_id) 63 64 try: 65 # Send connection confirmation 66 await connection.send({"type": "connected", "connection_id": connection_id}) 67 68 # Message loop 69 while True: 70 data = await connection.receive() 71 await ws_manager.handle_message(connection, connection_id, data) 72 73 except WebSocketDisconnect: 74 logger.info(f"WebSocket disconnected: {connection_id}") 75 except Exception as e: 76 logger.exception(f"WebSocket error: {e}") 77 finally: 78 await ws_manager.disconnect(connection_id)
logger =
<Logger component_framework.adapters.litestar_websocket (WARNING)>
class
LitestarWebSocketConnection(component_framework.core.websocket.WebSocketConnection):
20class LitestarWebSocketConnection(WebSocketConnection): 21 """Litestar WebSocket connection wrapper.""" 22 23 def __init__(self, websocket: WebSocket): 24 self.websocket = websocket 25 26 async def send(self, data: dict): 27 """Send JSON data to client.""" 28 await self.websocket.send_json(data) 29 30 async def receive(self) -> dict: 31 """Receive JSON data from client.""" 32 return await self.websocket.receive_json() 33 34 async def close(self): 35 """Close WebSocket connection.""" 36 await self.websocket.close()
Litestar WebSocket connection wrapper.
async def
send(self, data: dict):
26 async def send(self, data: dict): 27 """Send JSON data to client.""" 28 await self.websocket.send_json(data)
Send JSON data to client.
async def
component_websocket_endpoint(websocket: litestar.connection.websocket.WebSocket) -> None:
39async def component_websocket_endpoint(websocket: WebSocket) -> None: 40 """ 41 Litestar WebSocket endpoint for components. 42 43 Usage in a Litestar app: 44 from litestar import Litestar, websocket_listener 45 from component_framework.adapters.litestar_websocket import ( 46 component_websocket_endpoint, 47 ) 48 49 @websocket_listener("/ws") 50 async def ws_handler(websocket: WebSocket) -> None: 51 await component_websocket_endpoint(websocket) 52 53 app = Litestar(route_handlers=[ws_handler]) 54 """ 55 # Accept connection 56 await websocket.accept() 57 58 # Generate connection ID 59 connection_id = str(uuid4()) 60 connection = LitestarWebSocketConnection(websocket) 61 62 # Register with manager 63 await ws_manager.connect(connection, connection_id) 64 65 try: 66 # Send connection confirmation 67 await connection.send({"type": "connected", "connection_id": connection_id}) 68 69 # Message loop 70 while True: 71 data = await connection.receive() 72 await ws_manager.handle_message(connection, connection_id, data) 73 74 except WebSocketDisconnect: 75 logger.info(f"WebSocket disconnected: {connection_id}") 76 except Exception as e: 77 logger.exception(f"WebSocket error: {e}") 78 finally: 79 await ws_manager.disconnect(connection_id)
Litestar WebSocket endpoint for components.
Usage in a Litestar app:
from litestar import Litestar, websocket_listener from component_framework.adapters.litestar_websocket import ( component_websocket_endpoint, )
@websocket_listener("/ws") async def ws_handler(websocket: WebSocket) -> None: await component_websocket_endpoint(websocket)
app = Litestar(route_handlers=[ws_handler])