Concurrent Idioms #1: Broadcasting values in Go with linked channels.

Channels in Go are powerful things, but it’s not always obvious how to get them to accomplish certain tasks. One of those tasks is one-to-many communication. Channels work very well if lots of writers are funneling values to a single reader, but it’s not immediately clear how multiple readers can all wait for values from a single writer.

Here’s what a Go API for doing this might look like:

type Broadcaster ...

func NewBroadcaster() Broadcaster
func (b Broadcaster) Write(v interface{})
func (b Broadcaster) Listen() chan interface{}

The broadcast channel is created with NewBroadcaster, and values can be written to it with Write. To listen on the channel, we call Listen, which gives us a new channel from which we can receive the values written.

The solution that immediately comes to mind (which I’ve used in the past) is to have an intermediate process that keeps a registry of all the reading processes. Each call to Listen adds a new channel to the registry, and the central process’s main loop might look something like this:

for {
        select {
        case v := <-inc:
                for _, c := range(listeners) {
                        c <- v
                }
        case c := <-registryc:
                listeners.push(c);
        }
}

This is the conventional way of doing things. It’s also probably the most sensible method. The process writing values will block until all the readers have read the previous value. An alternative might be to maintain an output buffer for each reader, which could either grow as necessary, or we could discard values when the buffer has filled up.

But this post isn’t about the sensible way of doing things. This post is about an implementation where the writer never blocks; a slow reader with a fast writer can fill all of memory if it goes on for long enough. And it’s not particularly efficient.

But I don’t care much, because I think this is cool. And I might find a genuine use for it some day.

Here’s the heart of it:

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

This is what I call a “linked channel” (analagously to a linked list). But even more than a linked list, it’s an Ouroboros data structure. That is, an instance of the structure can be sent down the channel that is inside itself.

Or the other way around. If I have a value of type chan broadcast around, then I can read a broadcast value b from it, giving me the arbitrary value b.v, and another value of the original type, b.c, allowing me to repeat the process.

The other part of the puzzle comes from the way that a buffered channel can used as a one-use one-to-many broadcast object. If I’ve got a buffered channel of some type T:

var c = make(chan T, 1)

then any process reading from it will block until a value is written.
When we want to broadcast a value, we simply write it to the channel. This value will only go to a single reader, however we observe the convention that if you read a value from the channel, you always put it back immediately.

func wait(c chan T) T {
	v := <-c
	c <- v;
	return v;
}

Putting the two pieces together, we can see that if the channel inside the broadcast struct is buffered in the above way, then we can get an endless stream of one-use one-to-many broadcast channels, each with an associated value.

Here’s the code:

package broadcast

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

type Broadcaster struct {
	// private fields:
	Listenc	chan chan (chan broadcast);
	Sendc	chan<- interface{};
}

type Receiver struct {
	// private fields:
	C chan broadcast;
}

// create a new broadcaster object.
func NewBroadcaster() Broadcaster {
	listenc := make(chan (chan (chan broadcast)));
	sendc := make(chan interface{});
	go func() {
		currc := make(chan broadcast, 1);
		for {
			select {
			case v := <-sendc:
				if v == nil {
					currc <- broadcast{};
					return;
				}
				c := make(chan broadcast, 1);
				b := broadcast{c: c, v: v};
				currc <- b;
				currc = c;
			case r := <-listenc:
				r <- currc
			}
		}
	}();
	return Broadcaster{
		Listenc: listenc,
		Sendc: sendc,
	};
}

// start listening to the broadcasts.
func (b Broadcaster) Listen() Receiver {
	c := make(chan chan broadcast, 0);
	b.Listenc <- c;
	return Receiver{<-c};
}

// broadcast a value to all listeners.
func (b Broadcaster) Write(v interface{})	{ b.Sendc <- v }

// read a value that has been broadcast,
// waiting until one is available if necessary.
func (r *Receiver) Read() interface{} {
	b := <-r.C;
	v := b.v;
	r.C <- b;
	r.C = b.c;
	return v;
}

This implementastion has the nice property that there’s no longer any need for a central registry of listeners. A Receiver value encapsulates a place in the stream of values and can be copied at will – each copy will receive an identical copy of the stream. There’s no need for an Unregister function either. Of course, if the readers don’t keep up with the writers, memory can be used indefinitely, but… isn’t this quite neat?

Here’s some example code using it:

package main

import (
	"fmt";
	"broadcast";
	"time";
)

var b = broadcast.NewBroadcaster();

func listen(r broadcast.Receiver) {
	for v := r.Read(); v != nil; v = r.Read() {
		go listen(r);
		fmt.Println(v);
	}
}

func main() {
	r := b.Listen();
	go listen(r);
	for i := 0; i  < 10; i++ {
		b.Write(i);
	}
	b.Write(nil);

	time.Sleep(3 * 1e9);
}

Exercise for the reader: without running the code, how many lines will this code print?

Advertisements

20 Responses to “Concurrent Idioms #1: Broadcasting values in Go with linked channels.”

  1. rsc Says:

    very cool!

  2. Steve Says:

    Linked channels are fun and all, but for the “obvious” implementation, I’d go with

    for _, c := range(listeners) {
    go func() {
    c <- v;
    }
    }

    It's less efficient, but it keeps the write from blocking.

    • Steve Says:

      … by which I meant

      go func() {
      c <- v;
      }();

      , of course.

      • rogpeppe Says:

        unfortunately, that doesn’t work correctly – it doesn’t guarantee in-order delivery of the messages – AFAIK there’s no guarantee that if one goroutine is started before another that it will do the channel communication first.

        also, even if it did work properly, this has exactly the same properties as the linked-channel implementation – it never blocks for a slow reader, so it can accumulate garbage indefinitely.

  3. yigong Says:

    This is really cool! especially like the Ouroboros channel list. my take at answer to exercise: 1+2+4+…+2to9 (feel free to remove it if it takes experimental fun from others, or leave it if it is one of 1001 kinds of errors:)

  4. Select functions for Go « Savoury Morsels Says:

    […] Savoury Morsels Just another WordPress.com weblog « Concurrent Idioms #1: Broadcasting values in Go with linked channels. […]

  5. Simon Watt Says:

    I extended your ouroboros broadcaster with another method to A) simplify my program’s interface to it and B) test my understanding of it. Can you let me know if this jives with how this thing is supposed to work?

    Thanks a ton, by the way

    func (b Broadcaster) Stream() <-chan interface{}{
    messages := make(chan interface{},100) //Buffered
    r := b.Listen()
    go func(){
    outer: for v := r.Read(); v != nil; v = r.Read(){
    select {
    case messages<- v:
    default:
    break outer
    }
    }
    close(messages)
    }()
    return messages
    }

  6. ovi Says:

    netchan package is not in Go v1 anymore, can you update the code for Go v1?

  7. michal Says:

    to complicated: chan chan chan broadcast
    These types make it simpler:

    type Msg struct {
    inp chan Msg
    msg string
    }
    type Broadcast struct {
    inp chan Msg
    waitForAll chan bool
    }
    type Receiver struct {
    inp chan Msg
    waitForAll chan Msg
    }

  8. roger peppe Says:

    Here’s the code in the Go playground, changed slightly for Go 1:

    http://play.golang.org/p/tm4ctDlK6_

  9. Arlen Says:

    Hello there! Would you mind if I share your blog with my twitter group?
    There’s a lot of folks that I think would really enjoy your content.
    Please let me know. Thanks

  10. keroroxx520 Says:

    I thought it’s unnecessary to use `Broadcast.listenc` to listen to register process.
    Add a field `Broadcast.cc` refers the newest broadcast, and then the method `Broadcast.Listen` could only just return `b.cc`

    The Code I made a minor change: http://play.golang.org/p/HCbY04zIg3

  11. Kyle Says:

    Interesting, but not that practical. Really you want raw access to the channel because you might have a select statement with say, for example, a timeout. Having the Read() function abstract away the channel access prevents that.

  12. rogpeppe Says:

    Agreed. That was where the motivation behind my “Select functions in Go” post came from.

  13. go channel | 极客之源 Says:

    […] Broadcasting values in Go with linked channels […]

  14. An Says:

    I tried the code, don’t know why some value is printed more than once. Shouldn’t it be so?

  15. An Says:

    Thank you for the nice work.

    Found the recursive call inside listen().

    Are you going to write #2 of this series?

  16. karlkfi Says:

    keroroxx520’s solution is simpler, works without listener recursion, and doesn’t cause channel ballooning. Good job!

  17. exgirlfriend Says:

    There’s certainly a great deal to find out about this subject. I love all the points you made.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


%d bloggers like this: