Protocol Data Units (PDUs)
Simple containers for structured communication in Polyflux.
Outbound PDU
Container for outgoing messages.
from typing import Annotated, Required, TypedDict
from polyflux.core.shared.types import doc, example
class Outbound(TypedDict):
"""Base outgoing message.
Container for data being sent.
Message types can be customized.
"""
target: Annotated[
Required[str],
doc(
"The destination identifier for the outbound data. "
"This field specifies where the data should be sent and serves as the primary "
"addressing mechanism for the exchange() method in the SupportsExchange protocol. "
"The format depends on the specific protocol implementation."
),
example("https://api.example.com/endpoint", "HTTP/HTTPS URL endpoint"),
example("sensors/temperature/room1", "MQTT topic for IoT messaging"),
example("postgresql://user:pass@localhost:5432/mydb", "PostgreSQL connection string"),
]
Required Fields
The destination identifier for the outbound data. This field specifies where the data should be sent and serves as the primary addressing mechanism for the exchange() method. The format depends on the specific protocol implementation.
Example
from polyflux.core.shared.contracts.protocol_data_unit import Outbound
# Create basic outbound PDU
outbound: Outbound = {
"target": "https://api.example.com/users"
}
# Message types can be customized
from typing import NotRequired
class HttpOutbound(Outbound):
method: str
headers: NotRequired[dict[str, str]]
body: NotRequired[bytes | str | dict[str, Any]]
# Create HTTP-specific outbound PDU
http_request: HttpOutbound = {
"target": "https://api.example.com/endpoint",
"method": "POST",
"body": {"key": "value"},
"headers": {"Authorization": "Bearer token"},
}
Inbound PDU
Container for incoming messages.
from typing import TypedDict
class Inbound(TypedDict):
"""Base incoming message.
Container for received data.
Message types can be customized.
"""
pass # Base class has no required fields
Example
from polyflux.core.shared.contracts.protocol_data_unit import Inbound
# Create basic inbound PDU (empty base)
inbound: Inbound = {}
# Message types can be customized
from typing import NotRequired
class HttpInbound(Inbound):
status_code: int
data: bytes
headers: NotRequired[dict[str, str]]
# Create HTTP-specific inbound PDU
http_response: HttpInbound = {
"status_code": 200,
"data": b"response body",
"headers": {"content-type": "text/plain"},
}
Type Guards
is_inbound
Check if object is incoming message.
from typing import Any, TypeGuard
from polyflux.core.shared.contracts.protocol_data_unit import is_inbound
def is_inbound(obj: Any) -> TypeGuard[Inbound]:
"""Check if object is incoming message.
Checks if object is a dictionary.
Args:
obj: The object to check for Inbound type.
Returns:
True if the object is a dict (potential Inbound), False otherwise.
"""
return isinstance(obj, dict)
Example
from polyflux.core.shared.contracts.protocol_data_unit import is_inbound
# Test with valid inbound data
inbound_data = {"status_code": 200, "data": b"response"}
if is_inbound(inbound_data):
print("This could be an inbound container")
else:
print("Not an inbound container")
# Output: This could be an inbound container
# Test with non-dict data
invalid_data = "not a dict"
if is_inbound(invalid_data):
print("This could be an inbound container")
else:
print("Not an inbound container")
# Output: Not an inbound container
is_outbound
Check if object is outgoing message.
from typing import Any, TypeGuard
from polyflux.core.shared.contracts.protocol_data_unit import is_outbound
def is_outbound(obj: Any) -> TypeGuard[Outbound]:
"""Check if object is outgoing message.
Checks if object is a dictionary.
Args:
obj: The object to check for Outbound type.
Returns:
True if the object is a dict (potential Outbound), False otherwise.
"""
return isinstance(obj, dict)
Example
from polyflux.core.shared.contracts.protocol_data_unit import is_outbound
# Test with valid outbound data
outbound_data = {"target": "https://api.example.com", "method": "POST"}
if is_outbound(outbound_data):
print("This could be an outbound container")
else:
print("Not an outbound container")
# Output: This could be an outbound container
# Test with non-dict data
invalid_data = ["not", "a", "dict"]
if is_outbound(invalid_data):
print("This could be an outbound container")
else:
print("Not an outbound container")
# Output: Not an outbound container
Working with PDUs
Creating PDUs
Messages are created as regular dictionaries:
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
# Create basic outbound PDU with required target field
outbound: Outbound = {
"target": "https://api.example.com/users"
}
# Create basic inbound PDU (no required fields)
inbound: Inbound = {}
# Create with additional data for protocol-specific use
extended_outbound: Outbound = {
"target": "/api/users/123",
# Additional fields would be added by protocol-specific extensions
}
Protocol-Specific PDUs
Protocols can add custom fields to messages:
from typing import NotRequired, Any
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound
# HTTP-specific PDUs
class HttpOutbound(Outbound):
method: str
headers: NotRequired[dict[str, str]]
content_type: NotRequired[str]
body: NotRequired[bytes | str | dict[str, Any]]
class HttpInbound(Inbound):
status_code: int
data: bytes
headers: NotRequired[dict[str, str]]
# Create HTTP-specific PDUs
http_request: HttpOutbound = {
"target": "https://api.example.com/users",
"method": "POST",
"content_type": "application/json",
"body": {"name": "John", "email": "[email protected]"},
"headers": {"Authorization": "Bearer token123"}
}
http_response: HttpInbound = {
"status_code": 201,
"data": b'{"id": 123, "name": "John", "email": "[email protected]"}',
"headers": {"content-type": "application/json"}
}
Custom Message Types
Add validation and documentation to fields:
from typing import Annotated, NotRequired
from polyflux.core.shared.types import doc, example, constraint
from polyflux.core.shared.contracts.protocol_data_unit import Outbound, Inbound
class ApiRequest(Outbound):
method: Annotated[
str,
doc("HTTP method for the request"),
example("POST", "Create a new resource"),
constraint("http_method", regex=r"^(GET|POST|PUT|DELETE|PATCH)$")
]
user_id: Annotated[
int,
doc("User ID for the request"),
example(12345),
constraint("user_id", gte=1)
]
data: NotRequired[dict[str, Any]]
class ApiResponse(Inbound):
success: bool
user_id: int
message: NotRequired[str]
# Create type-safe PDUs
request: ApiRequest = {
"target": "/api/users/12345",
"method": "POST",
"user_id": 12345,
"data": {"name": "Updated Name"}
}
response: ApiResponse = {
"success": True,
"user_id": 12345,
"message": "User updated successfully"
}
Runtime Validation
Combine type guards with protocol-specific validation:
from polyflux.core.shared.contracts.protocol_data_unit import is_inbound, is_outbound
def validate_http_outbound(obj: Any) -> bool:
"""Validate if object is a valid HTTP outbound PDU."""
if not is_outbound(obj):
return False
# Check required HTTP fields
return (
"method" in obj and
isinstance(obj["method"], str) and
"target" in obj and
isinstance(obj["target"], str)
)
def validate_http_inbound(obj: Any) -> bool:
"""Validate if object is a valid HTTP inbound PDU."""
if not is_inbound(obj):
return False
# Check required HTTP fields
return (
"status_code" in obj and
isinstance(obj["status_code"], int) and
"data" in obj and
isinstance(obj["data"], bytes)
)
# Usage
request_data = {
"target": "https://api.example.com",
"method": "GET"
}
if validate_http_outbound(request_data):
print("Valid HTTP outbound PDU")
else:
print("Invalid HTTP outbound PDU")
WebSocket Protocol Data Units
WebSocket-specific PDUs for real-time bidirectional communication.
WebSocketOutbound
WebSocket-specific outbound PDU for sending data to WebSocket servers.
from typing import NotRequired, Required, Literal
from polyflux.core.shared.contracts.protocol_data_unit import Outbound
class WebSocketOutbound(Outbound):
"""WebSocket-specific outbound PDU.
Used for sending WebSocket messages with protocol-specific metadata.
"""
message_type: Required[Literal["text", "binary", "continuation", "ping", "pong", "close"]]
data: Required[bytes | str]
headers: NotRequired[dict[str, str]] # For handshake
subprotocols: NotRequired[list[str]]
close_code: NotRequired[int] # 1000-1015 per RFC 6455
close_reason: NotRequired[str]
Required Fields
WebSocket URL (ws:// or wss://) to connect to
Type of WebSocket message: “text”, “binary”, “continuation”, “ping”, “pong”, or “close”
Message payload data. Use str for text messages, bytes for binary messages
Optional Fields
HTTP headers for WebSocket handshake (e.g., Authorization)
List of WebSocket subprotocols to negotiate
WebSocket close code (1000-1015) for close messages
Human-readable close reason for close messages
Example
from polyflux.websocket.shared.protocol_data_unit import WebSocketOutbound
# Text message
text_message: WebSocketOutbound = {
"target": "wss://echo.websocket.org",
"message_type": "text",
"data": "Hello WebSocket!",
"headers": {"Authorization": "Bearer token123"}
}
# Binary message
binary_message: WebSocketOutbound = {
"target": "ws://localhost:8080",
"message_type": "binary",
"data": b"\x01\x02\x03\x04",
"subprotocols": ["chat", "superchat"]
}
# Close message
close_message: WebSocketOutbound = {
"target": "wss://api.example.com/ws",
"message_type": "close",
"data": "",
"close_code": 1000,
"close_reason": "Normal closure"
}
WebSocketInbound
WebSocket-specific inbound PDU for data received from WebSocket servers.
from typing import NotRequired, Required, Literal
class WebSocketInbound(Inbound):
"""WebSocket-specific inbound PDU.
Represents WebSocket messages received from the server.
"""
message_type: Required[Literal["text", "binary", "continuation", "ping", "pong", "close"]]
data: Required[bytes | str]
close_code: NotRequired[int]
close_reason: NotRequired[str]
is_final: NotRequired[bool] # For fragmented messages
Required Fields
Type of WebSocket message received: “text”, “binary”, “continuation”, “ping”, “pong”, or “close”
Message payload data received from the server
Optional Fields
WebSocket close code for close messages
Human-readable close reason for close messages
Whether this is the final frame in a fragmented message sequence
Example
from polyflux.websocket.shared.protocol_data_unit import WebSocketInbound
# Text response
text_response: WebSocketInbound = {
"message_type": "text",
"data": "Echo: Hello WebSocket!",
"is_final": True
}
# Binary response
binary_response: WebSocketInbound = {
"message_type": "binary",
"data": b"\x05\x06\x07\x08",
"is_final": True
}
# Close response
close_response: WebSocketInbound = {
"message_type": "close",
"data": "",
"close_code": 1000,
"close_reason": "Server shutdown"
}
WebSocket PDU Usage Examples
Basic Client Communication
from polyflux.websocket import WebSocketClient
async def websocket_example():
async with WebSocketClient() as client:
await client.connect("wss://echo.websocket.org")
# Send using PDU kwargs
response = await client.exchange(
target="wss://echo.websocket.org",
message_type="text",
data="Hello World!"
)
print(f"Received: {response['data']}")
Server Broadcasting
from polyflux.websocket import WebSocketServer
# Server with broadcasting enabled
server = WebSocketServer(
broadcasting_enabled=True,
default_channels=["general", "notifications"]
)
@server.register_handler("ws_text")
async def handle_message(inbound: WebSocketInbound) -> WebSocketOutbound:
# Broadcast to all connected clients
return {
"target": "broadcast:all",
"message_type": "text",
"data": f"Broadcast: {inbound['data']}"
}
Type-Safe WebSocket PDUs
from typing import Annotated
from polyflux.core.shared.types import doc, example, constraint
class ChatMessage(WebSocketOutbound):
"""Type-safe chat message PDU."""
room_id: Annotated[
str,
doc("Chat room identifier"),
example("room123", "Chat room ID"),
constraint("room_id", regex=r"^room\d+$")
]
user_id: Annotated[
int,
doc("User identifier"),
example(12345),
constraint("user_id", gte=1)
]
# Create type-safe chat message
chat_msg: ChatMessage = {
"target": "ws://chat.example.com",
"message_type": "text",
"data": "Hello everyone!",
"room_id": "room123",
"user_id": 12345
}