Skip to content


Folders and files

Last commit message
Last commit date

Latest commit



17 Commits

Repository files navigation


Reactive Messaging


  • Two-way network channels are modeled as Subject
  • A RemoteSubject : Subject<String> implements a protocol of three messages.
    • OnNext:<message text>
    • OnError:<message text>
    • OnCompleted
  • A RabbitPubSubSubject(host,queueName) : Subject<String> allows sending messages over RabbitMQ with a publish subscribe mechanism.


Remoting Rx is implemented using an interface HereComeDragons and an extension method on IObservable. They are designed in such a way that the user has an Rx stream with messages and another to handle network issues. The latter can be ignored of you don't want to recover.

We define two semantics, for communication through a message broker and for point-to-point communication. Any implementation should document which semantic it implements.

The point to point part is particularly unfinished

Interface and extension method

The extension methods are provided by Qx, the interface has to be implemented for every communication method you want to support.

// there should be some things here that look kind of dual to each
// other

interface Exposer<TState>
    Connection Observe(IObservable<string>) {}

interface Consumer<TState>
    Connection Subscribe(IObservable<string>) {}

interface Connection<TState> : IConnectableObservable<TState>

interface Channel<TState> : IObservable<Connection<TState>>

class ExtensionMethods

    public List<Channel<TState>> Expose<TState>(this IObservable<string> observable, Exposer<TState> next, Exposer<TState> error = null, Exposer<TState> completed = null)
        // create a connection with Observe
        // create ChannelState with possible reconnect behaviour
        // protocol logic lives here

    public IObservable<string> Consume<TState>(Consumer<TState> next, Consumer<TState> error, Consumer<TState> completed, List<Channel<TState>> => () = null)
        // create a connection with Subscribe
        // create ChannelState with possible reconnect behaviour
        // protocol logic lives here


// point-to-point connections probably need their own Listen method
// and an alternative to Consumer that creates new connection objects on
// an incoming connection.

We present a list of actor roles and the actions [A] and events [E] they have. In the semantics description below, we identify which roles actors have and which events map to what actions.


  • Expose [E] -- request to expose this stream to a channel


  • OnNext [E] -- next value from stream event
  • OnError [E] -- error on stream event
  • OnCompleted [E] -- stream completed event
  • SubscribeTo [E] -- asked to subscribe to a stream


  • Connect [A] -- connect to remote host
  • Send [A] -- send message over connection
  • Close [A] -- close connection
  • Error [E] -- error happened


  • Connect [A] -- connect to broker
  • Send [A] -- send message to broker
  • Close [A] -- close connection to broker
  • Error [E] -- error occurred


  • OnNext [A] -- put next value in stream
  • OnError [A] -- put error on stream
  • OnCompleted [A] -- set stream as completed
  • Subscribe [E] -- someone subscribes to the stream, invoked by HereComeDragons.Consume
  • Dispose [E] -- someone stops listening
  • Consume [E] -- a channel create request


  • Listen [A] -- listen to remote connections
  • Connect [E] -- new connection comes in
  • Receive [E] -- new message comes from network
  • Close [E] -- connection was closed
  • Error [E] -- error occurred


  • Connect [A] -- connect to broker
  • Receive [E] -- message received from broker
  • Close [A] -- close connection to broker
  • Error [E] -- an error occurred

ChannelState (= IObservable)

  • Subscribe [E] -- someone starts listening to channel state
  • OnNext [A] -- put the channel state in the stream
  • OnError [A] -- put a channel error in the stream
  • OnCompleted [A] -- signal the channel terminated peacefully

General notes:

  • Reconnection is only possible on ends where there is a connect action.
  • To keep things simple for the programmer:
    • Expose is one client connection
    • Consume is one client connection
    • There's no magic with every subscribe being a different thing or whatever, because then it's hard to map it on their knowledge of the middleware.
  • Failures are factored out from the value channels and exposed through the ChannelState. This way you can differentiate between the network aspects and the application level stream. For brokers this means you will never receive OnError or OnCompleted, because that doesn't exist in that world.
  • If no one is subscribed, nothing happens. So if you subscribe/dispose a lot but don't want to miss messages, you better stick a buffer between.
  • You can only subscribe once. If you want to share you'll have to use Publish(). The connection is opened/accepted on Subscribe and closed on Dispose. This is to prevent message loss, so we only accept messages when someone's listening and as soon as we stop listening we don't accept more messages. This way we can support unicast without messages getting lost.
  • Can a Dispose of the subscription cause message loss if the message is already in the pipeline but not yet fully processed? Is this a problem of the developer? What if you dispose a .Connect, does it send a OnCompleted down the line, after the last message? Then processing could be finished.
  • No hidden sharing. If you have a unicast and you Consume twice, you should get different messages.
  • ChannelState manages it's subscribes in generations. If OnCompleted or OnError, the previous generation is finished and let go. The first of the next generation triggers a connect (although a connect always happen before the first generation). Some smart locks should make sure we don't attempt multiple connections.

Broker notes:

  • Consider broker as part of infra, so no QueueDeleted events or anything
  • We assume Rx behaviour, so all consumers on one connection get the same message. No multiplexing!
  • Physical connections can be shared, as long as it behaves the same as if two connections were used.

Semantics for message broker

Sending -- ToChannel-ToBroker

Event Action
ToChannel.OnNext ToBroker.Send
ToChannel.OnError ToBroker.Close, ChannelState.OnCompleted
ToChannel.OnCompleted ToBroker.Close, ChannelState.OnCompleted
ToChannel.Expose ToBroker.Connect, ChannelState.OnNext
ToBroker.Error ToBroker.Close, ChannelState.OnError
ChannelState.Subscribe (last state unless error) ToBroker.Connect, ChannelState.OnNext, ToBroker.Send(pending)

Reconnects are possible.

For optimization the channel could be closed if there are no message for a certain time and reopened if a new message arrives.

Receiving -- FromBroker-FromChannel

Event Action
FromBroker.Receive FromChannel.OnNext
FromBroker.Error FromBroker.Close, ChannelState.OnError
FromChannel.Consume FromBroker.Connect, ChannelState.OnNext
ChannelState.Subscribe (last state unless error) FromBroker.Connect, ChannelState.OnNext

Reconnects are possible.

Semantics for point-to-point connections

Sending -- ToChannel-ToNetwork

Event Action
ToChannel.OnNext ToNetwork.Send
ToChannel.OnError ToNetwork.Send(error), ToNetwork.Close, ChannelState.OnCompleted
ToChannel.OnCompleted ToNetwork.Close, ChannelState.OnCompleted
ToChannel.Expose ToNetwork.Connect, ChannelState.OnNext
ToNetwork.Error ToNetwork.Close, ChannelState.OnError
ChannelState.Subscribe (last state unless error) ToNetwork.Connect, ChannelState.OnNext, ToNetwork.Send(pending)

Reconnects are possible.

Optimizing disconnects and reconnects are not allowed for point-to-point connections.

Receiving -- FromNetwork-FromChannel

Event Action
FromNetwork.Connect ??
FromNetwork.Receive FromChannel.OnNext
FromNetwork.Receive(error) FromChannel.OnError, ChannelState.OnCompleted
FromNetwork.Error FromNetwork.Close, ChannelState.OnError
FromNetwork.Close FromChannel.OnCompleted, ChannelState.OnCompleted
ChannelState.Subscribe (last state)

Reconnects are not possible.


// write to queue
IObservable<string> someThing;
var channel = someThing.Expose(AMQP("url").in("endpoint"));

// subscribe gives one connection
// it either gives an error or completes
// on retry, if reconnection is possible, it repeats, otherwise it throws ReconnectNotPossible
// we cannot easily expose the disposable of the connection here, which equates to stop sending despite what the stream does
//     this can be simulated however with a switch before the expose. Also other scenarios like user connect/disconnect can be done this way.
channel.Retry().Subscribe( (connection) => {
}, (error) => {
}, () => {
} ) // IDisposable disposes subscriptions and ultimately connection


Reactive Messaging







No releases published


No packages published