Welcome

This section provides comprehensive API documentation for all Polyflux packages. The documentation covers core protocols, client implementations, and server interfaces with full IDE support.

Core Concepts

Protocol Data Units (PDUs)

All communication in Polyflux uses message containers for structured communication:
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound

# Create an outbound PDU with required target field
outbound: Outbound = {
    "target": "https://api.example.com/users"
}

# Message types can be customized per protocol
from typing import NotRequired

class HttpOutbound(Outbound):
    method: str
    headers: NotRequired[dict[str, str]]
    body: NotRequired[dict[str, Any] | str | bytes]

# Create HTTP-specific outbound PDU
request: HttpOutbound = {
    "target": "https://api.example.com/users",
    "method": "POST",
    "headers": {"Content-Type": "application/json"},
    "body": {"name": "John", "email": "john@example.com"}
}

# Exchange returns inbound PDU
response: HttpInbound = await client.exchange(**request)

Exchange Protocol

The fundamental communication interface with flexible parameter passing:
from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound

class HttpClient(BaseExchange[Inbound]):
    async def exchange(self, **kwargs: Any) -> Inbound:
        """Exchange using kwargs that form an Outbound PDU."""
        target = kwargs["target"]  # Required field from Outbound
        method = kwargs.get("method", "GET")
        headers = kwargs.get("headers", {})
        body = kwargs.get("body")
        
        # HTTP implementation here
        response_data = await self._http_request(target, method, headers, body)
        
        # Return inbound PDU
        return {
            "status_code": response_data.status,
            "data": response_data.content,
            "headers": dict(response_data.headers)
        }
    
    async def send(self, **kwargs: Any) -> bool:
        """Send without expecting response."""
        try:
            await self.exchange(**kwargs)
            return True
        except Exception:
            return False
    
    async def receive(self, source: str) -> Inbound:
        """Receive from specified source."""
        # Implementation for polling, SSE, WebSocket, etc.
        return await self._receive_from_source(source)

Easy Configuration

All components use simple setup with validation:
from typing import TypedDict, NotRequired
from polyflux.core.shared.contracts.configurable import BaseConfigSchema, SupportsConfiguration

class HttpConfigSchema(BaseConfigSchema):
    base_url: str  # Required
    timeout: NotRequired[float]  # Optional (inherited)
    headers: NotRequired[dict[str, str]]  # Optional
    follow_redirects: NotRequired[bool]  # Optional

class HttpClient(SupportsConfiguration[HttpConfigSchema]):
    def configure(self, config: HttpConfigSchema) -> Self:
        self.base_url = config["base_url"]
        self.timeout = config.get("timeout", 30.0)
        self.headers = config.get("headers", {})
        return self

# Type-safe configuration
config: HttpConfigSchema = {
    "base_url": "https://api.example.com",
    "timeout": 60.0,
    "headers": {"User-Agent": "MyApp/1.0"}
}

client = HttpClient().configure(config)

Getting Started

Here are practical examples to get you started with Polyflux:
from polyflux.http import HttpClient
import asyncio

async def main():
    # Simple HTTP client usage
    async with HttpClient(base_url="https://api.example.com") as client:
        # GET request
        user = await client.exchange(
            target="/users/123",
            method="GET"
        )
        
        # POST request
        new_user = await client.exchange(
            target="/users",
            method="POST",
            body={"name": "John", "email": "john@example.com"}
        )

asyncio.run(main())

WebSocket Example

Real-time communication with WebSocket:
from polyflux.websocket import WebSocketClient
import asyncio

async def main():
    # WebSocket client
    async with WebSocketClient(uri="ws://localhost:8765") as client:
        # Send message
        await client.send(
            target="server",
            body={"type": "chat", "message": "Hello!"}
        )
        
        # Receive message
        response = await client.receive("server")
        print(f"Received: {response}")

asyncio.run(main())

Available Packages

Core Package (polyflux.core)

Status: Production Ready Core building blocks:
  • Exchange protocols (send, receive, exchange)
  • Simple configuration
  • Authentication protocols
  • Reliability and error handling
  • Server protocols (binding, dispatching, serving)
  • Enhanced type system with validation
  • Message containers

HTTP Package (polyflux.http)

Status: Production Ready Complete HTTP client and server implementation:
  • Full async HTTP client with context managers
  • ASGI server integration with Hypercorn
  • Easy configuration setup
  • Automatic error recovery
  • Request/response interception
  • Authentication support
  • Reliable (115 tests)
from polyflux.http import HttpClient, HttpServer

# HTTP Client
async with HttpClient(base_url="https://api.example.com") as client:
    response = await client.exchange(
        target="/users",
        method="POST",
        content_type="application/json",
        body={"name": "John", "email": "john@example.com"}
    )

# HTTP Server
server = HttpServer(host="0.0.0.0", port=8080)

@server.register_handler("/api/users", methods=["POST"])
async def create_user(request: HttpInbound) -> HttpOutbound:
    return {
        "target": "client",
        "method": "POST",
        "headers": {"X-HTTP-Status": "201"},
        "body": "User created"
    }

WebSocket Package (polyflux.websocket)

Status: Production Ready Complete WebSocket client and server implementation:
  • Full async WebSocket client with connection pooling
  • WebSocket server with broadcasting and real-time capabilities
  • Easy configuration setup
  • Performance features including compression and monitoring
  • Error handling and reliability features
  • Interception and middleware support
  • Reliable (280 tests)
from polyflux.websocket import WebSocketClient, WebSocketServer

# WebSocket Client
async with WebSocketClient() as client:
    await client.connect("wss://echo.websocket.org")
    response = await client.exchange(
        target="wss://echo.websocket.org",
        message_type="text",
        data="Hello WebSocket!"
    )

# WebSocket Server with Broadcasting
server = WebSocketServer(
    asgi_app=create_asgi_app(),
    config=create_hypercorn_config({"host": "0.0.0.0", "port": 8080}),
    broadcasting_enabled=True,
    default_channels=["general", "notifications"]
)

@server.register_handler("ws_text")
async def handle_message(message: WebSocketInbound) -> WebSocketOutbound:
    return {
        "target": "broadcast:all",
        "message_type": "text",
        "data": f"Echo: {message['data']}"
    }

Future Packages

In Development:
  • polyflux.grpc: gRPC client and server implementation
  • polyflux.graphql: GraphQL client and server implementation
  • polyflux.mqtt: MQTT client and broker implementation for IoT
  • polyflux.kafka: Apache Kafka client for distributed streaming
  • polyflux.database: Database client with unified query interface

Getting Started

  1. Install packages: uv add polyflux-core polyflux-http polyflux-websocket
  2. Import protocols: Choose the protocols that match your needs
  3. Build clients: Create implementations using message containers
  4. Get IDE support: Full autocomplete and validation
  5. Build applications: Create robust, type-safe networked applications
from polyflux.http import HttpClient
import asyncio

async def main():
    async with HttpClient(base_url="https://jsonplaceholder.typicode.com") as client:
        # Simple request with parameters
        response = await client.exchange(
            target="/posts/1",
            method="GET",
            headers={"Accept": "application/json"}
        )
        
        print(f"Status: {response['status_code']}")
        print(f"Data: {response['data'].decode()}")

asyncio.run(main())