WebSocket

The Polyflux WebSocket package provides production-ready WebSocket client and server implementations with full async support, connection pooling, and broadcasting capabilities.

Installation

uv add polyflux-websocket

Key Features

  • 🚀 Production Ready - Full WebSocket protocol support with 277 comprehensive tests
  • ⚡ Async First - Built from the ground up for async/await with context managers
  • 🔒 Easy Setup - Simple configuration with validation
  • 🌐 Real-time Communication - Bidirectional messaging with text, binary, and control frames
  • 📡 Broadcasting - Built-in channel management and multi-client broadcasting
  • 🔧 Configurable - Flexible authentication, interceptors, and connection pooling
  • 📊 Observable - Built-in logging, metrics, and message interception

Quick Start

WebSocket Client

from polyflux.websocket import WebSocketClient
import asyncio

async def main():
    async with WebSocketClient() as client:
        await client.connect("wss://echo.websocket.org")
        
        # Send text message
        response = await client.exchange(
            target="wss://echo.websocket.org",
            message_type="text",
            data="Hello WebSocket!"
        )
        
        print(f"Echo response: {response['data']}")
        
        # Send binary message
        binary_response = await client.exchange(
            target="wss://echo.websocket.org",
            message_type="binary",
            data=b"Binary data"
        )

asyncio.run(main())

WebSocket Server

from polyflux.websocket import WebSocketServer, WebSocketInbound, WebSocketOutbound
import asyncio

async def main():
    server = WebSocketServer(
        host="0.0.0.0", 
        port=8080,
        broadcasting_enabled=True,
        default_channels=["general", "notifications"]
    )
    
    @server.register_handler("ws_text")
    async def handle_text_message(request: WebSocketInbound) -> WebSocketOutbound:
        # Echo the message back to sender
        return {
            "target": "client",
            "message_type": "text",
            "data": f"Echo: {request['data']}"
        }
    
    @server.register_handler("ws_broadcast")
    async def handle_broadcast(request: WebSocketInbound) -> WebSocketOutbound:
        # Broadcast to all connected clients
        return {
            "target": "broadcast:all",
            "message_type": "text",
            "data": f"Broadcast: {request['data']}"
        }
    
    async with server:
        print("WebSocket server running on ws://localhost:8080")
        await asyncio.Event().wait()

asyncio.run(main())

Client Usage

Basic Configuration

from polyflux.websocket import WebSocketClient, WebSocketClientConfigSchema

# Simple configuration
config: WebSocketClientConfigSchema = {
    "ping_interval": 30.0,
    "ping_timeout": 15.0,
    "max_size": 1024 * 1024,  # 1MB
    "compression_enabled": True,
    "pool_enabled": True,
    "pool_size": 10,
    "monitoring_enabled": True
}

client = WebSocketClient()
client.configure(config)

async with client:
    await client.connect("wss://api.example.com/ws")
    
    # Client automatically uses ping_interval, compression, etc.
    response = await client.exchange(
        target="wss://api.example.com/ws",
        message_type="text",
        data="Hello from configured client!"
    )

Authentication

from polyflux.websocket import WebSocketClient

# WebSocket with authentication headers
async with WebSocketClient() as client:
    await client.connect(
        "wss://secure-api.example.com/ws",
        headers={"Authorization": "Bearer your-token-here"}
    )
    
    response = await client.exchange(
        target="wss://secure-api.example.com/ws",
        message_type="text",
        data="Authenticated message"
    )

Connection Pooling

from polyflux.websocket import WebSocketClient

# Enable connection pooling for efficiency
async with WebSocketClient(pool_enabled=True, pool_size=20) as client:
    # Multiple connections can reuse the pool
    tasks = []
    for i in range(50):
        task = client.exchange(
            target="wss://api.example.com/ws",
            message_type="text",
            data=f"Message {i}"
        )
        tasks.append(task)
    
    # Execute all requests efficiently using connection pool
    responses = await asyncio.gather(*tasks)

Server Usage

Advanced Server Configuration

from polyflux.websocket import WebSocketServer

server = WebSocketServer(
    host="0.0.0.0",
    port=8080,
    max_connections=5000,          # Connection limits
    ping_interval=20.0,            # Keep-alive ping interval
    ping_timeout=10.0,             # Ping response timeout
    max_size=2 * 1024 * 1024,      # 2MB max message size
    compression_enabled=True,       # Enable per-message deflate
    broadcasting_enabled=True,      # Enable broadcasting features
    default_channels=["general", "events", "alerts"],
    monitoring_enabled=True         # Enable metrics collection
)

Message Handlers with Routing

@server.register_handler("ws_text")
async def handle_text(request: WebSocketInbound) -> WebSocketOutbound:
    """Handle text messages."""
    message_data = request["data"]
    
    return {
        "target": "client",
        "message_type": "text",
        "data": f"Received: {message_data}"
    }

@server.register_handler("ws_binary")
async def handle_binary(request: WebSocketInbound) -> WebSocketOutbound:
    """Handle binary messages."""
    binary_data = request["data"]
    
    # Process binary data...
    response_data = process_binary(binary_data)
    
    return {
        "target": "client", 
        "message_type": "binary",
        "data": response_data
    }

@server.register_handler("ws_close")
async def handle_close(request: WebSocketInbound) -> WebSocketOutbound:
    """Handle connection close."""
    close_code = request.get("close_code", 1000)
    
    return {
        "target": "client",
        "message_type": "close", 
        "data": "",
        "close_code": close_code,
        "close_reason": "Server acknowledged close"
    }

Broadcasting and Channels

from polyflux.websocket import WebSocketServer

# Server with broadcasting enabled
server = WebSocketServer(
    broadcasting_enabled=True,
    default_channels=["general", "notifications", "alerts"]
)

@server.register_handler("ws_join_channel")
async def join_channel(request: WebSocketInbound) -> WebSocketOutbound:
    """Allow clients to join specific channels."""
    data = json.loads(request["data"])
    client_id = data["client_id"]
    channel = data["channel"]
    
    # Join client to channel
    server.join_channel(client_id, channel)
    
    return {
        "target": "client",
        "message_type": "text",
        "data": f"Joined channel: {channel}"
    }

@server.register_handler("ws_broadcast_channel")
async def broadcast_to_channel(request: WebSocketInbound) -> WebSocketOutbound:
    """Broadcast message to specific channel."""
    data = json.loads(request["data"])
    channel = data["channel"]
    message = data["message"]
    
    # Broadcast to channel members
    return {
        "target": f"broadcast:channel:{channel}",
        "message_type": "text",
        "data": message
    }

@server.register_handler("ws_broadcast_all")
async def broadcast_to_all(request: WebSocketInbound) -> WebSocketOutbound:
    """Broadcast message to all connected clients."""
    return {
        "target": "broadcast:all",
        "message_type": "text",
        "data": f"Global announcement: {request['data']}"
    }

Message Interceptors

For comprehensive interceptor documentation and advanced patterns, see the WebSocket Interceptors Guide.
from polyflux.websocket.server.interception import WebSocketInterceptor

class AuthInterceptor(WebSocketInterceptor):
    async def intercept_inbound(self, request: WebSocketInbound) -> WebSocketInbound:
        """Authenticate incoming messages."""
        # Check for authentication in message data or connection context
        if not self.is_authenticated(request):
            raise WebSocketAuthenticationError("Authentication required")
        return request
    
    async def intercept_outbound(self, response: WebSocketOutbound) -> WebSocketOutbound:
        """Add metadata to outbound messages."""
        # Add server timestamp
        if isinstance(response.get("data"), str):
            data = json.loads(response["data"])
            data["server_timestamp"] = time.time()
            response["data"] = json.dumps(data)
        return response

class LoggingInterceptor(WebSocketInterceptor):
    async def intercept_inbound(self, request: WebSocketInbound) -> WebSocketInbound:
        """Log incoming messages."""
        logger.info(f"Received {request['message_type']} message: {len(request['data'])} bytes")
        return request
    
    async def intercept_outbound(self, response: WebSocketOutbound) -> WebSocketOutbound:
        """Log outbound messages."""
        logger.info(f"Sending {response['message_type']} to {response['target']}")
        return response

# Add interceptors to server
server.add_interceptor(AuthInterceptor())
server.add_interceptor(LoggingInterceptor())

Type System

Client Configuration

from polyflux.websocket import WebSocketClientConfigSchema

config: WebSocketClientConfigSchema = {
    "ping_interval": 30.0,                    # Optional: WebSocket ping interval
    "ping_timeout": 15.0,                     # Optional: WebSocket ping timeout  
    "max_size": 1024 * 1024,                  # Optional: Maximum message size
    "compression_enabled": True,               # Optional: Enable compression
    "pool_enabled": True,                     # Optional: Enable connection pooling
    "pool_size": 10,                          # Optional: Pool size
    "monitoring_enabled": False,              # Optional: Enable metrics
    "timeout": 30.0,                          # Optional: Connection timeout
    "retries": 3,                             # Optional: Retry attempts
}

Server Configuration

from polyflux.websocket import WebSocketServerConfigSchema

config: WebSocketServerConfigSchema = {
    "host": "0.0.0.0",                        # Optional: Host address
    "port": 8080,                             # Optional: Port number
    "max_connections": 1000,                  # Optional: Max concurrent connections
    "ping_interval": 20.0,                    # Optional: Ping interval
    "ping_timeout": 10.0,                     # Optional: Ping timeout
    "max_size": 1024 * 1024,                  # Optional: Max message size
    "compression_enabled": False,             # Optional: Enable compression
    "broadcasting_enabled": False,            # Optional: Enable broadcasting
    "default_channels": [],                   # Optional: Default channels
    "monitoring_enabled": False,              # Optional: Enable monitoring
}

Message Types

from polyflux.websocket import WebSocketInbound, WebSocketOutbound

# Inbound (messages from clients)
inbound: WebSocketInbound = {
    "message_type": "text",                   # Required: message type
    "data": "Hello from client!",            # Required: message data
    "is_final": True,                        # Optional: final frame flag
    "close_code": 1000,                      # Optional: close code
    "close_reason": "Normal closure"          # Optional: close reason
}

# Outbound (messages to clients)
outbound: WebSocketOutbound = {
    "target": "wss://api.example.com/ws",    # Required: WebSocket URL
    "message_type": "text",                  # Required: message type
    "data": "Hello from server!",           # Required: message data
    "headers": {"Authorization": "Bearer token"},  # Optional: handshake headers
    "subprotocols": ["chat", "v1"],         # Optional: subprotocols
    "close_code": 1000,                     # Optional: close code
    "close_reason": "Normal closure"         # Optional: close reason
}

Error Handling

Built-in Exception Types

from polyflux.websocket.shared.exceptions import (
    WebSocketError,                    # Base WebSocket exception
    WebSocketConnectionError,          # Connection issues
    WebSocketProtocolError,           # Protocol violations
    WebSocketAuthenticationError,     # Authentication failures
    WebSocketTimeoutError,            # Operation timeouts
)

try:
    response = await client.exchange(
        target="wss://api.example.com/ws",
        message_type="text",
        data="Hello"
    )
except WebSocketTimeoutError as e:
    print(f"WebSocket operation timed out: {e}")
except WebSocketConnectionError as e:
    print(f"WebSocket connection failed: {e}")
except WebSocketError as e:
    print(f"WebSocket error: {e}")

Production Deployment

High-Performance Server

import asyncio
import signal
from polyflux.websocket import WebSocketServer

async def run_production_server():
    server = WebSocketServer(
        host="0.0.0.0",
        port=8080,
        max_connections=10000,              # Scale for high load
        ping_interval=30.0,
        ping_timeout=15.0,
        max_size=5 * 1024 * 1024,          # 5MB max message size
        compression_enabled=True,           # Enable compression
        broadcasting_enabled=True,
        default_channels=["general", "events", "alerts"],
        monitoring_enabled=True             # Enable metrics
    )
    
    # Graceful shutdown handling
    def signal_handler():
        print("Shutting down WebSocket server...")
        
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    
    async with server:
        print(f"Production WebSocket server running on ws://0.0.0.0:8080")
        print(f"Max connections: {server.config['max_connections']}")
        await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(run_production_server())

Client with Connection Resilience

from polyflux.websocket import WebSocketClient

async def resilient_client():
    """WebSocket client with automatic reconnection and error handling."""
    
    client = WebSocketClient(
        ping_interval=20.0,
        ping_timeout=10.0,
        pool_enabled=True,
        pool_size=5,
        monitoring_enabled=True
    )
    
    max_retries = 5
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            async with client:
                await client.connect("wss://api.example.com/ws")
                
                # Send periodic heartbeat messages
                while True:
                    try:
                        response = await client.exchange(
                            target="wss://api.example.com/ws",
                            message_type="text",
                            data=json.dumps({"type": "heartbeat", "timestamp": time.time()})
                        )
                        
                        print(f"Heartbeat response: {response['data']}")
                        await asyncio.sleep(30)  # Send heartbeat every 30 seconds
                        
                    except WebSocketConnectionError:
                        print("Connection lost, will retry...")
                        break
                        
        except Exception as e:
            retry_count += 1
            print(f"Connection attempt {retry_count} failed: {e}")
            if retry_count < max_retries:
                await asyncio.sleep(2 ** retry_count)  # Exponential backoff
            else:
                print("Max retries exceeded")
                break

Testing

The WebSocket package includes comprehensive test coverage:
# Run WebSocket package tests
uv run pytest packages/websocket/tests/

# Run with coverage
uv run pytest packages/websocket/tests/ --cov=polyflux.websocket
Test categories:
  • Unit Tests - Individual component testing (277 tests)
  • Integration Tests - End-to-end client/server scenarios
  • Connection Tests - WebSocket protocol compliance
  • Broadcasting Tests - Multi-client and channel functionality
  • Performance Tests - Load testing and connection pooling
  • Type Tests - TypedDict schema validation

Next Steps