polyflux.core

The core package provides building blocks for communication clients and servers.

Exchange Protocols

Basic communication methods.
from typing import Any, TypeVar
from polyflux.core.shared.contracts.exchange import (
    SupportsSend, SupportsReceive, SupportsExchange, BaseExchange
)
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound

T_kwargs = TypeVar("T_kwargs")
I_co = TypeVar("I_co", bound=Inbound, covariant=True)

class SupportsSend[**T_kwargs](Protocol):
    """Protocol for one-way data transmission."""
    async def send(self, **kwargs: T_kwargs) -> bool:
        """Send data using keyword arguments."""
        ...

class SupportsReceive[I_co](Protocol):
    """Protocol for active data retrieval."""  
    async def receive(self, source: str) -> I_co:
        """Receive data from specified source."""
        ...

class SupportsExchange[**T_kwargs, I_co](Protocol):
    """Protocol for atomic send-receive operations."""
    async def exchange(self, **kwargs: T_kwargs) -> I_co:
        """Send data and receive response atomically."""
        ...

class BaseExchange[I_co](
    SupportsSend[Any],
    SupportsReceive[I_co], 
    SupportsExchange[Any, I_co],
    Protocol
):
    """Complete communication interface."""
    pass

Methods

send
async method
Parameters
**kwargs
Any
Keyword arguments that form an Outbound PDU. Must include:
  • target: The destination identifier (required)
  • Additional protocol-specific fields
Returns
return
bool
True if send was successful, False otherwise
receive
async method
Parameters
source
str
required
The source identifier to receive from
Returns
return
Inbound
The inbound PDU containing received data
exchange
async method
Parameters
**kwargs
Any
Keyword arguments that form an Outbound PDU. Must include:
  • target: The destination identifier (required)
  • Additional protocol-specific fields
Returns
return
Inbound
The inbound PDU containing response data

Example

from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound

class HttpClient(BaseExchange[Inbound]):
    async def send(self, **kwargs: Any) -> bool:
        """Send HTTP request without waiting for response."""
        try:
            await self.exchange(**kwargs)
            return True
        except Exception:
            return False
    
    async def receive(self, source: str) -> Inbound:
        """Receive from HTTP source (polling, SSE, etc.)."""
        # Implementation for receiving from HTTP source
        return {"data": b"received_data"}
    
    async def exchange(self, **kwargs: Any) -> Inbound:
        """Send HTTP request and receive response."""
        target = kwargs["target"]  # Required field
        method = kwargs.get("method", "GET")
        headers = kwargs.get("headers", {})
        body = kwargs.get("body")
        
        # HTTP implementation here
        return {
            "status_code": 200,
            "data": b"response_body", 
            "headers": {"content-type": "application/json"}
        }

Configuration Protocols

Simple configuration management for all components.
from typing import TypedDict, NotRequired, Protocol
from polyflux.core.shared.contracts.configurable import (
    BaseConfigSchema, SupportsConfiguration, BaseConfiguration
)

class BaseConfigSchema(TypedDict):
    """Basic configuration options."""
    timeout: NotRequired[float]
    retries: NotRequired[int]
    logging_level: NotRequired[str]

ConfigSchemaT = TypeVar("ConfigSchemaT", bound=BaseConfigSchema)

class SupportsConfiguration[ConfigSchemaT: BaseConfigSchema](Protocol):
    """Configuration interface."""
    def configure(self, config: ConfigSchemaT) -> Self:
        """Configure the component with provided settings."""
        ...

class BaseConfiguration[ConfigSchemaT: BaseConfigSchema](
    SupportsConfiguration[ConfigSchemaT],
    Protocol
):
    """Complete configuration interface."""
    pass

Example

from typing import TypedDict, NotRequired
from polyflux.core.shared.contracts.configurable import BaseConfigSchema, SupportsConfiguration

class HttpConfigSchema(BaseConfigSchema):
    base_url: str  # Required
    headers: NotRequired[dict[str, str]]  # Optional
    follow_redirects: NotRequired[bool]  # Optional

class HttpClient(SupportsConfiguration[HttpConfigSchema]):
    def configure(self, config: HttpConfigSchema) -> Self:
        self.base_url = config["base_url"]
        self.timeout = config.get("timeout", 30.0)
        self.headers = config.get("headers", {})
        return self

Authentication Protocols

Credential management and authentication capabilities.
from typing import Protocol, Any
from polyflux.core.shared.contracts.auth import SupportsCredentials, SupportsAuthentication
from polyflux.core.shared.types import SerializeFormat

class SupportsCredentials(Protocol):
    """Protocol for credential objects."""
    def is_valid(self) -> bool:
        """Check if credentials are valid."""
        ...
    
    def get_redacted_string(self) -> str:
        """Safe string representation for logging."""
        ...
    
    def serialize(self, format: SerializeFormat = "dict") -> Any:
        """Convert credentials to usable format."""
        ...

class SupportsAuthentication(Protocol):
    """Protocol for authentication operations."""
    async def authenticate(self, credentials: Any) -> bool:
        """Authenticate with provided credentials."""
        ...
    
    async def is_authenticated(self) -> bool:
        """Check current authentication status."""
        ...

Example

from polyflux.core.shared.contracts.auth import SupportsCredentials, SupportsAuthentication

class BearerToken(SupportsCredentials):
    def __init__(self, token: str):
        self.token = token
    
    def is_valid(self) -> bool:
        return bool(self.token and len(self.token) > 0)
    
    def get_redacted_string(self) -> str:
        return f"Bearer {self.token[:8]}..."
    
    def serialize(self, format: SerializeFormat = "dict") -> Any:
        if format == "dict":
            return {"Authorization": f"Bearer {self.token}"}
        return self.token

class HttpClient(SupportsAuthentication):
    def __init__(self):
        self._credentials = None
    
    async def authenticate(self, credentials: BearerToken) -> bool:
        if credentials.is_valid():
            self._credentials = credentials
            return True
        return False
    
    async def is_authenticated(self) -> bool:
        return self._credentials is not None and self._credentials.is_valid()

Reliability Protocols

Error handling and operational robustness capabilities.
from typing import Protocol, Callable, Any
from polyflux.core.shared.contracts.reliability import SupportsErrorHandling, BaseReliability

class SupportsErrorHandling(Protocol):
    """Protocol for error handler registration and processing."""
    def on_error(self, handler: Callable[[Exception], Any]) -> Self:
        """Register an error handler."""
        ...
    
    async def handle_error(self, error: Exception) -> Any:
        """Process an error through registered handlers."""
        ...

class BaseReliability(SupportsErrorHandling, Protocol):
    """Umbrella protocol for operational robustness."""
    pass

Server Protocols

Server-specific capabilities for binding, dispatching, and serving.
from typing import Protocol, TypeVar, Any
from polyflux.core.server.binding import SupportsBinding
from polyflux.core.server.dispatch import SupportsDispatching  
from polyflux.core.server.serving import SupportsServing
from polyflux.core.server.interception.base import SupportsInterception

AddressT = TypeVar("AddressT")
I_contra = TypeVar("I_contra", contravariant=True)
O_co = TypeVar("O_co", covariant=True)

class SupportsBinding[AddressT](Protocol):
    """Protocol for network interface binding."""
    async def bind(self, address: AddressT) -> Self:
        """Bind to network interface."""
        ...
    
    async def unbind(self) -> None:
        """Unbind from network interface."""
        ...
    
    def is_bound(self) -> bool:
        """Check if currently bound."""
        ...

class SupportsDispatching[I_contra, O_co](Protocol):
    """Protocol for request routing and handler dispatch."""
    def register_handler(self, pattern: str, handler: Callable[[I_contra], O_co]) -> Self:
        """Register a handler for a pattern."""
        ...
    
    async def dispatch(self, request: I_contra) -> O_co:
        """Dispatch request to appropriate handler."""
        ...

class SupportsServing(Protocol):
    """Protocol for server lifecycle management."""
    async def start(self) -> None:
        """Start the server."""
        ...
    
    async def stop(self) -> None:
        """Stop the server."""
        ...
    
    def is_running(self) -> bool:
        """Check if server is running."""
        ...

class SupportsInterception[I_contra: Inbound, O_co: Outbound](Protocol):
    """Protocol for servers that support data flow interception.
    
    This protocol defines the capability to manage and execute chains of
    interceptors for processing both inbound and outbound data flow. It
    enables middleware-like functionality in a protocol-agnostic way.
    """
    
    def add_interceptor(self, interceptor: Interceptor[I_contra, O_co]) -> Self:
        """Add an interceptor to the processing chain.
        
        Args:
            interceptor: The interceptor instance to add to the processing chain.
            
        Returns:
            Self for method chaining.
        """
        ...
    
    async def intercept_inbound(self, inbound: I_contra) -> I_contra:
        """Process inbound data through all registered interceptors.
        
        This method executes the complete inbound interceptor chain,
        passing data through each registered interceptor in sequence.
        
        Args:
            inbound: The inbound data PDU to process through the interceptor chain.
            
        Returns:
            The processed inbound data PDU after passing through all interceptors.
        """
        ...
    
    async def intercept_outbound(self, outbound: O_co) -> O_co:
        """Process outbound data through all registered interceptors.
        
        This method executes the complete outbound interceptor chain,
        passing data through each registered interceptor in sequence.
        
        Args:
            outbound: The outbound data PDU to process through the interceptor chain.
            
        Returns:
            The processed outbound data PDU after passing through all interceptors.
        """
        ...

Shared Types

from polyflux.core.shared.types import Json, AddressType, SerializeFormat

# JSON-compatible values
data: Json = {"key": "value", "number": 42, "list": [1, 2, 3]}

# Protocol-agnostic addressing
address: AddressType = "https://api.example.com"

# Credential serialization formats
format: SerializeFormat = "dict"  # "dict" | "string" | "bytes"

Exception Handling

Server Exceptions

from polyflux.core.server.exceptions import DispatchError, PatternCompilationError

try:
    # Server dispatch operation
    result = await server.dispatch(request)
except DispatchError as e:
    print(f"Dispatch failed: {e}")
except PatternCompilationError as e:
    print(f"Invalid pattern: {e}")

Utilities

Type Guards

from polyflux.core.shared.contracts.protocol_data_unit import is_inbound, is_outbound

# Check PDU types
data = {"status_code": 200, "data": b"response"}
if is_inbound(data):
    print("This is an inbound PDU")

request = {"target": "https://api.example.com", "method": "GET"}  
if is_outbound(request):
    print("This is an outbound PDU")

Type Metadata Utilities

from polyflux.core.shared.types import doc, example, constraint

# Create type metadata
user_age = constraint("age", gte=0, lte=150)
api_key = constraint("api_key", regex=r"^[a-zA-Z0-9]{32}$")
description = doc("User's description field")
sample_value = example("John Doe", "Typical user name")