Full-featured HTTP client and server implementation
uv add polyflux-http
from polyflux.http import HttpClient
import asyncio
async def main():
async with HttpClient(base_url="https://api.example.com") as client:
# GET request
response = await client.exchange(
target="/users/123",
method="GET",
headers={"Accept": "application/json"}
)
# POST with JSON
result = await client.exchange(
target="/users",
method="POST",
content_type="application/json",
body={"name": "John", "email": "john@example.com"}
)
asyncio.run(main())
from polyflux.http import HttpServer, HttpInbound, HttpOutbound
import asyncio
async def main():
server = HttpServer(host="0.0.0.0", port=8080, workers=4)
@server.register_handler("/health", methods=["GET"])
async def health_check(request: HttpInbound) -> HttpOutbound:
return {
"target": "client",
"method": "GET",
"headers": {"X-HTTP-Status": "200"},
"body": "OK"
}
async with server:
print("Server running on http://localhost:8080")
await asyncio.Event().wait()
asyncio.run(main())
from polyflux.http import HttpClient, HttpClientConfigSchema
# Simple configuration
config: HttpClientConfigSchema = {
"base_url": "https://api.example.com",
"timeout": 30.0,
"max_retries": 3,
"headers": {"User-Agent": "MyApp/1.0"}
}
async with HttpClient(**config) as client:
# Client automatically uses base_url, timeout, etc.
response = await client.exchange(target="/endpoint", method="GET")
from polyflux.http import HttpClient
# API Key authentication
async with HttpClient(
base_url="https://api.example.com",
headers={"Authorization": "Bearer your-token-here"}
) as client:
response = await client.exchange(target="/protected", method="GET")
from polyflux.http import HttpClient
from polyflux.http.shared.exceptions import CircuitBreakerOpenError
async with HttpClient(base_url="https://unreliable-api.com") as client:
try:
response = await client.exchange(target="/endpoint", method="GET")
except CircuitBreakerOpenError as e:
print(f"Circuit breaker open: {e}")
# Fallback logic here
from polyflux.http import HttpServer
server = HttpServer(
host="0.0.0.0",
port=8080,
workers=4, # Multi-process workers
ssl_cert="/path/to/cert.pem", # HTTPS support
ssl_key="/path/to/key.pem",
max_connections=1000, # Connection limits
request_timeout=30.0, # Request timeouts
access_log=True # Access logging
)
@server.register_handler("/api/users/{user_id}", methods=["GET", "PUT", "DELETE"])
async def user_handler(request: HttpInbound, user_id: str) -> HttpOutbound:
method = request["headers"].get("X-HTTP-Method", "GET")
if method == "GET":
return {
"target": "client",
"method": "GET",
"headers": {
"X-HTTP-Status": "200",
"Content-Type": "application/json"
},
"body": f'{{"id": "{user_id}", "name": "User {user_id}"}}'
}
elif method == "PUT":
# Update user logic
body = request.get("data", b"").decode()
return {
"target": "client",
"method": "PUT",
"headers": {"X-HTTP-Status": "200"},
"body": f"Updated user {user_id}"
}
elif method == "DELETE":
# Delete user logic
return {
"target": "client",
"method": "DELETE",
"headers": {"X-HTTP-Status": "204"},
"body": ""
}
from collections.abc import Awaitable, Callable
from polyflux.core.server.interception import Interceptor
from polyflux.http import HttpInbound, HttpOutbound
from polyflux.http.shared.exceptions import HttpAuthenticationError
class AuthInterceptor(Interceptor[HttpInbound, HttpOutbound]):
async def intercept_inbound(
self, request: HttpInbound, next_interceptor: Callable[[HttpInbound], Awaitable[HttpInbound]]
) -> HttpInbound:
auth_header = request.get("headers", {}).get("Authorization")
if not auth_header or not self.validate_token(auth_header):
raise HttpAuthenticationError("Invalid authentication")
return await next_interceptor(request)
async def intercept_outbound(
self, response: HttpOutbound, next_interceptor: Callable[[HttpOutbound], Awaitable[HttpOutbound]]
) -> HttpOutbound:
# Optional: modify outbound responses
return await next_interceptor(response)
def validate_token(self, token: str) -> bool:
# Your token validation logic
return token == "Bearer valid-token"
# Add interceptor to server
server.add_interceptor(AuthInterceptor())
from typing import NotRequired
from polyflux.http.client.configurable import HttpClientConfigSchema
config: HttpClientConfigSchema = {
"base_url": "https://api.example.com", # Required
"timeout": 30.0, # Optional: request timeout
"max_retries": 3, # Optional: retry attempts
"headers": {"User-Agent": "MyApp/1.0"}, # Optional: default headers
"follow_redirects": True, # Optional: redirect handling
}
from polyflux.http.server.configurable import HttpServerConfigSchema
config: HttpServerConfigSchema = {
"host": "0.0.0.0", # Optional: bind address
"port": 8080, # Optional: port number
"workers": 4, # Optional: worker processes
"ssl_cert": "/path/to/cert.pem", # Optional: SSL certificate
"ssl_key": "/path/to/key.pem", # Optional: SSL private key
"max_connections": 1000, # Optional: connection limit
"request_timeout": 30.0, # Optional: request timeout
"access_log": True, # Optional: access logging
"debug": False, # Optional: debug mode
}
from polyflux.http import HttpInbound, HttpOutbound
# Inbound (responses from server)
inbound: HttpInbound = {
"status_code": 200,
"data": b'{"message": "Hello, World!"}',
"headers": {"Content-Type": "application/json"} # Optional
}
# Outbound (requests from client)
outbound: HttpOutbound = {
"target": "https://api.example.com/users", # Required (inherited from Outbound)
"method": "POST", # Required
"headers": {"Authorization": "Bearer token"}, # Optional
"content_type": "application/json", # Optional
"body": {"name": "John", "email": "john@example.com"} # Optional
}
from polyflux.http.shared.exceptions import (
HttpError, # Base HTTP exception
HttpConnectionError, # Connection issues
HttpTimeoutError, # Request timeouts
HttpValidationError, # Request validation
HttpAuthenticationError, # Authentication failures
HttpAuthorizationError, # Authorization failures
CircuitBreakerOpenError, # Circuit breaker open
HttpStatusError, # HTTP status errors
)
try:
response = await client.exchange(target="/endpoint", method="GET")
except HttpTimeoutError as e:
print(f"Request timed out: {e}")
except HttpConnectionError as e:
print(f"Connection failed: {e}")
except HttpAuthenticationError as e:
print(f"Authentication failed: {e}")
except HttpError as e:
print(f"HTTP error: {e}")
from polyflux.http import HttpClient
# Circuit breaker automatically protects against failing services
async with HttpClient(base_url="https://flaky-service.com") as client:
# After 5 failures, circuit opens for 60 seconds
# Automatic retry with exponential backoff
response = await client.exchange(target="/endpoint", method="GET")
import asyncio
import signal
from polyflux.http import HttpServer
async def run_production_server():
server = HttpServer(
host="0.0.0.0",
port=443,
workers=8, # Scale based on CPU cores
ssl_cert="/etc/ssl/certs/server.crt",
ssl_key="/etc/ssl/private/server.key",
max_connections=5000,
request_timeout=60.0,
access_log=True,
debug=False
)
# Graceful shutdown handling
def signal_handler():
print("Shutting down server...")
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
async with server:
print(f"Production server running on https://0.0.0.0:443 with {server.config['workers']} workers")
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(run_production_server())
from polyflux.http import HttpClient
# Efficient client for high-throughput applications
async def high_throughput_client():
async with HttpClient(
base_url="https://api.example.com",
timeout=10.0,
max_retries=3,
headers={"User-Agent": "HighThroughputApp/1.0"}
) as client:
# Process many requests efficiently
tasks = []
for i in range(100):
task = client.exchange(
target=f"/items/{i}",
method="GET"
)
tasks.append(task)
# Execute all requests concurrently
responses = await asyncio.gather(*tasks, return_exceptions=True)
for i, response in enumerate(responses):
if isinstance(response, Exception):
print(f"Request {i} failed: {response}")
else:
print(f"Request {i} succeeded: {response.get('status_code')}")
# Run HTTP package tests
uv run pytest packages/http/tests/
# Run with coverage
uv run pytest packages/http/tests/ --cov=polyflux.http