Get started in under 5 minutes
Install everything
uv add polyflux
Install the core package
uv add polyflux-core
Install the HTTP package
uv add polyflux-http
Install the WebSocket package
uv add polyflux-websocket
from typing import Any from polyflux.core import BaseExchange from polyflux.core.shared.contracts.protocol_data_unit import Inbound import asyncio class SimpleClient(BaseExchange[Inbound]): async def exchange(self, **kwargs: Any) -> Inbound: """Exchange using parameters for sending data.""" target = kwargs["target"] # Required field from Outbound method = kwargs.get("method", "GET") headers = kwargs.get("headers", {}) # Your protocol implementation here print(f"Sending {method} request to {target}") print(f"Headers: {headers}") # Return response data (dictionary) return { "data": b"Response from simple client", "status": "success" } async def send(self, **kwargs: Any) -> bool: """Send without expecting response.""" try: await self.exchange(**kwargs) return True except Exception: return False async def receive(self, source: str) -> Inbound: """Receive from specified source.""" print(f"Receiving from {source}") return { "data": b"Received data", "source": source } async def main(): client = SimpleClient() # Send data using parameters response = await client.exchange( target="/api/greeting", # Required target field method="GET", headers={"Accept": "application/json"} ) print(f"Exchange completed: {response}") # Test send operation success = await client.send( target="/api/log", method="POST", data={"event": "quickstart_test"} ) print(f"Send successful: {success}") if __name__ == "__main__": asyncio.run(main())
from typing import Any, NotRequired from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound # Base Outbound requires a 'target' field outbound_data: Outbound = { "target": "https://api.example.com/users" } # Base Inbound is empty by default inbound_data: Inbound = {} # Message types can be customized per protocol class ApiOutbound(Outbound): method: str headers: NotRequired[dict[str, str]] body: NotRequired[dict[str, Any]] class ApiInbound(Inbound): status_code: int data: bytes message: NotRequired[str] # Create message containers request: ApiOutbound = { "target": "https://api.example.com/users", "method": "POST", "body": {"name": "John", "email": "john@example.com"} } response: ApiInbound = { "status_code": 201, "data": b'{"id": 123, "name": "John"}', "message": "User created successfully" }
from polyflux.http import HttpClient, HttpServer, HttpInbound, HttpOutbound import asyncio # HTTP Client Example async def client_example(): async with HttpClient(base_url="https://jsonplaceholder.typicode.com") as client: # GET request response = await client.exchange( target="/posts/1", method="GET", headers={"Accept": "application/json"} ) print(f"Response: {response}") # POST request result = await client.exchange( target="/posts", method="POST", content_type="application/json", body={"title": "Hello", "body": "World", "userId": 1} ) print(f"Created: {result}") # HTTP Server Example async def server_example(): server = HttpServer(host="localhost", port=8080) @server.register_handler("/hello", methods=["GET"]) async def hello_handler(request: HttpInbound) -> HttpOutbound: return { "target": "client", "method": "GET", "headers": {"X-HTTP-Status": "200", "Content-Type": "text/plain"}, "body": "Hello from Polyflux HTTP Server!" } print("Starting server on http://localhost:8080") async with server: await asyncio.sleep(5) # Run for 5 seconds print("Server stopped") # Run examples asyncio.run(client_example()) # asyncio.run(server_example()) # Uncomment to run server
from polyflux.websocket import WebSocketClient, WebSocketServer, WebSocketInbound, WebSocketOutbound import asyncio # WebSocket Client Example async def websocket_client_example(): async with WebSocketClient() as client: await client.connect("wss://echo.websocket.org") # Send text message response = await client.exchange( target="wss://echo.websocket.org", message_type="text", data="Hello WebSocket!" ) print(f"Echo response: {response['data']}") # WebSocket Server Example async def websocket_server_example(): server = WebSocketServer(host="localhost", port=8080) @server.register_handler("ws_text") async def handle_message(request: WebSocketInbound) -> WebSocketOutbound: return { "target": "client", "message_type": "text", "data": f"Echo: {request['data']}" } print("WebSocket server running on ws://localhost:8080") async with server: await asyncio.sleep(5) # Run for 5 seconds print("WebSocket server stopped") # Run examples asyncio.run(websocket_client_example()) # asyncio.run(websocket_server_example()) # Uncomment to run server
Was this page helpful?