Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.polyflux.ai/llms.txt

Use this file to discover all available pages before exploring further.

Installation

Polyflux is designed to be modular. Install only the packages you need.

All-in-One Installation

Install all available Polyflux packages:
uv add polyflux

Individual Package Installation

The core package provides the fundamental protocols and interfaces:
uv add polyflux-core
The HTTP package provides full client and server implementations:
uv add polyflux-http
The WebSocket package provides real-time bidirectional communication:
uv add polyflux-websocket

Quick Examples

Get up and running with these simple examples.

Core Protocols

Learn about the foundational protocols and interfaces

HTTP Package

Full HTTP client and server with async support and ASGI integration

WebSocket Package

Production-ready WebSocket client and server with broadcasting

Coming Soon

gRPC, GraphQL, MQTT, and more protocols in development

First Steps

Create a simple client using the core protocols:
from typing import Any
from polyflux.core import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
import asyncio

class SimpleClient(BaseExchange[Inbound]):
    async def exchange(self, **kwargs: Any) -> Inbound:
        """Exchange using parameters for sending data."""
        target = kwargs["target"]  # Required field from Outbound
        method = kwargs.get("method", "GET")
        headers = kwargs.get("headers", {})
        
        # Your protocol implementation here
        print(f"Sending {method} request to {target}")
        print(f"Headers: {headers}")
        
        # Return response data (dictionary)
        return {
            "data": b"Response from simple client",
            "status": "success"
        }
    
    async def send(self, **kwargs: Any) -> bool:
        """Send without expecting response."""
        try:
            await self.exchange(**kwargs)
            return True
        except Exception:
            return False
    
    async def receive(self, source: str) -> Inbound:
        """Receive from specified source."""
        print(f"Receiving from {source}")
        return {
            "data": b"Received data",
            "source": source
        }

async def main():
    client = SimpleClient()
    
    # Send data using parameters
    response = await client.exchange(
        target="/api/greeting",  # Required target field
        method="GET",
        headers={"Accept": "application/json"}
    )
    print(f"Exchange completed: {response}")
    
    # Test send operation
    success = await client.send(
        target="/api/log",
        method="POST",
        data={"event": "quickstart_test"}
    )
    print(f"Send successful: {success}")

if __name__ == "__main__":
    asyncio.run(main())

Understanding Messages

Polyflux uses message containers for structured communication:
from typing import Any, NotRequired
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound

# Base Outbound requires a 'target' field
outbound_data: Outbound = {
    "target": "https://api.example.com/users"
}

# Base Inbound is empty by default
inbound_data: Inbound = {}

# Message types can be customized per protocol
class ApiOutbound(Outbound):
    method: str
    headers: NotRequired[dict[str, str]]
    body: NotRequired[dict[str, Any]]

class ApiInbound(Inbound):
    status_code: int
    data: bytes
    message: NotRequired[str]

# Create message containers
request: ApiOutbound = {
    "target": "https://api.example.com/users",
    "method": "POST",
    "body": {"name": "John", "email": "john@example.com"}
}

response: ApiInbound = {
    "status_code": 201,
    "data": b'{"id": 123, "name": "John"}',
    "message": "User created successfully"
}

HTTP Package Example

Get started with the HTTP package for real-world applications:
from polyflux.http import HttpClient, HttpServer, HttpInbound, HttpOutbound
import asyncio

# HTTP Client Example
async def client_example():
    async with HttpClient(base_url="https://jsonplaceholder.typicode.com") as client:
        # GET request
        response = await client.exchange(
            target="/posts/1",
            method="GET",
            headers={"Accept": "application/json"}
        )
        print(f"Response: {response}")
        
        # POST request
        result = await client.exchange(
            target="/posts",
            method="POST",
            content_type="application/json",
            body={"title": "Hello", "body": "World", "userId": 1}
        )
        print(f"Created: {result}")

# HTTP Server Example
async def server_example():
    server = HttpServer(host="localhost", port=8080)
    
    @server.register_handler("/hello", methods=["GET"])
    async def hello_handler(request: HttpInbound) -> HttpOutbound:
        return {
            "target": "client",
            "method": "GET",
            "headers": {"X-HTTP-Status": "200", "Content-Type": "text/plain"},
            "body": "Hello from Polyflux HTTP Server!"
        }
    
    print("Starting server on http://localhost:8080")
    async with server:
        await asyncio.sleep(5)  # Run for 5 seconds
        print("Server stopped")

# Run examples
asyncio.run(client_example())
# asyncio.run(server_example())  # Uncomment to run server

WebSocket Package Example

Real-time bidirectional communication with WebSocket:
from polyflux.websocket import WebSocketClient, WebSocketServer, WebSocketInbound, WebSocketOutbound
import asyncio

# WebSocket Client Example
async def websocket_client_example():
    async with WebSocketClient() as client:
        await client.connect("wss://echo.websocket.org")
        
        # Send text message
        response = await client.exchange(
            target="wss://echo.websocket.org",
            message_type="text",
            data="Hello WebSocket!"
        )
        print(f"Echo response: {response['data']}")

# WebSocket Server Example  
async def websocket_server_example():
    server = WebSocketServer(host="localhost", port=8080)
    
    @server.register_handler("ws_text")
    async def handle_message(request: WebSocketInbound) -> WebSocketOutbound:
        return {
            "target": "client",
            "message_type": "text",
            "data": f"Echo: {request['data']}"
        }
    
    print("WebSocket server running on ws://localhost:8080")
    async with server:
        await asyncio.sleep(5)  # Run for 5 seconds
        print("WebSocket server stopped")

# Run examples
asyncio.run(websocket_client_example())
# asyncio.run(websocket_server_example())  # Uncomment to run server

Next Steps