Anoma Topics Meeting Digest: Global Data Brokers

The general design

We wish to design a global data broker that gets messages about a particular topic. This would likely be an actor or a pool of actors.

From there, we would make sub brokers as a custom query. These would have a defined elixir module type with potential arguments. Users can make more sub queries filters from there, having a chain of topic filters for whatever information someone would want.

Let us exemplify this by having 2 examples:

  1. I am interested in all spawned workers from the executor with Identity 0xdeadbeef.
  2. I am interested in all spawned workers with an even ID from the executor with identity 0xdeadbeef.

This is best illustrated by this diagram

As we can see here, we have 4 topic actors that have spawned up. This for the time being will be done much like an inheritance diagram.

  • We have a general topic Topic.OnID that takes a parameter, id , the specific ID that we wish to subscribe to messages from.

  • Topic.OnID.SpawnedWorkers is a sub topic. This topic inherits the argument id, and presents no new arguments. Meaning that if we did:

Broker.subscribe(Topic.OnID.Spawned_Workers, id: "0xdeadbeef") 

For consumer 1 this creates two new topics. The Topic.OnID id: "0xdeadbeef", and the spawned workers topic.

  • Topic.OnID.SpawnedWorkers.EvenWorkers is a sub topic of Topic.OnID.SpawnedWorkers. This also presents no new arguments.

For consumer 2 this create one new topic. The Topic.OnID.SpawnedWorkers.EvenWorkers. As Topic.OnID.SpawnedWorkers already exists, and so it need only spawn 1.

If a consumer 3 came around who just cares about messages from 0xdeadbeef then, a subscription would cause no new topics to be created as the chain is already set in place.

If all consumers unsubscribe then the topics could be garbage collected.

How we deal with subscribing in engines

Further, Engines should note at startup what topics they wish to subscribe to at startup. This would make serializing for dumping and restarting easier. We suspect a macro can help us declare this.

We also wish to support transient subscriptions, a with_subscription macro would be helpful here. Further we wish to have a :flush flag that notes that for this filter, we wish to flush all messages from it at the end.

Other Designs

Another design is that instead of having an inheritance hierarchy, we could instead use a mixin system. Indeed spawned workers has no relations to onID. We could utilize method combination as seen in Common Lisp to achieve mixing these in, however this would take more effort to set up. If we do this statically we would not lose any information, but a naive implementation would lose a lot of efficiency.

I’d say we can do this in the future once the idea is realized.

Implementation Notes

  1. We shall make the filters a protocol
    • They should be a standard format and we should respect that
  2. We should have each filter be an actor. With the possibility of making them a pool of actors if pressure is high enouh.
  3. We want this in the codebase as soon as possibe.

Notes on The Topic meeting

  • Networking discussion

  • Ray

    • Sub behavior where user chooses which kind of info from Engine one receives
    • system of tags (may become unwieldy)
    • you can’t require event consumers to sort through all tags
  • Jeremy

    • Engines stating what sorts of stuff it usually broadcasts about
  • Ray

    • only consumers know what they want to hear about
  • Jeremy

    • Define your own filters
    • Define filter as an agent that does filtration for you
    • You can subscribe to that agent instead
    • We could even graph dependencies
    • Have sub-brokers

Questions about how mailboxes getting over-flooded.

  • Jeremy

    • I want to have an unsubscription and flush
    • At the end it would match to any message you subscribe to usually and then match
  • Ray

    • Do a receive loop to unconditionally flush

    • Mailbox semantics: matched in a receive block then consumed otherwise not

  • Jeremy

    • Should be a protocol for the filter system
    • Two layers of filters

Who should be a global broker for this system?

Q about robustness

These are robust to crushing

Agree on uniform message struct

How does discovery work? Can be queried up top
See actors as modules so can be queries with naming

Further they can be paramartized, such that if we filter by a specific engine, it will remember, it and register itself so there exists one that people do attach to

How to deal with the broker? What to do with swamped requests?
Slow even if its own actor, will bottleneck.

Can be divided among worker pool.

Supervisor that can clone state among its children for the balanced load.

Not going to be 1-1 relationship of topics with engines as we currently have. Instead, we will launch a large topic on Node launch and then every Engine and user can filter the messages from this large topic.

“engine subscription state” section for startup subscription

prefer for messages to be returned at the end of function calls. may be hard to do. Dependency issues.

want to unsubscribe after receiving a specific message.

Ray against “with_subscription”. If something’s asynchronous you should be able to yield.

Discussion about Mempool-Executor coordination.

Additional meetings. Weekly. Discussing codebase.

not sure exactly what these are getting at but i will note we can make unsubscribe act synchronous: if, between the time when you request to unsubscribe from something and the time when the unsubscription actually happens, you receive a message for that subscription, just drop it. you could do that anyway but the point is to abstract it into Engine so nobody else has to worry about it. (and it’s not even about asynchrony of unsub per se because you could already have messages for that subscription in your mailbox; fine)

(if the point of the with_subscription comment is that subscribe is asynch then i agree. just unsub can be fully synch. and it doesn’t seem like a problem to me to have the option of blocking if you want to and have verified it’s safe—i think the context where we wanted with_sub is tests, where it should be fine)

this confuses me. if we can subscribe and unsubscribe dynamically, then what does it help to specify a set of startup subscriptions? and for that matter, i don’t see how it would help dump/restart even if we didn’t have that

primary comment: if filters are queries, then it’s advantageous to avoid making assumptions about or fixing the query plan, esp. if the relationships are not strictly hierarchical. but that compromises modularity since it involves making assumptions about the space of representable queries. but: since this is maximally general, if it becomes a problem we can add more optimised/restricted queries as just a kind of sub-broker later. so i think the proposed approach is probably fine

Do we have agreement on how to process parameters such as :id? I agree here with Ray that probably kinda impossible to predict what clients care about so we should not rely on pre-determined keywords here to provide search parameters which we process during subscribe function call.

An easy solution I see that we discussed but I forgot to write down is that we have a library of filter functions. These practically are just public functions which filter inbox messages in some way. So instead of processing parameters as keywords we feed in (I forget how exactly that is done nicely in Elixir) send in a list where atoms designate function names (also send in arguments obs).

The good part of this approach is:

  1. Allows to have good place to search for fingers, we literally just have a library.
  2. These will have good documentation using Elixir @doc without need for nay external documenting
  3. Users can add filters in separate PRs which should be quite easy to merge and check

yeah. you pass an arg list for the function, same as the usual Erlang pattern of passing module, function, args to something

The thing we came up with is a very Erlangish approach of “throw more processes at it”, processes being the cheapest way to do most things in Erlang.

So, a sketch of the agents:

  • PubSub.Broker
    • State: %{subscriptions: list(pid(), subscription())}
      • this is basically all it needs as state
    • Messages accepted:
      • {:event, event()}: every event producer just tosses its events directly to the pubsub bus.
      • {:subscribe, subscription()}: send this if you want events
      • {:unsubscribe, subscription()}
  • PubSub.Filter.ExampleFilter
    • State: %{parent: filter_descriptor() | :apex, subscriptions: list(pid(), subscription()), parameters: term()}
    • Messages received: subscribes to the broker, or to another filter
    • Messages accepted: :subscribe, :event, :unsubscribe
    • Callbacks: filter(state, event): on receiving an event, either pass it on to our own subscribers, or don’t, depending on the filter. Can read filter parameters from agent state

That’s just a basic sketch, it’ll look somewhat different in code of necessity.

The downside to this arrangement has already been pointed out, which is that it’s hierarchical and filter composition isn’t necessarily; e.g. you end up with filter agent id |> filter message type being different from filter message type |> filter agent id. The upside is that, if you mostly keep to a hierarchy, you mostly avoid duplicating tons of event classification work while still allowing event consumers, not event producers, to define what events they care about.

It’s possible that there’s some clever solution to this hidden in the works of Chris Date or someone, but nothing immediately comes to mind. Can revisit that.

Proceedings from SKAN Meetings: Jul 18/24

As seen in : Anoma Node: Proper Scry support, we created plans to tackle scry, however to do so we deemed it necessary to implement the Global Data Broker seen in this thread.

We have created a model to give a better idea on how this ought to work:

Note that these are mostly extremely simple agents, i.e. some pseudocode

module Event.Broker do
  type state(): set(subscriber)

  def handle_cast(state, event) do
    for subscriber in state do send(subscriber, event) end
    {:noreply, state}
  end

  def handle_cast(_, {:subscribe, id}) do
    {:noreply, state.add(id)}
  end

  def handle_cast(_, {:unsubscribe, id}) do
    {:noreply, state.remove(id)}
  end
end

module Event.Filter.Noop do
  def filter(event) do
    true
  end
end

module Event.Filter.Reject do
  def filter(event) do
    false
  end
end

and so on. (implementation will differ in details, of course!)

The part that requires a bit of thought but still isn’t really hard is the registry; the idea is that you tell it you want to subscribe to, say, the list [“Events from source A”, “even-numbered events”] and if this tree of agents exists it just forwards your info to the filter agent to subscribe, else it creates them before doing the same

1 Like

Event broker constraints document:

EventBroker.event

Caller’s constraints:

  1. I want to return immediately from calling this function.
  2. My responsibility for this event ends when I thus return.
  3. I want this event to be delivered expeditiously to any consumers who
    may or may not be interested in it.
  4. However, I have no synchronization requirements over such delivery.
  5. I also have no knowledge of what consumers may or may not exist.

Consumer’s constraints:

  1. I want to expeditiously receive any events which match subscriptions
    I’ve made.
  2. However, I have no synchronization requirements over such delivery.
  3. I want to receive each event I care about exactly once.

Broker’s constraints:

  1. I want to repeat any event I receive to all my subscribers.
  2. I know nothing about events.

Registry’s constraints:

none

Filter’s constraints:

  1. I want to repeat any event I receive which passes my filter function
    to all my subscribers.
  2. I know nothing about events besides whether or not they pass my
    filter function.

Conclusions

  • event should use asynchronous message sending, i.e. cast and info.
  • Each event should be completely handled within the callback receiving
    it.
  • No synchronization is necessary during the process of handling an
    event.

EventBroker.subscribe / EventBroker.unsubscribe

Caller’s constraints:

  1. I want to be (un)subscribed when I return from this call.
  2. I want to receive events matching the filter spec list I subscribed
    to, if I subscribed.
  3. I want to not receive events matching the filter spec list if I
    unsubscribed (unless they match another subscription I also have).

Producer’s constraints:

  1. I know nothing about any consumers that may or may not exist or their
    subscriptions.

Broker’s constraints:

  1. I know nothing about any subscribers except direct subscribers to me.
  2. I know nothing about whether any subscriber is a filter agent or a
    final consumer.
  3. I have no synchronization requirements; if a subscriber to me doesn’t
    actually exist, I don’t care.

Registry’s constraints

  1. I want to maintain an accurate, up-to-date map of filter specs to
    filter agents.
  2. I have a synchronization requirement over this: if an agent in my map
    does not exist, I may direct phantom subscriptions to a nonexistent
    agent and falsely report success to the caller.
  3. I have a synchronization requirement over this: if an agent that
    exists is not in my map, I will fail to direct (un)subscriptions
    correctly.

Filter’s constraints:

  1. I know nothing about any subscribers except direct subscribers to me.
  2. I know nothing about whether any subscriber is another filter agent
    or a final consumer.
  3. I know nothing about what I am subscribed to, whether it is another
    filter agent or the broker.
  4. I have no synchronization requirements; if a subscriber to me doesn’t
    actually exist, I don’t care.
  5. I want to reap myself if I have no remaining subscribers.

Conclusions

  • subscribe and unsubscribe are synchronous, and should be calls.
  • They should be calls on the Registry; only the caller and the Registry
    have synchronization requirements.
  • The Registry should supervise or monitor all filters, and the Broker,
    to maintain its map accurately.

Notes

  • The reason we’re not just using the stock Registry module is because
    we want flexible consumer-defined filters, and the stock Registry
    module just works with term topics. However, it might be instructive
    to look at how it handles registration just to see if any
    synchronization gotchas are hidden therein.
  • The design does not deduplicate events, seemingly violating one of the
    constraints! Rather, we rely on the ability of consumers to customize
    their filter stack freely to write exactly the filter they want. The
    ideal usage of this event broker is for each consumer to have exactly
    one subscription to it. This should be emphasized in the
    documentation.
  • Supervisors are meant to do exactly one thing: start and stop
    children, using some strategy to deal with normal/abnormal stops. So
    Registry won’t literally be a Supervisor, even if morally it
    supervises.