Installation

Polyflux is designed to be modular. Install only the packages you need.

All-in-One Installation

Individual Package Installation

Quick Examples

Get up and running with these simple examples.

First Steps

Create a simple client using the core protocols:
from typing import Any
from polyflux.core import BaseExchange
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
import asyncio

class SimpleClient(BaseExchange[Inbound]):
    async def exchange(self, **kwargs: Any) -> Inbound:
        """Exchange using parameters for sending data."""
        target = kwargs["target"]  # Required field from Outbound
        method = kwargs.get("method", "GET")
        headers = kwargs.get("headers", {})
        
        # Your protocol implementation here
        print(f"Sending {method} request to {target}")
        print(f"Headers: {headers}")
        
        # Return response data (dictionary)
        return {
            "data": b"Response from simple client",
            "status": "success"
        }
    
    async def send(self, **kwargs: Any) -> bool:
        """Send without expecting response."""
        try:
            await self.exchange(**kwargs)
            return True
        except Exception:
            return False
    
    async def receive(self, source: str) -> Inbound:
        """Receive from specified source."""
        print(f"Receiving from {source}")
        return {
            "data": b"Received data",
            "source": source
        }

async def main():
    client = SimpleClient()
    
    # Send data using parameters
    response = await client.exchange(
        target="/api/greeting",  # Required target field
        method="GET",
        headers={"Accept": "application/json"}
    )
    print(f"Exchange completed: {response}")
    
    # Test send operation
    success = await client.send(
        target="/api/log",
        method="POST",
        data={"event": "quickstart_test"}
    )
    print(f"Send successful: {success}")

if __name__ == "__main__":
    asyncio.run(main())

Understanding Messages

Polyflux uses message containers for structured communication:
from typing import Any, NotRequired
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound

# Base Outbound requires a 'target' field
outbound_data: Outbound = {
    "target": "https://api.example.com/users"
}

# Base Inbound is empty by default
inbound_data: Inbound = {}

# Message types can be customized per protocol
class ApiOutbound(Outbound):
    method: str
    headers: NotRequired[dict[str, str]]
    body: NotRequired[dict[str, Any]]

class ApiInbound(Inbound):
    status_code: int
    data: bytes
    message: NotRequired[str]

# Create message containers
request: ApiOutbound = {
    "target": "https://api.example.com/users",
    "method": "POST",
    "body": {"name": "John", "email": "john@example.com"}
}

response: ApiInbound = {
    "status_code": 201,
    "data": b'{"id": 123, "name": "John"}',
    "message": "User created successfully"
}

HTTP Package Example

Get started with the HTTP package for real-world applications:
from polyflux.http import HttpClient, HttpServer, HttpInbound, HttpOutbound
import asyncio

# HTTP Client Example
async def client_example():
    async with HttpClient(base_url="https://jsonplaceholder.typicode.com") as client:
        # GET request
        response = await client.exchange(
            target="/posts/1",
            method="GET",
            headers={"Accept": "application/json"}
        )
        print(f"Response: {response}")
        
        # POST request
        result = await client.exchange(
            target="/posts",
            method="POST",
            content_type="application/json",
            body={"title": "Hello", "body": "World", "userId": 1}
        )
        print(f"Created: {result}")

# HTTP Server Example
async def server_example():
    server = HttpServer(host="localhost", port=8080)
    
    @server.register_handler("/hello", methods=["GET"])
    async def hello_handler(request: HttpInbound) -> HttpOutbound:
        return {
            "target": "client",
            "method": "GET",
            "headers": {"X-HTTP-Status": "200", "Content-Type": "text/plain"},
            "body": "Hello from Polyflux HTTP Server!"
        }
    
    print("Starting server on http://localhost:8080")
    async with server:
        await asyncio.sleep(5)  # Run for 5 seconds
        print("Server stopped")

# Run examples
asyncio.run(client_example())
# asyncio.run(server_example())  # Uncomment to run server

WebSocket Package Example

Real-time bidirectional communication with WebSocket:
from polyflux.websocket import WebSocketClient, WebSocketServer, WebSocketInbound, WebSocketOutbound
import asyncio

# WebSocket Client Example
async def websocket_client_example():
    async with WebSocketClient() as client:
        await client.connect("wss://echo.websocket.org")
        
        # Send text message
        response = await client.exchange(
            target="wss://echo.websocket.org",
            message_type="text",
            data="Hello WebSocket!"
        )
        print(f"Echo response: {response['data']}")

# WebSocket Server Example  
async def websocket_server_example():
    server = WebSocketServer(host="localhost", port=8080)
    
    @server.register_handler("ws_text")
    async def handle_message(request: WebSocketInbound) -> WebSocketOutbound:
        return {
            "target": "client",
            "message_type": "text",
            "data": f"Echo: {request['data']}"
        }
    
    print("WebSocket server running on ws://localhost:8080")
    async with server:
        await asyncio.sleep(5)  # Run for 5 seconds
        print("WebSocket server stopped")

# Run examples
asyncio.run(websocket_client_example())
# asyncio.run(websocket_server_example())  # Uncomment to run server

Next Steps