Core protocols and shared components API reference
from typing import Any, TypeVar
from polyflux.core.shared.contracts.exchange import (
SupportsSend, SupportsReceive, SupportsExchange, BaseExchange
)
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
T_kwargs = TypeVar("T_kwargs")
I_co = TypeVar("I_co", bound=Inbound, covariant=True)
class SupportsSend[**T_kwargs](Protocol):
"""Protocol for one-way data transmission."""
async def send(self, **kwargs: T_kwargs) -> bool:
"""Send data using keyword arguments."""
...
class SupportsReceive[I_co](Protocol):
"""Protocol for active data retrieval."""
async def receive(self, source: str) -> I_co:
"""Receive data from specified source."""
...
class SupportsExchange[**T_kwargs, I_co](Protocol):
"""Protocol for atomic send-receive operations."""
async def exchange(self, **kwargs: T_kwargs) -> I_co:
"""Send data and receive response atomically."""
...
class BaseExchange[I_co](
SupportsSend[Any],
SupportsReceive[I_co],
SupportsExchange[Any, I_co],
Protocol
):
"""Complete communication interface."""
pass
from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
class HttpClient(BaseExchange[Inbound]):
async def send(self, **kwargs: Any) -> bool:
"""Send HTTP request without waiting for response."""
try:
await self.exchange(**kwargs)
return True
except Exception:
return False
async def receive(self, source: str) -> Inbound:
"""Receive from HTTP source (polling, SSE, etc.)."""
# Implementation for receiving from HTTP source
return {"data": b"received_data"}
async def exchange(self, **kwargs: Any) -> Inbound:
"""Send HTTP request and receive response."""
target = kwargs["target"] # Required field
method = kwargs.get("method", "GET")
headers = kwargs.get("headers", {})
body = kwargs.get("body")
# HTTP implementation here
return {
"status_code": 200,
"data": b"response_body",
"headers": {"content-type": "application/json"}
}
from typing import TypedDict, NotRequired, Protocol
from polyflux.core.shared.contracts.configurable import (
BaseConfigSchema, SupportsConfiguration, BaseConfiguration
)
class BaseConfigSchema(TypedDict):
"""Basic configuration options."""
timeout: NotRequired[float]
retries: NotRequired[int]
logging_level: NotRequired[str]
ConfigSchemaT = TypeVar("ConfigSchemaT", bound=BaseConfigSchema)
class SupportsConfiguration[ConfigSchemaT: BaseConfigSchema](Protocol):
"""Configuration interface."""
def configure(self, config: ConfigSchemaT) -> Self:
"""Configure the component with provided settings."""
...
class BaseConfiguration[ConfigSchemaT: BaseConfigSchema](
SupportsConfiguration[ConfigSchemaT],
Protocol
):
"""Complete configuration interface."""
pass
from typing import TypedDict, NotRequired
from polyflux.core.shared.contracts.configurable import BaseConfigSchema, SupportsConfiguration
class HttpConfigSchema(BaseConfigSchema):
base_url: str # Required
headers: NotRequired[dict[str, str]] # Optional
follow_redirects: NotRequired[bool] # Optional
class HttpClient(SupportsConfiguration[HttpConfigSchema]):
def configure(self, config: HttpConfigSchema) -> Self:
self.base_url = config["base_url"]
self.timeout = config.get("timeout", 30.0)
self.headers = config.get("headers", {})
return self
from typing import Protocol, Any
from polyflux.core.shared.contracts.auth import SupportsCredentials, SupportsAuthentication
from polyflux.core.shared.types import SerializeFormat
class SupportsCredentials(Protocol):
"""Protocol for credential objects."""
def is_valid(self) -> bool:
"""Check if credentials are valid."""
...
def get_redacted_string(self) -> str:
"""Safe string representation for logging."""
...
def serialize(self, format: SerializeFormat = "dict") -> Any:
"""Convert credentials to usable format."""
...
class SupportsAuthentication(Protocol):
"""Protocol for authentication operations."""
async def authenticate(self, credentials: Any) -> bool:
"""Authenticate with provided credentials."""
...
async def is_authenticated(self) -> bool:
"""Check current authentication status."""
...
from polyflux.core.shared.contracts.auth import SupportsCredentials, SupportsAuthentication
class BearerToken(SupportsCredentials):
def __init__(self, token: str):
self.token = token
def is_valid(self) -> bool:
return bool(self.token and len(self.token) > 0)
def get_redacted_string(self) -> str:
return f"Bearer {self.token[:8]}..."
def serialize(self, format: SerializeFormat = "dict") -> Any:
if format == "dict":
return {"Authorization": f"Bearer {self.token}"}
return self.token
class HttpClient(SupportsAuthentication):
def __init__(self):
self._credentials = None
async def authenticate(self, credentials: BearerToken) -> bool:
if credentials.is_valid():
self._credentials = credentials
return True
return False
async def is_authenticated(self) -> bool:
return self._credentials is not None and self._credentials.is_valid()
from typing import Protocol, Callable, Any
from polyflux.core.shared.contracts.reliability import SupportsErrorHandling, BaseReliability
class SupportsErrorHandling(Protocol):
"""Protocol for error handler registration and processing."""
def on_error(self, handler: Callable[[Exception], Any]) -> Self:
"""Register an error handler."""
...
async def handle_error(self, error: Exception) -> Any:
"""Process an error through registered handlers."""
...
class BaseReliability(SupportsErrorHandling, Protocol):
"""Umbrella protocol for operational robustness."""
pass
from typing import Protocol, TypeVar, Any
from polyflux.core.server.binding import SupportsBinding
from polyflux.core.server.dispatch import SupportsDispatching
from polyflux.core.server.serving import SupportsServing
from polyflux.core.server.interception.base import SupportsInterception
AddressT = TypeVar("AddressT")
I_contra = TypeVar("I_contra", contravariant=True)
O_co = TypeVar("O_co", covariant=True)
class SupportsBinding[AddressT](Protocol):
"""Protocol for network interface binding."""
async def bind(self, address: AddressT) -> Self:
"""Bind to network interface."""
...
async def unbind(self) -> None:
"""Unbind from network interface."""
...
def is_bound(self) -> bool:
"""Check if currently bound."""
...
class SupportsDispatching[I_contra, O_co](Protocol):
"""Protocol for request routing and handler dispatch."""
def register_handler(self, pattern: str, handler: Callable[[I_contra], O_co]) -> Self:
"""Register a handler for a pattern."""
...
async def dispatch(self, request: I_contra) -> O_co:
"""Dispatch request to appropriate handler."""
...
class SupportsServing(Protocol):
"""Protocol for server lifecycle management."""
async def start(self) -> None:
"""Start the server."""
...
async def stop(self) -> None:
"""Stop the server."""
...
def is_running(self) -> bool:
"""Check if server is running."""
...
class SupportsInterception[I_contra: Inbound, O_co: Outbound](Protocol):
"""Protocol for servers that support data flow interception.
This protocol defines the capability to manage and execute chains of
interceptors for processing both inbound and outbound data flow. It
enables middleware-like functionality in a protocol-agnostic way.
"""
def add_interceptor(self, interceptor: Interceptor[I_contra, O_co]) -> Self:
"""Add an interceptor to the processing chain.
Args:
interceptor: The interceptor instance to add to the processing chain.
Returns:
Self for method chaining.
"""
...
async def intercept_inbound(self, inbound: I_contra) -> I_contra:
"""Process inbound data through all registered interceptors.
This method executes the complete inbound interceptor chain,
passing data through each registered interceptor in sequence.
Args:
inbound: The inbound data PDU to process through the interceptor chain.
Returns:
The processed inbound data PDU after passing through all interceptors.
"""
...
async def intercept_outbound(self, outbound: O_co) -> O_co:
"""Process outbound data through all registered interceptors.
This method executes the complete outbound interceptor chain,
passing data through each registered interceptor in sequence.
Args:
outbound: The outbound data PDU to process through the interceptor chain.
Returns:
The processed outbound data PDU after passing through all interceptors.
"""
...
from polyflux.core.shared.types import Json, AddressType, SerializeFormat
# JSON-compatible values
data: Json = {"key": "value", "number": 42, "list": [1, 2, 3]}
# Protocol-agnostic addressing
address: AddressType = "https://api.example.com"
# Credential serialization formats
format: SerializeFormat = "dict" # "dict" | "string" | "bytes"
from polyflux.core.server.exceptions import DispatchError, PatternCompilationError
try:
# Server dispatch operation
result = await server.dispatch(request)
except DispatchError as e:
print(f"Dispatch failed: {e}")
except PatternCompilationError as e:
print(f"Invalid pattern: {e}")
from polyflux.core.shared.contracts.protocol_data_unit import is_inbound, is_outbound
# Check PDU types
data = {"status_code": 200, "data": b"response"}
if is_inbound(data):
print("This is an inbound PDU")
request = {"target": "https://api.example.com", "method": "GET"}
if is_outbound(request):
print("This is an outbound PDU")
from polyflux.core.shared.types import doc, example, constraint
# Create type metadata
user_age = constraint("age", gte=0, lte=150)
api_key = constraint("api_key", regex=r"^[a-zA-Z0-9]{32}$")
description = doc("User's description field")
sample_value = example("John Doe", "Typical user name")
Was this page helpful?