Concurrent Idioms #1: Broadcasting values in Go with linked channels.

Channels in Go are powerful things, but it’s not always obvious how to get them to accomplish certain tasks. One of those tasks is one-to-many communication. Channels work very well if lots of writers are funneling values to a single reader, but it’s not immediately clear how multiple readers can all wait for values from a single writer.

Here’s what a Go API for doing this might look like:

type Broadcaster ...

func NewBroadcaster() Broadcaster
func (b Broadcaster) Write(v interface{})
func (b Broadcaster) Listen() chan interface{}

The broadcast channel is created with NewBroadcaster, and values can be written to it with Write. To listen on the channel, we call Listen, which gives us a new channel from which we can receive the values written.

The solution that immediately comes to mind (which I’ve used in the past) is to have an intermediate process that keeps a registry of all the reading processes. Each call to Listen adds a new channel to the registry, and the central process’s main loop might look something like this:

for {
        select {
        case v := <-inc:
                for _, c := range(listeners) {
                        c <- v
                }
        case c := <-registryc:
                listeners.push(c);
        }
}

This is the conventional way of doing things. It’s also probably the most sensible method. The process writing values will block until all the readers have read the previous value. An alternative might be to maintain an output buffer for each reader, which could either grow as necessary, or we could discard values when the buffer has filled up.

But this post isn’t about the sensible way of doing things. This post is about an implementation where the writer never blocks; a slow reader with a fast writer can fill all of memory if it goes on for long enough. And it’s not particularly efficient.

But I don’t care much, because I think this is cool. And I might find a genuine use for it some day.

Here’s the heart of it:

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

This is what I call a “linked channel” (analagously to a linked list). But even more than a linked list, it’s an Ouroboros data structure. That is, an instance of the structure can be sent down the channel that is inside itself.

Or the other way around. If I have a value of type chan broadcast around, then I can read a broadcast value b from it, giving me the arbitrary value b.v, and another value of the original type, b.c, allowing me to repeat the process.

The other part of the puzzle comes from the way that a buffered channel can used as a one-use one-to-many broadcast object. If I’ve got a buffered channel of some type T:

var c = make(chan T, 1)

then any process reading from it will block until a value is written.
When we want to broadcast a value, we simply write it to the channel. This value will only go to a single reader, however we observe the convention that if you read a value from the channel, you always put it back immediately.

func wait(c chan T) T {
	v := <-c
	c <- v;
	return v;
}

Putting the two pieces together, we can see that if the channel inside the broadcast struct is buffered in the above way, then we can get an endless stream of one-use one-to-many broadcast channels, each with an associated value.

Here’s the code:

package broadcast

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

type Broadcaster struct {
	// private fields:
	Listenc	chan chan (chan broadcast);
	Sendc	chan<- interface{};
}

type Receiver struct {
	// private fields:
	C chan broadcast;
}

// create a new broadcaster object.
func NewBroadcaster() Broadcaster {
	listenc := make(chan (chan (chan broadcast)));
	sendc := make(chan interface{});
	go func() {
		currc := make(chan broadcast, 1);
		for {
			select {
			case v := <-sendc:
				if v == nil {
					currc <- broadcast{};
					return;
				}
				c := make(chan broadcast, 1);
				b := broadcast{c: c, v: v};
				currc <- b;
				currc = c;
			case r := <-listenc:
				r <- currc
			}
		}
	}();
	return Broadcaster{
		Listenc: listenc,
		Sendc: sendc,
	};
}

// start listening to the broadcasts.
func (b Broadcaster) Listen() Receiver {
	c := make(chan chan broadcast, 0);
	b.Listenc <- c;
	return Receiver{<-c};
}

// broadcast a value to all listeners.
func (b Broadcaster) Write(v interface{})	{ b.Sendc <- v }

// read a value that has been broadcast,
// waiting until one is available if necessary.
func (r *Receiver) Read() interface{} {
	b := <-r.C;
	v := b.v;
	r.C <- b;
	r.C = b.c;
	return v;
}

This implementastion has the nice property that there’s no longer any need for a central registry of listeners. A Receiver value encapsulates a place in the stream of values and can be copied at will – each copy will receive an identical copy of the stream. There’s no need for an Unregister function either. Of course, if the readers don’t keep up with the writers, memory can be used indefinitely, but… isn’t this quite neat?

Here’s some example code using it:

package main

import (
	"fmt";
	"broadcast";
	"time";
)

var b = broadcast.NewBroadcaster();

func listen(r broadcast.Receiver) {
	for v := r.Read(); v != nil; v = r.Read() {
		go listen(r);
		fmt.Println(v);
	}
}

func main() {
	r := b.Listen();
	go listen(r);
	for i := 0; i  < 10; i++ {
		b.Write(i);
	}
	b.Write(nil);

	time.Sleep(3 * 1e9);
}

Exercise for the reader: without running the code, how many lines will this code print?

Advertisement

6 Responses to “Concurrent Idioms #1: Broadcasting values in Go with linked channels.”

  1. rsc Says:

    very cool!

  2. Steve Says:

    Linked channels are fun and all, but for the “obvious” implementation, I’d go with

    for _, c := range(listeners) {
    go func() {
    c <- v;
    }
    }

    It's less efficient, but it keeps the write from blocking.

    • Steve Says:

      … by which I meant

      go func() {
      c <- v;
      }();

      , of course.

      • rogpeppe Says:

        unfortunately, that doesn’t work correctly – it doesn’t guarantee in-order delivery of the messages – AFAIK there’s no guarantee that if one goroutine is started before another that it will do the channel communication first.

        also, even if it did work properly, this has exactly the same properties as the linked-channel implementation – it never blocks for a slow reader, so it can accumulate garbage indefinitely.

  3. yigong Says:

    This is really cool! especially like the Ouroboros channel list. my take at answer to exercise: 1+2+4+…+2to9 (feel free to remove it if it takes experimental fun from others, or leave it if it is one of 1001 kinds of errors:)

  4. Select functions for Go « Savoury Morsels Says:

    [...] Savoury Morsels Just another WordPress.com weblog « Concurrent Idioms #1: Broadcasting values in Go with linked channels. [...]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s


Follow

Get every new post delivered to your Inbox.