Core protocol definitions and type interfaces
from typing import Protocol, Any, TypeVar
T_kwargs = TypeVar("T_kwargs")
class SupportsSend[**T_kwargs](Protocol):
"""Protocol for one-way data transmission."""
async def send(self, **kwargs: T_kwargs) -> bool:
"""Send data using keyword arguments.
Args:
**kwargs: Keyword arguments forming an Outbound PDU.
Must include 'target' field.
Returns:
True if send was successful, False otherwise.
"""
...
from typing import Protocol, TypeVar
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
I_co = TypeVar("I_co", bound=Inbound, covariant=True)
class SupportsReceive[I_co](Protocol):
"""Protocol for active data retrieval."""
async def receive(self, source: str) -> I_co:
"""Receive data from specified source.
Args:
source: The source identifier to receive from.
Returns:
Inbound PDU containing received data.
"""
...
from typing import Protocol, TypeVar
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
class SupportsExchange[**T_kwargs, I_co](Protocol):
"""Protocol for atomic send-receive operations."""
async def exchange(self, **kwargs: T_kwargs) -> I_co:
"""Send data and receive response atomically.
Args:
**kwargs: Keyword arguments forming an Outbound PDU.
Must include 'target' field.
Returns:
Inbound PDU containing response data.
"""
...
from typing import Protocol, Any
from polyflux.core.shared.contracts.exchange import (
SupportsSend, SupportsReceive, SupportsExchange
)
class BaseExchange[I_co](
SupportsSend[Any],
SupportsReceive[I_co],
SupportsExchange[Any, I_co],
Protocol
):
"""Umbrella protocol combining all exchange capabilities."""
pass
from typing import TypedDict, NotRequired
class BaseConfigSchema(TypedDict):
"""Base configuration schema with common settings."""
timeout: NotRequired[float]
retries: NotRequired[int]
logging_level: NotRequired[str]
from typing import Protocol, TypeVar
from typing_extensions import Self
ConfigSchemaT = TypeVar("ConfigSchemaT", bound=BaseConfigSchema)
class SupportsConfiguration[ConfigSchemaT: BaseConfigSchema](Protocol):
"""Protocol for type-safe configuration management."""
def configure(self, config: ConfigSchemaT) -> Self:
"""Configure the component with provided settings.
Args:
config: TypedDict configuration object.
Returns:
Self for method chaining.
"""
...
class BaseConfiguration[ConfigSchemaT: BaseConfigSchema](
SupportsConfiguration[ConfigSchemaT],
Protocol
):
"""Umbrella protocol for configuration capabilities."""
pass
from typing import Protocol, Any
from polyflux.core.shared.types import SerializeFormat
class SupportsCredentials(Protocol):
"""Protocol for credential objects."""
def is_valid(self) -> bool:
"""Check if credentials are valid."""
...
def get_redacted_string(self) -> str:
"""Safe string representation for logging."""
...
def serialize(self, format: SerializeFormat = "dict") -> Any:
"""Convert credentials to usable format.
Args:
format: Serialization format ("dict" | "string" | "bytes").
Returns:
Serialized credentials in requested format.
"""
...
from typing import Protocol, Any
class SupportsAuthentication(Protocol):
"""Protocol for authentication operations."""
async def authenticate(self, credentials: Any) -> bool:
"""Authenticate with provided credentials.
Args:
credentials: Credentials object (must implement SupportsCredentials).
Returns:
True if authentication succeeded, False otherwise.
"""
...
async def is_authenticated(self) -> bool:
"""Check current authentication status.
Returns:
True if currently authenticated, False otherwise.
"""
...
from typing import Protocol, Callable, Any
from typing_extensions import Self
class SupportsErrorHandling(Protocol):
"""Protocol for error handler registration and processing."""
def on_error(self, handler: Callable[[Exception], Any]) -> Self:
"""Register an error handler.
Args:
handler: Error handling function.
Returns:
Self for method chaining.
"""
...
async def handle_error(self, error: Exception) -> Any:
"""Process an error through registered handlers.
Args:
error: Exception to handle.
Returns:
Result from error handler, if any.
"""
...
class BaseReliability(SupportsErrorHandling, Protocol):
"""Umbrella protocol for operational robustness."""
pass
from typing import Protocol, TypeVar
from typing_extensions import Self
AddressT = TypeVar("AddressT")
class SupportsBinding[AddressT](Protocol):
"""Protocol for network interface binding."""
async def bind(self, address: AddressT) -> Self:
"""Bind to network interface.
Args:
address: Network address to bind to.
Returns:
Self for method chaining.
"""
...
async def unbind(self) -> None:
"""Unbind from network interface."""
...
def is_bound(self) -> bool:
"""Check if currently bound.
Returns:
True if bound to an address, False otherwise.
"""
...
from typing import Protocol, TypeVar, Callable
from typing_extensions import Self
I_contra = TypeVar("I_contra", contravariant=True)
O_co = TypeVar("O_co", covariant=True)
class SupportsDispatching[I_contra, O_co](Protocol):
"""Protocol for request routing and handler dispatch."""
def register_handler(
self,
pattern: str,
handler: Callable[[I_contra], O_co]
) -> Self:
"""Register a handler for a pattern.
Args:
pattern: Pattern to match (e.g., "/api/users/{id}").
handler: Handler function for matching requests.
Returns:
Self for method chaining.
"""
...
async def dispatch(self, request: I_contra) -> O_co:
"""Dispatch request to appropriate handler.
Args:
request: Incoming request to dispatch.
Returns:
Handler response.
"""
...
from typing import Protocol
class SupportsServing(Protocol):
"""Protocol for server lifecycle management."""
async def start(self) -> None:
"""Start the server."""
...
async def stop(self) -> None:
"""Stop the server."""
...
def is_running(self) -> bool:
"""Check if server is running.
Returns:
True if server is running, False otherwise.
"""
...
from typing import Protocol, TypeVar
from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.core.shared.contracts.reliability import BaseReliability
from polyflux.core.shared.contracts.configurable import BaseConfiguration
from polyflux.core.shared.contracts.auth import SupportsAuthentication
ConfigDictT = TypeVar("ConfigDictT")
O_co = TypeVar("O_co", covariant=True)
I_co = TypeVar("I_co", covariant=True)
A = TypeVar("A")
class BaseClient[ConfigDictT, O_co, I_co, A](
BaseExchange[I_co],
BaseReliability,
BaseConfiguration[ConfigDictT],
SupportsAuthentication,
Protocol
):
"""Comprehensive client protocol combining all capability domains."""
pass
from typing import Protocol, TypeVar
from polyflux.core.server.binding import SupportsBinding
from polyflux.core.server.dispatch import SupportsDispatching
from polyflux.core.server.serving import SupportsServing
ConfigDictT = TypeVar("ConfigDictT")
O_co = TypeVar("O_co", covariant=True)
I_co = TypeVar("I_co", covariant=True)
AddressT = TypeVar("AddressT")
class BaseServer[ConfigDictT, O_co, I_co, AddressT](
BaseExchange[I_co],
BaseReliability,
BaseConfiguration[ConfigDictT],
SupportsBinding[AddressT],
SupportsDispatching[I_co, O_co],
SupportsServing,
Protocol
):
"""Comprehensive server protocol combining shared + server capabilities."""
pass
from typing import Any
from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.websocket.shared.protocol_data_unit import WebSocketInbound, WebSocketOutbound
class WebSocketClient(BaseExchange[WebSocketInbound]):
"""WebSocket client implementing core exchange protocols."""
async def send(self, **kwargs: Any) -> bool:
"""Send WebSocket message without waiting for response."""
try:
await self.exchange(**kwargs)
return True
except Exception:
return False
async def receive(self, source: str) -> WebSocketInbound:
"""Receive WebSocket message from source."""
# Implementation for receiving from WebSocket connection
return {
"message_type": "text",
"data": "received_message",
"is_final": True
}
async def exchange(self, **kwargs: Any) -> WebSocketInbound:
"""Send WebSocket message and receive response."""
target = kwargs["target"] # Required: WebSocket URL
message_type = kwargs.get("message_type", "text")
data = kwargs.get("data", "")
# WebSocket implementation here
await self._send_websocket_message(target, message_type, data)
return await self._receive_websocket_response(target)
from typing import Any, Self
from polyflux.core.server.binding import SupportsBinding
from polyflux.core.server.dispatch import SupportsDispatching
from polyflux.core.server.serving import SupportsServing
class WebSocketServer(
SupportsBinding[tuple[str, int]],
SupportsDispatching[WebSocketInbound, WebSocketOutbound],
SupportsServing
):
"""WebSocket server implementing core server protocols."""
async def bind(self, address: tuple[str, int]) -> Self:
"""Bind WebSocket server to network address."""
host, port = address
self._bound_address = (host, port)
self._is_bound = True
return self
async def unbind(self) -> None:
"""Unbind WebSocket server from network."""
self._bound_address = None
self._is_bound = False
def is_bound(self) -> bool:
"""Check if server is bound to network address."""
return self._is_bound
def register_handler(
self,
pattern: str,
handler: Callable[[WebSocketInbound], WebSocketOutbound]
) -> Self:
"""Register WebSocket message handler for pattern."""
self._handlers[pattern] = handler
return self
async def dispatch(self, request: WebSocketInbound) -> WebSocketOutbound:
"""Dispatch WebSocket message to appropriate handler."""
message_type = request.get("message_type", "text")
pattern = f"ws_{message_type}"
handler = self._handlers.get(pattern)
if handler is None:
# Default echo handler
return {
"target": "client",
"message_type": "text",
"data": f"Echo: {request.get('data', '')}"
}
return await handler(request)
async def start(self) -> None:
"""Start WebSocket server."""
self._is_running = True
# Start WebSocket server implementation
async def stop(self) -> None:
"""Stop WebSocket server."""
self._is_running = False
def is_running(self) -> bool:
"""Check if WebSocket server is running."""
return self._is_running
from typing import TypedDict, NotRequired
from polyflux.core.shared.contracts.configurable import BaseConfigSchema
class WebSocketClientConfigSchema(BaseConfigSchema):
"""WebSocket client configuration schema."""
ping_interval: NotRequired[float] # WebSocket ping interval
ping_timeout: NotRequired[float] # WebSocket ping timeout
max_size: NotRequired[int] # Maximum message size
compression_enabled: NotRequired[bool] # Enable compression
pool_enabled: NotRequired[bool] # Enable connection pooling
pool_size: NotRequired[int] # Pool size
monitoring_enabled: NotRequired[bool] # Enable metrics
class WebSocketServerConfigSchema(BaseConfigSchema):
"""WebSocket server configuration schema."""
host: NotRequired[str]
port: NotRequired[int]
asgi_app: Any # Required ASGI application
config: Any # Required Hypercorn config
broadcasting_enabled: NotRequired[bool] # Enable broadcasting
default_channels: NotRequired[list[str]] # Default channels
max_connections: NotRequired[int] # Max concurrent connections
# Type-safe configuration
client_config: WebSocketClientConfigSchema = {
"ping_interval": 30.0,
"ping_timeout": 15.0,
"max_size": 1024 * 1024,
"compression_enabled": True,
"pool_enabled": True,
"pool_size": 10
}
server_config: WebSocketServerConfigSchema = {
"host": "0.0.0.0",
"port": 8080,
"asgi_app": create_asgi_app(),
"config": create_hypercorn_config({}),
"broadcasting_enabled": True,
"default_channels": ["general", "notifications"]
}
from polyflux.core.shared.contracts.auth import SupportsAuthentication, SupportsCredentials
class WebSocketToken(SupportsCredentials):
"""WebSocket-specific credentials implementation."""
def __init__(self, token: str):
self.token = token
def is_valid(self) -> bool:
return bool(self.token and len(self.token) >= 8)
def get_redacted_string(self) -> str:
return f"WebSocketToken(token={self.token[:8]}...)"
def serialize(self, format: str = "dict") -> Any:
if format == "dict":
return {"Authorization": f"Bearer {self.token}"}
elif format == "header":
return f"Bearer {self.token}"
return self.token
class WebSocketClient(SupportsAuthentication):
"""WebSocket client with authentication support."""
def __init__(self):
self._credentials = None
async def authenticate(self, credentials: WebSocketToken) -> bool:
"""Authenticate WebSocket connection."""
if credentials.is_valid():
self._credentials = credentials
# Send authentication message via WebSocket
auth_headers = credentials.serialize("dict")
success = await self._authenticate_websocket(auth_headers)
return success
return False
async def is_authenticated(self) -> bool:
"""Check WebSocket authentication status."""
return (
self._credentials is not None and
self._credentials.is_valid() and
await self._check_websocket_auth_status()
)
@runtime_checkable
class SupportsBroadcasting(Protocol):
"""Protocol for WebSocket broadcasting capabilities."""
async def broadcast_to_all(self, message: WebSocketOutbound) -> int:
"""Broadcast message to all connected clients."""
...
async def broadcast_to_channel(self, channel: str, message: WebSocketOutbound) -> int:
"""Broadcast message to specific channel."""
...
def join_channel(self, client_id: str, channel: str) -> bool:
"""Add client to broadcast channel."""
...
def leave_channel(self, client_id: str, channel: str) -> bool:
"""Remove client from broadcast channel."""
...
class WebSocketServer(SupportsBroadcasting):
"""WebSocket server with broadcasting capabilities."""
def __init__(self):
self.connections: dict[str, Any] = {}
self.channels: dict[str, set[str]] = {}
async def broadcast_to_all(self, message: WebSocketOutbound) -> int:
"""Broadcast to all connected clients."""
sent_count = 0
for client_id, connection in self.connections.items():
try:
await self._send_to_client(connection, message)
sent_count += 1
except Exception:
pass # Client disconnected
return sent_count
async def broadcast_to_channel(self, channel: str, message: WebSocketOutbound) -> int:
"""Broadcast to clients in specific channel."""
if channel not in self.channels:
return 0
sent_count = 0
for client_id in self.channels[channel]:
if client_id in self.connections:
try:
await self._send_to_client(self.connections[client_id], message)
sent_count += 1
except Exception:
pass
return sent_count
from polyflux.core.server.interception.base import Interceptor
class WebSocketInterceptor(Interceptor[WebSocketInbound, WebSocketOutbound]):
"""WebSocket message interceptor."""
async def intercept_inbound(self, inbound: WebSocketInbound) -> WebSocketInbound:
"""Process inbound WebSocket messages."""
# Add timestamp to incoming messages
if isinstance(inbound.get("data"), str):
inbound["data"] = f"[{datetime.now().isoformat()}] {inbound['data']}"
return inbound
async def intercept_outbound(self, outbound: WebSocketOutbound) -> WebSocketOutbound:
"""Process outbound WebSocket messages."""
# Add server signature to outgoing messages
if isinstance(outbound.get("data"), str):
outbound["data"] = f"{outbound['data']} [Polyflux-WebSocket]"
return outbound
# Usage with server
server = WebSocketServer()
server.add_interceptor(WebSocketInterceptor())
Was this page helpful?