Production-ready WebSocket client and server implementation
uv add polyflux-websocket
from polyflux.websocket import WebSocketClient
import asyncio
async def main():
async with WebSocketClient() as client:
await client.connect("wss://echo.websocket.org")
# Send text message
response = await client.exchange(
target="wss://echo.websocket.org",
message_type="text",
data="Hello WebSocket!"
)
print(f"Echo response: {response['data']}")
# Send binary message
binary_response = await client.exchange(
target="wss://echo.websocket.org",
message_type="binary",
data=b"Binary data"
)
asyncio.run(main())
from polyflux.websocket import WebSocketServer, WebSocketInbound, WebSocketOutbound
import asyncio
async def main():
server = WebSocketServer(
host="0.0.0.0",
port=8080,
broadcasting_enabled=True,
default_channels=["general", "notifications"]
)
@server.register_handler("ws_text")
async def handle_text_message(request: WebSocketInbound) -> WebSocketOutbound:
# Echo the message back to sender
return {
"target": "client",
"message_type": "text",
"data": f"Echo: {request['data']}"
}
@server.register_handler("ws_broadcast")
async def handle_broadcast(request: WebSocketInbound) -> WebSocketOutbound:
# Broadcast to all connected clients
return {
"target": "broadcast:all",
"message_type": "text",
"data": f"Broadcast: {request['data']}"
}
async with server:
print("WebSocket server running on ws://localhost:8080")
await asyncio.Event().wait()
asyncio.run(main())
from polyflux.websocket import WebSocketClient, WebSocketClientConfigSchema
# Simple configuration
config: WebSocketClientConfigSchema = {
"ping_interval": 30.0,
"ping_timeout": 15.0,
"max_size": 1024 * 1024, # 1MB
"compression_enabled": True,
"pool_enabled": True,
"pool_size": 10,
"monitoring_enabled": True
}
client = WebSocketClient()
client.configure(config)
async with client:
await client.connect("wss://api.example.com/ws")
# Client automatically uses ping_interval, compression, etc.
response = await client.exchange(
target="wss://api.example.com/ws",
message_type="text",
data="Hello from configured client!"
)
from polyflux.websocket import WebSocketClient
# WebSocket with authentication headers
async with WebSocketClient() as client:
await client.connect(
"wss://secure-api.example.com/ws",
headers={"Authorization": "Bearer your-token-here"}
)
response = await client.exchange(
target="wss://secure-api.example.com/ws",
message_type="text",
data="Authenticated message"
)
from polyflux.websocket import WebSocketClient
# Enable connection pooling for efficiency
async with WebSocketClient(pool_enabled=True, pool_size=20) as client:
# Multiple connections can reuse the pool
tasks = []
for i in range(50):
task = client.exchange(
target="wss://api.example.com/ws",
message_type="text",
data=f"Message {i}"
)
tasks.append(task)
# Execute all requests efficiently using connection pool
responses = await asyncio.gather(*tasks)
from polyflux.websocket import WebSocketServer
server = WebSocketServer(
host="0.0.0.0",
port=8080,
max_connections=5000, # Connection limits
ping_interval=20.0, # Keep-alive ping interval
ping_timeout=10.0, # Ping response timeout
max_size=2 * 1024 * 1024, # 2MB max message size
compression_enabled=True, # Enable per-message deflate
broadcasting_enabled=True, # Enable broadcasting features
default_channels=["general", "events", "alerts"],
monitoring_enabled=True # Enable metrics collection
)
@server.register_handler("ws_text")
async def handle_text(request: WebSocketInbound) -> WebSocketOutbound:
"""Handle text messages."""
message_data = request["data"]
return {
"target": "client",
"message_type": "text",
"data": f"Received: {message_data}"
}
@server.register_handler("ws_binary")
async def handle_binary(request: WebSocketInbound) -> WebSocketOutbound:
"""Handle binary messages."""
binary_data = request["data"]
# Process binary data...
response_data = process_binary(binary_data)
return {
"target": "client",
"message_type": "binary",
"data": response_data
}
@server.register_handler("ws_close")
async def handle_close(request: WebSocketInbound) -> WebSocketOutbound:
"""Handle connection close."""
close_code = request.get("close_code", 1000)
return {
"target": "client",
"message_type": "close",
"data": "",
"close_code": close_code,
"close_reason": "Server acknowledged close"
}
from polyflux.websocket import WebSocketServer
# Server with broadcasting enabled
server = WebSocketServer(
broadcasting_enabled=True,
default_channels=["general", "notifications", "alerts"]
)
@server.register_handler("ws_join_channel")
async def join_channel(request: WebSocketInbound) -> WebSocketOutbound:
"""Allow clients to join specific channels."""
data = json.loads(request["data"])
client_id = data["client_id"]
channel = data["channel"]
# Join client to channel
server.join_channel(client_id, channel)
return {
"target": "client",
"message_type": "text",
"data": f"Joined channel: {channel}"
}
@server.register_handler("ws_broadcast_channel")
async def broadcast_to_channel(request: WebSocketInbound) -> WebSocketOutbound:
"""Broadcast message to specific channel."""
data = json.loads(request["data"])
channel = data["channel"]
message = data["message"]
# Broadcast to channel members
return {
"target": f"broadcast:channel:{channel}",
"message_type": "text",
"data": message
}
@server.register_handler("ws_broadcast_all")
async def broadcast_to_all(request: WebSocketInbound) -> WebSocketOutbound:
"""Broadcast message to all connected clients."""
return {
"target": "broadcast:all",
"message_type": "text",
"data": f"Global announcement: {request['data']}"
}
from polyflux.websocket.server.interception import WebSocketInterceptor
class AuthInterceptor(WebSocketInterceptor):
async def intercept_inbound(self, request: WebSocketInbound) -> WebSocketInbound:
"""Authenticate incoming messages."""
# Check for authentication in message data or connection context
if not self.is_authenticated(request):
raise WebSocketAuthenticationError("Authentication required")
return request
async def intercept_outbound(self, response: WebSocketOutbound) -> WebSocketOutbound:
"""Add metadata to outbound messages."""
# Add server timestamp
if isinstance(response.get("data"), str):
data = json.loads(response["data"])
data["server_timestamp"] = time.time()
response["data"] = json.dumps(data)
return response
class LoggingInterceptor(WebSocketInterceptor):
async def intercept_inbound(self, request: WebSocketInbound) -> WebSocketInbound:
"""Log incoming messages."""
logger.info(f"Received {request['message_type']} message: {len(request['data'])} bytes")
return request
async def intercept_outbound(self, response: WebSocketOutbound) -> WebSocketOutbound:
"""Log outbound messages."""
logger.info(f"Sending {response['message_type']} to {response['target']}")
return response
# Add interceptors to server
server.add_interceptor(AuthInterceptor())
server.add_interceptor(LoggingInterceptor())
from polyflux.websocket import WebSocketClientConfigSchema
config: WebSocketClientConfigSchema = {
"ping_interval": 30.0, # Optional: WebSocket ping interval
"ping_timeout": 15.0, # Optional: WebSocket ping timeout
"max_size": 1024 * 1024, # Optional: Maximum message size
"compression_enabled": True, # Optional: Enable compression
"pool_enabled": True, # Optional: Enable connection pooling
"pool_size": 10, # Optional: Pool size
"monitoring_enabled": False, # Optional: Enable metrics
"timeout": 30.0, # Optional: Connection timeout
"retries": 3, # Optional: Retry attempts
}
from polyflux.websocket import WebSocketServerConfigSchema
config: WebSocketServerConfigSchema = {
"host": "0.0.0.0", # Optional: Host address
"port": 8080, # Optional: Port number
"max_connections": 1000, # Optional: Max concurrent connections
"ping_interval": 20.0, # Optional: Ping interval
"ping_timeout": 10.0, # Optional: Ping timeout
"max_size": 1024 * 1024, # Optional: Max message size
"compression_enabled": False, # Optional: Enable compression
"broadcasting_enabled": False, # Optional: Enable broadcasting
"default_channels": [], # Optional: Default channels
"monitoring_enabled": False, # Optional: Enable monitoring
}
from polyflux.websocket import WebSocketInbound, WebSocketOutbound
# Inbound (messages from clients)
inbound: WebSocketInbound = {
"message_type": "text", # Required: message type
"data": "Hello from client!", # Required: message data
"is_final": True, # Optional: final frame flag
"close_code": 1000, # Optional: close code
"close_reason": "Normal closure" # Optional: close reason
}
# Outbound (messages to clients)
outbound: WebSocketOutbound = {
"target": "wss://api.example.com/ws", # Required: WebSocket URL
"message_type": "text", # Required: message type
"data": "Hello from server!", # Required: message data
"headers": {"Authorization": "Bearer token"}, # Optional: handshake headers
"subprotocols": ["chat", "v1"], # Optional: subprotocols
"close_code": 1000, # Optional: close code
"close_reason": "Normal closure" # Optional: close reason
}
from polyflux.websocket.shared.exceptions import (
WebSocketError, # Base WebSocket exception
WebSocketConnectionError, # Connection issues
WebSocketProtocolError, # Protocol violations
WebSocketAuthenticationError, # Authentication failures
WebSocketTimeoutError, # Operation timeouts
)
try:
response = await client.exchange(
target="wss://api.example.com/ws",
message_type="text",
data="Hello"
)
except WebSocketTimeoutError as e:
print(f"WebSocket operation timed out: {e}")
except WebSocketConnectionError as e:
print(f"WebSocket connection failed: {e}")
except WebSocketError as e:
print(f"WebSocket error: {e}")
import asyncio
import signal
from polyflux.websocket import WebSocketServer
async def run_production_server():
server = WebSocketServer(
host="0.0.0.0",
port=8080,
max_connections=10000, # Scale for high load
ping_interval=30.0,
ping_timeout=15.0,
max_size=5 * 1024 * 1024, # 5MB max message size
compression_enabled=True, # Enable compression
broadcasting_enabled=True,
default_channels=["general", "events", "alerts"],
monitoring_enabled=True # Enable metrics
)
# Graceful shutdown handling
def signal_handler():
print("Shutting down WebSocket server...")
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
async with server:
print(f"Production WebSocket server running on ws://0.0.0.0:8080")
print(f"Max connections: {server.config['max_connections']}")
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(run_production_server())
from polyflux.websocket import WebSocketClient
async def resilient_client():
"""WebSocket client with automatic reconnection and error handling."""
client = WebSocketClient(
ping_interval=20.0,
ping_timeout=10.0,
pool_enabled=True,
pool_size=5,
monitoring_enabled=True
)
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
async with client:
await client.connect("wss://api.example.com/ws")
# Send periodic heartbeat messages
while True:
try:
response = await client.exchange(
target="wss://api.example.com/ws",
message_type="text",
data=json.dumps({"type": "heartbeat", "timestamp": time.time()})
)
print(f"Heartbeat response: {response['data']}")
await asyncio.sleep(30) # Send heartbeat every 30 seconds
except WebSocketConnectionError:
print("Connection lost, will retry...")
break
except Exception as e:
retry_count += 1
print(f"Connection attempt {retry_count} failed: {e}")
if retry_count < max_retries:
await asyncio.sleep(2 ** retry_count) # Exponential backoff
else:
print("Max retries exceeded")
break
# Run WebSocket package tests
uv run pytest packages/websocket/tests/
# Run with coverage
uv run pytest packages/websocket/tests/ --cov=polyflux.websocket
Was this page helpful?