component_framework.adapters.django_websocket

Django Channels WebSocket adapter.

  1"""Django Channels WebSocket adapter."""
  2
  3import json
  4import logging
  5from uuid import uuid4
  6
  7try:
  8    from channels.generic.websocket import AsyncWebsocketConsumer
  9except ImportError as e:
 10    from . import _require_extra
 11
 12    raise _require_extra("channels", "django") from e
 13
 14from ..core.websocket import WebSocketConnection, ws_manager
 15
 16logger = logging.getLogger(__name__)
 17
 18
 19class DjangoWebSocketConnection(WebSocketConnection):
 20    """Django Channels WebSocket connection wrapper."""
 21
 22    def __init__(self, consumer: "ComponentConsumer"):
 23        self.consumer = consumer
 24
 25    async def send(self, data: dict):
 26        """Send JSON data to client."""
 27        await self.consumer.send(text_data=json.dumps(data))
 28
 29    async def receive(self) -> dict:
 30        """Receive is handled by consumer, not used directly."""
 31        raise NotImplementedError("Receive handled by consumer")
 32
 33    async def close(self):
 34        """Close WebSocket connection."""
 35        await self.consumer.close()
 36
 37
 38class ComponentConsumer(AsyncWebsocketConsumer):
 39    """
 40    Django Channels consumer for component WebSocket.
 41
 42    Usage in routing.py:
 43        from channels.routing import ProtocolTypeRouter, URLRouter
 44        from django.urls import path
 45        from component_framework.adapters.django_websocket import ComponentConsumer
 46
 47        application = ProtocolTypeRouter({
 48            "websocket": URLRouter([
 49                path("ws/", ComponentConsumer.as_asgi()),
 50            ]),
 51        })
 52    """
 53
 54    def __init__(self, *args, **kwargs):
 55        super().__init__(*args, **kwargs)
 56        self.connection_id: str | None = None
 57        self.connection: DjangoWebSocketConnection | None = None
 58
 59    async def connect(self):
 60        """Handle WebSocket connection."""
 61        # Accept connection
 62        await self.accept()
 63
 64        # Generate connection ID
 65        self.connection_id = str(uuid4())
 66        self.connection = DjangoWebSocketConnection(self)
 67
 68        # Register with manager
 69        await ws_manager.connect(self.connection, self.connection_id)
 70
 71        # Send connection confirmation
 72        await self.send(
 73            text_data=json.dumps({"type": "connected", "connection_id": self.connection_id})
 74        )
 75
 76        logger.info(f"WebSocket connected: {self.connection_id}")
 77
 78    async def disconnect(self, close_code):  # type: ignore[override]
 79        """Handle WebSocket disconnection."""
 80        if self.connection_id:
 81            await ws_manager.disconnect(self.connection_id)
 82        logger.info(f"WebSocket disconnected: {self.connection_id}")
 83
 84    async def receive(self, text_data=None, bytes_data=None):
 85        """Handle incoming WebSocket message."""
 86        if text_data is None:
 87            return
 88        try:
 89            data = json.loads(text_data)
 90            if self.connection is None or self.connection_id is None:
 91                return
 92            await ws_manager.handle_message(self.connection, self.connection_id, data)
 93        except json.JSONDecodeError:
 94            await self.send(text_data=json.dumps({"type": "error", "error": "Invalid JSON"}))
 95        except Exception as e:
 96            logger.exception("Error handling WebSocket message")
 97            await self.send(text_data=json.dumps({"type": "error", "error": str(e)}))
 98
 99
100# Channel layer helper for broadcasting
101async def broadcast_component_update(component_id: str, html: str, state: dict):
102    """
103    Broadcast component update via WebSocket manager.
104
105    Can be called from anywhere (views, signals, tasks, etc.)
106
107    Usage:
108        from component_framework.adapters.django_websocket import broadcast_component_update
109
110        # In a view or signal handler
111        await broadcast_component_update(
112            component_id="order-123",
113            html=rendered_html,
114            state={"status": "completed"}
115        )
116    """
117    await ws_manager.push_update(component_id, html, state)
logger = <Logger component_framework.adapters.django_websocket (WARNING)>
class DjangoWebSocketConnection(component_framework.core.websocket.WebSocketConnection):
20class DjangoWebSocketConnection(WebSocketConnection):
21    """Django Channels WebSocket connection wrapper."""
22
23    def __init__(self, consumer: "ComponentConsumer"):
24        self.consumer = consumer
25
26    async def send(self, data: dict):
27        """Send JSON data to client."""
28        await self.consumer.send(text_data=json.dumps(data))
29
30    async def receive(self) -> dict:
31        """Receive is handled by consumer, not used directly."""
32        raise NotImplementedError("Receive handled by consumer")
33
34    async def close(self):
35        """Close WebSocket connection."""
36        await self.consumer.close()

Django Channels WebSocket connection wrapper.

DjangoWebSocketConnection( consumer: ComponentConsumer)
23    def __init__(self, consumer: "ComponentConsumer"):
24        self.consumer = consumer
consumer
async def send(self, data: dict):
26    async def send(self, data: dict):
27        """Send JSON data to client."""
28        await self.consumer.send(text_data=json.dumps(data))

Send JSON data to client.

async def receive(self) -> dict:
30    async def receive(self) -> dict:
31        """Receive is handled by consumer, not used directly."""
32        raise NotImplementedError("Receive handled by consumer")

Receive is handled by consumer, not used directly.

async def close(self):
34    async def close(self):
35        """Close WebSocket connection."""
36        await self.consumer.close()

Close WebSocket connection.

class ComponentConsumer(channels.generic.websocket.AsyncWebsocketConsumer):
39class ComponentConsumer(AsyncWebsocketConsumer):
40    """
41    Django Channels consumer for component WebSocket.
42
43    Usage in routing.py:
44        from channels.routing import ProtocolTypeRouter, URLRouter
45        from django.urls import path
46        from component_framework.adapters.django_websocket import ComponentConsumer
47
48        application = ProtocolTypeRouter({
49            "websocket": URLRouter([
50                path("ws/", ComponentConsumer.as_asgi()),
51            ]),
52        })
53    """
54
55    def __init__(self, *args, **kwargs):
56        super().__init__(*args, **kwargs)
57        self.connection_id: str | None = None
58        self.connection: DjangoWebSocketConnection | None = None
59
60    async def connect(self):
61        """Handle WebSocket connection."""
62        # Accept connection
63        await self.accept()
64
65        # Generate connection ID
66        self.connection_id = str(uuid4())
67        self.connection = DjangoWebSocketConnection(self)
68
69        # Register with manager
70        await ws_manager.connect(self.connection, self.connection_id)
71
72        # Send connection confirmation
73        await self.send(
74            text_data=json.dumps({"type": "connected", "connection_id": self.connection_id})
75        )
76
77        logger.info(f"WebSocket connected: {self.connection_id}")
78
79    async def disconnect(self, close_code):  # type: ignore[override]
80        """Handle WebSocket disconnection."""
81        if self.connection_id:
82            await ws_manager.disconnect(self.connection_id)
83        logger.info(f"WebSocket disconnected: {self.connection_id}")
84
85    async def receive(self, text_data=None, bytes_data=None):
86        """Handle incoming WebSocket message."""
87        if text_data is None:
88            return
89        try:
90            data = json.loads(text_data)
91            if self.connection is None or self.connection_id is None:
92                return
93            await ws_manager.handle_message(self.connection, self.connection_id, data)
94        except json.JSONDecodeError:
95            await self.send(text_data=json.dumps({"type": "error", "error": "Invalid JSON"}))
96        except Exception as e:
97            logger.exception("Error handling WebSocket message")
98            await self.send(text_data=json.dumps({"type": "error", "error": str(e)}))

Django Channels consumer for component WebSocket.

Usage in routing.py: from channels.routing import ProtocolTypeRouter, URLRouter from django.urls import path from component_framework.adapters.django_websocket import ComponentConsumer

application = ProtocolTypeRouter({
    "websocket": URLRouter([
        path("ws/", ComponentConsumer.as_asgi()),
    ]),
})
ComponentConsumer(*args, **kwargs)
55    def __init__(self, *args, **kwargs):
56        super().__init__(*args, **kwargs)
57        self.connection_id: str | None = None
58        self.connection: DjangoWebSocketConnection | None = None
connection_id: str | None
connection: DjangoWebSocketConnection | None
async def connect(self):
60    async def connect(self):
61        """Handle WebSocket connection."""
62        # Accept connection
63        await self.accept()
64
65        # Generate connection ID
66        self.connection_id = str(uuid4())
67        self.connection = DjangoWebSocketConnection(self)
68
69        # Register with manager
70        await ws_manager.connect(self.connection, self.connection_id)
71
72        # Send connection confirmation
73        await self.send(
74            text_data=json.dumps({"type": "connected", "connection_id": self.connection_id})
75        )
76
77        logger.info(f"WebSocket connected: {self.connection_id}")

Handle WebSocket connection.

async def disconnect(self, close_code):
79    async def disconnect(self, close_code):  # type: ignore[override]
80        """Handle WebSocket disconnection."""
81        if self.connection_id:
82            await ws_manager.disconnect(self.connection_id)
83        logger.info(f"WebSocket disconnected: {self.connection_id}")

Handle WebSocket disconnection.

async def receive(self, text_data=None, bytes_data=None):
85    async def receive(self, text_data=None, bytes_data=None):
86        """Handle incoming WebSocket message."""
87        if text_data is None:
88            return
89        try:
90            data = json.loads(text_data)
91            if self.connection is None or self.connection_id is None:
92                return
93            await ws_manager.handle_message(self.connection, self.connection_id, data)
94        except json.JSONDecodeError:
95            await self.send(text_data=json.dumps({"type": "error", "error": "Invalid JSON"}))
96        except Exception as e:
97            logger.exception("Error handling WebSocket message")
98            await self.send(text_data=json.dumps({"type": "error", "error": str(e)}))

Handle incoming WebSocket message.

async def broadcast_component_update(component_id: str, html: str, state: dict):
102async def broadcast_component_update(component_id: str, html: str, state: dict):
103    """
104    Broadcast component update via WebSocket manager.
105
106    Can be called from anywhere (views, signals, tasks, etc.)
107
108    Usage:
109        from component_framework.adapters.django_websocket import broadcast_component_update
110
111        # In a view or signal handler
112        await broadcast_component_update(
113            component_id="order-123",
114            html=rendered_html,
115            state={"status": "completed"}
116        )
117    """
118    await ws_manager.push_update(component_id, html, state)

Broadcast component update via WebSocket manager.

Can be called from anywhere (views, signals, tasks, etc.)

Usage:

from component_framework.adapters.django_websocket import broadcast_component_update

In a view or signal handler

await broadcast_component_update( component_id="order-123", html=rendered_html, state={"status": "completed"} )