The peaq_ros2_stream node: subscribe to ROS 2 topics, sign and encrypt machine data on the robot, chunk it, store it, anchor it on peaq, and serve it to buyers.
The Stream Edge Agent is the on-machine half of Stream. It is a ROS 2 node (stream_agent_node, package peaq_ros2_stream) that subscribes to the topics you allow, applies field rules, then signs, encrypts, and each message, stores the ciphertext, posts a signed to the Stream backend, anchors the payload hash on peaq, and serves purchased chunks to buyers. Keys never leave the machine in the clear, and the agent publishes no ROS topics.It runs alongside the peaqos_node runtime: the runtime provides the machine’s identity and the events/submit service the agent calls to anchor data on-chain. For the trust model behind the chunks, see Data streams.
Per inbound message: capture → field transform → sign → encrypt → chunk → store → post manifest → anchor on-chain. When an order is paid, the agent re-wraps that buyer’s chunk keys and serves the encrypted chunks from a local delivery server.
The agent lives in peaq_ros2_stream and depends on peaq_ros2_interfaces (for the PeaqosSubmitEvent service). ROS 2 Humble (Docker image) or Jazzy (native host) are both supported.
peaq_ros2_stream currently ships on the feature/peaqos-ros2-runtime branch and merges to the default branch shortly, so the clone below checks that branch out. Once it lands on the default branch, the git checkout step is no longer needed.
git clone https://github.com/peaqnetwork/peaq-robotics-ros2.gitcd peaq-robotics-ros2git checkout feature/peaqos-ros2-runtime # Stream package lives here until it merges to the default branchsource /opt/ros/jazzy/setup.bash# Use /opt/ros/humble/setup.bash inside the Docker image.colcon build --packages-select peaq_ros2_interfaces peaq_ros2_streamsource install/setup.bash
Start it with the launch file (which declares the config_yaml argument):
ros2 run peaq_ros2_stream stream_agent_node --ros-args \ -p config.yaml_path:=/path/to/stream_config.yaml
Python dependencies (installed by rosdep or pip): PyNaCl, PyYAML, boto3, google-api-python-client, google-auth, requests. The agent is inert until stream_agent.enabled is true.
Configuration resolves in this order, last wins: built-in defaults → the config_yaml file → PEAQOS_STREAM_* environment variables → ROS parameters → an optional standalone policy_path file.
The agent reads one YAML block, stream_agent (the alias stream also works; keys accept snake_case or camelCase). This is the Data Event Map: what to read, how to protect each field, where ciphertext goes, and who can decrypt.
stream_config.yaml
stream_agent: enabled: true # inert unless true # Identity + backend (all four required when enabled) machine_id: "mach_01HXYZ" agent_id: "agent_ros2_stream" agent_token: "stk_live_xxx" # per-agent secret — prefer env/policy file identity_ref: "did:peaq:0xMACHINE" # written into every manifest signature api_base_url: "https://stream.peaq.network" # default http://127.0.0.1:8000 api_key: "" # optional; sent as Bearer # Topic subscriptions + per-field rules topics: - topic: "/battery_state" message_type: "sensor_msgs/msg/BatteryState" qos_preset: "sensor_data" # default | sensor_data | reliable field_rules: # ordered - { path: "voltage", action: "include" } - { path: "serial_number", action: "hash" } - { path: "location", action: "anonymize" } - { path: "raw_cells", action: "exclude" } - { path: "operator_note", action: "encrypt", public_key_hex: "aa11…(32 bytes)" } - topic: "/cmd_vel" message_type: "geometry_msgs/msg/Twist" qos_preset: "reliable" # Who can unwrap each chunk's key (≥1 required; buyers are added later, on purchase) key_recipients: - { recipient_id: "owner-1", recipient_type: "owner", public_key_hex: "11…(32 bytes)" } - { recipient_id: "operator-1", recipient_type: "operator", public_key_hex: "22…(32 bytes)" } - { recipient_id: "mach_01", recipient_type: "machine", public_key_hex: "33…(32 bytes)" } # Where encrypted chunks go (a local copy is always written first) storage: backend: "walrus" # local | walrus | s3 | google-drive walrus: publisher_url: "https://walrus-publisher.example" aggregator_url: "https://walrus-aggregator.example" epochs: 5 permanent: true # Anchor each payload hash on peaq via the peaqos_node runtime peaqos_event: enabled: true node_name: "peaqos_node" # → /peaqos_node/events/submit machine_id: 42 # on-chain machine id (uint64) trust_level: 0 # Local HTTP server that serves purchased chunks to buyers delivery: enabled: true host: "127.0.0.1" port: 8765 token: "deliver_xxx" # required when delivery.enabled
Validation is strict: when enabled, machine_id / agent_id / agent_token / identity_ref and at least one topic and one key recipient are required; recipient_type is machine / owner / operator; each public_key_hex is a 32-byte (64-hex) x25519 key; storage.backend outside the known set falls back to local; delivery.token is required if delivery is on.Local state defaults under ~/.peaq_robot/ (signing key, sequence counters, chunk .bin files, manifests, the SQLite catalog and key store) and is configurable per path. An offline buffer (SQLite) holds envelopes when the backend is unreachable and drains on a retry timer.
Rules run before signing, so protected fields never leave the machine in the clear while the package stays verifiable. Rules are ordered; if any include rule is present the agent starts from an allow-list, otherwise from the full message.
Action
Effect
include
Keep the field as-is (and, if used, allow-list it).
exclude
Drop the field entirely.
hash
Replace the value with sha256:<hex>.
anonymize
Replace the value with { "redacted": true }.
encrypt
Seal the value to the rule’s public_key_hex (x25519 sealed box); only that key holder can read it.
Capture. The subscription fires; the ROS message is converted to a canonical ordered dict.
Transform. Field rules apply (above).
Sequence. A monotonic per-(machine, topic, policy version) number is assigned.
Sign. The agent builds a peaqos-stream-envelope@v1 carrying the payloadHash and signs it with the machine’s Ed25519 key (signature.keyId, algorithm: "ed25519").
Encrypt. The transformed payload is encrypted with a fresh per-chunk XChaCha20-Poly1305 key (32-byte key, 24-byte nonce), yielding plaintextHash, encryptedDataHash, and a keyCommitment.
Chunk ID. A deterministic sha256: id is computed over { schemaVersion, previousChunkId, index, plaintextHash, encryptedDataHash }, forming a hash .
Store. The chosen adapter writes the ciphertext and returns a storageRef.
Manifest. A peaq.stream.chunks.v1 manifest wraps the chunk key to every key_recipient (x25519 sealed box) and is Ed25519-signed over encryptedDataHash, then POSTed to /api/v1/stream/chunks. The envelope is POSTed to /api/v1/stream/events, returning a receipt.
Anchor. If peaqos_event.enabled, the agent calls /peaqos_node/events/submit (PeaqosSubmitEvent) with the payload hash as raw_data_hex and { streamReceiptId, policyId, policyVersion } as metadata_hex, then patches the receipt with the returned txHash.
On startup the agent syncs its policy to the backend and registers its Ed25519 public key (keyId = {agent_id}-stream-ed25519).
Anchoring is optional and runs through the peaqos_node runtime, so that node must be up with events/submit available. The agent submits only the payload hash plus non-secret metadata — never raw data or keys — producing a tamper-evident on-chain record that links each Stream receipt to a peaq .
The agent polls the backend for orders. When an order is paid, it looks up each purchased chunk’s symmetric key from the local key store, re-wraps it to the buyer’s public key (x25519 sealed box, peaq.stream.buyer-access.v1), and submits the . The chunk data is never re-encrypted.If delivery.enabled, a token-gated local HTTP server serves the encrypted chunks:
Every config field has a PEAQOS_STREAM_* override (applied above the YAML file, below ROS params) — use them to keep secrets out of committed YAML. The essentials:
Storage credentials also accept the standard vendor variables: Walrus (WALRUS_PUBLISHER_URL, …), S3 (AWS_ACCESS_KEY_ID, AWS_REGION, AWS_ENDPOINT_URL_S3, …), and Google Drive (GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_DRIVE_FOLDER_ID).
The Ed25519 signing key is generated on first run, written to signing_key_path (0600), and never sent or logged — only the public key is registered and embedded in manifests.
Per-chunk symmetric keys live only in the local SQLite key store and travel only as sealed-box-wrapped blobs (to recipients, then to the buyer). Plaintext keys never appear in ROS messages, manifests, the backend payload, or logs.
No secrets cross ROS. The agent publishes no topics; its only ROS egress is the events/submit service call, which carries a hash and non-secret metadata.
The delivery server silences request logging and gates every route except /health behind delivery.token. Keep agent_token, api_key, delivery.token, and storage credentials in environment variables, not committed YAML.