Documentation Index Fetch the complete documentation index at: https://docs.polyflux.ai/llms.txt
Use this file to discover all available pages before exploring further.
Welcome
This section provides comprehensive API documentation for all Polyflux packages. The documentation covers core protocols, client implementations, and server interfaces with full IDE support.
Core Protocols Fundamental protocols and interfaces for all implementations
Protocol Interfaces Complete protocol definitions and type interfaces
Protocol Data Units Message containers for easy data handling
Core Concepts
Protocol Data Units (PDUs)
All communication in Polyflux uses message containers for structured communication:
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
# Create an outbound PDU with required target field
outbound: Outbound = {
"target" : "https://api.example.com/users"
}
# Message types can be customized per protocol
from typing import NotRequired
class HttpOutbound ( Outbound ):
method: str
headers: NotRequired[dict[ str , str ]]
body: NotRequired[dict[ str , Any] | str | bytes ]
# Create HTTP-specific outbound PDU
request: HttpOutbound = {
"target" : "https://api.example.com/users" ,
"method" : "POST" ,
"headers" : { "Content-Type" : "application/json" },
"body" : { "name" : "John" , "email" : "john@example.com" }
}
# Exchange returns inbound PDU
response: HttpInbound = await client.exchange( ** request)
Exchange Protocol
The fundamental communication interface with flexible parameter passing:
from polyflux.core.shared.contracts.exchange import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
class HttpClient (BaseExchange[Inbound]):
async def exchange ( self , ** kwargs : Any) -> Inbound:
"""Exchange using kwargs that form an Outbound PDU."""
target = kwargs[ "target" ] # Required field from Outbound
method = kwargs.get( "method" , "GET" )
headers = kwargs.get( "headers" , {})
body = kwargs.get( "body" )
# HTTP implementation here
response_data = await self ._http_request(target, method, headers, body)
# Return inbound PDU
return {
"status_code" : response_data.status,
"data" : response_data.content,
"headers" : dict (response_data.headers)
}
async def send ( self , ** kwargs : Any) -> bool :
"""Send without expecting response."""
try :
await self .exchange( ** kwargs)
return True
except Exception :
return False
async def receive ( self , source : str ) -> Inbound:
"""Receive from specified source."""
# Implementation for polling, SSE, WebSocket, etc.
return await self ._receive_from_source(source)
Easy Configuration
All components use simple setup with validation:
from typing import TypedDict, NotRequired
from polyflux.core.shared.contracts.configurable import BaseConfigSchema, SupportsConfiguration
class HttpConfigSchema ( BaseConfigSchema ):
base_url: str # Required
timeout: NotRequired[ float ] # Optional (inherited)
headers: NotRequired[dict[ str , str ]] # Optional
follow_redirects: NotRequired[ bool ] # Optional
class HttpClient (SupportsConfiguration[HttpConfigSchema]):
def configure ( self , config : HttpConfigSchema) -> Self:
self .base_url = config[ "base_url" ]
self .timeout = config.get( "timeout" , 30.0 )
self .headers = config.get( "headers" , {})
return self
# Type-safe configuration
config: HttpConfigSchema = {
"base_url" : "https://api.example.com" ,
"timeout" : 60.0 ,
"headers" : { "User-Agent" : "MyApp/1.0" }
}
client = HttpClient().configure(config)
Getting Started
Here are practical examples to get you started with Polyflux:
from polyflux.http import HttpClient
import asyncio
async def main ():
# Simple HTTP client usage
async with HttpClient( base_url = "https://api.example.com" ) as client:
# GET request
user = await client.exchange(
target = "/users/123" ,
method = "GET"
)
# POST request
new_user = await client.exchange(
target = "/users" ,
method = "POST" ,
body = { "name" : "John" , "email" : "john@example.com" }
)
asyncio.run(main())
WebSocket Example
Real-time communication with WebSocket:
from polyflux.websocket import WebSocketClient
import asyncio
async def main ():
# WebSocket client
async with WebSocketClient( uri = "ws://localhost:8765" ) as client:
# Send message
await client.send(
target = "server" ,
body = { "type" : "chat" , "message" : "Hello!" }
)
# Receive message
response = await client.receive( "server" )
print ( f "Received: { response } " )
asyncio.run(main())
Available Packages
Core Package (polyflux.core)
Status: Production Ready
Core building blocks:
Exchange protocols (send, receive, exchange)
Simple configuration
Authentication protocols
Reliability and error handling
Server protocols (binding, dispatching, serving)
Enhanced type system with validation
Message containers
HTTP Package (polyflux.http)
Status: Production Ready
Complete HTTP client and server implementation:
Full async HTTP client with context managers
ASGI server integration with Hypercorn
Easy configuration setup
Automatic error recovery
Request/response interception
Authentication support
Reliable (115 tests)
from polyflux.http import HttpClient, HttpServer
# HTTP Client
async with HttpClient( base_url = "https://api.example.com" ) as client:
response = await client.exchange(
target = "/users" ,
method = "POST" ,
content_type = "application/json" ,
body = { "name" : "John" , "email" : "john@example.com" }
)
# HTTP Server
server = HttpServer( host = "0.0.0.0" , port = 8080 )
@server.register_handler ( "/api/users" , methods = [ "POST" ])
async def create_user ( request : HttpInbound) -> HttpOutbound:
return {
"target" : "client" ,
"method" : "POST" ,
"headers" : { "X-HTTP-Status" : "201" },
"body" : "User created"
}
WebSocket Package (polyflux.websocket)
Status: Production Ready
Complete WebSocket client and server implementation:
Full async WebSocket client with connection pooling
WebSocket server with broadcasting and real-time capabilities
Easy configuration setup
Performance features including compression and monitoring
Error handling and reliability features
Interception and middleware support
Reliable (280 tests)
from polyflux.websocket import WebSocketClient, WebSocketServer
# WebSocket Client
async with WebSocketClient() as client:
await client.connect( "wss://echo.websocket.org" )
response = await client.exchange(
target = "wss://echo.websocket.org" ,
message_type = "text" ,
data = "Hello WebSocket!"
)
# WebSocket Server with Broadcasting
server = WebSocketServer(
asgi_app = create_asgi_app(),
config = create_hypercorn_config({ "host" : "0.0.0.0" , "port" : 8080 }),
broadcasting_enabled = True ,
default_channels = [ "general" , "notifications" ]
)
@server.register_handler ( "ws_text" )
async def handle_message ( message : WebSocketInbound) -> WebSocketOutbound:
return {
"target" : "broadcast:all" ,
"message_type" : "text" ,
"data" : f "Echo: { message[ 'data' ] } "
}
Future Packages
In Development:
polyflux.grpc : gRPC client and server implementation
polyflux.graphql : GraphQL client and server implementation
polyflux.mqtt : MQTT client and broker implementation for IoT
polyflux.kafka : Apache Kafka client for distributed streaming
polyflux.database : Database client with unified query interface
Getting Started
Install packages : uv add polyflux-core polyflux-http polyflux-websocket
Import protocols : Choose the protocols that match your needs
Build clients : Create implementations using message containers
Get IDE support : Full autocomplete and validation
Build applications : Create robust, type-safe networked applications
from polyflux.http import HttpClient
import asyncio
async def main ():
async with HttpClient( base_url = "https://jsonplaceholder.typicode.com" ) as client:
# Simple request with parameters
response = await client.exchange(
target = "/posts/1" ,
method = "GET" ,
headers = { "Accept" : "application/json" }
)
print ( f "Status: { response[ 'status_code' ] } " )
print ( f "Data: { response[ 'data' ].decode() } " )
asyncio.run(main())