Logging in Reverse

March 31, 2011

A recent post on the Go mailing list asked how to use the Go logging package to log messages in reverse order, so that they could be displayed most-recent-first in a console window.

The package gives no obvious way to do this, but the flexibility of Go interfaces makes it almost trivial to do. It took about 10 minutes to write the following code. The log package can use an arbitrary io.Writer for output so I defined a type, ReverseBuffer, with a Write method that stores all the data in a reverse-ordered linked list and a Reader method that returns an io.Reader that can be used to read the data back.

It would not take much longer to implement a size or message count limit on the list if desired.

I like the way that the language features worked together to make the design almost inevitable once I had thought of it. Once you have an implementation of an interface, everything else just works.

You can run it in the Go Playground to see it working.

package main

import (
        "io"
        "os"
        "log"
)

type msg struct {
        data []byte
        next *msg
}

// ReverseBuffer is an implementation of io.Writer that stores data such
// that it can later be read with the data from all writes reversed.
type ReverseBuffer struct {
        msgs *msg
}

type reverseReader struct {
        data []byte
        msgs *msg
}

func (w *ReverseBuffer) Write(data []byte) (int, os.Error) {
        if len(data) == 0 {
                return 0, nil
        }
        w.msgs = &msg{append([]byte(nil), data...), w.msgs}
        return len(data), nil
}

// Reader returns a new io.Reader that can be used to read all the data
// written to w.  The data from more recent writes is returned first.
func (w *ReverseBuffer) Reader() io.Reader {
        return &reverseReader{nil, w.msgs}
}

// Read implements io.Reader.Read.
func (r *reverseReader) Read(data []byte) (int, os.Error) {
        if len(r.data) == 0 {
                if r.msgs == nil {
                        return 0, os.EOF
                }
                r.data = r.msgs.data
                r.msgs = r.msgs.next
        }
        n := copy(data, r.data)
        r.data = r.data[n:]
        return n, nil
}

func main() {
        w := new(ReverseBuffer)
        out := log.New(w, "", log.Ldate|log.Ltime)
        out.Printf("one")
        out.Printf("two")
        out.Printf("three")
        io.Copy(os.Stdout, w.Reader())
}

Bidirectional RPC with netchan

February 10, 2011

Today’s firewall-riddled network makes it hard to set up ad hoc topologies of communicating programs. Opening up a single incoming port for a server to listen on is usually possible. If you want arbitrary numbers of ports, then it’s harder, and as for calling a client back, well, forget it.

Remote Procedure Call (RPC) is a common paradigm that is easy to use in this scenario. A client makes a connection to the server, sends some data to the server; the server operates on that data and sends back the results.

Say I want the control flow to go the other way? That’s OK too: the client makes a connection to the server, then acts in the RPC server role, reading data from the “server”, operating on it, and sending back the results.

Harder is when I want control flow to go both ways – for the client to be able to make requests of the server and for the server to make requests of clients. If I can manage two connections per client, it’s easy enough, but for the purposes of this article I assume that is not possible.

Luckily Go’s flexible interface-based APIs make this kind of thing quite straightforward. We can layer the Go RPC package on top of the Go netchan package. This article explains how, and uses the functionality to build a flexible client-server application where clients make their local file system available to the server, and any client can access any other client’s file system.

If you want to skip ahead and just read the example code, it’s here.

The RPC package can run neatly on top of any byte stream – all we need is an io.ReadWriteCloser at each end. Server.ServeConn will start an RPC server; NewClient will start the client end. We just need a way of layering a client to server byte stream onto netchan.

From netchan to Listener

Using the netchan package, we can send and receive data from several channels on the same underlying connection. If we use one channel of []byte in each direction, it is straightforward to make them appear as an io.ReadWriteCloser.

First the Read half. When we get a read request, we satisfy it by reading the channel. If the channel gives us too much data, we save it and return it in subsequent Read requests.

type chanReader struct {
	buf []byte
	c   <-chan []byte
}

func newChanReader(c <-chan []byte) *chanReader {
	return &chanReader{c: c}
}

func (r *chanReader) Read(buf []byte) (int, os.Error) {
	for len(r.buf) == 0 {
		r.buf = <-r.c
		if closed(r.c) {
			return 0, os.EOF
		}
	}
	n := copy(buf, r.buf)
	r.buf = r.buf[n:]
	return n, nil
}

Next we implement the write half. When we get a Write request, we write the data down the channel. Note that we need to copy the data because the function calling Write is likely to re-use the same buffer.

type chanWriter struct {
	c chan<- []byte
}

func newChanWriter(c chan<- []byte) *chanWriter {
	return &chanWriter{c: c}
}
func (w *chanWriter) Write(buf []byte) (n int, err os.Error) {
	b := make([]byte, len(buf))
	copy(b, buf)
	w.c <- b
	return len(buf), nil
}

Now we have built the first layer, but if we want to run an RPC service over this, then we need more. In fact, it would be nice if we could have an implementation of net.Listener. Then we could simply pass it to Accept.

Recall that the Listener interface looks like this:

type Listener interface {
    // Accept waits for and returns the next connection to the listener.
    Accept() (c net.Conn, err os.Error)

    // Close closes the listener.
    // The error returned is an os.Error to satisfy io.Closer;
    Close() os.Error

    // Addr returns the listener's network address.
    Addr() net.Addr
}

Each new connection must implement net.Conn, which requires more than just Read, Write and Close:

type Conn interface {
    Read(b []byte) (n int, err os.Error)
    Write(b []byte) (n int, err os.Error)
    Close() os.Error

    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    SetTimeout(nsec int64) os.Error
    SetReadTimeout(nsec int64) os.Error
    SetWriteTimeout(nsec int64) os.Error
}

Luckily we can easily provide dummy implementations of all the extra methods because none of their functionality is used by the rpc package. For details, see netchanConn.

Now comes an awkward bridge to cross. We need a way to establish a unique pair of channels between the netchan server and a given client. Netchan does not make it easy, but it is possible.

We do it by getting the server to generate an endless set of channel pairs and export each pair using a unique name. Then we use an auxilliary channel to send these names from the server to any client that wants a connection.

Here’s an excerpt from the paraphrased code that creates the Listener. The service variable holds a name that we are going to use as the “port name” for clients to connect to.

[Much of the code in this article has been paraphrased to omit the parts dealing with error checking and teardown, to make the main control flow more obvious.]

func Listen(exp *netchan.Exporter, service string) (net.Listener, os.Error) {
	r := &netchanListener{
		exp: exp,
		name: service,
		conns: make(chan net.Conn),
	}
	// Create the auxilliary channel and export it.
	clientNames := make(chan string)
	exp.Export(service, clientNames, netchan.Send)
	go func() {
		for i := 0; ; i++ {
			clientName := fmt.Sprintf("%s.%d", service, i)
			r.exporter(clientName)
			select{
			case clientNames <- clientName:
			case <-r.closed:
				// Listener has been closed, so stop exporting channels.
				return
			}
		}
	}()
	return r, nil
}

The exporter function creates two channels and exports them, spawns a goroutine to handle them, and returns.

func (r *netchanListener) exporter(clientName string) {
	req := make(chan []byte),
	reply := make(chan []byte)
	r.exp.Export(clientName+".req", req, netchan.Recv)
	r.exp.Export(clientName+".reply", reply, netchan.Send)

	go func() {
		c := &netchanConn{
			chanReader: newChanReader(req),
			chanWriter: newChanWriter(reply),
		}
		r.conns <- c:
	}()
}

Now the Accept function is trivial (omitting teardown code again):

func (r *netchanListener) Accept() (c net.Conn, err os.Error) {
	return <-r.conns, nil
}

We can then define the client side. Note that we import only a single value from the auxilliary channel – this ensures that the server does not create many unnecessary channel pairs only for their names to be lost in the channel buffer.

func Dial(imp *netchan.Importer, service string) (net.Conn, os.Error) {
	// Import auxilliary channel.
	cnames := make(chan string)
	imp.ImportNValues(service, cnames, netchan.Recv, 1, 1)
	clientName := <-cnames

	// Import channel pair
	req := make(chan []byte)
	imp.Import(clientName + ".req", req, netchan.Send, 200)
	reply := make(chan []byte)
	imp.Import(clientName + ".reply", reply, netchan.Recv, 200)

	return &netchanConn{
		chanReader: &chanReader{c: reply},
		chanWriter: &chanWriter{c: req},
	}, nil
}

Between them, Listen and Dial provide the functionality we need to set up bidirectional RPC. I have made them available as the package at rog-go.googlecode.com/hg/ncnet.

Putting it together

Now that we have the Dial and Listener functionality over a netchan connection, we can use it to create a little application. It works like this:

  • Each client connects to a central server.
  • Each client exports an RPC interface to the server
  • The server exports an RPC interface to the clients

For this example, we want the client to export one method only: Read, which allows the server to ask the client for the contents of a file. The server will export two methods to the clients: List, to ask for a list of all currently connected clients, and Read to ask for the contents of a file on a given client.

We will start by defining the Server RPC type. It will hold one additional method to those listed above: Publish. This will be called by a client to tell the server that it is there, and to set up the server to client RPC link. Access to fields in the Server type is guarded by a mutex because RPC methods can be called concurrently.

type Server struct {
	mu       sync.Mutex
	clients  map[string]*rpc.Client		// All current clients.
	exp      *netchan.Exporter
	clientid int					// Unique client id generator.
}

The List method simply iterates over the list of clients and returns it. Note that the RPC package requires that both the argument and return types are represented as pointers. List takes no arguments, so we define it as taking a pointer to an empty struct.

func (srv *Server) List(_ *struct{}, names *[]string) os.Error {
	srv.mu.Lock()
	defer srv.mu.Unlock()
	for name := range srv.clients {
		*names = append(*names, name)
	}
	return nil
}

The Read method looks up a client and invokes its Read operation. Here, we’re turning client to server RPC into server to client RPC.

type ReadReq struct {
	Client string
	Path   string
}
func (srv *Server) Read(req *ReadReq, data *[]byte) os.Error {
	srv.mu.Lock()
	client := srv.clients[req.Client]
	srv.mu.Unlock()
	if client == nil {
		return os.ErrorString("unknown client")
	}
	return client.Call("Client.Read", &req.Path, data)
}

The Publish method is to be called by a client to establish the initial reverse-RPC connection with the server. It allocates a new name which it returns to the client, starting a new goroutine listening for a netchan connection on that name. When the connection is accepted, it starts a new RPC client instance on it and adds it to the Server’s list of clients.

func (srv *Server) Publish(name *string, clientId *string) os.Error {
	srv.mu.Lock()
	defer srv.mu.Unlock()
	*clientId = fmt.Sprintf("client%d", srv.clientid)
	srv.clientid++
	listener, _ := ncnet.Listen(srv.exp, *clientId)
	go func() {
		// Accept a single connection, then close the listener.
		conn, _ := listener.Accept()
		listener.Close()
		client := rpc.NewClient(conn)
		srv.mu.Lock()
		srv.clients[*name] = client
		srv.mu.Unlock()
	}()
	return nil
}

To start a new server, we create a new netchan Exporter, a ncnet Listener on top of that (“ctl” is just a port name that we choose by convention), and export an RPC server instance on that. The network address is given in addr (e.g. myserver.com:8080).

func main() {
	addr := os.Args[1]
	exp := netchan.NewExporter()
	exp.Listen("tcp", addr)
	listener, _ := ncnet.Listen(exp, "ctl")
	srv := &Server{
		exp:     exp,
		clients: make(map[string]*rpc.Client),
	}
	rpcsrv := rpc.NewServer()
	rpcsrv.Register(srv)
	rpcsrv.Accept(listener)
	listener.Close()
}

All that remains is the client side. There’s no state for the client, so we’ll just use an empty struct for the Client RPC type. The Read method takes a file name and returns the data as a []byte.

type Client struct{}

func Client(Client) Read(file *string, data *[]byte) (err os.Error) {
	f, err := os.Open(*file, os.O_RDONLY, 0)
	if err != nil {
		return err
	}
	*data, err = ioutil.ReadAll(f)
	return
}

To establish the initial client connection, we dial the server, establish a netchan link to the server’s RPC interface. Then we Publish our name and export the Client RPC type on the resulting connection. We can then interact with the server and the user as we please – in this case we provide a little command line interface reading commands from the user and executing them.

func main() {
	clientName := os.Args[1]
	addr := os.Args[2]

	imp, _ := netchan.NewImporter("tcp", addr)
	srvconn, _ := ncnet.Dial(imp, "ctl")
	srv := rpc.NewClient(srvconn)

	var clientId string
	srv.Call("Server.Publish", clientName, &clientId)

	clientsrv := rpc.NewServer()
	clientsrv.Register(Client{}); err != nil {
		log.Fatal("clientsrv register failed: ", err)
	}

	clientconn, err := ncnet.Dial(imp, clientId)
	if err != nil {
		log.Fatalf("ncnet dial %q failed: %v", clientId, err)
	}

	go clientsrv.ServeConn(clientconn)
	interact(srv)
	clientconn.Close()
}

Summing up

We’ve created something with quite a bit of flexibility here. The example program has the bare bones, but there are many other ways we could use the components. For instance, it would be easy to provide direct client-to-client communication by simply gluing two pipes together at the server side. Or a client could negotiate for upstream or downstream streams of data of any type.

It shows, I think, just how well the Go interface-based model works.

To try it out, build it like this:

goinstall rog-go.googlecode.com/hg/cmd/share
cd $GOROOT/src/pkg/rog-go.googlecode.com/hg/cmd/share
make

Then follow the instructions in doc.go to run it. An example client session:

% share -name second-client localhost:8765
> list
["first-client" "second-client"]
> read first-client /tmp/x
hello world
> ^D
%

Lenses and Listeners in Go.

September 20, 2010

A little while ago, I developed a small experimental graphics package for Go, with the aim of giving full rein to the concurrency of the language and keeping things as modular and “building block”y as possible.

I wanted to allow an application to be structured as many independent agents, each interacting freely with the graphics environment, with independent communication between them.

I haven’t got there yet with the API design (although I’m happy with the way that some things work), but I do particularly like a package I created to solve one problem. It mixes functional style, interfaces and channels in a way that is seems to me to be uniquely “Go-like”. Whether it’s nice or useful I’ll leave you to decide.

Scratches on the wall

Imagine that the screen is a physical blackboard with several people writing on it. Each person can independently make marks on the board, and they can talk to others to help them decide what to write. Others in the room attend to different tasks, writing letters to the outside world, talking amongst themselves and to the people at the blackboard.

With goroutines as people, and talking as communication over a channel, this is how I imagine the graphical environment could be in Go. There is one difficulty, however, and that’s the problem of deadlock. In graphical environments, there are often several elements that reflect one another – for instance, a slider bar might have a textual display of its value, and the value might be used to determine the graphical appearance of some other element.

If each element is represented with a goroutine, then the above setup implies a cyclical dependency. The slider bar can be dragged, the textual display may be edited, and the graphical appearance may be changed as a result of some other action – each of which implies a change to the other two elements.

Naively, we might model this system something like this:

	func SliderElement(setc <-chan float64, getc chan<- float64) {
		var currentValue float64
		for {
			select{
			case currentValue = <-setc:

			case m := <-MouseEventChannel:
				currentValue = mousePointToValue(m)
				getc <- currentValue
			}
			drawSlider(currentValue)
		}
	}

	func TextElement(setc chan<- string, getc <-chan string) {
		<i>... similar to SliderElement</i>
	}

	func GraphicalElement(setc chan <-float64, getc <-chan float64) {
		<i>... also similar to SliderElement</i>
	}

	func setup() {
		sliderSet := make(chan float64)
		sliderGet := make(chan float64)
		go SliderElement(sliderSet, sliderGet)

		textSet := make(chan string)
		textGet := make(chan string)
		go TextElement(textSet, textGet)

		graphicalSet := make(chan float64)
		graphicalGet := make(chan float64)
		go GraphicalElement(graphicalSet, graphicalGet)

		// communications goroutine
		go func() {
			select {
			case v := <-sliderGet:
				textSet <- fmt.Sprint(v)
				graphicalSet <- v
			case v := <-textGet:
				f, err := strconv.Atof(v)
				if err != nil {
					// what now?
					break
				}
				sliderSet <- f
				graphicalSet <- v
			case v := <-graphicalGet:
				textSet <- fmt.Sprint(v)
				sliderSet <- v
			}
		}()
	}

There is are a few problems with this approach, but one show-stopper: it can deadlock. If the communications goroutine happens to be sending on graphicalSet at the same time that the graphical element has decided that it will change, then the two will block forever trying to perform the operation, and the whole thing will hang up. This is fairly unlikely if all the “set” actions are triggered by user mouse events, but not impossible. When independent actions are added to the mix, for example when triggered by a network communication, then it becomes much more likely.

Other problems with the above structure:

  • The number of elements is hard-wired into the communications goroutine. How would we introduce a variable number of graphical elements depending
    on the value?

  • The update speed of one element is linked to the update speed of all of them. If one of the elements is slow to update (for example, the graphical element might be a picture that takes 200ms to redraw), then
    the slider will be sluggish to react.

  • There is no way for the text element to know that the user has entered some invalid text without building knowledge of the acceptable format into it.

The Value of Listening

As one possible solution to this issue, I implemented the “values” package (rog-go.googlecode.com/hg/values). Instead of elements talking directly to one another, they talk to a proxy, a Value, which acts as an intermediary. Any goroutine can set the contents of a value or ask to be notified when the value changes.
The Value type is an interface as follows:

	type Value interface {
		Set(val interface{}) os.Error
		Iter() <-chan interface{}
		Close()
		Type() reflect.Type
	}

To create a new Value, we use values.NewValue:

	v := values.NewValue(0.5)

This creates a new Value holding a single floating point number, 0.5. We can set its contents with Set:

	v.Set(0.6)

Despite the interface{} argument to Set, only a single type of value may be stored, fixed by the initial value passed to NewValue.

	v.Set("hello")		// runtime error

If we want, we can find out that type:

	fmt.Printf("%v\n", v.Type())		// prints "float64"

If we wish to observe changes to the value, we use Iter. This code will start a new goroutine that prints the value each time it changes:

	c := v.Iter()
	go func() {
		for x := range c {
			fmt.Printf("%v\n", x)
		}
	}()

One key property of Value is that Set never waits for the observers to receive the value – in fact, it never blocks. This means that two goroutines that are observing changes to a Value can call Set on that value without risk of deadlock.

There is, however, aother risk: calling Set in response to a value received on the Iter channel will lead to livelock:

	c := v.Iter()
	go func() {
		// this loops forever, printing the same value.
		for x := range c {
			fmt.Printf("%v\n", x)
			c.Set(x)
		}
	}()

As a result, we have a rule: do not call Set in response to a notification of a changed value.

With the primitives we now have, we can join the slider to the graphical element. The code for the Elements is still fairly similar, but the setup() glue becomes much simpler:

	func SliderElement(v Value) {
		c := v.Iter()
		currentValue := <-c.(float64)
		for {
			select{
			case currentValue = <-c:

			case m := <-MouseEventChannel:
				currentValue = mousePointToValue(m)
				v.Set(currentValue)
			}
			drawSlider(currentValue)
		}
	}

	func GraphicalElement(v Value) {
		... similar to SliderElement
	}

	func setup() {
		v := values.NewValue(0.5)
		go SliderElement(v)
		go GraphicalElement(v)
	}

Here we have linked two elements – each can be independently manipulated, but each one always shows the current value. We would like to link in the text element too, but there’s one problem: the text element uses strings, not float64s.

If we could create a value that mirrors the original value, but using a textual representation instead of the original float64 representation, then implementing TextElement would be simple. As it happens (ta da!) the values package makes that quite straightforward.

In focus

“Lens” is a term (probably misborrowed) from functional language terminology. The analogy is the way that light can pass through a glass lens in either direction, bending one way on the way in, and the other on the way out.

I represent a Lens as a pair of functions, one implementing the inverse transformation to the other.

	func stringToFloat(s string) (float64, os.Error) { return strconv.Atof(s) }

	func float64ToString(f float64) (string, os.Error)  { return fmt.Sprint(f), nil }

	var stringToFloatLens = values.NewLens(stringToFloat, float64ToString)

Despite the analogy, a Go Lens is actually unidirectional.

	x, _ := stringToFloat64Lens.Transform("1.34e99")
	fmt.Printf("%g\n", x.(float64))

To transform in the opposite direction, the Lens can be reversed:

	float64ToStringLens := stringToFloat64Lens.Reverse()
	x, _ := float64ToStringLens.Transform(1.34e99)
	fmt.Printf("%s\n", x.(string))

We can transform the type of a Value using values.Transform and a Lens.Values passed to Set are passed through the lens one way; values received from Iter are passed the other way. Thus we can complete our setup function by adding the third element:

	func setup() {
		v := values.NewValue(0.5)
		go SliderElement(v)
		go GraphicalElement(v)

		sv := values.Transform(v, float64ToStringLens)
		go TextElement(sv)
	}

One more piece completes the toolkit: Lenses can be combined. Suppose, for example, that the Slider value ranges between 0 and 1, whereas the value expected by the graphical element and displayed in the text element varies from -180 to +180.

The values package provides a lens which implements this kind of transformation: UnitFloat2RangedFloat.
Here’s how it might be used:

	func setup() {
		v := values.NewValue(0.5)
		unit2angle := values.UnitFloat2RangedFloat(-180, 180)
		go SliderElement(v)
		go GraphicalElement(v.Transform(unit2angle))
		go TextElement(v.Transform(unit2angle.Combine(float64ToStringLens)))
	}

The observant might note that there are actually two ways of doing this. Without using Combine, we could apply two successive transformations using values.Transform: The two forms are subtly different – values.Transform introduces a new goroutine, and one more buffer element into the channel, whereas Combine does not.

In a subsequent blog post, I plan to talk a little more about the kinds of structures that can be built with this primitive, how it is implemented and performance considerations.

Unlimited Buffering with Low Overhead

February 10, 2010

In Go, channels have a fixed length buffer. Sometimes it is useful to add a buffer of unlimited length to a channel (here is an example). The first question is what the interface should look like. I can think of three immediate possibilities (assume T is an arbitrary type – if Go had generics, this would be a generic function):

Given a channel, make sure that no writes to that channel will
block, and return a channel from which the buffered values can be read:

func Buffer(in <-chan T) <-chan T

Given a channel, return a channel that will buffer writes
to that channel:

func Buffer(out chan<- T) chan <-T

Given two channels, connect them via a buffering process:

func Buffer(in <-chan T, out chan<- T)

Of these possibilities, on balance I think I prefer the second, as no operations will be performed on the original channel except when a value is written on the returned channel.

I’d be interested in hearing arguments for or against the other possibilities.

Here is one simple, and relatively slow implementation. It uses the doubly-linked list implementation from the Go library. I timed it at 2076ns per item transferred on my machine. Note the code that runs before the select statement each time through the loop, which works out whether we want to be sending a value, and when it is time to finish. This relies on the fact that in a Go select statement, operations on nil channels are ignored.

import "container/list"
func BufferList(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var buf = list.New()
		for {
			outc := out
			var v T
			n := buf.Len()
			if n == 0 {
				// buffer empty: don't try to send on output
				if in == nil {
					close(out)
					return
				}
				outc = nil
			}else{
				v = buf.Front().Value.(T)
			}
			select {
			case e := <-in:
				if closed(in) {
					in = nil
				} else {
					buf.PushBack(e)
				}
			case outc <- v:
				buf.Remove(buf.Front())
			}
		}
	}()
	return in
}

The above implementation allocates a new linked list item for every value transferred. Here’s an alternative implementation that uses an array as a circular buffer, amortising allocations over time by doubling the size of the buffer when it overflows, and shrinking it when there is too much space. Although the basic structure is similar, the code is more complex, and the time saving is modest – I timed it at 1729ns per item transferred, an improvement of 17%. Removing the code to shrink the buffer does not make it significantly faster.

func BufferRingOrig(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var zero T
		var buf = make([]T, 10)
		var i = 0 // location of first value in buffer.
		var n = 0 // number of items in buffer.
		for {
			outc := out
			switch {
			case n == 0:
				// buffer empty: don't try to send on output
				if in == nil {
					close(out)
					return
				}
				outc = nil

			case n == len(buf):
				// buffer full: expand it
				b := make([]T, n*2)
				copy(b, buf[i:])
				copy(b[n-i:], buf[0:i])
				i = 0
				buf = b

			case len(buf) > 128 && n*3 < len(buf):
				// buffer too big, shrink it
				b := make([]T, len(buf) / 2)
				j := i + n
				if j > len(buf) {
					// wrap around
					k := j - len(buf)
					j = len(buf)
					copy(b, buf[i:j])
					copy(b[j - i:], buf[0:k])
				}else{
					// contiguous
					copy(b, buf[i:j])
				}
				i = 0
				buf = b
			}
			select {
			case e := <-in:
				if closed(in) {
					in = nil
				} else {
					j := i + n
					if j >= len(buf) {
						j -= len(buf)
					}
					buf[j] = e
					n++
				}
			case outc <- buf[i]:
				buf[i] = zero
				if i++; i == len(buf) {
					i = 0
				}
				n--
			}
		}
	}()
	return in
}

I wondered if the unnecessary tests before the select statement were making any significant difference to the time taken. Although it makes it easy to preserve the invariants, there is no need to test whether the buffer is empty when a value has just been placed in it, for example.

Here is a version that only does the tests when necessary. Interestingly, this change actually made the code run marginally slower (1704ns per item)

func BufferRing(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var zero T
		var buf = make([]T, 10)
		var i = 0 // location of first value in buffer.
		var n = 0 // number of items in buffer.
		var outc chan<- T
		for {
			select {
			case e := <-in:
				if closed(in) {
					in = nil
					if n == 0 {
						close(out)
						return
					}
				} else {
					j := i + n
					if j >= len(buf) {
						j -= len(buf)
					}
					buf[j] = e
					n++
					if n == len(buf) {
						// buffer full: expand it
						b := make([]T, n*2)
						copy(b, buf[i:])
						copy(b[n-i:], buf[0:i])
						i = 0
						buf = b
					}
					outc = out
				}
			case outc <- buf[i]:
				buf[i] = zero
				if i++; i == len(buf) {
					i = 0
				}
				n--
				if n == 0 {
					// buffer empty: don't try to send on output
					if in == nil {
						close(out)
						return
					}
					outc = nil
				}
				if len(buf) > 128 && n*3 < len(buf) {
					// buffer too big, shrink it
					b := make([]T, len(buf) / 2)
					j := i + n
					if j > len(buf) {
						// wrap around
						k := j - len(buf)
						j = len(buf)
						copy(b, buf[i:j])
						copy(b[j - i:], buf[0:k])
					}else{
						// contiguous
						copy(b, buf[i:j])
					}
					i = 0
					buf = b
				}
			}
		}
	}()
	return in
}

Although the speed improvement from the above piece of code was disappointing, the change paves the way for a change that really does make a difference. A select statement in Go is significantly more costly than a regular channel operation. In the code below, we loop receiving or sending values as long as we can do so without blocking. Here’s a version of the list-based code that does this. I measured it at 752ns per item, an improvement of 63% over the original, or 2.7x faster.

func BufferListCont(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var buf = list.New()
		var outc chan<- T
		var v T
		for {
			select {
			case e := <-in:
				if buf.Len() == 0 && !closed(in) {
					outc = out
					v = e
				}
				for {
					if closed(in) {
						in = nil
						if buf.Len() == 0 {
							close(out)
							return
						}
						break
					}
					buf.PushBack(e)
					var ok bool
					if e, ok = <-in; !ok {
						break
					}
				}

			case outc <- v:
				for {
					buf.Remove(buf.Front())
					if buf.Len() == 0 {
						// buffer empty: don't try to send on output
						if in == nil {
							close(out)
							return
						}
						outc = nil
						break
					}
					v = buf.Front().Value.(T)
					if ok := outc <- v; !ok {
						break
					}
				}
			}
		}
	}()
	return in
}

One objection to the above code is that in theory if there was a fast enough producer on another processor, the buffer process could spend forever feeding values into the buffer, without ever trying to write them out. Although I believe that in practice the risk is negligible, it’s easy to guard against anyway, by only adding a fixed maximum number of values before returning to the select statement.

Here’s my final implementation, using the looping technique and with the guard added in.

I timed it at 427ns per item transferred, an improvement of 79% over the original version, or almost 5x faster. Using a buffered channel directly is only 2.4x faster than this.

func BufferRingContCheck(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var zero T
		var buf = make([]T, 10)
		var i = 0 // location of first value in buffer.
		var n = 0 // number of items in buffer.
		var outc chan<- T
		for {
			select {
			case e := <-in:
				for added := 0; added < 1000; added++ {
					if closed(in) {
						in = nil
						if n == 0 {
							close(out)
							return
						}
						break
					}
					j := i + n
					if j >= len(buf) {
						j -= len(buf)
					}
					buf[j] = e
					n++
					outc = out		// enable output
					if n == len(buf) {
						// buffer full: expand it
						b := make([]T, n*2)
						copy(b, buf[i:])
						copy(b[n-i:], buf[0:i])
						i = 0
						buf = b
					}
					var ok bool
					if e, ok = <-in; !ok {
						break
					}
				}
			case outc <- buf[i]:
				for {
					buf[i] = zero
					if i++; i == len(buf) {
						i = 0
					}
					n--
					if n == 0 {
						// buffer empty: don't try to send on output
						if in == nil {
							close(out)
							return
						}
						outc = nil
						break
					}
					if len(buf) > 128 && n*3 < len(buf) {
						// buffer too big, shrink it
						b := make([]T, len(buf) / 2)
						j := i + n
						if j > len(buf) {
							// wrap around
							k := j - len(buf)
							j = len(buf)
							copy(b, buf[i:j])
							copy(b[j - i:], buf[0:k])
						}else{
							// contiguous
							copy(b, buf[i:j])
						}
						i = 0
						buf = b
					}
					if ok := outc <- buf[i]; !ok {
						break
					}
				}
			}
		}
	}()
	return in
}

Obviously the final code is significantly bigger and more complex than the original. Which implementation should we choose? Lacking generics, this code cannot usefully be put into a library, as most channels are not of type chan interface{}.

Given this, in most instances, perhaps the first version is to be preferred, as it’s smaller to cut and paste, and easier to understand. In cases where performance is crucial, the final version can easily be substituted.

Perhaps there’s another faster technique that I haven’t found yet. I’d be interested to hear any ideas on the matter.

The code with all the tests and benchmarks can be found here.

Select functions for Go

January 4, 2010

In this thread, Rowan Davies wrote:

There’s no STM implementation for Go, as far as I know. STM doesn’t fit so well with message passing channels, which Go adopts as another quite good alternative to locks. When used well they can avoid the compositionality issues with locks, and have less of a performance cost compared to STM – but STM arguably scales better to complicated situations in terms of the ease of avoiding deadlocks and the like.

STM isn’t as expressive as channels – you can’t build channels within STM (although you can layer them on top of it, which then loses some of the compositionality benefits)

Channels have their own compositionality issues – if you want to be able to alt on something, you need to have access to the raw channel, but the interface might require other things to happen after the channel operation – these must be exposed by the interface, which is undesirable.

For example, in some example code I wrote in a previous post, there’s some code to read a value:

func (r *Receiver) Read() interface{} {
	b := <-r.C
	v := b.v
	r.C <- b
	r.C = b.c
	return v
}

It would be nice if we could have this Read as part of a select statement. Currently, the only way to do this is to make the channel publicly readable, and have a function that the user must remember to call with the value read from the channel:

func (r *Receiver) DoneRead(b broadcast) interface{} {
	v := b.v
	r.C <- b
	r.C = b.c
	return v
}

select {
case b := <-r.C:
    v := r.DoneRead(b)
    ...
....
}

This is error-prone – and more importantly, it breaks encapsulation by exposing the internal-only “broadcast” type.

For a nice way around these problems, I’ve been thinking that something like the select functions provided by the XC language might work well in Go.

A select function would be similar to a normal function except that its top level contains arms of a select statement:

selectfunc read(r *Receiver) interface{} {
    	case b := <-r.C:
	    v := b.v
	    r.C <- b
	    r.C = b.c
	    return v
}

Then you could do:

select {
case v := read(&r):
    .... do something with v
}

i.e. a select function can be used in place of a channel operation in any select arm. Using the select function in an expression would just block as normal.

Select functions seem enough like normal functions that one might ask whether they could be methods too. Given the current syntax. which doesn’t use the func keyword inside interface declarations, I’d say no. And they’re not strictly speaking necessary either. Something not too far from the original interface can be obtained by returning a selectfunc as a closure:

func (r *Receiver) Reader() (selectfunc () interface{}) {
 	return selectfunc() interface{} {
    		case b := <-r.C:
			v := b.v
			r.C <- b
			r.C = b.c
			return v
	}
}

Then you’d do:

select {
case v := r.Reader()() {
	...
}

Not entirely ideal, but quite feasible.

I think there are quite a few benefits to be gained from implementing something like this. And the implementation should be reasonably straightforward – the main implication, as far as I can see, is that the number of arms in an alt statement would not always be statically determinable.

What do people think?

Concurrent Idioms #1: Broadcasting values in Go with linked channels.

December 1, 2009

Channels in Go are powerful things, but it’s not always obvious how to get them to accomplish certain tasks. One of those tasks is one-to-many communication. Channels work very well if lots of writers are funneling values to a single reader, but it’s not immediately clear how multiple readers can all wait for values from a single writer.

Here’s what a Go API for doing this might look like:

type Broadcaster ...

func NewBroadcaster() Broadcaster
func (b Broadcaster) Write(v interface{})
func (b Broadcaster) Listen() chan interface{}

The broadcast channel is created with NewBroadcaster, and values can be written to it with Write. To listen on the channel, we call Listen, which gives us a new channel from which we can receive the values written.

The solution that immediately comes to mind (which I’ve used in the past) is to have an intermediate process that keeps a registry of all the reading processes. Each call to Listen adds a new channel to the registry, and the central process’s main loop might look something like this:

for {
        select {
        case v := <-inc:
                for _, c := range(listeners) {
                        c <- v
                }
        case c := <-registryc:
                listeners.push(c);
        }
}

This is the conventional way of doing things. It’s also probably the most sensible method. The process writing values will block until all the readers have read the previous value. An alternative might be to maintain an output buffer for each reader, which could either grow as necessary, or we could discard values when the buffer has filled up.

But this post isn’t about the sensible way of doing things. This post is about an implementation where the writer never blocks; a slow reader with a fast writer can fill all of memory if it goes on for long enough. And it’s not particularly efficient.

But I don’t care much, because I think this is cool. And I might find a genuine use for it some day.

Here’s the heart of it:

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

This is what I call a “linked channel” (analagously to a linked list). But even more than a linked list, it’s an Ouroboros data structure. That is, an instance of the structure can be sent down the channel that is inside itself.

Or the other way around. If I have a value of type chan broadcast around, then I can read a broadcast value b from it, giving me the arbitrary value b.v, and another value of the original type, b.c, allowing me to repeat the process.

The other part of the puzzle comes from the way that a buffered channel can used as a one-use one-to-many broadcast object. If I’ve got a buffered channel of some type T:

var c = make(chan T, 1)

then any process reading from it will block until a value is written.
When we want to broadcast a value, we simply write it to the channel. This value will only go to a single reader, however we observe the convention that if you read a value from the channel, you always put it back immediately.

func wait(c chan T) T {
	v := <-c
	c <- v;
	return v;
}

Putting the two pieces together, we can see that if the channel inside the broadcast struct is buffered in the above way, then we can get an endless stream of one-use one-to-many broadcast channels, each with an associated value.

Here’s the code:

package broadcast

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

type Broadcaster struct {
	// private fields:
	Listenc	chan chan (chan broadcast);
	Sendc	chan<- interface{};
}

type Receiver struct {
	// private fields:
	C chan broadcast;
}

// create a new broadcaster object.
func NewBroadcaster() Broadcaster {
	listenc := make(chan (chan (chan broadcast)));
	sendc := make(chan interface{});
	go func() {
		currc := make(chan broadcast, 1);
		for {
			select {
			case v := <-sendc:
				if v == nil {
					currc <- broadcast{};
					return;
				}
				c := make(chan broadcast, 1);
				b := broadcast{c: c, v: v};
				currc <- b;
				currc = c;
			case r := <-listenc:
				r <- currc
			}
		}
	}();
	return Broadcaster{
		Listenc: listenc,
		Sendc: sendc,
	};
}

// start listening to the broadcasts.
func (b Broadcaster) Listen() Receiver {
	c := make(chan chan broadcast, 0);
	b.Listenc <- c;
	return Receiver{<-c};
}

// broadcast a value to all listeners.
func (b Broadcaster) Write(v interface{})	{ b.Sendc <- v }

// read a value that has been broadcast,
// waiting until one is available if necessary.
func (r *Receiver) Read() interface{} {
	b := <-r.C;
	v := b.v;
	r.C <- b;
	r.C = b.c;
	return v;
}

This implementastion has the nice property that there’s no longer any need for a central registry of listeners. A Receiver value encapsulates a place in the stream of values and can be copied at will – each copy will receive an identical copy of the stream. There’s no need for an Unregister function either. Of course, if the readers don’t keep up with the writers, memory can be used indefinitely, but… isn’t this quite neat?

Here’s some example code using it:

package main

import (
	"fmt";
	"broadcast";
	"time";
)

var b = broadcast.NewBroadcaster();

func listen(r broadcast.Receiver) {
	for v := r.Read(); v != nil; v = r.Read() {
		go listen(r);
		fmt.Println(v);
	}
}

func main() {
	r := b.Listen();
	go listen(r);
	for i := 0; i  < 10; i++ {
		b.Write(i);
	}
	b.Write(nil);

	time.Sleep(3 * 1e9);
}

Exercise for the reader: without running the code, how many lines will this code print?

Initialisation

December 1, 2009

I’ve not had a blog before. Never really felt the need. But here I am, blog at the ready.

The aim is to provide occasional nuggets of information that I’ve found interesting. Likely subjects include esoteric computer languages (such as Go and Limbo), tasty recipes and cool fiddle tunes.


Follow

Get every new post delivered to your Inbox.