Protocol Data Units (PDUs)

Simple containers for structured communication in Polyflux.

Outbound PDU

Container for outgoing messages.
from typing import Annotated, Required, TypedDict
from polyflux.core.shared.types import doc, example

class Outbound(TypedDict):
    """Base outgoing message.
    
    Container for data being sent.
    Message types can be customized.
    """
    
    target: Annotated[
        Required[str],
        doc(
            "The destination identifier for the outbound data. "
            "This field specifies where the data should be sent and serves as the primary "
            "addressing mechanism for the exchange() method in the SupportsExchange protocol. "
            "The format depends on the specific protocol implementation."
        ),
        example("https://api.example.com/endpoint", "HTTP/HTTPS URL endpoint"),
        example("sensors/temperature/room1", "MQTT topic for IoT messaging"),
        example("postgresql://user:pass@localhost:5432/mydb", "PostgreSQL connection string"),
    ]

Required Fields

target
str
The destination identifier for the outbound data. This field specifies where the data should be sent and serves as the primary addressing mechanism for the exchange() method. The format depends on the specific protocol implementation.

Example

from polyflux.core.shared.contracts.protocol_data_unit import Outbound

# Create basic outbound PDU
outbound: Outbound = {
    "target": "https://api.example.com/users"
}

# Message types can be customized
from typing import NotRequired

class HttpOutbound(Outbound):
    method: str
    headers: NotRequired[dict[str, str]]
    body: NotRequired[bytes | str | dict[str, Any]]

# Create HTTP-specific outbound PDU
http_request: HttpOutbound = {
    "target": "https://api.example.com/endpoint",
    "method": "POST",
    "body": {"key": "value"},
    "headers": {"Authorization": "Bearer token"},
}

Inbound PDU

Container for incoming messages.
from typing import TypedDict

class Inbound(TypedDict):
    """Base incoming message.
    
    Container for received data.
    Message types can be customized.
    """
    
    pass  # Base class has no required fields

Example

from polyflux.core.shared.contracts.protocol_data_unit import Inbound

# Create basic inbound PDU (empty base)
inbound: Inbound = {}

# Message types can be customized
from typing import NotRequired

class HttpInbound(Inbound):
    status_code: int
    data: bytes
    headers: NotRequired[dict[str, str]]

# Create HTTP-specific inbound PDU
http_response: HttpInbound = {
    "status_code": 200,
    "data": b"response body",
    "headers": {"content-type": "text/plain"},
}

Type Guards

is_inbound

Check if object is incoming message.
from typing import Any, TypeGuard
from polyflux.core.shared.contracts.protocol_data_unit import is_inbound

def is_inbound(obj: Any) -> TypeGuard[Inbound]:
    """Check if object is incoming message.
    
    Checks if object is a dictionary.
    
    Args:
        obj: The object to check for Inbound type.
        
    Returns:
        True if the object is a dict (potential Inbound), False otherwise.
    """
    return isinstance(obj, dict)

Example

from polyflux.core.shared.contracts.protocol_data_unit import is_inbound

# Test with valid inbound data
inbound_data = {"status_code": 200, "data": b"response"}

if is_inbound(inbound_data):
    print("This could be an inbound container")
else:
    print("Not an inbound container")
# Output: This could be an inbound container

# Test with non-dict data
invalid_data = "not a dict"

if is_inbound(invalid_data):
    print("This could be an inbound container")
else:
    print("Not an inbound container")
# Output: Not an inbound container

is_outbound

Check if object is outgoing message.
from typing import Any, TypeGuard
from polyflux.core.shared.contracts.protocol_data_unit import is_outbound

def is_outbound(obj: Any) -> TypeGuard[Outbound]:
    """Check if object is outgoing message.
    
    Checks if object is a dictionary.
    
    Args:
        obj: The object to check for Outbound type.
        
    Returns:
        True if the object is a dict (potential Outbound), False otherwise.
    """
    return isinstance(obj, dict)

Example

from polyflux.core.shared.contracts.protocol_data_unit import is_outbound

# Test with valid outbound data
outbound_data = {"target": "https://api.example.com", "method": "POST"}

if is_outbound(outbound_data):
    print("This could be an outbound container")
else:
    print("Not an outbound container")
# Output: This could be an outbound container

# Test with non-dict data
invalid_data = ["not", "a", "dict"]

if is_outbound(invalid_data):
    print("This could be an outbound container")
else:
    print("Not an outbound container")
# Output: Not an outbound container

Working with PDUs

Creating PDUs

Messages are created as regular dictionaries:
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound

# Create basic outbound PDU with required target field
outbound: Outbound = {
    "target": "https://api.example.com/users"
}

# Create basic inbound PDU (no required fields)
inbound: Inbound = {}

# Create with additional data for protocol-specific use
extended_outbound: Outbound = {
    "target": "/api/users/123",
    # Additional fields would be added by protocol-specific extensions
}

Protocol-Specific PDUs

Protocols can add custom fields to messages:
from typing import NotRequired, Any
from polyflux.core.shared.contracts.protocol_data_unit import Inbound, Outbound

# HTTP-specific PDUs
class HttpOutbound(Outbound):
    method: str
    headers: NotRequired[dict[str, str]]
    content_type: NotRequired[str]
    body: NotRequired[bytes | str | dict[str, Any]]

class HttpInbound(Inbound):
    status_code: int
    data: bytes
    headers: NotRequired[dict[str, str]]

# Create HTTP-specific PDUs
http_request: HttpOutbound = {
    "target": "https://api.example.com/users",
    "method": "POST",
    "content_type": "application/json",
    "body": {"name": "John", "email": "john@example.com"},
    "headers": {"Authorization": "Bearer token123"}
}

http_response: HttpInbound = {
    "status_code": 201,
    "data": b'{"id": 123, "name": "John", "email": "john@example.com"}',
    "headers": {"content-type": "application/json"}
}

Custom Message Types

Add validation and documentation to fields:
from typing import Annotated, NotRequired
from polyflux.core.shared.types import doc, example, constraint
from polyflux.core.shared.contracts.protocol_data_unit import Outbound, Inbound

class ApiRequest(Outbound):
    method: Annotated[
        str,
        doc("HTTP method for the request"),
        example("POST", "Create a new resource"),
        constraint("http_method", regex=r"^(GET|POST|PUT|DELETE|PATCH)$")
    ]
    user_id: Annotated[
        int,
        doc("User ID for the request"),
        example(12345),
        constraint("user_id", gte=1)
    ]
    data: NotRequired[dict[str, Any]]

class ApiResponse(Inbound):
    success: bool
    user_id: int
    message: NotRequired[str]

# Create type-safe PDUs
request: ApiRequest = {
    "target": "/api/users/12345",
    "method": "POST",
    "user_id": 12345,
    "data": {"name": "Updated Name"}
}

response: ApiResponse = {
    "success": True,
    "user_id": 12345,
    "message": "User updated successfully"
}

Runtime Validation

Combine type guards with protocol-specific validation:
from polyflux.core.shared.contracts.protocol_data_unit import is_inbound, is_outbound

def validate_http_outbound(obj: Any) -> bool:
    """Validate if object is a valid HTTP outbound PDU."""
    if not is_outbound(obj):
        return False
    
    # Check required HTTP fields
    return (
        "method" in obj and
        isinstance(obj["method"], str) and
        "target" in obj and
        isinstance(obj["target"], str)
    )

def validate_http_inbound(obj: Any) -> bool:
    """Validate if object is a valid HTTP inbound PDU."""
    if not is_inbound(obj):
        return False
    
    # Check required HTTP fields
    return (
        "status_code" in obj and
        isinstance(obj["status_code"], int) and
        "data" in obj and
        isinstance(obj["data"], bytes)
    )

# Usage
request_data = {
    "target": "https://api.example.com",
    "method": "GET"
}

if validate_http_outbound(request_data):
    print("Valid HTTP outbound PDU")
else:
    print("Invalid HTTP outbound PDU")

WebSocket Protocol Data Units

WebSocket-specific PDUs for real-time bidirectional communication.

WebSocketOutbound

WebSocket-specific outbound PDU for sending data to WebSocket servers.
from typing import NotRequired, Required, Literal
from polyflux.core.shared.contracts.protocol_data_unit import Outbound

class WebSocketOutbound(Outbound):
    """WebSocket-specific outbound PDU.
    
    Used for sending WebSocket messages with protocol-specific metadata.
    """
    
    message_type: Required[Literal["text", "binary", "continuation", "ping", "pong", "close"]]
    data: Required[bytes | str]
    headers: NotRequired[dict[str, str]]  # For handshake
    subprotocols: NotRequired[list[str]]
    close_code: NotRequired[int]  # 1000-1015 per RFC 6455
    close_reason: NotRequired[str]

Required Fields

target
str
WebSocket URL (ws:// or wss://) to connect to
message_type
Literal
Type of WebSocket message: “text”, “binary”, “continuation”, “ping”, “pong”, or “close”
data
bytes | str
Message payload data. Use str for text messages, bytes for binary messages

Optional Fields

headers
dict[str, str]
HTTP headers for WebSocket handshake (e.g., Authorization)
subprotocols
list[str]
List of WebSocket subprotocols to negotiate
close_code
int
WebSocket close code (1000-1015) for close messages
close_reason
str
Human-readable close reason for close messages

Example

from polyflux.websocket.shared.protocol_data_unit import WebSocketOutbound

# Text message
text_message: WebSocketOutbound = {
    "target": "wss://echo.websocket.org",
    "message_type": "text",
    "data": "Hello WebSocket!",
    "headers": {"Authorization": "Bearer token123"}
}

# Binary message  
binary_message: WebSocketOutbound = {
    "target": "ws://localhost:8080",
    "message_type": "binary",
    "data": b"\x01\x02\x03\x04",
    "subprotocols": ["chat", "superchat"]
}

# Close message
close_message: WebSocketOutbound = {
    "target": "wss://api.example.com/ws",
    "message_type": "close",
    "data": "",
    "close_code": 1000,
    "close_reason": "Normal closure"
}

WebSocketInbound

WebSocket-specific inbound PDU for data received from WebSocket servers.
from typing import NotRequired, Required, Literal

class WebSocketInbound(Inbound):
    """WebSocket-specific inbound PDU.
    
    Represents WebSocket messages received from the server.
    """
    
    message_type: Required[Literal["text", "binary", "continuation", "ping", "pong", "close"]]
    data: Required[bytes | str]
    close_code: NotRequired[int]
    close_reason: NotRequired[str]
    is_final: NotRequired[bool]  # For fragmented messages

Required Fields

message_type
Literal
Type of WebSocket message received: “text”, “binary”, “continuation”, “ping”, “pong”, or “close”
data
bytes | str
Message payload data received from the server

Optional Fields

close_code
int
WebSocket close code for close messages
close_reason
str
Human-readable close reason for close messages
is_final
bool
Whether this is the final frame in a fragmented message sequence

Example

from polyflux.websocket.shared.protocol_data_unit import WebSocketInbound

# Text response
text_response: WebSocketInbound = {
    "message_type": "text",
    "data": "Echo: Hello WebSocket!",
    "is_final": True
}

# Binary response
binary_response: WebSocketInbound = {
    "message_type": "binary", 
    "data": b"\x05\x06\x07\x08",
    "is_final": True
}

# Close response
close_response: WebSocketInbound = {
    "message_type": "close",
    "data": "",
    "close_code": 1000,
    "close_reason": "Server shutdown"
}

WebSocket PDU Usage Examples

Basic Client Communication

from polyflux.websocket import WebSocketClient

async def websocket_example():
    async with WebSocketClient() as client:
        await client.connect("wss://echo.websocket.org")
        
        # Send using PDU kwargs
        response = await client.exchange(
            target="wss://echo.websocket.org",
            message_type="text",
            data="Hello World!"
        )
        
        print(f"Received: {response['data']}")

Server Broadcasting

from polyflux.websocket import WebSocketServer

# Server with broadcasting enabled
server = WebSocketServer(
    broadcasting_enabled=True,
    default_channels=["general", "notifications"]
)

@server.register_handler("ws_text")
async def handle_message(inbound: WebSocketInbound) -> WebSocketOutbound:
    # Broadcast to all connected clients
    return {
        "target": "broadcast:all",
        "message_type": "text", 
        "data": f"Broadcast: {inbound['data']}"
    }

Type-Safe WebSocket PDUs

from typing import Annotated
from polyflux.core.shared.types import doc, example, constraint

class ChatMessage(WebSocketOutbound):
    """Type-safe chat message PDU."""
    room_id: Annotated[
        str,
        doc("Chat room identifier"),
        example("room123", "Chat room ID"),
        constraint("room_id", regex=r"^room\d+$")
    ]
    user_id: Annotated[
        int,
        doc("User identifier"),
        example(12345),
        constraint("user_id", gte=1)
    ]

# Create type-safe chat message
chat_msg: ChatMessage = {
    "target": "ws://chat.example.com",
    "message_type": "text",
    "data": "Hello everyone!",
    "room_id": "room123",
    "user_id": 12345
}