Skip to main content

The RCAN Technical Specification: JSON Schemas and Protobufs (Part 5)

19 min read By Craig Merry
Robotics AI Protocol Technical RCAN Protobuf JSON Schema Series

The RCAN Technical Specification: JSON Schemas and Protobufs (Part 5)

In Part 4, I proposed RCAN—a federated addressing and authentication protocol for robotics. But a protocol proposal without a specification is just a blog post.

This is the technical appendix. Here you will find the formal schemas that prove RCAN is implementable today.

1. Robot URI (RURI) Specification

1.1 URI Format

rcan://<registry>/<manufacturer>/<model>/<device-id>[:<port>][/<capability>]

1.2 JSON Schema

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://rcan.dev/schemas/ruri.json",
  "title": "Robot Universal Resource Identifier (RURI)",
  "description": "A globally unique identifier for robotic agents in the RCAN protocol",
  "type": "object",
  "required": ["registry", "manufacturer", "model", "deviceId"],
  "properties": {
    "registry": {
      "type": "string",
      "description": "Root registry domain (e.g., continuon.cloud, local.rcan)",
      "pattern": "^[a-z0-9][a-z0-9.-]*[a-z0-9]$",
      "examples": ["continuon.cloud", "local.rcan", "robotics-co-op.org"]
    },
    "manufacturer": {
      "type": "string",
      "description": "Manufacturer namespace identifier",
      "pattern": "^[a-z0-9][a-z0-9-]*[a-z0-9]$",
      "minLength": 2,
      "maxLength": 64,
      "examples": ["continuon", "unitree", "boston-dynamics"]
    },
    "model": {
      "type": "string",
      "description": "Robot model identifier",
      "pattern": "^[a-z0-9][a-z0-9-]*[a-z0-9]$",
      "minLength": 1,
      "maxLength": 64,
      "examples": ["companion-v1", "go2", "spot-enterprise"]
    },
    "deviceId": {
      "type": "string",
      "description": "Unique device identifier (UUID or short-form)",
      "oneOf": [
        {
          "pattern": "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
          "description": "Full UUID format"
        },
        {
          "pattern": "^[0-9a-f]{8}$",
          "description": "Short-form (first 8 chars of UUID)"
        }
      ],
      "examples": ["d3a4b5c6-7890-1234-5678-abcdef012345", "d3a4b5c6"]
    },
    "port": {
      "type": "integer",
      "description": "Communication port (default: 8080)",
      "minimum": 1,
      "maximum": 65535,
      "default": 8080
    },
    "capability": {
      "type": "string",
      "description": "Specific capability endpoint",
      "pattern": "^/[a-z][a-z0-9/-]*$",
      "examples": ["/arm", "/vision", "/chat", "/teleop/camera/front"]
    }
  },
  "additionalProperties": false
}

1.3 RURI Parsing Examples

// Parse a RURI string into components
function parseRURI(ruri) {
  const regex = /^rcan:\/\/([^\/]+)\/([^\/]+)\/([^\/]+)\/([^:\/]+)(?::(\d+))?(\/.*)?$/;
  const match = ruri.match(regex);
  
  if (!match) throw new Error(`Invalid RURI: ${ruri}`);
  
  return {
    registry: match[1],
    manufacturer: match[2],
    model: match[3],
    deviceId: match[4],
    port: match[5] ? parseInt(match[5]) : 8080,
    capability: match[6] || null
  };
}

// Examples:
parseRURI("rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6");
// → { registry: "continuon.cloud", manufacturer: "continuon", 
//     model: "companion-v1", deviceId: "d3a4b5c6", port: 8080, capability: null }

parseRURI("rcan://local.rcan/unitree/go2/a1b2c3d4:9000/teleop");
// → { registry: "local.rcan", manufacturer: "unitree", 
//     model: "go2", deviceId: "a1b2c3d4", port: 9000, capability: "/teleop" }

2. Protocol Buffer Definitions

2.1 Core Message Types

syntax = "proto3";

package rcan.v1;

option java_package = "cloud.continuon.rcan.v1";
option go_package = "github.com/continuon/rcan/v1;rcanv1";

// ============================================================================
// RCAN Message - The universal envelope for all RCAN communication
// ============================================================================

message RCANMessage {
  string version = 1;           // Protocol version: "1.0.0"
  string message_id = 2;        // Unique message UUID
  string source_ruri = 3;       // Sender's RURI
  string target_ruri = 4;       // Recipient's RURI (or "broadcast")
  string auth_token = 5;        // JWT or session token
  MessageType type = 6;         // Message type enum
  bytes payload = 7;            // Type-specific payload (serialized submessage)
  int64 timestamp_ms = 8;       // Unix timestamp in milliseconds
  int32 ttl_ms = 9;             // Time-to-live for commands (0 = no expiry)
  Priority priority = 10;       // Message priority level
  
  enum MessageType {
    MESSAGE_TYPE_UNSPECIFIED = 0;
    DISCOVER = 1;               // Discovery broadcast/response
    STATUS = 2;                 // Status query/update
    COMMAND = 3;                // Control command
    STREAM = 4;                 // Streaming data (video, telemetry)
    EVENT = 5;                  // Async event notification
    HANDOFF = 6;                // Control transfer
    ACK = 7;                    // Acknowledgment
    ERROR = 8;                  // Error response
  }
  
  enum Priority {
    PRIORITY_UNSPECIFIED = 0;
    LOW = 1;                    // Background tasks
    NORMAL = 2;                 // Standard operations
    HIGH = 3;                   // Time-sensitive commands
    SAFETY = 4;                 // Safety-critical, always processed first
  }
}

// ============================================================================
// Discovery Messages
// ============================================================================

message DiscoverRequest {
  string query = 1;             // Optional filter (e.g., "model:companion-v1")
  int32 timeout_ms = 2;         // Discovery timeout (default: 5000)
  repeated string capabilities = 3;  // Required capabilities filter
}

message DiscoverResponse {
  string ruri = 1;              // Robot's full RURI
  string model = 2;             // Model identifier
  string friendly_name = 3;     // Human-readable name
  repeated string capabilities = 4;  // Available capabilities
  repeated string supported_roles = 5;  // Roles this robot accepts
  string protocol_version = 6;  // RCAN protocol version
  RobotStatus status = 7;       // Current status summary
}

// ============================================================================
// Authentication Messages
// ============================================================================

message AuthClaimRequest {
  string ruri = 1;              // Target robot RURI
  Role requested_role = 2;      // Desired role level
  string credential = 3;        // Auth credential (password, key, etc.)
  string client_id = 4;         // Client application identifier
  map<string, string> metadata = 5;  // Additional auth context
}

message AuthClaimResponse {
  bool granted = 1;             // Whether access was granted
  string session_token = 2;     // JWT session token (if granted)
  Role granted_role = 3;        // Actual role granted (may differ from requested)
  int64 expires_at = 4;         // Token expiration timestamp
  string error_message = 5;     // Error details (if denied)
  ErrorCode error_code = 6;     // Structured error code
  
  enum ErrorCode {
    ERROR_CODE_UNSPECIFIED = 0;
    INVALID_CREDENTIALS = 1;
    INSUFFICIENT_PRIVILEGES = 2;
    ROBOT_BUSY = 3;
    ROLE_UNAVAILABLE = 4;
    RATE_LIMITED = 5;
  }
}

message AuthReleaseRequest {
  string session_token = 1;     // Current session token
  bool force = 2;               // Force release even if commands pending
}

// ============================================================================
// Role Definitions
// ============================================================================

enum Role {
  ROLE_UNSPECIFIED = 0;
  GUEST = 1;                    // Limited interaction, read-only
  USER = 2;                     // Operational control within modes
  LEASEE = 3;                   // Time-bound full control
  OWNER = 4;                    // Configuration and user management
  CREATOR = 5;                  // Full hardware/software control
}

message RoleCapabilities {
  Role role = 1;
  repeated string allowed_capabilities = 2;
  repeated string denied_capabilities = 3;
  int32 rate_limit_per_minute = 4;
  int32 max_session_duration_seconds = 5;
}

// ============================================================================
// Command Messages
// ============================================================================

message CommandRequest {
  string command_id = 1;        // Unique command identifier
  string capability = 2;        // Target capability (e.g., "/arm", "/nav")
  string action = 3;            // Action name (e.g., "move", "grab", "speak")
  bytes parameters = 4;         // Action-specific parameters (JSON or protobuf)
  bool require_ack = 5;         // Whether to wait for acknowledgment
  int32 timeout_ms = 6;         // Command timeout
}

message CommandResponse {
  string command_id = 1;        // Echo of request command_id
  CommandStatus status = 2;     // Execution status
  bytes result = 3;             // Command result (if any)
  string error_message = 4;     // Error details (if failed)
  int64 execution_time_ms = 5;  // Time taken to execute
  
  enum CommandStatus {
    COMMAND_STATUS_UNSPECIFIED = 0;
    ACCEPTED = 1;               // Command accepted, executing
    COMPLETED = 2;              // Command completed successfully
    FAILED = 3;                 // Command failed
    REJECTED = 4;               // Command rejected (auth, safety, etc.)
    TIMEOUT = 5;                // Command timed out
    CANCELLED = 6;              // Command was cancelled
  }
}

// ============================================================================
// Status Messages
// ============================================================================

message RobotStatus {
  string ruri = 1;
  OperationalState state = 2;
  float battery_percent = 3;
  bool safety_ok = 4;
  string current_controller = 5;   // RURI of current controller (if any)
  Role current_controller_role = 6;
  repeated string active_capabilities = 7;
  map<string, string> diagnostics = 8;
  int64 uptime_seconds = 9;
  
  enum OperationalState {
    OPERATIONAL_STATE_UNSPECIFIED = 0;
    IDLE = 1;
    ACTIVE = 2;
    BUSY = 3;
    ERROR = 4;
    MAINTENANCE = 5;
    EMERGENCY_STOP = 6;
  }
}

// ============================================================================
// Handoff Messages
// ============================================================================

message HandoffRequest {
  string from_session = 1;      // Current controller's session token
  string to_user_id = 2;        // Target user identifier
  Role offered_role = 3;        // Role being offered
  int32 offer_timeout_seconds = 4;  // How long offer is valid
  string message = 5;           // Optional message to recipient
}

message HandoffResponse {
  bool accepted = 1;
  string new_session_token = 2;  // New controller's session (if accepted)
  string error_message = 3;
}

2.2 mDNS Service Record Schema

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://rcan.dev/schemas/mdns-txt-record.json",
  "title": "RCAN mDNS TXT Record",
  "description": "Service discovery TXT record fields for _rcan._tcp.local",
  "type": "object",
  "required": ["ruri", "model", "version"],
  "properties": {
    "ruri": {
      "type": "string",
      "description": "Full Robot URI",
      "pattern": "^rcan://.*"
    },
    "model": {
      "type": "string",
      "description": "Model identifier"
    },
    "caps": {
      "type": "string",
      "description": "Comma-separated capability list",
      "examples": ["arm,vision,chat,teleop"]
    },
    "roles": {
      "type": "string",
      "description": "Comma-separated accepted roles",
      "examples": ["owner,user,guest"]
    },
    "version": {
      "type": "string",
      "description": "RCAN protocol version",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    },
    "name": {
      "type": "string",
      "description": "Human-friendly robot name"
    },
    "status": {
      "type": "string",
      "enum": ["idle", "active", "busy", "error", "maintenance"],
      "description": "Current operational status"
    }
  }
}

3. Authentication Token (JWT) Schema

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://rcan.dev/schemas/auth-token.json",
  "title": "RCAN Authentication Token",
  "description": "JWT payload schema for RCAN session tokens",
  "type": "object",
  "required": ["sub", "iss", "aud", "role", "exp", "iat"],
  "properties": {
    "sub": {
      "type": "string",
      "description": "Subject - User UUID",
      "format": "uuid"
    },
    "iss": {
      "type": "string",
      "description": "Issuer - Registry domain",
      "examples": ["continuon.cloud", "local.rcan"]
    },
    "aud": {
      "type": "string",
      "description": "Audience - Target RURI pattern (supports wildcards)",
      "examples": [
        "rcan://continuon.cloud/continuon/companion-v1/*",
        "rcan://local.rcan/*/*/d3a4b5c6"
      ]
    },
    "role": {
      "type": "string",
      "enum": ["guest", "user", "leasee", "owner", "creator"],
      "description": "Granted role level"
    },
    "scope": {
      "type": "array",
      "items": { "type": "string" },
      "description": "Granted capability scopes",
      "examples": [["control", "config", "training"]]
    },
    "fleet": {
      "type": "array",
      "items": { "type": "string" },
      "description": "Fleet robot IDs this token grants access to"
    },
    "exp": {
      "type": "integer",
      "description": "Expiration time (Unix timestamp)"
    },
    "iat": {
      "type": "integer",
      "description": "Issued at time (Unix timestamp)"
    },
    "jti": {
      "type": "string",
      "description": "JWT ID - unique token identifier",
      "format": "uuid"
    }
  }
}

4. Complete Handshake Flow

Here is the full discovery-to-control flow, message by message:

┌─────────────┐                    ┌─────────────┐                    ┌─────────────┐
│  App/Client │                    │    Robot    │                    │   Registry  │
└──────┬──────┘                    └──────┬──────┘                    └──────┬──────┘
       │                                  │                                  │
       │  1. UDP DISCOVER (broadcast)     │                                  │
       │─────────────────────────────────►│                                  │
       │                                  │                                  │
       │  2. DISCOVER_RESPONSE            │                                  │
       │◄─────────────────────────────────│                                  │
       │  { ruri, model, caps, roles }    │                                  │
       │                                  │                                  │
       │  3. AUTH_CLAIM_REQUEST           │                                  │
       │─────────────────────────────────►│                                  │
       │  { ruri, role: "user", cred }    │                                  │
       │                                  │                                  │
       │                                  │  4. VALIDATE_TOKEN               │
       │                                  │─────────────────────────────────►│
       │                                  │  { user_id, credential }         │
       │                                  │                                  │
       │                                  │  5. TOKEN_RESPONSE               │
       │                                  │◄─────────────────────────────────│
       │                                  │  { valid, jwt, role }            │
       │                                  │                                  │
       │  6. AUTH_CLAIM_RESPONSE          │                                  │
       │◄─────────────────────────────────│                                  │
       │  { granted: true, jwt, role }    │                                  │
       │                                  │                                  │
       │══════════════════════════════════│                                  │
       │       SECURE SESSION ACTIVE      │                                  │
       │══════════════════════════════════│                                  │
       │                                  │                                  │
       │  7. COMMAND (with JWT)           │                                  │
       │─────────────────────────────────►│                                  │
       │  { action: "move", params }      │                                  │
       │                                  │                                  │
       │  8. COMMAND_RESPONSE             │                                  │
       │◄─────────────────────────────────│                                  │
       │  { status: COMPLETED }           │                                  │
       │                                  │                                  │
       │  9. AUTH_RELEASE                 │                                  │
       │─────────────────────────────────►│                                  │
       │                                  │                                  │
       │  10. ACK                         │                                  │
       │◄─────────────────────────────────│                                  │
       │                                  │                                  │

4.1 Offline Mode (Registry Unreachable)

┌─────────────┐                    ┌─────────────┐
│  App/Client │                    │    Robot    │
└──────┬──────┘                    └──────┬──────┘
       │                                  │
       │  1. AUTH_CLAIM_REQUEST           │
       │─────────────────────────────────►│
       │                                  │
       │  [Robot checks local auth cache] │
       │  [Cached token still valid]      │
       │                                  │
       │  2. AUTH_CLAIM_RESPONSE          │
       │◄─────────────────────────────────│
       │  { granted: true, mode: OFFLINE }│
       │                                  │
       │══════════════════════════════════│
       │   OFFLINE SESSION (logged for    │
       │   later sync when online)        │
       │══════════════════════════════════│

5. Reference Implementations

5.1 Python (ContinuonBrain)

Full RURI parsing and RCAN client implementation:

"""
RCAN Protocol Reference Implementation (Python)
continuonbrain/rcan/ruri.py
"""
import re
import json
import asyncio
import websockets
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any
from enum import IntEnum
import jwt
from datetime import datetime, timedelta

# =============================================================================
# RURI (Robot Universal Resource Identifier)
# =============================================================================

RURI_PATTERN = re.compile(
    r'^rcan://([a-z0-9][a-z0-9.-]*[a-z0-9])/'   # registry
    r'([a-z0-9][a-z0-9-]*[a-z0-9])/'             # manufacturer
    r'([a-z0-9][a-z0-9-]*[a-z0-9])/'             # model
    r'([0-9a-f]{8}(?:-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?)'  # device_id
    r'(?::(\d{1,5}))?'                           # port (optional)
    r'(/[a-z][a-z0-9/-]*)?$'                     # capability (optional)
)

@dataclass
class RURI:
    """Robot Universal Resource Identifier"""
    registry: str
    manufacturer: str
    model: str
    device_id: str
    port: int = 8080
    capability: Optional[str] = None
    
    @classmethod
    def parse(cls, ruri_string: str) -> 'RURI':
        """Parse a RURI string into components."""
        match = RURI_PATTERN.match(ruri_string)
        if not match:
            raise ValueError(f"Invalid RURI: {ruri_string}")
        
        return cls(
            registry=match.group(1),
            manufacturer=match.group(2),
            model=match.group(3),
            device_id=match.group(4),
            port=int(match.group(5)) if match.group(5) else 8080,
            capability=match.group(6)
        )
    
    @classmethod
    def is_valid(cls, ruri_string: str) -> bool:
        """Check if a RURI string is valid without raising."""
        return RURI_PATTERN.match(ruri_string) is not None
    
    def __str__(self) -> str:
        """Convert back to RURI string format."""
        s = f"rcan://{self.registry}/{self.manufacturer}/{self.model}/{self.device_id}"
        if self.port != 8080:
            s += f":{self.port}"
        if self.capability:
            s += self.capability
        return s
    
    def with_capability(self, capability: str) -> 'RURI':
        """Return a new RURI with a different capability."""
        return RURI(
            registry=self.registry,
            manufacturer=self.manufacturer,
            model=self.model,
            device_id=self.device_id,
            port=self.port,
            capability=capability if capability.startswith('/') else f'/{capability}'
        )

# =============================================================================
# Role Definitions
# =============================================================================

class Role(IntEnum):
    """RCAN Role Hierarchy (higher = more privileges)"""
    GUEST = 1     # Limited interaction, read-only
    USER = 2      # Operational control within modes
    LEASEE = 3    # Time-bound full control
    OWNER = 4     # Configuration and user management
    CREATOR = 5   # Full hardware/software control
    
    def can_access(self, required_role: 'Role') -> bool:
        """Check if this role has sufficient privileges."""
        return self.value >= required_role.value

class MessageType(IntEnum):
    """RCAN Message Types"""
    DISCOVER = 1
    STATUS = 2
    COMMAND = 3
    STREAM = 4
    EVENT = 5
    HANDOFF = 6
    ACK = 7
    ERROR = 8

class Priority(IntEnum):
    """Message Priority Levels"""
    LOW = 1
    NORMAL = 2
    HIGH = 3
    SAFETY = 4  # Always processed first

# =============================================================================
# RCAN Messages
# =============================================================================

@dataclass
class RCANMessage:
    """Universal RCAN message envelope"""
    version: str
    message_id: str
    source_ruri: str
    target_ruri: str
    type: MessageType
    payload: Dict[str, Any]
    timestamp_ms: int
    auth_token: Optional[str] = None
    ttl_ms: int = 0
    priority: Priority = Priority.NORMAL
    
    def to_json(self) -> str:
        """Serialize to JSON for transmission."""
        data = asdict(self)
        data['type'] = self.type.value
        data['priority'] = self.priority.value
        return json.dumps(data)
    
    @classmethod
    def from_json(cls, json_str: str) -> 'RCANMessage':
        """Deserialize from JSON."""
        data = json.loads(json_str)
        data['type'] = MessageType(data['type'])
        data['priority'] = Priority(data['priority'])
        return cls(**data)

# =============================================================================
# RCAN Client
# =============================================================================

class RCANClient:
    """RCAN Protocol Client for connecting to robots."""
    
    def __init__(self, client_id: str):
        self.client_id = client_id
        self._session_token: Optional[str] = None
        self._ws: Optional[websockets.WebSocketClientProtocol] = None
        self._current_ruri: Optional[RURI] = None
    
    async def connect(self, ruri: RURI) -> None:
        """Establish WebSocket connection to robot."""
        url = f"ws://{ruri.registry}:{ruri.port}/rcan/v1/stream"
        self._ws = await websockets.connect(url)
        self._current_ruri = ruri
    
    async def claim(self, role: Role, credential: str) -> str:
        """Claim control of the connected robot."""
        if not self._ws or not self._current_ruri:
            raise RuntimeError("Not connected")
        
        request = RCANMessage(
            version="1.0.0",
            message_id=self._generate_id(),
            source_ruri=f"rcan://client/{self.client_id}",
            target_ruri=str(self._current_ruri),
            type=MessageType.COMMAND,
            payload={
                "action": "auth_claim",
                "requested_role": role.value,
                "credential": credential,
                "client_id": self.client_id
            },
            timestamp_ms=int(datetime.now().timestamp() * 1000)
        )
        
        await self._ws.send(request.to_json())
        response = RCANMessage.from_json(await self._ws.recv())
        
        if response.payload.get("granted"):
            self._session_token = response.payload["session_token"]
            return self._session_token
        else:
            raise PermissionError(response.payload.get("error_message", "Claim denied"))
    
    async def command(self, capability: str, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """Send a command to the robot."""
        if not self._session_token:
            raise RuntimeError("Not authenticated - call claim() first")
        
        request = RCANMessage(
            version="1.0.0",
            message_id=self._generate_id(),
            source_ruri=f"rcan://client/{self.client_id}",
            target_ruri=str(self._current_ruri.with_capability(capability)),
            type=MessageType.COMMAND,
            payload={"action": action, "params": params},
            timestamp_ms=int(datetime.now().timestamp() * 1000),
            auth_token=self._session_token
        )
        
        await self._ws.send(request.to_json())
        response = RCANMessage.from_json(await self._ws.recv())
        return response.payload
    
    async def release(self) -> None:
        """Release control of the robot."""
        if self._ws and self._session_token:
            request = RCANMessage(
                version="1.0.0",
                message_id=self._generate_id(),
                source_ruri=f"rcan://client/{self.client_id}",
                target_ruri=str(self._current_ruri),
                type=MessageType.COMMAND,
                payload={"action": "auth_release"},
                timestamp_ms=int(datetime.now().timestamp() * 1000),
                auth_token=self._session_token
            )
            await self._ws.send(request.to_json())
            self._session_token = None
        
        if self._ws:
            await self._ws.close()
            self._ws = None
    
    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())

# =============================================================================
# Usage Example
# =============================================================================

async def example_usage():
    """Example: Connect to a robot and send a command."""
    client = RCANClient(client_id="my-app-001")
    
    # Parse the robot's RURI
    ruri = RURI.parse("rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6")
    print(f"Connecting to: {ruri}")
    
    # Connect and authenticate
    await client.connect(ruri)
    token = await client.claim(Role.USER, credential="my-secret-key")
    print(f"Authenticated with token: {token[:20]}...")
    
    # Send a command
    result = await client.command(
        capability="/arm",
        action="move",
        params={"x": 0.5, "y": 0.2, "z": 0.1}
    )
    print(f"Command result: {result}")
    
    # Clean up
    await client.release()

if __name__ == "__main__":
    asyncio.run(example_usage())

5.2 TypeScript (Node.js)

import { v4 as uuidv4 } from 'uuid';

// RURI Parser
interface RURI {
  registry: string;
  manufacturer: string;
  model: string;
  deviceId: string;
  port: number;
  capability: string | null;
}

const RURI_REGEX = /^rcan:\/\/([a-z0-9][a-z0-9.-]*[a-z0-9])\/([a-z0-9][a-z0-9-]*[a-z0-9])\/([a-z0-9][a-z0-9-]*[a-z0-9])\/([0-9a-f]{8}(?:-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?)(?::(\d{1,5}))?(\/[a-z][a-z0-9\/-]*)?$/;

function parseRURI(ruri: string): RURI {
  const match = ruri.match(RURI_REGEX);
  if (!match) throw new Error(`Invalid RURI: ${ruri}`);
  
  return {
    registry: match[1],
    manufacturer: match[2],
    model: match[3],
    deviceId: match[4],
    port: match[5] ? parseInt(match[5]) : 8080,
    capability: match[6] || null,
  };
}

function formatRURI(ruri: RURI): string {
  let s = `rcan://${ruri.registry}/${ruri.manufacturer}/${ruri.model}/${ruri.deviceId}`;
  if (ruri.port !== 8080) s += `:${ruri.port}`;
  if (ruri.capability) s += ruri.capability;
  return s;
}

// RCAN Client
enum Role { GUEST = 1, USER = 2, LEASEE = 3, OWNER = 4, CREATOR = 5 }
enum MessageType { DISCOVER = 1, STATUS = 2, COMMAND = 3, STREAM = 4, EVENT = 5, HANDOFF = 6, ACK = 7, ERROR = 8 }

class RCANClient {
  private ws: WebSocket | null = null;
  private sessionToken: string | null = null;
  private clientId: string;

  constructor(clientId: string) {
    this.clientId = clientId;
  }

  async connect(ruri: RURI): Promise<void> {
    const url = `ws://${ruri.registry}:${ruri.port}/rcan/v1/stream`;
    this.ws = new WebSocket(url);
    await new Promise((resolve, reject) => {
      this.ws!.onopen = resolve;
      this.ws!.onerror = reject;
    });
  }

  async claim(ruri: string, role: Role, credential: string): Promise<string> {
    if (!this.ws) throw new Error('Not connected');
    
    const request = {
      version: '1.0.0',
      message_id: uuidv4(),
      source_ruri: `rcan://client/${this.clientId}`,
      target_ruri: ruri,
      type: MessageType.COMMAND,
      payload: { action: 'auth_claim', requested_role: role, credential, client_id: this.clientId },
      timestamp_ms: Date.now(),
    };
    
    this.ws.send(JSON.stringify(request));
    
    const response = await this.waitForResponse();
    if (response.payload.granted) {
      this.sessionToken = response.payload.session_token;
      return this.sessionToken;
    }
    throw new Error(response.payload.error_message || 'Claim denied');
  }

  async command(capability: string, action: string, params: object): Promise<any> {
    if (!this.sessionToken) throw new Error('Not authenticated');
    // Implementation details...
  }

  private waitForResponse(): Promise<any> {
    return new Promise((resolve) => {
      this.ws!.onmessage = (event) => resolve(JSON.parse(event.data));
    });
  }
}

5.3 Dart (Flutter/ContinuonAI)

import 'dart:convert';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:uuid/uuid.dart';

/// RCAN Role Hierarchy
enum Role { guest, user, leasee, owner, creator }

/// Robot Universal Resource Identifier
class RURI {
  final String registry;
  final String manufacturer;
  final String model;
  final String deviceId;
  final int port;
  final String? capability;

  RURI({
    required this.registry,
    required this.manufacturer,
    required this.model,
    required this.deviceId,
    this.port = 8080,
    this.capability,
  });

  static final _pattern = RegExp(
    r'^rcan://([a-z0-9][a-z0-9.-]*[a-z0-9])/([a-z0-9][a-z0-9-]*[a-z0-9])/([a-z0-9][a-z0-9-]*[a-z0-9])/([0-9a-f]{8}(?:-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})?)(?::(\d{1,5}))?(/[a-z][a-z0-9/-]*)?$'
  );

  factory RURI.parse(String ruri) {
    final match = _pattern.firstMatch(ruri);
    if (match == null) throw FormatException('Invalid RURI: $ruri');
    
    return RURI(
      registry: match.group(1)!,
      manufacturer: match.group(2)!,
      model: match.group(3)!,
      deviceId: match.group(4)!,
      port: match.group(5) != null ? int.parse(match.group(5)!) : 8080,
      capability: match.group(6),
    );
  }

  static bool isValid(String ruri) => _pattern.hasMatch(ruri);

  @override
  String toString() {
    var s = 'rcan://$registry/$manufacturer/$model/$deviceId';
    if (port != 8080) s += ':$port';
    if (capability != null) s += capability;
    return s;
  }

  RURI withCapability(String cap) => RURI(
    registry: registry, manufacturer: manufacturer, model: model,
    deviceId: deviceId, port: port,
    capability: cap.startsWith('/') ? cap : '/$cap',
  );
}

/// RCAN Protocol Client
class RCANClient {
  final String clientId;
  WebSocketChannel? _channel;
  String? _sessionToken;
  RURI? _currentRuri;
  final _uuid = Uuid();

  RCANClient({required this.clientId});

  Future<void> connect(RURI ruri) async {
    final uri = Uri.parse('ws://${ruri.registry}:${ruri.port}/rcan/v1/stream');
    _channel = WebSocketChannel.connect(uri);
    _currentRuri = ruri;
    await _channel!.ready;
  }

  Future<String> claim(Role role, String credential) async {
    if (_channel == null) throw StateError('Not connected');
    
    final request = {
      'version': '1.0.0',
      'message_id': _uuid.v4(),
      'source_ruri': 'rcan://client/$clientId',
      'target_ruri': _currentRuri.toString(),
      'type': 3, // COMMAND
      'payload': {
        'action': 'auth_claim',
        'requested_role': role.index + 1,
        'credential': credential,
        'client_id': clientId,
      },
      'timestamp_ms': DateTime.now().millisecondsSinceEpoch,
    };
    
    _channel!.sink.add(jsonEncode(request));
    final response = jsonDecode(await _channel!.stream.first as String);
    
    if (response['payload']['granted'] == true) {
      _sessionToken = response['payload']['session_token'];
      return _sessionToken!;
    }
    throw Exception(response['payload']['error_message'] ?? 'Claim denied');
  }

  Future<Map<String, dynamic>> command(String capability, String action, Map<String, dynamic> params) async {
    if (_sessionToken == null) throw StateError('Not authenticated');
    // Implementation...
    return {};
  }

  Future<void> release() async {
    _sessionToken = null;
    await _channel?.sink.close();
    _channel = null;
  }
}

6. Conformance Test Suite

A conforming RCAN implementation MUST pass all tests in this section.

6.1 RURI Validation Tests

Test IDInputExpectedDescription
RURI-001rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6VALIDStandard RURI
RURI-002rcan://local.rcan/unitree/go2/a1b2c3d4:9000/teleopVALIDWith port and capability
RURI-003rcan://my-server.lan/acme/bot-x1/12345678-1234-1234-1234-123456789abcVALIDFull UUID
RURI-004https://example.com/robotINVALIDWrong scheme
RURI-005rcan://UPPERCASE/test/test/12345678INVALIDUppercase not allowed
RURI-006rcan://a/b/c/1234567INVALIDDevice ID too short
RURI-007rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6/ArmINVALIDCapability must be lowercase
RURI-008rcan://INVALIDEmpty components

6.2 Role Hierarchy Tests

Test IDRoleRequiredExpectedDescription
ROLE-001OWNERGUESTALLOWHigher role can access lower
ROLE-002USEROWNERDENYLower role cannot access higher
ROLE-003LEASEELEASEEALLOWSame role can access
ROLE-004CREATOROWNERALLOWCreator can access all
ROLE-005GUESTUSERDENYGuest has minimal access

6.3 Authentication Tests

Test IDScenarioExpectedDescription
AUTH-001Valid credential, available roleGRANTEDNormal auth flow
AUTH-002Invalid credentialDENIED (code: INVALID_CREDENTIALS)Bad password
AUTH-003Valid credential, robot busyDENIED (code: ROBOT_BUSY)Another controller active
AUTH-004Request CREATOR, max role OWNERGRANTED as OWNERRole downgrade
AUTH-005Expired JWTDENIED (code: TOKEN_EXPIRED)Token validation
AUTH-006Offline mode, cached token validGRANTED (mode: OFFLINE)Resilience test

6.4 Safety Invariant Tests

Test IDScenarioExpectedDescription
SAFE-001SAFETY priority messageResponse < 100msPriority processing
SAFE-002Network partition during commandSafe-stop triggeredGraceful degradation
SAFE-003Remote command bypasses local limitREJECTEDLocal safety wins
SAFE-004Any command executionAudit log entry createdLogging required

6.5 Python Test Runner

"""RCAN Conformance Test Suite"""
import pytest
from rcan import RURI, Role

class TestRURIValidation:
    """RURI-xxx tests"""
    
    @pytest.mark.parametrize("ruri,expected", [
        ("rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6", True),
        ("rcan://local.rcan/unitree/go2/a1b2c3d4:9000/teleop", True),
        ("rcan://my-server.lan/acme/bot-x1/12345678-1234-1234-1234-123456789abc", True),
        ("https://example.com/robot", False),
        ("rcan://UPPERCASE/test/test/12345678", False),
        ("rcan://a/b/c/1234567", False),
        ("rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6/Arm", False),
        ("rcan://", False),
    ])
    def test_ruri_validation(self, ruri: str, expected: bool):
        assert RURI.is_valid(ruri) == expected

class TestRoleHierarchy:
    """ROLE-xxx tests"""
    
    @pytest.mark.parametrize("role,required,expected", [
        (Role.OWNER, Role.GUEST, True),
        (Role.USER, Role.OWNER, False),
        (Role.LEASEE, Role.LEASEE, True),
        (Role.CREATOR, Role.OWNER, True),
        (Role.GUEST, Role.USER, False),
    ])
    def test_role_access(self, role: Role, required: Role, expected: bool):
        assert role.can_access(required) == expected

class TestRURIParsing:
    """RURI parsing round-trip tests"""
    
    def test_parse_and_format(self):
        original = "rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6"
        ruri = RURI.parse(original)
        assert str(ruri) == original
        assert ruri.registry == "continuon.cloud"
        assert ruri.manufacturer == "continuon"
        assert ruri.model == "companion-v1"
        assert ruri.device_id == "d3a4b5c6"
        assert ruri.port == 8080
        assert ruri.capability is None
    
    def test_parse_with_port_and_capability(self):
        ruri = RURI.parse("rcan://local.rcan/unitree/go2/a1b2c3d4:9000/teleop")
        assert ruri.port == 9000
        assert ruri.capability == "/teleop"
    
    def test_with_capability(self):
        ruri = RURI.parse("rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6")
        arm_ruri = ruri.with_capability("arm")
        assert str(arm_ruri) == "rcan://continuon.cloud/continuon/companion-v1/d3a4b5c6/arm"

if __name__ == "__main__":
    pytest.main([__file__, "-v"])

Run the test suite:

pip install pytest
python -m pytest test_rcan_conformance.py -v

This is Part 5 of an ongoing series on values, governance, and architecture in embodied AI. Read Part 1: When Robots Carry Values, Part 2: We Need a Constitution for the Robot Age, Part 3: One Brain, Many Shells, and Part 4: We Need an ICANN for Robotics.