Create a group of streamers and publishers


March 2019


42 time


I have a number of nodes that will use a secondary service to be informed about the address of each other. I want to be able to publish information so as all the other nodes can hear it. Using an XPUB socket is not an option I would want to go with here as I want this system to be distributed.

What I have tried is something that sums up to:

1 Create a PUB socket,

def pub_stream(self): = self.context.socket(zmq.PUB)

2 Create a SUB stream,

def sub_stream(self):
    ioloop = IOLoop.instance()
    socket = self.context.socket(zmq.SUB)
    self.sub_stream = ZMQStream(socket, ioloop)
    self.subs_stream.setsockopt(zmq.SUBSCRIBE, self.topic)

3 At some point receive the addresses of all other nodes and connect to them,

# close and restart sub_stream to get rid of any previous connections
for endpoint in endpoints:

No messages are passed on the on_message callback though. Is what I am doing correct, if not, what is a better way of doing what I want to achieve?

1 answers


Whatever route you decide you need at least one fixed address to connect to unless you have access to multicast etc.

I would have a simple X(pub/sub) proxy as my separate discovery network allowing new nodes to decide which other nodes are of interest based on subject.

Simple version

  • Create a single XSUB/XPUB proxy with a fixed address on each side (DNS etc)
  • Node starts
    • connects to XSUB port of proxy and broadcasts its subject, address and data port
    • connects to XPUB port and subscribes to nodes subjects of interest
      • With the connection information returned it connects its data socket to the data socket of the node based on the connection information.

Reliable version

  • Add multiple discovery proxies with load balancers/virtual IP's to cover fault tolerance etc.
  • Node should send discovery message on a timer
    • Allow late joining nodes to connect
    • Allow connected nodes to discover failed nodes (other than relying on tcp timeout)