Overview

The polyflux-core package provides building blocks and shared components for communication clients and servers.

Installation

uv add polyflux-core

Key Concepts

Message Types

Message containers that standardize how data is sent and received:
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
from typing import NotRequired, Any

# Base Outbound PDU requires a target field
outbound: Outbound = {
    "target": "https://api.example.com/users"
}

# Base Inbound PDU (empty by default, extended by protocols)
inbound: Inbound = {}

# Message types can be customized per protocol
class HttpOutbound(Outbound):
    method: str
    headers: NotRequired[dict[str, str]]
    body: NotRequired[dict[str, Any] | str | bytes]

class HttpInbound(Inbound):
    status_code: int
    data: bytes
    headers: NotRequired[dict[str, str]]

# Create message containers
request: HttpOutbound = {
    "target": "https://api.example.com/users",
    "method": "POST",
    "headers": {"Content-Type": "application/json"},
    "body": {"name": "John", "email": "john@example.com"}
}

Exchange Protocol

The fundamental communication interface with flexible parameter passing:
from polyflux.core import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
from typing import Any

class CustomClient(BaseExchange[Inbound]):
    async def exchange(self, **kwargs: Any) -> Inbound:
        """Exchange using parameters for sending messages."""
        target = kwargs["target"]  # Required field from Outbound
        # Extract other protocol-specific fields from kwargs
        
        # Custom protocol implementation
        result = await self._custom_protocol_request(target, **kwargs)
        
        # Return inbound PDU
        return {
            "data": result.data,
            "metadata": result.metadata
        }
    
    async def send(self, **kwargs: Any) -> bool:
        """Send without expecting response."""
        try:
            await self.exchange(**kwargs)
            return True
        except Exception:
            return False
    
    async def receive(self, source: str) -> Inbound:
        """Receive from specified source."""
        result = await self._receive_from_source(source)
        return {"data": result}

Configuration Protocols

Simple configuration management:
from typing import TypedDict, NotRequired
from polyflux.core import BaseConfigSchema, SupportsConfiguration

class CustomConfigSchema(BaseConfigSchema):
    endpoint: str  # Required
    timeout: NotRequired[float]  # Optional (inherited from BaseConfigSchema)
    retries: NotRequired[int]    # Optional (inherited from BaseConfigSchema)
    api_key: NotRequired[str]    # Optional

class CustomClient(SupportsConfiguration[CustomConfigSchema]):
    def configure(self, config: CustomConfigSchema) -> Self:
        self.endpoint = config["endpoint"]
        self.timeout = config.get("timeout", 30.0)
        self.api_key = config.get("api_key")
        return self

# Configuration setup
config: CustomConfigSchema = {
    "endpoint": "https://api.example.com",
    "timeout": 60.0,
    "api_key": "secret-key"
}

client = CustomClient().configure(config)

Authentication Protocols

Credential management and authentication capabilities:
from polyflux.core import SupportsAuthentication
from polyflux.core.shared.contracts.auth import SupportsCredentials
from polyflux.core.shared.types import SerializeFormat
from typing import Any

class ApiKeyCredentials(SupportsCredentials):
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    def is_valid(self) -> bool:
        return bool(self.api_key and len(self.api_key) > 0)
    
    def get_redacted_string(self) -> str:
        return f"ApiKey({self.api_key[:8]}...)"
    
    def serialize(self, format: SerializeFormat = "dict") -> Any:
        if format == "dict":
            return {"X-API-Key": self.api_key}
        return self.api_key

class AuthenticatedClient(SupportsAuthentication):
    def __init__(self):
        self._credentials = None
    
    async def authenticate(self, credentials: ApiKeyCredentials) -> bool:
        if credentials.is_valid():
            self._credentials = credentials
            return True
        return False
    
    async def is_authenticated(self) -> bool:
        return self._credentials is not None and self._credentials.is_valid()

Server Protocols

Core server capabilities for building protocol servers:

Network Binding

from polyflux.core.server import SupportsBinding
from typing_extensions import Self

class CustomServer(SupportsBinding[str]):
    async def bind(self, address: str) -> Self:
        """Bind to network address."""
        self._address = address
        self._bound = True
        return self
    
    async def unbind(self) -> None:
        """Unbind from network."""
        self._bound = False
    
    def is_bound(self) -> bool:
        """Check if bound to address."""
        return getattr(self, '_bound', False)

Request Dispatching

from polyflux.core.server import SupportsDispatching
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
from typing import Callable
from typing_extensions import Self

class CustomServer(SupportsDispatching[Inbound, Outbound]):
    def __init__(self):
        self._handlers = {}
    
    def register_handler(
        self, 
        pattern: str, 
        handler: Callable[[Inbound], Outbound]
    ) -> Self:
        """Register handler for pattern."""
        self._handlers[pattern] = handler
        return self
    
    async def dispatch(self, request: Inbound) -> Outbound:
        """Dispatch request to handler."""
        # Find matching handler based on request
        handler = self._find_handler(request)
        return await handler(request)

Server Lifecycle

from polyflux.core.server import SupportsServing

class CustomServer(SupportsServing):
    async def start(self) -> None:
        """Start the server."""
        self._running = True
        # Start server logic
    
    async def stop(self) -> None:
        """Stop the server."""
        self._running = False
        # Cleanup logic
    
    def is_running(self) -> bool:
        """Check if server is running."""
        return getattr(self, '_running', False)

Available Exports

The core package exports all major protocols and types:
# Main exports (available at top level)
from polyflux.core import (
    BaseClient,
    BaseExchange,
    BaseReliability,
    BaseConfiguration,
    BaseConfigSchema,
    SupportsSend,
    SupportsReceive,
    SupportsExchange,
    SupportsErrorHandling,
    SupportsAuthentication,
    SupportsConfiguration,
)

# Server protocols
from polyflux.core.server import (
    BaseServer,
    SupportsBinding,
    SupportsDispatching,
    SupportsServing,
    SupportsInterception,
)

# Authentication (additional)
from polyflux.core.shared.contracts.auth import SupportsCredentials

# PDU types and utilities
from polyflux.core.shared.contracts.protocol_data_unit import (
    Inbound, Outbound, is_inbound, is_outbound
)

# Type system
from polyflux.core.shared.types import (
    doc, example, constraint,
    Json, Host, Port, NetworkAddress, SerializeFormat
)
The core package provides the foundation for building robust, type-safe networked applications with any communication protocol.