Skip to main content
The Stream Edge Agent is the on-machine half of Stream. It is a ROS 2 node (stream_agent_node, package peaq_ros2_stream) that subscribes to the topics you allow, applies field rules, then signs, encrypts, and each message, stores the ciphertext, posts a signed to the Stream backend, anchors the payload hash on peaq, and serves purchased chunks to buyers. Keys never leave the machine in the clear, and the agent publishes no ROS topics. It runs alongside the peaqos_node runtime: the runtime provides the machine’s identity and the events/submit service the agent calls to anchor data on-chain. For the trust model behind the chunks, see Data streams.

How it works

Per inbound message: capture → field transform → sign → encrypt → chunk → store → post manifest → anchor on-chain. When an order is paid, the agent re-wraps that buyer’s chunk keys and serves the encrypted chunks from a local delivery server.

Install and run

The agent lives in peaq_ros2_stream and depends on peaq_ros2_interfaces (for the PeaqosSubmitEvent service). ROS 2 Humble (Docker image) or Jazzy (native host) are both supported.
peaq_ros2_stream currently ships on the feature/peaqos-ros2-runtime branch and merges to the default branch shortly, so the clone below checks that branch out. Once it lands on the default branch, the git checkout step is no longer needed.
git clone https://github.com/peaqnetwork/peaq-robotics-ros2.git
cd peaq-robotics-ros2
git checkout feature/peaqos-ros2-runtime   # Stream package lives here until it merges to the default branch

source /opt/ros/jazzy/setup.bash
# Use /opt/ros/humble/setup.bash inside the Docker image.

colcon build --packages-select peaq_ros2_interfaces peaq_ros2_stream
source install/setup.bash
Start it with the launch file (which declares the config_yaml argument):
ros2 launch peaq_ros2_stream stream_agent.launch.py \
  config_yaml:=/path/to/stream_config.yaml
Or run the node directly:
ros2 run peaq_ros2_stream stream_agent_node --ros-args \
  -p config.yaml_path:=/path/to/stream_config.yaml
Python dependencies (installed by rosdep or pip): PyNaCl, PyYAML, boto3, google-api-python-client, google-auth, requests. The agent is inert until stream_agent.enabled is true.
Configuration resolves in this order, last wins: built-in defaults → the config_yaml file → PEAQOS_STREAM_* environment variables → ROS parameters → an optional standalone policy_path file.

Configure: the Data Event Map

The agent reads one YAML block, stream_agent (the alias stream also works; keys accept snake_case or camelCase). This is the Data Event Map: what to read, how to protect each field, where ciphertext goes, and who can decrypt.
stream_config.yaml
stream_agent:
  enabled: true                                  # inert unless true

  # Identity + backend (all four required when enabled)
  machine_id: "mach_01HXYZ"
  agent_id: "agent_ros2_stream"
  agent_token: "stk_live_xxx"                     # per-agent secret — prefer env/policy file
  identity_ref: "did:peaq:0xMACHINE"              # written into every manifest signature
  api_base_url: "https://stream.peaq.network"     # default http://127.0.0.1:8000
  api_key: ""                                     # optional; sent as Bearer

  # Topic subscriptions + per-field rules
  topics:
    - topic: "/battery_state"
      message_type: "sensor_msgs/msg/BatteryState"
      qos_preset: "sensor_data"                   # default | sensor_data | reliable
      field_rules:                                # ordered
        - { path: "voltage",        action: "include" }
        - { path: "serial_number",  action: "hash" }
        - { path: "location",       action: "anonymize" }
        - { path: "raw_cells",      action: "exclude" }
        - { path: "operator_note",  action: "encrypt", public_key_hex: "aa11…(32 bytes)" }
    - topic: "/cmd_vel"
      message_type: "geometry_msgs/msg/Twist"
      qos_preset: "reliable"

  # Who can unwrap each chunk's key (≥1 required; buyers are added later, on purchase)
  key_recipients:
    - { recipient_id: "owner-1",    recipient_type: "owner",    public_key_hex: "11…(32 bytes)" }
    - { recipient_id: "operator-1", recipient_type: "operator", public_key_hex: "22…(32 bytes)" }
    - { recipient_id: "mach_01",    recipient_type: "machine",  public_key_hex: "33…(32 bytes)" }

  # Where encrypted chunks go (a local copy is always written first)
  storage:
    backend: "walrus"                             # local | walrus | s3 | google-drive
    walrus:
      publisher_url:  "https://walrus-publisher.example"
      aggregator_url: "https://walrus-aggregator.example"
      epochs: 5
      permanent: true

  # Anchor each payload hash on peaq via the peaqos_node runtime
  peaqos_event:
    enabled: true
    node_name: "peaqos_node"                      # → /peaqos_node/events/submit
    machine_id: 42                                # on-chain machine id (uint64)
    trust_level: 0

  # Local HTTP server that serves purchased chunks to buyers
  delivery:
    enabled: true
    host: "127.0.0.1"
    port: 8765
    token: "deliver_xxx"                          # required when delivery.enabled
Validation is strict: when enabled, machine_id / agent_id / agent_token / identity_ref and at least one topic and one key recipient are required; recipient_type is machine / owner / operator; each public_key_hex is a 32-byte (64-hex) x25519 key; storage.backend outside the known set falls back to local; delivery.token is required if delivery is on. Local state defaults under ~/.peaq_robot/ (signing key, sequence counters, chunk .bin files, manifests, the SQLite catalog and key store) and is configurable per path. An offline buffer (SQLite) holds envelopes when the backend is unreachable and drains on a retry timer.

Field rules

Rules run before signing, so protected fields never leave the machine in the clear while the package stays verifiable. Rules are ordered; if any include rule is present the agent starts from an allow-list, otherwise from the full message.
ActionEffect
includeKeep the field as-is (and, if used, allow-list it).
excludeDrop the field entirely.
hashReplace the value with sha256:<hex>.
anonymizeReplace the value with { "redacted": true }.
encryptSeal the value to the rule’s public_key_hex (x25519 sealed box); only that key holder can read it.

How a message becomes a sellable chunk

  1. Capture. The subscription fires; the ROS message is converted to a canonical ordered dict.
  2. Transform. Field rules apply (above).
  3. Sequence. A monotonic per-(machine, topic, policy version) number is assigned.
  4. Sign. The agent builds a peaqos-stream-envelope@v1 carrying the payloadHash and signs it with the machine’s Ed25519 key (signature.keyId, algorithm: "ed25519").
  5. Encrypt. The transformed payload is encrypted with a fresh per-chunk XChaCha20-Poly1305 key (32-byte key, 24-byte nonce), yielding plaintextHash, encryptedDataHash, and a keyCommitment.
  6. Chunk ID. A deterministic sha256: id is computed over { schemaVersion, previousChunkId, index, plaintextHash, encryptedDataHash }, forming a hash .
  7. Store. The chosen adapter writes the ciphertext and returns a storageRef.
  8. Manifest. A peaq.stream.chunks.v1 manifest wraps the chunk key to every key_recipient (x25519 sealed box) and is Ed25519-signed over encryptedDataHash, then POSTed to /api/v1/stream/chunks. The envelope is POSTed to /api/v1/stream/events, returning a receipt.
  9. Anchor. If peaqos_event.enabled, the agent calls /peaqos_node/events/submit (PeaqosSubmitEvent) with the payload hash as raw_data_hex and { streamReceiptId, policyId, policyVersion } as metadata_hex, then patches the receipt with the returned txHash.
On startup the agent syncs its policy to the backend and registers its Ed25519 public key (keyId = {agent_id}-stream-ed25519).

Storage adapters

Every adapter writes the local .bin first (so the delivery server always has a copy), then pushes to the remote.
BackendRequired configstorageRef
localchunk_storage_pathfile://<abs-path>
walruswalrus.publisher_url (+ aggregator_url for reads)walrus://<blobId>
s3s3.bucket (+ region / endpoint / creds)s3://<bucket>/<prefix>/<id>.bin
google-drivegoogle_drive.folder_id + credentials_pathgdrive://<fileId>

On-chain anchoring

Anchoring is optional and runs through the peaqos_node runtime, so that node must be up with events/submit available. The agent submits only the payload hash plus non-secret metadata — never raw data or keys — producing a tamper-evident on-chain record that links each Stream receipt to a peaq .

Selling and delivery

The agent polls the backend for orders. When an order is paid, it looks up each purchased chunk’s symmetric key from the local key store, re-wraps it to the buyer’s public key (x25519 sealed box, peaq.stream.buyer-access.v1), and submits the . The chunk data is never re-encrypted. If delivery.enabled, a token-gated local HTTP server serves the encrypted chunks:
RouteReturns
GET /health{ ok: true }
GET /chunksCatalog list (filter by topic, time, status)
GET /chunks/{id}/manifestThe peaq.stream.chunks.v1 manifest
GET /chunks/{id}/dataRaw encrypted bytes (supports HTTP Range)

Environment variables

Every config field has a PEAQOS_STREAM_* override (applied above the YAML file, below ROS params) — use them to keep secrets out of committed YAML. The essentials:
export PEAQOS_STREAM_ENABLED=true
export PEAQOS_STREAM_API_BASE_URL=https://stream.peaq.network
export PEAQOS_STREAM_MACHINE_ID=mach_01HXYZ
export PEAQOS_STREAM_AGENT_ID=agent_ros2_stream
export PEAQOS_STREAM_AGENT_TOKEN=stk_live_xxx
export PEAQOS_STREAM_IDENTITY_REF=did:peaq:0xMACHINE
export PEAQOS_STREAM_STORAGE_BACKEND=walrus
Storage credentials also accept the standard vendor variables: Walrus (WALRUS_PUBLISHER_URL, …), S3 (AWS_ACCESS_KEY_ID, AWS_REGION, AWS_ENDPOINT_URL_S3, …), and Google Drive (GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_DRIVE_FOLDER_ID).

Security

  • The Ed25519 signing key is generated on first run, written to signing_key_path (0600), and never sent or logged — only the public key is registered and embedded in manifests.
  • Per-chunk symmetric keys live only in the local SQLite key store and travel only as sealed-box-wrapped blobs (to recipients, then to the buyer). Plaintext keys never appear in ROS messages, manifests, the backend payload, or logs.
  • No secrets cross ROS. The agent publishes no topics; its only ROS egress is the events/submit service call, which carries a hash and non-secret metadata.
  • The delivery server silences request logging and gates every route except /health behind delivery.token. Keep agent_token, api_key, delivery.token, and storage credentials in environment variables, not committed YAML.