Share, Order, Execute

Share, Order, Execute

WORK IN PROGRESS: STILL ONE SECTION YET TO COMPLETE

Abstract

Here we overview a design for our envisioned Replicated Controller architecture, including Mempool, Consensus, Execution, and State Storage.
A Replicated Controller (RC) is an Anoma controller maintained by a collection of replicas, called Validators, each of which maintain a complete copy of the RC’s state.
Unlike a single-machine controller, an RC can use fault-tolerant consensus to tolerate some validator failures: surviving (correct) validators maintain consistent copies of the state.
All updates to the RC take the form of Transaction Candidates (TCs), which are computed “on” the RC itself, rather than on any one validator.
We allow our TCs to issue side effects, which can include sending messages over the network, or storing things outside the RC, so long as side effects do not affect RC state updates.
For the most part, we abstract over the details of the Anoma Controller and its state; instead we provide a general interface for maintaining a Replicated State Machine (RSM).

Based on this presentation.

Introduction

While any machine can implement an Anoma controller, single-machine controllers are only as trustworthy (or reliable) as the individual machines running them.
One solution is to run an Anoma controller inside a Replicated State Machine (RSM) maintained by a fault-tolerant collection of Validators.
We call this a Replicated Controller (RC).

Protocol Adapters

The Anoma Protocol Adapters serve to implement an Anoma controller within an existing replicated state machine (such as Ethereum’s EVM).
While very useful, adapters on existing chains don’t allow for creating custom controllers with their own failure assumptions (e.g. their own set of stakers).
We might imagine addressing this problem with some combination of something like the Cosmos SDK and an appropriate protocol adapter: one can create a new, custom Cosmos SDK chain, and then run an Anoma protocol adapter within it.

Room for Improvement

In general, these protocol adapters are comparatively expensive and slow to run: they require computing within another (metered) virtual machine, which may be expensive, and they offer no parallelism (or concurrency).
Even implementing a Controller directly as the “application” on top of CometBFT doesn’t deliver anywhere near the performance we know is possible.

We therefore set out to design a new, modern architecture for RCs.
Like CometBFT, we find it useful to distinguish between the state machine being replicated and the architecture maintaining the replicas.
We do, however, imagine a different API between the two (compared with CometBFT’s ABCI), intended to facilitate parallelism, enabling more throughput (TCs per second), and horizontal scaling (the RC can handle more state and more throughput if each validator has more resources, possibly spread across multiple machines).
We therefore design a general architecture for running an RSM, with the intention of instantiating an anoma-controller-specific RSM, the Anoma State Machine (ASM).

Vision for the Anoma State Machine

The Anoma State Machine (ASM) will be an RSM that implements a controller for the Anoma Resource Machine (ARM).
It maintains a state outlined in the Anoma State Architecture Report, including Commitments and Nullifiers.

A state machine can be described in terms of a type of state, and an update function that takes in a TC and an old state, and deterministically produces a new state.
The Anoma State Architecture Report outlines the ASM state, including Commitments and Nullifiers.

The update function generally proceeds as:

  1. Run the Post-Ordering Execution (POE) function encoded in the TC.
    This may read from ASM state, and outputs:

    1. A Side-Effect Function: this function may read from ASM state, send messages over the wire, and produce local logging information. It can even receive (pretty much arbitrary) additional information from the Validator, but it may not write to ASM state and returns nothing. (we’ll need some kind of limits in here to prevent doing expensive computation)
    2. An ARM Transaction
  2. Verify the ARM transaction.
    The transaction proofs may rely on facts about the current ASM state (e.g. X is an old commitment root from time Y), which may require further ASM reads.
    IFF the ARM transaction verifies:

    1. Update state with the commitments, nullfiers, and storage application blobs from the ARM transaction
    2. Execute the Side-effect function

The ASM may need to be equipped with special TCs for creating state roots at checkpoints, and periodically creating new commitment (and nullifier?) roots.

In general, ASM state is only useful for stuff that the update function needs to read for some future TC.
If someone wants to store stuff that isn’t going to be read in some future TC, then it’s almost certainly cheaper to send a message from the side-effect function to some storage service (possibly located on the validator itself).
This might include, for example, encrypted newly-committed resources that are supposed to be available for whoever owns them, but not directly read by future post-ordering execution.
Storing stuff in the ASM (via the blob storage application) is probably more expensive.

It may be helpful to keep the ASM in mind when considering the architecture here, but it’s mostly written as a general Replicated State Machine (RSM) implementation.

State Machine Requirements

We assume that the state machine we’re replicating has a state divided into key-value pairs.
This makes it easy for replicas to scale horizontally: adding more processes (or even machines) with storage allows them to store more state.

Specifying a State Machine means specifying a Key Structure, Starting State, and an Execution Function (whose domain implies a language of Transaction Candidates).

Key Structure

It is possible for these keys to have some structure beyond “arbitrary hashes.”
Such structures (such as tree structures or lexicographic ordering) allow for large subsets of keys to be succinctly referenced (e.g. as tree prefixes or ranges).
A proposed Key structure for the ASM is in the Anoma State Architecture Report.

In general, it is helpful for keys that are “nearby” in this structure (e.g. lexicographically close keys) to be stored on the same process: it makes queries easier to execute.

Execution Function and Transaction Candidates

Every update to the State Machine is the result of running a TC on the previous state.
Formally, this means there is some update function which, given a TC and a previous state, outputs the next state.

To help enable concurrency, we assume:

  • The update function can be reduced to an execution function, which is executable code that can issue reads and writes using key-value pairs.
    • the execution function is deterministic
    • the update function is equivalent to running the execution function (inputting reads from the previous state), and outputing a new state which is identical to the old state except for the execution function’s writes.
  • TCs are labeled with:
    • a set of keys (possibly specified using key structure) to which it definitely will write
    • a set of keys (possibly specified using key structure) to which it may write
    • a set of keys (possibly specified using key structure) which it definitely will read
    • a set of keys (possibly specified using key structure) which it may read

The key idea here is that replicas can actually run the execution function for multiple TCs concurrently, so long as their reads and writes (or the conservative estimates thereof in the label) don’t conflict.

Design Overview

Here we outline a dedicated architecture for maintaining a Replicated State Machine given a State Machine meeting the requirements above.
In principle, State Machine Replication is as simple as using Consensus to agree on a total order of TCs (a ledger, if you will), and then each replica, starting with the starting state, runs the TCs in order to get the current state.
As new TCs arrive, Consensus needs to work out the total order in which they are appended to this ledger, and replicas each maintain a prefix of the ledger, and run the TCs in order to get the current state as of the most recent known TC.

We, however, are hoping to build something that can scale more horizontally, as replicas acquire more storage and parallel processing power.
We would like to be able to process many more TCs per second than can be run sequentially, as modern databases do.
A key challenge for our overall design is that no one process’ workload scales linearly with throughput.
In fact, no one process touches (even references to) all the TCs.

Our architecture can be broken into 3 main sub-components: Mempool, Consensus, and Execution.
For the most part, we think of TCs flowing from each to the next.

A growing body of DAG-based BFT research, such as Narwhal & Tusk, indicates that distributing TCs during Consensus may be a major bottleneck.
Most DAG-based setups, including ours, distribute TCs separately, and then use Consensus to totally order references to batches of TCs (or blocks).
The key difficulty is in ensuring that, if Consensus appends a reference to the ledger, the TCs referenced are available to all replicas: if an unretrievable reference is appended to the ledger, the RSM halts, as replicas wait eternally to find out what the TCs are.

Our Consensus layer then appends these references to an ever-growing, totally ordered ledger, from which we can derive a total order of TCs.
Our consensus implementation will be a (fairly standard) Byzantized-Paxos based configuration, which we will adapt in future to hetrogeneous paxos for chimera chains, which are beyond the scope here.

Our Execution component receives the TC batches (or blocks) from the Mempool, and the reference ledger from Consensus, and uses this to continuously update the local replica of the RSM state.
The “no one process’ workload scales linearly with throughput” requirement is especially challenging for the execution engine.
Our solution is to break up the work into 2 types of processes:

  • Shards store portions of the RSM state. RSM key-space is partitioned across the shards, so they each store a portion of the keys, and any reads or writes know which shard to contact. The process of re-distributing keys across shards is called re-sharding, and in early prototypes, we probably won’t re-shard. Shards are in charge of ordering reads and writes to their portion of space, which means they must learn enough information to totally order relevant TCs from Mempool and Consensus.
  • Executors run the execution function: there is one executor per TC, and it contacts the relevant shards for any reads or writes. The mempool must be able to spin up (or draw from some pool) new executors for each TC.
    One executor on each replica ultimately runs all side-effects of each TC.

This image is a little outdated, but the general flow is right:

you can zoom in on this image to see more detail

Mempool

Ultimately, we’ll get to some fancy DAG stuff, for now, let’s do more or less what autobahn does: we’ll have a collection of Mempool workers who store TCs, and the references that Consensus orders will include an availability certificate for the most recent block from each worker.

Each validator maintains a pool of Mempool Worker processes (MWs).
Some MWs are user-facing (UFMWs): they receive TCs from users, order them into a sequence of blocks, and distribute them to replicas.
Other MWs are replicas (RMWs): they receive TCs from a UFMW, maintaining a prefix of their assigned UFMW.
This prefix can become a tree if the assigned UFMW is byzantine.

User-Facing Mempool Workers (UFMWs)

Some RSM replicas have peocesses called user-facing mempool workers (UFMWs).
From a workload distribution standpoint, it’s probably best if RSM replicas have approximately equal numbers of UFMWs.
Early prototypes should use 1 UFMW each.

UFMWs receive TCs from clients (solvers, users, etc.) and order them into batches, called blocks.
They might produce a new block after a certain amount of time has passed, or perhaps after a certain number of TCs are received.
Blocks should not be empty (if there are no TCs available, don’t make a block).
Additionally, blocks contain the hash of the UFMW’s previous block (first block uses NULL), and a height number (first block should be 1, other blocks have height = 1 + height of previous block).
In this manner, blocks from a UFMW naturally form a tree, rooted at NULL, and blocks from an honest UFMW form a list.

Replica Mempool Workers (RMWs)

Each RSM replica maintains an RMW for each UFMW on every other RSM replica.
UFMWs distribute their blocks to their RMWs as quickly as possible.
This can even include streaming TCs as they assemble blocks, before the block is complete.

In general, there is not much point in building fancy block distribution trees here: load balancing is done by having all UFWs concurrently producing blocks.
When an RMW stores an entire block, and it has already voted for the previous block (unless that’s NULL), it sends a signed vote to the UFMW.


svg

When a UFMW acquires a blocking set (intersects every consensus quorum) of votes, it aggregates these (using signature aggregation, if desired) into an availability certificate, which it then distributes to all its RMWs.
RMWs delay verification of any availability certificates until they have verified the availabiilty certificate of the previous block.


svg

We can thus imagine an entire RSM replica’s mempool as a collection of MWs, one for each UFMW in the system, each holding ever-growing trees (lists, if the UFMWs are all correctly-behaved) of blocks, some prefix of which have verified availability certificates:


svg

There are several convenient things about this structure:

  • There is already an inherent partial order of TCs (if we want to use it): we can impose an order within each block, and know that all TCs in a block will be ordered after the TCs in the previous block.
    • In a future expansion, we can allow blocks to reference additional “previous blocks” from other UFMWs, introducing DAG-ordering.
  • A single availability certificate proves that, so long as some consensus quorum remains live, all the TCs in a block and all of its ancestors are available.
    • To make this work, we need an additional protocol where processes can query MWs (by hash) for blocks they don’t know.

Consensus

The purpose of consensus, then, is to impose a total order on this partial order of available TCs.
Traditionally, we use consensus to append to an ever-growing ledger of proposals.

For our purposes, a consensus proposal includes a height (natural number) in the ledger, and availability certificate from some subset of UFMWs.
This represents a kind of cut of the mempool DAG:

svg

These proposals’ size (with signature aggregation) is proportional only to the number of UFMWs, which is substantially better than some previous designs, wherein these proposals included the entire block to be committed.
Future designs may be able to reduce proposal size even further.

In principle, any Consensus protocol will suffice for appending proposals to the ledger.
Two proposals “conflict” when they have the same height, and the state of the ledger known to any participant is simply the set of decisions they know such that they also know decisions for all previous heights.
This technically allows the ledger to decide on proposals for arbitrary heights, but these decisions don’t matter much until all previous heights are filled in.

In order to enable Chimera Chains later, I propose writing our own consensus protocol based on Heterogeneous Paxos, as formally specified here.
For a non-chimera (base) chain, this is similar to Byzantizing Paxos by Refinement
In particular, each RSM replica runs 2 processes, a proposer and an acceptor.
Here I lay out a consensus overview for consensus for a non-chimera, base chain.

Ballots and Messages

Our Consensus uses ballots, which are totally ordered, such that no two messages with the same ballot can have different proposals (a.k.a. __value__s).
One way to implement this is to make the type of ballots a pair, with the first element being a timestamp, and the second element being a proposal.

There are 3 types of messages, all of which are signed by their sender, and broadcast to all acceptors:

  • 1a messages carry a ballot and a proposal
  • 1b messages carry a set of (hash) references to previous messages
  • 2a messages carry a set of (hash) references to previous messages

(1b and 2a messages are structurally similar, but serve different purposes.)

The ballot of a 1b or 2a message is simply the highest ballot of any 1a in its (transitive) references.
The value of a 1b or 2a message is determined by that ballot.

We assume that acceptors delay receipt of all messages until they’ve received the messages referenced.
This requires some kind of protocol for requesting (or forwarding) missing messages).

Proposers

Proposers send out 1a messages, carrying a proposal to be appended to the ledger.
In order to ensure that a decision is eventually reached, we need a proposer to propose an acceptable 1a, and all acceptors to have enough time to reach a decision before another 1a is proposed.
To ensure this, I suggest that:

  • for each height, we allocate an infinite sequence of time windows, growing in duration, that rotate between all possible proposers.
  • 1a messages are only considered valid if their ballot’s timestamp is within a time window for their proposer
  • acceptors delay receipt of 1a messages until their ballot’s timestamp (according to their local clock)

Proposers then, at the beginning of each of their time windows, broadcast a 1a message with the current time, and the highest ballot proposal for which their local acceptor has seen a 2a message.
Each time their local acceptor sees a higher ballot 2a during their time window, if its proposal disagrees with the most recent 1a they’ve sent, they broadcast a new 1a.

If a proposer’s acceptor has not yet seen any 2as, it must construct a new proposal.
This includes the 1 availability certificate for each MW, specifically the one with the greatest height.

Acceptors

Acceptors’ messages always feature references to the most recent message they’ve sent, as well as all messages they’ve received since then.
This ensures their messages transitive references include their entire causal history.

Acceptors send a 1b whenever they receive a 1a.
We call a 1b fresh if the highest ballot 2a in its transitive history has the same proposal as the 1b.

Acceptors send a 2a when they receive a quorum of fresh 1b messages, and they have not received any higher ballot 1as.

Acceptors decide (append a proposal to the ledger) if they receive a quorum of 2a messages with the same ballot.

Total Order

svg

Each ledger entry appends to the total order of TCs all TCs that are referenced in the proposal (or any blocks in the ancestry thereof) which are not already included by any previous entry.
The exact order of the TCs added by each ledger entry must be consistent (across all RSM replicas), but can be arbitrary.
It could be determined by some auction mechanism, for example.

For the fastest possible chain, it’s best if this order is somewhat predictable, so RSM replicas can get started calculating state updates before consensus is done.
Using a total order consistent with DAG-based partial order would be a good start.

Execution

TODO