Skip to main content
The stream module ships in @peaqos/peaq-os-sdk and peaq-os-sdk 0.3.0+. Its exports are flagged @experimental and may change as the surface settles.
The stream module is the cryptographic core of Stream: it signs the data a machine produces, chunks and encrypts it, and re-wraps chunk keys so a buyer can decrypt only what they bought. You hand it a payload and it returns signed, verifiable, encrypted artifacts — topic subscription and transport are handled by the on-machine Edge Agent, which drives this same module from a Data Event Map.

Module location

SDKImport
JavaScript / TypeScript@peaqos/peaq-os-sdksrc/stream/
Pythonpeaq_os_sdkpeaq_os_sdk.stream

Functions

FunctionPurpose
buildSignedDataPackageApply field rules to a topic payload, bundle it with the machine’s identity (DID, timestamp, schema version, sequence number), and sign it EIP-191. Async.
verifyDataPackageRecover the signer and check it against the public key derived from the DID. No network needed. Async.
buildChunkChainSplit data into bounded chunks, encrypt each with a fresh per-chunk key wrapped to the owner, operator, and machine, and link each chunk to the previous one. One caller-supplied Ed25519 key signs the whole chain; only the encryption key is per-chunk.
verifyChunkChainConfirm the chain is intact — no gaps, reordering, or edits.
createBuyerAccessEntryUnwrap a chunk’s key with the owner’s private key and re-wrap it to a buyer’s public key. This is the access-grant primitive.
buildBuyerAccessFilesRe-wrap a chunk chain’s keys for a buyer and write distributable access files (sharded by size). Takes the chunks, not pre-built entries.
decryptChunkDecrypt a chunk with a recipient’s private key and wrapped-key entry.
computeChunkIdRecompute a chunk’s content-derived ID for integrity checks.
Python exposes the same surface in snake_case (build_signed_data_package, verify_data_package, build_chunk_chain, verify_chunk_chain, create_buyer_access_entry, build_buyer_access_files, decrypt_chunk, compute_chunk_id).

Types

type FieldRuleAction = "include" | "exclude" | "encrypt" | "anonymize";
type AnonymizeStrategy = "hash" | "generalize" | "redact";
type KeyStorageBackend = "tee" | "software";

type UnsignedDataPayload = {
  topic: string;                       // ROS 2 topic or feed name
  fields: Record<string, unknown>;
  capturedAt: string;                  // ISO 8601
};

type TopicFieldRules = {
  topic: string;
  rules: FieldRule[];                  // ordered; first matching fieldPath wins
  defaultAction?: FieldRuleAction;
};

type FieldRule = {
  fieldPath: string;                   // dot-notation, e.g. "gps.latitude"
  action: FieldRuleAction;
  anonymizeStrategy?: AnonymizeStrategy;
};

type DataPackage = {
  did: string;                 // did:peaq:0x...
  timestamp: string;           // ISO 8601
  schemaVersion: string;
  sequenceNumber: number;
  payload: Record<string, unknown>;
  signature: string;           // EIP-191 personal_sign
  signingKeyId: string;
};

type VerificationResult = {
  valid: boolean;
  signerDid: string;
  reason?: string;
};
Signing credentials ride in a SigningContext ({ privateKey, backend: "tee" | "software", did, keyId }toJSON redacts the key so it never leaks through serialization) and field-level encryption in an EncryptionContext ({ encryptionKey, algorithm: "AES-256-GCM" }). The field layer uses AES-256-GCM; the per-chunk layer (inside buildChunkChain) uses XChaCha20-Poly1305. Chunking defaults: chunkSize 262144 bytes (256 KiB), hashAlgorithm "sha-256", via ChunkingConfig. The full envelope shape is documented under Data streams → The chunk envelope.

Example

Sign a reading, verify it, chunk-and-encrypt, then grant a buyer access:
import {
  buildSignedDataPackage,
  verifyDataPackage,
  buildChunkChain,
  createBuyerAccessEntry,
  buildBuyerAccessFiles,
  SigningContext,
  EncryptionContext,
} from "@peaqos/peaq-os-sdk";

const pkg = await buildSignedDataPackage({
  unsignedPayload: {
    topic: "/vehicle/telemetry",
    fields: { speed: 12, gps: { latitude: 47.1, longitude: 8.5 } },
    capturedAt: new Date().toISOString(),
  },
  topicRules: {
    topic: "/vehicle/telemetry",
    rules: [
      { fieldPath: "gps.latitude", action: "encrypt" },
      { fieldPath: "gps.longitude", action: "encrypt" },
    ],
    defaultAction: "include",
  },
  signingCtx: new SigningContext({
    privateKey: process.env.MACHINE_DATA_KEY!,
    backend: "software",
    did: "did:peaq:0xMACHINE",
    keyId: "did:peaq:0xMACHINE#keys-1",
  }),
  encryptionCtx: new EncryptionContext({
    encryptionKey: process.env.FIELD_KEY!,
    algorithm: "AES-256-GCM",
  }),
  schemaVersion: "1",
  sequenceNumber: 1,
});

const result = await verifyDataPackage(pkg);
if (!result.valid) throw new Error(result.reason);

// Chunk + encrypt; keys wrapped to owner, operator, and machine
const { chain, encryptedData } = await buildChunkChain({
  input: new TextEncoder().encode(JSON.stringify(pkg)),
  ownerPublicKeyHex: OWNER_X25519_PUB,
  operatorPublicKeyHex: OPERATOR_X25519_PUB,
  machinePublicKeyHex: MACHINE_X25519_PUB,
  signingPrivateKeyHex: ED25519_PRIV,
  signingPublicKeyHex: ED25519_PUB,
  machineDid: "did:peaq:0xMACHINE",
  machineKeyId: "did:peaq:0xMACHINE#keys-1",
  did: "did:peaq:0xMACHINE",
  // config: { chunkSize: 262144 }
});

// Grant a buyer: re-wrap the purchased chunk keys to the buyer's public key
const accessFiles = buildBuyerAccessFiles({
  chunks: chain.chunks,
  ownerPrivateKeyHex: OWNER_X25519_PRIV,
  buyerRecipientId: "did:peaq:0xBUYER",
  buyerPublicKeyHex: BUYER_X25519_PUB,
});

// createBuyerAccessEntry is the per-chunk primitive behind it
const oneEntry = createBuyerAccessEntry({
  chunk: chain.chunks[0],
  ownerPrivateKeyHex: OWNER_X25519_PRIV,
  buyerRecipientId: "did:peaq:0xBUYER",
  buyerPublicKeyHex: BUYER_X25519_PUB,
});
buildChunkChain returns the chain plus encryptedData, a map of ciphertext bytes keyed by chunk index. The ciphertext is a pre-upload sidecar — callers store the bytes wherever they distribute from and set each chunk’s storageRef. The buyer decrypts with decryptChunk({ chunk, recipientPrivateKeyHex, recipientEntry, encryptedData }) — in Python, decrypt_chunk(chunk=…, recipient_private_key_hex=…, recipient_entry=…, encrypted_data=…) — passing the chunk’s stored ciphertext bytes plus a KeyRecipient from chunk.encryption.keyRecipients or from a buyer access entry. The data itself is never re-encrypted when access is granted. From the terminal, the same publish and grant flows are peaqos stream publish and peaqos stream grant.

Errors

ErrorWhen
StreamValidationErrorInvalid input — bad DID, missing field, unknown rule, wrong key length, key-commitment mismatch.
StreamSigningErrorSigning or verification failed. Messages never contain key material.
StreamErrorBase class for the above.

Solana signing

OWS mnemonic-derived wallets carry a account (ed25519, derivation path m/44'/501'/0'/0') for cross-chain payments. This is wallet signing — separate from Stream data signing, which uses EIP-191. Today, Solana-quoted Machine Market orders are paid externally — complete the transfer with your Solana wallet and pass --payment-tx-hash to peaqos scale order. Automatic in-vault SPL signing and a standalone peaqos solana command group (address, sign-tx, sign-message) are planned for the Stream release line; the key is decrypted inside the OWS vault, used to sign, and wiped — it never leaves the vault.