HTTP

The Polyflux HTTP package provides production-ready HTTP client and server implementations with full async support and ASGI integration.

Installation

uv add polyflux-http

Key Features

  • 🚀 Production Ready - Full HTTP/1.1 and HTTP/2 support via Hypercorn ASGI server
  • ⚡ Async First - Built from the ground up for async/await with context managers
  • 🔒 Easy Setup - Simple configuration with validation
  • 🛡️ Resilient - Automatic error recovery and retry logic
  • 🔧 Configurable - Flexible authentication, interceptors, and middleware support
  • 📊 Observable - Built-in logging, metrics, and request/response interception

Quick Start

HTTP Client

from polyflux.http import HttpClient
import asyncio

async def main():
    async with HttpClient(base_url="https://api.example.com") as client:
        # GET request
        response = await client.exchange(
            target="/users/123",
            method="GET",
            headers={"Accept": "application/json"}
        )
        
        # POST with JSON
        result = await client.exchange(
            target="/users",
            method="POST",
            content_type="application/json",
            body={"name": "John", "email": "john@example.com"}
        )

asyncio.run(main())

HTTP Server

from polyflux.http import HttpServer, HttpInbound, HttpOutbound
import asyncio

async def main():
    server = HttpServer(host="0.0.0.0", port=8080, workers=4)
    
    @server.register_handler("/health", methods=["GET"])
    async def health_check(request: HttpInbound) -> HttpOutbound:
        return {
            "target": "client",
            "method": "GET", 
            "headers": {"X-HTTP-Status": "200"},
            "body": "OK"
        }
    
    async with server:
        print("Server running on http://localhost:8080")
        await asyncio.Event().wait()

asyncio.run(main())

Client Usage

Basic Configuration

from polyflux.http import HttpClient, HttpClientConfigSchema

# Simple configuration
config: HttpClientConfigSchema = {
    "base_url": "https://api.example.com",
    "timeout": 30.0,
    "max_retries": 3,
    "headers": {"User-Agent": "MyApp/1.0"}
}

async with HttpClient(**config) as client:
    # Client automatically uses base_url, timeout, etc.
    response = await client.exchange(target="/endpoint", method="GET")

Authentication

from polyflux.http import HttpClient

# API Key authentication
async with HttpClient(
    base_url="https://api.example.com",
    headers={"Authorization": "Bearer your-token-here"}
) as client:
    response = await client.exchange(target="/protected", method="GET")

Error Handling & Circuit Breaker

from polyflux.http import HttpClient
from polyflux.http.shared.exceptions import CircuitBreakerOpenError

async with HttpClient(base_url="https://unreliable-api.com") as client:
    try:
        response = await client.exchange(target="/endpoint", method="GET")
    except CircuitBreakerOpenError as e:
        print(f"Circuit breaker open: {e}")
        # Fallback logic here

Server Usage

Advanced Server Configuration

from polyflux.http import HttpServer

server = HttpServer(
    host="0.0.0.0",
    port=8080,
    workers=4,                    # Multi-process workers
    ssl_cert="/path/to/cert.pem", # HTTPS support
    ssl_key="/path/to/key.pem",
    max_connections=1000,         # Connection limits
    request_timeout=30.0,         # Request timeouts
    access_log=True              # Access logging
)

Route Handlers with Parameters

@server.register_handler("/api/users/{user_id}", methods=["GET", "PUT", "DELETE"])
async def user_handler(request: HttpInbound, user_id: str) -> HttpOutbound:
    method = request["headers"].get("X-HTTP-Method", "GET")
    
    if method == "GET":
        return {
            "target": "client",
            "method": "GET",
            "headers": {
                "X-HTTP-Status": "200",
                "Content-Type": "application/json"
            },
            "body": f'{{"id": "{user_id}", "name": "User {user_id}"}}'
        }
    elif method == "PUT":
        # Update user logic
        body = request.get("data", b"").decode()
        return {
            "target": "client",
            "method": "PUT", 
            "headers": {"X-HTTP-Status": "200"},
            "body": f"Updated user {user_id}"
        }
    elif method == "DELETE":
        # Delete user logic
        return {
            "target": "client",
            "method": "DELETE",
            "headers": {"X-HTTP-Status": "204"},
            "body": ""
        }

Request Interceptors

from collections.abc import Awaitable, Callable
from polyflux.core.server.interception import Interceptor
from polyflux.http import HttpInbound, HttpOutbound
from polyflux.http.shared.exceptions import HttpAuthenticationError

class AuthInterceptor(Interceptor[HttpInbound, HttpOutbound]):
    async def intercept_inbound(
        self, request: HttpInbound, next_interceptor: Callable[[HttpInbound], Awaitable[HttpInbound]]
    ) -> HttpInbound:
        auth_header = request.get("headers", {}).get("Authorization")
        if not auth_header or not self.validate_token(auth_header):
            raise HttpAuthenticationError("Invalid authentication")
        return await next_interceptor(request)
    
    async def intercept_outbound(
        self, response: HttpOutbound, next_interceptor: Callable[[HttpOutbound], Awaitable[HttpOutbound]]
    ) -> HttpOutbound:
        # Optional: modify outbound responses
        return await next_interceptor(response)
    
    def validate_token(self, token: str) -> bool:
        # Your token validation logic
        return token == "Bearer valid-token"

# Add interceptor to server
server.add_interceptor(AuthInterceptor())

Type System

Client Configuration

from typing import NotRequired
from polyflux.http.client.configurable import HttpClientConfigSchema

config: HttpClientConfigSchema = {
    "base_url": "https://api.example.com",        # Required
    "timeout": 30.0,                             # Optional: request timeout
    "max_retries": 3,                            # Optional: retry attempts
    "headers": {"User-Agent": "MyApp/1.0"},      # Optional: default headers
    "follow_redirects": True,                    # Optional: redirect handling
}

Server Configuration

from polyflux.http.server.configurable import HttpServerConfigSchema

config: HttpServerConfigSchema = {
    "host": "0.0.0.0",                           # Optional: bind address
    "port": 8080,                                # Optional: port number
    "workers": 4,                                # Optional: worker processes
    "ssl_cert": "/path/to/cert.pem",             # Optional: SSL certificate
    "ssl_key": "/path/to/key.pem",               # Optional: SSL private key
    "max_connections": 1000,                     # Optional: connection limit
    "request_timeout": 30.0,                     # Optional: request timeout
    "access_log": True,                          # Optional: access logging
    "debug": False,                              # Optional: debug mode
}

Message Types

from polyflux.http import HttpInbound, HttpOutbound

# Inbound (responses from server)
inbound: HttpInbound = {
    "status_code": 200,
    "data": b'{"message": "Hello, World!"}',
    "headers": {"Content-Type": "application/json"}  # Optional
}

# Outbound (requests from client)
outbound: HttpOutbound = {
    "target": "https://api.example.com/users",  # Required (inherited from Outbound)
    "method": "POST",                           # Required
    "headers": {"Authorization": "Bearer token"},  # Optional
    "content_type": "application/json",         # Optional
    "body": {"name": "John", "email": "john@example.com"}  # Optional
}

Error Handling

Built-in Exception Types

from polyflux.http.shared.exceptions import (
    HttpError,                    # Base HTTP exception
    HttpConnectionError,          # Connection issues
    HttpTimeoutError,            # Request timeouts
    HttpValidationError,         # Request validation
    HttpAuthenticationError,     # Authentication failures
    HttpAuthorizationError,      # Authorization failures
    CircuitBreakerOpenError,     # Circuit breaker open
    HttpStatusError,             # HTTP status errors
)

try:
    response = await client.exchange(target="/endpoint", method="GET")
except HttpTimeoutError as e:
    print(f"Request timed out: {e}")
except HttpConnectionError as e:
    print(f"Connection failed: {e}")
except HttpAuthenticationError as e:
    print(f"Authentication failed: {e}")
except HttpError as e:
    print(f"HTTP error: {e}")

Circuit Breaker Configuration

from polyflux.http import HttpClient

# Circuit breaker automatically protects against failing services
async with HttpClient(base_url="https://flaky-service.com") as client:
    # After 5 failures, circuit opens for 60 seconds
    # Automatic retry with exponential backoff
    response = await client.exchange(target="/endpoint", method="GET")

Production Deployment

Server with SSL and Workers

import asyncio
import signal
from polyflux.http import HttpServer

async def run_production_server():
    server = HttpServer(
        host="0.0.0.0",
        port=443,
        workers=8,                           # Scale based on CPU cores
        ssl_cert="/etc/ssl/certs/server.crt",
        ssl_key="/etc/ssl/private/server.key",
        max_connections=5000,
        request_timeout=60.0,
        access_log=True,
        debug=False
    )
    
    # Graceful shutdown handling
    def signal_handler():
        print("Shutting down server...")
        
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    
    async with server:
        print(f"Production server running on https://0.0.0.0:443 with {server.config['workers']} workers")
        await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(run_production_server())

Client with Connection Pooling

from polyflux.http import HttpClient

# Efficient client for high-throughput applications
async def high_throughput_client():
    async with HttpClient(
        base_url="https://api.example.com",
        timeout=10.0,
        max_retries=3,
        headers={"User-Agent": "HighThroughputApp/1.0"}
    ) as client:
        
        # Process many requests efficiently
        tasks = []
        for i in range(100):
            task = client.exchange(
                target=f"/items/{i}",
                method="GET"
            )
            tasks.append(task)
        
        # Execute all requests concurrently
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, response in enumerate(responses):
            if isinstance(response, Exception):
                print(f"Request {i} failed: {response}")
            else:
                print(f"Request {i} succeeded: {response.get('status_code')}")

Testing

The HTTP package includes comprehensive test coverage:
# Run HTTP package tests
uv run pytest packages/http/tests/

# Run with coverage
uv run pytest packages/http/tests/ --cov=polyflux.http
Test categories:
  • Unit Tests - Individual component testing (120 tests)
  • Integration Tests - End-to-end client/server scenarios
  • Reliability Tests - Circuit breaker and error handling
  • Type Tests - TypedDict schema validation
  • External Service Tests - Real HTTP service integration (with resilient fallbacks)

Next Steps