component_framework.adapters.django_websocket
Django Channels WebSocket adapter.
1"""Django Channels WebSocket adapter.""" 2 3import json 4import logging 5from uuid import uuid4 6 7try: 8 from channels.generic.websocket import AsyncWebsocketConsumer 9except ImportError as e: 10 from . import _require_extra 11 12 raise _require_extra("channels", "django") from e 13 14from ..core.websocket import WebSocketConnection, ws_manager 15 16logger = logging.getLogger(__name__) 17 18 19class DjangoWebSocketConnection(WebSocketConnection): 20 """Django Channels WebSocket connection wrapper.""" 21 22 def __init__(self, consumer: "ComponentConsumer"): 23 self.consumer = consumer 24 25 async def send(self, data: dict): 26 """Send JSON data to client.""" 27 await self.consumer.send(text_data=json.dumps(data)) 28 29 async def receive(self) -> dict: 30 """Receive is handled by consumer, not used directly.""" 31 raise NotImplementedError("Receive handled by consumer") 32 33 async def close(self): 34 """Close WebSocket connection.""" 35 await self.consumer.close() 36 37 38class ComponentConsumer(AsyncWebsocketConsumer): 39 """ 40 Django Channels consumer for component WebSocket. 41 42 Usage in routing.py: 43 from channels.routing import ProtocolTypeRouter, URLRouter 44 from django.urls import path 45 from component_framework.adapters.django_websocket import ComponentConsumer 46 47 application = ProtocolTypeRouter({ 48 "websocket": URLRouter([ 49 path("ws/", ComponentConsumer.as_asgi()), 50 ]), 51 }) 52 """ 53 54 def __init__(self, *args, **kwargs): 55 super().__init__(*args, **kwargs) 56 self.connection_id: str | None = None 57 self.connection: DjangoWebSocketConnection | None = None 58 59 async def connect(self): 60 """Handle WebSocket connection.""" 61 # Accept connection 62 await self.accept() 63 64 # Generate connection ID 65 self.connection_id = str(uuid4()) 66 self.connection = DjangoWebSocketConnection(self) 67 68 # Register with manager 69 await ws_manager.connect(self.connection, self.connection_id) 70 71 # Send connection confirmation 72 await self.send( 73 text_data=json.dumps({"type": "connected", "connection_id": self.connection_id}) 74 ) 75 76 logger.info(f"WebSocket connected: {self.connection_id}") 77 78 async def disconnect(self, close_code): # type: ignore[override] 79 """Handle WebSocket disconnection.""" 80 if self.connection_id: 81 await ws_manager.disconnect(self.connection_id) 82 logger.info(f"WebSocket disconnected: {self.connection_id}") 83 84 async def receive(self, text_data=None, bytes_data=None): 85 """Handle incoming WebSocket message.""" 86 if text_data is None: 87 return 88 try: 89 data = json.loads(text_data) 90 if self.connection is None or self.connection_id is None: 91 return 92 await ws_manager.handle_message(self.connection, self.connection_id, data) 93 except json.JSONDecodeError: 94 await self.send(text_data=json.dumps({"type": "error", "error": "Invalid JSON"})) 95 except Exception as e: 96 logger.exception("Error handling WebSocket message") 97 await self.send(text_data=json.dumps({"type": "error", "error": str(e)})) 98 99 100# Channel layer helper for broadcasting 101async def broadcast_component_update(component_id: str, html: str, state: dict): 102 """ 103 Broadcast component update via WebSocket manager. 104 105 Can be called from anywhere (views, signals, tasks, etc.) 106 107 Usage: 108 from component_framework.adapters.django_websocket import broadcast_component_update 109 110 # In a view or signal handler 111 await broadcast_component_update( 112 component_id="order-123", 113 html=rendered_html, 114 state={"status": "completed"} 115 ) 116 """ 117 await ws_manager.push_update(component_id, html, state)
20class DjangoWebSocketConnection(WebSocketConnection): 21 """Django Channels WebSocket connection wrapper.""" 22 23 def __init__(self, consumer: "ComponentConsumer"): 24 self.consumer = consumer 25 26 async def send(self, data: dict): 27 """Send JSON data to client.""" 28 await self.consumer.send(text_data=json.dumps(data)) 29 30 async def receive(self) -> dict: 31 """Receive is handled by consumer, not used directly.""" 32 raise NotImplementedError("Receive handled by consumer") 33 34 async def close(self): 35 """Close WebSocket connection.""" 36 await self.consumer.close()
Django Channels WebSocket connection wrapper.
26 async def send(self, data: dict): 27 """Send JSON data to client.""" 28 await self.consumer.send(text_data=json.dumps(data))
Send JSON data to client.
39class ComponentConsumer(AsyncWebsocketConsumer): 40 """ 41 Django Channels consumer for component WebSocket. 42 43 Usage in routing.py: 44 from channels.routing import ProtocolTypeRouter, URLRouter 45 from django.urls import path 46 from component_framework.adapters.django_websocket import ComponentConsumer 47 48 application = ProtocolTypeRouter({ 49 "websocket": URLRouter([ 50 path("ws/", ComponentConsumer.as_asgi()), 51 ]), 52 }) 53 """ 54 55 def __init__(self, *args, **kwargs): 56 super().__init__(*args, **kwargs) 57 self.connection_id: str | None = None 58 self.connection: DjangoWebSocketConnection | None = None 59 60 async def connect(self): 61 """Handle WebSocket connection.""" 62 # Accept connection 63 await self.accept() 64 65 # Generate connection ID 66 self.connection_id = str(uuid4()) 67 self.connection = DjangoWebSocketConnection(self) 68 69 # Register with manager 70 await ws_manager.connect(self.connection, self.connection_id) 71 72 # Send connection confirmation 73 await self.send( 74 text_data=json.dumps({"type": "connected", "connection_id": self.connection_id}) 75 ) 76 77 logger.info(f"WebSocket connected: {self.connection_id}") 78 79 async def disconnect(self, close_code): # type: ignore[override] 80 """Handle WebSocket disconnection.""" 81 if self.connection_id: 82 await ws_manager.disconnect(self.connection_id) 83 logger.info(f"WebSocket disconnected: {self.connection_id}") 84 85 async def receive(self, text_data=None, bytes_data=None): 86 """Handle incoming WebSocket message.""" 87 if text_data is None: 88 return 89 try: 90 data = json.loads(text_data) 91 if self.connection is None or self.connection_id is None: 92 return 93 await ws_manager.handle_message(self.connection, self.connection_id, data) 94 except json.JSONDecodeError: 95 await self.send(text_data=json.dumps({"type": "error", "error": "Invalid JSON"})) 96 except Exception as e: 97 logger.exception("Error handling WebSocket message") 98 await self.send(text_data=json.dumps({"type": "error", "error": str(e)}))
Django Channels consumer for component WebSocket.
Usage in routing.py: from channels.routing import ProtocolTypeRouter, URLRouter from django.urls import path from component_framework.adapters.django_websocket import ComponentConsumer
application = ProtocolTypeRouter({
"websocket": URLRouter([
path("ws/", ComponentConsumer.as_asgi()),
]),
})
60 async def connect(self): 61 """Handle WebSocket connection.""" 62 # Accept connection 63 await self.accept() 64 65 # Generate connection ID 66 self.connection_id = str(uuid4()) 67 self.connection = DjangoWebSocketConnection(self) 68 69 # Register with manager 70 await ws_manager.connect(self.connection, self.connection_id) 71 72 # Send connection confirmation 73 await self.send( 74 text_data=json.dumps({"type": "connected", "connection_id": self.connection_id}) 75 ) 76 77 logger.info(f"WebSocket connected: {self.connection_id}")
Handle WebSocket connection.
79 async def disconnect(self, close_code): # type: ignore[override] 80 """Handle WebSocket disconnection.""" 81 if self.connection_id: 82 await ws_manager.disconnect(self.connection_id) 83 logger.info(f"WebSocket disconnected: {self.connection_id}")
Handle WebSocket disconnection.
85 async def receive(self, text_data=None, bytes_data=None): 86 """Handle incoming WebSocket message.""" 87 if text_data is None: 88 return 89 try: 90 data = json.loads(text_data) 91 if self.connection is None or self.connection_id is None: 92 return 93 await ws_manager.handle_message(self.connection, self.connection_id, data) 94 except json.JSONDecodeError: 95 await self.send(text_data=json.dumps({"type": "error", "error": "Invalid JSON"})) 96 except Exception as e: 97 logger.exception("Error handling WebSocket message") 98 await self.send(text_data=json.dumps({"type": "error", "error": str(e)}))
Handle incoming WebSocket message.
102async def broadcast_component_update(component_id: str, html: str, state: dict): 103 """ 104 Broadcast component update via WebSocket manager. 105 106 Can be called from anywhere (views, signals, tasks, etc.) 107 108 Usage: 109 from component_framework.adapters.django_websocket import broadcast_component_update 110 111 # In a view or signal handler 112 await broadcast_component_update( 113 component_id="order-123", 114 html=rendered_html, 115 state={"status": "completed"} 116 ) 117 """ 118 await ws_manager.push_update(component_id, html, state)
Broadcast component update via WebSocket manager.
Can be called from anywhere (views, signals, tasks, etc.)
Usage:
from component_framework.adapters.django_websocket import broadcast_component_update
In a view or signal handler
await broadcast_component_update( component_id="order-123", html=rendered_html, state={"status": "completed"} )