Protocol Architecture

Polyflux uses Python protocols for flexible, type-safe implementations.

Exchange Protocols

The fundamental communication protocols that all clients must implement.

SupportsSend

Protocol for one-way data transmission.
from typing import Protocol, Any, TypeVar

T_kwargs = TypeVar("T_kwargs")

class SupportsSend[**T_kwargs](Protocol):
    """Protocol for one-way data transmission."""
    
    async def send(self, **kwargs: T_kwargs) -> bool:
        """Send data using keyword arguments.
        
        Args:
            **kwargs: Keyword arguments forming an Outbound PDU.
                     Must include 'target' field.
        
        Returns:
            True if send was successful, False otherwise.
        """
        ...

SupportsReceive

Protocol for active data retrieval.
from typing import Protocol, TypeVar
from polyflux.core.shared.contracts.protocol_data_unit import Inbound

I_co = TypeVar("I_co", bound=Inbound, covariant=True)

class SupportsReceive[I_co](Protocol):
    """Protocol for active data retrieval."""
    
    async def receive(self, source: str) -> I_co:
        """Receive data from specified source.
        
        Args:
            source: The source identifier to receive from.
            
        Returns:
            Inbound PDU containing received data.
        """
        ...

SupportsExchange

Protocol for atomic send-receive operations.
from typing import Protocol, TypeVar
from polyflux.core.shared.contracts.protocol_data_unit import Inbound

class SupportsExchange[**T_kwargs, I_co](Protocol):
    """Protocol for atomic send-receive operations."""
    
    async def exchange(self, **kwargs: T_kwargs) -> I_co:
        """Send data and receive response atomically.
        
        Args:
            **kwargs: Keyword arguments forming an Outbound PDU.
                     Must include 'target' field.
                     
        Returns:
            Inbound PDU containing response data.
        """
        ...

BaseExchange

Umbrella protocol combining all exchange capabilities.
from typing import Protocol, Any
from polyflux.core.shared.contracts.exchange import (
    SupportsSend, SupportsReceive, SupportsExchange
)

class BaseExchange[I_co](
    SupportsSend[Any],
    SupportsReceive[I_co],
    SupportsExchange[Any, I_co],
    Protocol
):
    """Umbrella protocol combining all exchange capabilities."""
    pass

Configuration Protocols

Type-safe configuration management with TypedDict schemas.

BaseConfigSchema

Base configuration TypedDict with common settings.
from typing import TypedDict, NotRequired

class BaseConfigSchema(TypedDict):
    """Base configuration schema with common settings."""
    timeout: NotRequired[float]
    retries: NotRequired[int]
    logging_level: NotRequired[str]

SupportsConfiguration

Protocol for type-safe configuration management.
from typing import Protocol, TypeVar
from typing_extensions import Self

ConfigSchemaT = TypeVar("ConfigSchemaT", bound=BaseConfigSchema)

class SupportsConfiguration[ConfigSchemaT: BaseConfigSchema](Protocol):
    """Protocol for type-safe configuration management."""
    
    def configure(self, config: ConfigSchemaT) -> Self:
        """Configure the component with provided settings.
        
        Args:
            config: TypedDict configuration object.
            
        Returns:
            Self for method chaining.
        """
        ...

BaseConfiguration

Umbrella protocol for configuration capabilities.
class BaseConfiguration[ConfigSchemaT: BaseConfigSchema](
    SupportsConfiguration[ConfigSchemaT],
    Protocol
):
    """Umbrella protocol for configuration capabilities."""
    pass

Authentication Protocols

Credential management and authentication capabilities.

SupportsCredentials

Protocol for credential objects.
from typing import Protocol, Any
from polyflux.core.shared.types import SerializeFormat

class SupportsCredentials(Protocol):
    """Protocol for credential objects."""
    
    def is_valid(self) -> bool:
        """Check if credentials are valid."""
        ...
    
    def get_redacted_string(self) -> str:
        """Safe string representation for logging."""
        ...
    
    def serialize(self, format: SerializeFormat = "dict") -> Any:
        """Convert credentials to usable format.
        
        Args:
            format: Serialization format ("dict" | "string" | "bytes").
            
        Returns:
            Serialized credentials in requested format.
        """
        ...

SupportsAuthentication

Protocol for authentication operations.
from typing import Protocol, Any

class SupportsAuthentication(Protocol):
    """Protocol for authentication operations."""
    
    async def authenticate(self, credentials: Any) -> bool:
        """Authenticate with provided credentials.
        
        Args:
            credentials: Credentials object (must implement SupportsCredentials).
            
        Returns:
            True if authentication succeeded, False otherwise.
        """
        ...
    
    async def is_authenticated(self) -> bool:
        """Check current authentication status.
        
        Returns:
            True if currently authenticated, False otherwise.
        """
        ...

Reliability Protocols

Error handling and operational robustness capabilities.

SupportsErrorHandling

Protocol for error handler registration and processing.
from typing import Protocol, Callable, Any
from typing_extensions import Self

class SupportsErrorHandling(Protocol):
    """Protocol for error handler registration and processing."""
    
    def on_error(self, handler: Callable[[Exception], Any]) -> Self:
        """Register an error handler.
        
        Args:
            handler: Error handling function.
            
        Returns:
            Self for method chaining.
        """
        ...
    
    async def handle_error(self, error: Exception) -> Any:
        """Process an error through registered handlers.
        
        Args:
            error: Exception to handle.
            
        Returns:
            Result from error handler, if any.
        """
        ...

BaseReliability

Umbrella protocol for operational robustness.
class BaseReliability(SupportsErrorHandling, Protocol):
    """Umbrella protocol for operational robustness."""
    pass

Server Protocols

Server-specific capabilities for binding, dispatching, and serving.

SupportsBinding

Protocol for network interface binding.
from typing import Protocol, TypeVar
from typing_extensions import Self

AddressT = TypeVar("AddressT")

class SupportsBinding[AddressT](Protocol):
    """Protocol for network interface binding."""
    
    async def bind(self, address: AddressT) -> Self:
        """Bind to network interface.
        
        Args:
            address: Network address to bind to.
            
        Returns:
            Self for method chaining.
        """
        ...
    
    async def unbind(self) -> None:
        """Unbind from network interface."""
        ...
    
    def is_bound(self) -> bool:
        """Check if currently bound.
        
        Returns:
            True if bound to an address, False otherwise.
        """
        ...

SupportsDispatching

Protocol for request routing and handler dispatch.
from typing import Protocol, TypeVar, Callable
from typing_extensions import Self

I_contra = TypeVar("I_contra", contravariant=True)
O_co = TypeVar("O_co", covariant=True)

class SupportsDispatching[I_contra, O_co](Protocol):
    """Protocol for request routing and handler dispatch."""
    
    def register_handler(
        self, 
        pattern: str, 
        handler: Callable[[I_contra], O_co]
    ) -> Self:
        """Register a handler for a pattern.
        
        Args:
            pattern: Pattern to match (e.g., "/api/users/{id}").
            handler: Handler function for matching requests.
            
        Returns:
            Self for method chaining.
        """
        ...
    
    async def dispatch(self, request: I_contra) -> O_co:
        """Dispatch request to appropriate handler.
        
        Args:
            request: Incoming request to dispatch.
            
        Returns:
            Handler response.
        """
        ...

SupportsServing

Protocol for server lifecycle management.
from typing import Protocol

class SupportsServing(Protocol):
    """Protocol for server lifecycle management."""
    
    async def start(self) -> None:
        """Start the server."""
        ...
    
    async def stop(self) -> None:
        """Stop the server."""
        ...
    
    def is_running(self) -> bool:
        """Check if server is running.
        
        Returns:
            True if server is running, False otherwise.
        """
        ...

Unified Protocols

Complete client and server protocols combining all capabilities.

BaseClient

Comprehensive client protocol combining all capability domains.
from typing import Protocol, TypeVar
from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.core.shared.contracts.reliability import BaseReliability
from polyflux.core.shared.contracts.configurable import BaseConfiguration
from polyflux.core.shared.contracts.auth import SupportsAuthentication

ConfigDictT = TypeVar("ConfigDictT")
O_co = TypeVar("O_co", covariant=True)
I_co = TypeVar("I_co", covariant=True)
A = TypeVar("A")

class BaseClient[ConfigDictT, O_co, I_co, A](
    BaseExchange[I_co],
    BaseReliability,
    BaseConfiguration[ConfigDictT],
    SupportsAuthentication,
    Protocol
):
    """Comprehensive client protocol combining all capability domains."""
    pass

BaseServer

Comprehensive server protocol combining shared + server capabilities.
from typing import Protocol, TypeVar
from polyflux.core.server.binding import SupportsBinding
from polyflux.core.server.dispatch import SupportsDispatching
from polyflux.core.server.serving import SupportsServing

ConfigDictT = TypeVar("ConfigDictT")
O_co = TypeVar("O_co", covariant=True)  
I_co = TypeVar("I_co", covariant=True)
AddressT = TypeVar("AddressT")

class BaseServer[ConfigDictT, O_co, I_co, AddressT](
    BaseExchange[I_co],
    BaseReliability,
    BaseConfiguration[ConfigDictT],
    SupportsBinding[AddressT],
    SupportsDispatching[I_co, O_co],
    SupportsServing,
    Protocol
):
    """Comprehensive server protocol combining shared + server capabilities."""
    pass

WebSocket Protocol Examples

Real-world examples of implementing WebSocket protocols with Polyflux.

WebSocket Client Implementation

from typing import Any
from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.websocket.shared.protocol_data_unit import WebSocketInbound, WebSocketOutbound

class WebSocketClient(BaseExchange[WebSocketInbound]):
    """WebSocket client implementing core exchange protocols."""
    
    async def send(self, **kwargs: Any) -> bool:
        """Send WebSocket message without waiting for response."""
        try:
            await self.exchange(**kwargs)
            return True
        except Exception:
            return False
    
    async def receive(self, source: str) -> WebSocketInbound:
        """Receive WebSocket message from source."""
        # Implementation for receiving from WebSocket connection
        return {
            "message_type": "text",
            "data": "received_message",
            "is_final": True
        }
    
    async def exchange(self, **kwargs: Any) -> WebSocketInbound:
        """Send WebSocket message and receive response."""
        target = kwargs["target"]  # Required: WebSocket URL
        message_type = kwargs.get("message_type", "text")
        data = kwargs.get("data", "")
        
        # WebSocket implementation here
        await self._send_websocket_message(target, message_type, data)
        return await self._receive_websocket_response(target)

WebSocket Server Implementation

from typing import Any, Self
from polyflux.core.server.binding import SupportsBinding
from polyflux.core.server.dispatch import SupportsDispatching
from polyflux.core.server.serving import SupportsServing

class WebSocketServer(
    SupportsBinding[tuple[str, int]],
    SupportsDispatching[WebSocketInbound, WebSocketOutbound],
    SupportsServing
):
    """WebSocket server implementing core server protocols."""
    
    async def bind(self, address: tuple[str, int]) -> Self:
        """Bind WebSocket server to network address."""
        host, port = address
        self._bound_address = (host, port)
        self._is_bound = True
        return self
    
    async def unbind(self) -> None:
        """Unbind WebSocket server from network."""
        self._bound_address = None
        self._is_bound = False
    
    def is_bound(self) -> bool:
        """Check if server is bound to network address."""
        return self._is_bound
    
    def register_handler(
        self, 
        pattern: str, 
        handler: Callable[[WebSocketInbound], WebSocketOutbound]
    ) -> Self:
        """Register WebSocket message handler for pattern."""
        self._handlers[pattern] = handler
        return self
    
    async def dispatch(self, request: WebSocketInbound) -> WebSocketOutbound:
        """Dispatch WebSocket message to appropriate handler."""
        message_type = request.get("message_type", "text")
        pattern = f"ws_{message_type}"
        
        handler = self._handlers.get(pattern)
        if handler is None:
            # Default echo handler
            return {
                "target": "client",
                "message_type": "text",
                "data": f"Echo: {request.get('data', '')}"
            }
        
        return await handler(request)
    
    async def start(self) -> None:
        """Start WebSocket server."""
        self._is_running = True
        # Start WebSocket server implementation
    
    async def stop(self) -> None:
        """Stop WebSocket server."""
        self._is_running = False
    
    def is_running(self) -> bool:
        """Check if WebSocket server is running."""
        return self._is_running

WebSocket Configuration Protocol

from typing import TypedDict, NotRequired
from polyflux.core.shared.contracts.configurable import BaseConfigSchema

class WebSocketClientConfigSchema(BaseConfigSchema):
    """WebSocket client configuration schema."""
    
    ping_interval: NotRequired[float]  # WebSocket ping interval
    ping_timeout: NotRequired[float]   # WebSocket ping timeout  
    max_size: NotRequired[int]         # Maximum message size
    compression_enabled: NotRequired[bool]     # Enable compression
    pool_enabled: NotRequired[bool]            # Enable connection pooling
    pool_size: NotRequired[int]                # Pool size
    monitoring_enabled: NotRequired[bool]      # Enable metrics

class WebSocketServerConfigSchema(BaseConfigSchema):
    """WebSocket server configuration schema."""
    
    host: NotRequired[str]
    port: NotRequired[int]
    asgi_app: Any  # Required ASGI application
    config: Any    # Required Hypercorn config
    broadcasting_enabled: NotRequired[bool]    # Enable broadcasting
    default_channels: NotRequired[list[str]]   # Default channels
    max_connections: NotRequired[int]          # Max concurrent connections

# Type-safe configuration
client_config: WebSocketClientConfigSchema = {
    "ping_interval": 30.0,
    "ping_timeout": 15.0,
    "max_size": 1024 * 1024,
    "compression_enabled": True,
    "pool_enabled": True,
    "pool_size": 10
}

server_config: WebSocketServerConfigSchema = {
    "host": "0.0.0.0",
    "port": 8080,
    "asgi_app": create_asgi_app(),
    "config": create_hypercorn_config({}),
    "broadcasting_enabled": True,
    "default_channels": ["general", "notifications"]
}

WebSocket Authentication Protocol

from polyflux.core.shared.contracts.auth import SupportsAuthentication, SupportsCredentials

class WebSocketToken(SupportsCredentials):
    """WebSocket-specific credentials implementation."""
    
    def __init__(self, token: str):
        self.token = token
    
    def is_valid(self) -> bool:
        return bool(self.token and len(self.token) >= 8)
    
    def get_redacted_string(self) -> str:
        return f"WebSocketToken(token={self.token[:8]}...)"
    
    def serialize(self, format: str = "dict") -> Any:
        if format == "dict":
            return {"Authorization": f"Bearer {self.token}"}
        elif format == "header":
            return f"Bearer {self.token}"
        return self.token

class WebSocketClient(SupportsAuthentication):
    """WebSocket client with authentication support."""
    
    def __init__(self):
        self._credentials = None
    
    async def authenticate(self, credentials: WebSocketToken) -> bool:
        """Authenticate WebSocket connection."""
        if credentials.is_valid():
            self._credentials = credentials
            # Send authentication message via WebSocket
            auth_headers = credentials.serialize("dict")
            success = await self._authenticate_websocket(auth_headers)
            return success
        return False
    
    async def is_authenticated(self) -> bool:
        """Check WebSocket authentication status."""
        return (
            self._credentials is not None and 
            self._credentials.is_valid() and
            await self._check_websocket_auth_status()
        )

WebSocket Broadcasting Protocol

@runtime_checkable
class SupportsBroadcasting(Protocol):
    """Protocol for WebSocket broadcasting capabilities."""
    
    async def broadcast_to_all(self, message: WebSocketOutbound) -> int:
        """Broadcast message to all connected clients."""
        ...
    
    async def broadcast_to_channel(self, channel: str, message: WebSocketOutbound) -> int:
        """Broadcast message to specific channel."""
        ...
    
    def join_channel(self, client_id: str, channel: str) -> bool:
        """Add client to broadcast channel."""
        ...
    
    def leave_channel(self, client_id: str, channel: str) -> bool:
        """Remove client from broadcast channel."""
        ...

class WebSocketServer(SupportsBroadcasting):
    """WebSocket server with broadcasting capabilities."""
    
    def __init__(self):
        self.connections: dict[str, Any] = {}
        self.channels: dict[str, set[str]] = {}
    
    async def broadcast_to_all(self, message: WebSocketOutbound) -> int:
        """Broadcast to all connected clients."""
        sent_count = 0
        for client_id, connection in self.connections.items():
            try:
                await self._send_to_client(connection, message)
                sent_count += 1
            except Exception:
                pass  # Client disconnected
        return sent_count
    
    async def broadcast_to_channel(self, channel: str, message: WebSocketOutbound) -> int:
        """Broadcast to clients in specific channel."""
        if channel not in self.channels:
            return 0
        
        sent_count = 0
        for client_id in self.channels[channel]:
            if client_id in self.connections:
                try:
                    await self._send_to_client(self.connections[client_id], message)
                    sent_count += 1
                except Exception:
                    pass
        return sent_count

WebSocket Interception Protocol

from polyflux.core.server.interception.base import Interceptor

class WebSocketInterceptor(Interceptor[WebSocketInbound, WebSocketOutbound]):
    """WebSocket message interceptor."""
    
    async def intercept_inbound(self, inbound: WebSocketInbound) -> WebSocketInbound:
        """Process inbound WebSocket messages."""
        # Add timestamp to incoming messages
        if isinstance(inbound.get("data"), str):
            inbound["data"] = f"[{datetime.now().isoformat()}] {inbound['data']}"
        return inbound
    
    async def intercept_outbound(self, outbound: WebSocketOutbound) -> WebSocketOutbound:
        """Process outbound WebSocket messages."""
        # Add server signature to outgoing messages
        if isinstance(outbound.get("data"), str):
            outbound["data"] = f"{outbound['data']} [Polyflux-WebSocket]"
        return outbound

# Usage with server
server = WebSocketServer()
server.add_interceptor(WebSocketInterceptor())