Fundamental protocols and interfaces for Polyflux
polyflux-core
package provides building blocks and shared components for communication clients and servers.
uv add polyflux-core
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
from typing import NotRequired, Any
# Base Outbound PDU requires a target field
outbound: Outbound = {
"target": "https://api.example.com/users"
}
# Base Inbound PDU (empty by default, extended by protocols)
inbound: Inbound = {}
# Message types can be customized per protocol
class HttpOutbound(Outbound):
method: str
headers: NotRequired[dict[str, str]]
body: NotRequired[dict[str, Any] | str | bytes]
class HttpInbound(Inbound):
status_code: int
data: bytes
headers: NotRequired[dict[str, str]]
# Create message containers
request: HttpOutbound = {
"target": "https://api.example.com/users",
"method": "POST",
"headers": {"Content-Type": "application/json"},
"body": {"name": "John", "email": "john@example.com"}
}
from polyflux.core import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
from typing import Any
class CustomClient(BaseExchange[Inbound]):
async def exchange(self, **kwargs: Any) -> Inbound:
"""Exchange using parameters for sending messages."""
target = kwargs["target"] # Required field from Outbound
# Extract other protocol-specific fields from kwargs
# Custom protocol implementation
result = await self._custom_protocol_request(target, **kwargs)
# Return inbound PDU
return {
"data": result.data,
"metadata": result.metadata
}
async def send(self, **kwargs: Any) -> bool:
"""Send without expecting response."""
try:
await self.exchange(**kwargs)
return True
except Exception:
return False
async def receive(self, source: str) -> Inbound:
"""Receive from specified source."""
result = await self._receive_from_source(source)
return {"data": result}
from typing import TypedDict, NotRequired
from polyflux.core import BaseConfigSchema, SupportsConfiguration
class CustomConfigSchema(BaseConfigSchema):
endpoint: str # Required
timeout: NotRequired[float] # Optional (inherited from BaseConfigSchema)
retries: NotRequired[int] # Optional (inherited from BaseConfigSchema)
api_key: NotRequired[str] # Optional
class CustomClient(SupportsConfiguration[CustomConfigSchema]):
def configure(self, config: CustomConfigSchema) -> Self:
self.endpoint = config["endpoint"]
self.timeout = config.get("timeout", 30.0)
self.api_key = config.get("api_key")
return self
# Configuration setup
config: CustomConfigSchema = {
"endpoint": "https://api.example.com",
"timeout": 60.0,
"api_key": "secret-key"
}
client = CustomClient().configure(config)
from polyflux.core import SupportsAuthentication
from polyflux.core.shared.contracts.auth import SupportsCredentials
from polyflux.core.shared.types import SerializeFormat
from typing import Any
class ApiKeyCredentials(SupportsCredentials):
def __init__(self, api_key: str):
self.api_key = api_key
def is_valid(self) -> bool:
return bool(self.api_key and len(self.api_key) > 0)
def get_redacted_string(self) -> str:
return f"ApiKey({self.api_key[:8]}...)"
def serialize(self, format: SerializeFormat = "dict") -> Any:
if format == "dict":
return {"X-API-Key": self.api_key}
return self.api_key
class AuthenticatedClient(SupportsAuthentication):
def __init__(self):
self._credentials = None
async def authenticate(self, credentials: ApiKeyCredentials) -> bool:
if credentials.is_valid():
self._credentials = credentials
return True
return False
async def is_authenticated(self) -> bool:
return self._credentials is not None and self._credentials.is_valid()
from polyflux.core.server import SupportsBinding
from typing_extensions import Self
class CustomServer(SupportsBinding[str]):
async def bind(self, address: str) -> Self:
"""Bind to network address."""
self._address = address
self._bound = True
return self
async def unbind(self) -> None:
"""Unbind from network."""
self._bound = False
def is_bound(self) -> bool:
"""Check if bound to address."""
return getattr(self, '_bound', False)
from polyflux.core.server import SupportsDispatching
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
from typing import Callable
from typing_extensions import Self
class CustomServer(SupportsDispatching[Inbound, Outbound]):
def __init__(self):
self._handlers = {}
def register_handler(
self,
pattern: str,
handler: Callable[[Inbound], Outbound]
) -> Self:
"""Register handler for pattern."""
self._handlers[pattern] = handler
return self
async def dispatch(self, request: Inbound) -> Outbound:
"""Dispatch request to handler."""
# Find matching handler based on request
handler = self._find_handler(request)
return await handler(request)
from polyflux.core.server import SupportsServing
class CustomServer(SupportsServing):
async def start(self) -> None:
"""Start the server."""
self._running = True
# Start server logic
async def stop(self) -> None:
"""Stop the server."""
self._running = False
# Cleanup logic
def is_running(self) -> bool:
"""Check if server is running."""
return getattr(self, '_running', False)
# Main exports (available at top level)
from polyflux.core import (
BaseClient,
BaseExchange,
BaseReliability,
BaseConfiguration,
BaseConfigSchema,
SupportsSend,
SupportsReceive,
SupportsExchange,
SupportsErrorHandling,
SupportsAuthentication,
SupportsConfiguration,
)
# Server protocols
from polyflux.core.server import (
BaseServer,
SupportsBinding,
SupportsDispatching,
SupportsServing,
SupportsInterception,
)
# Authentication (additional)
from polyflux.core.shared.contracts.auth import SupportsCredentials
# PDU types and utilities
from polyflux.core.shared.contracts.protocol_data_unit import (
Inbound, Outbound, is_inbound, is_outbound
)
# Type system
from polyflux.core.shared.types import (
doc, example, constraint,
Json, Host, Port, NetworkAddress, SerializeFormat
)
Was this page helpful?