gocharm – Juju charms in Go.

December 19, 2014

Gocharm

I’d like to announce a preliminary version of a “Friday Labs” project I’ve been working on for a little while.

It’s called gocharm, and it is a framework for writing Juju charms in Go. I like Go, and that’s one reason for me to do this, but I am also hoping that gocharm contributes more than just a new language to the Juju ecosystem. Here are some areas where I think it has something new to offer:

  • Charm maintenance
  • Modularity
  • Persistent state
  • Dependencies
  • Charm-provided services

I’ll go into my thoughts on these subjects below.

To take it for a quick test spin (assumes a working juju environment and a valid JUJU_REPOSITORY environment variable):

# install gocharm
go get -d github.com/juju/gocharm/cmd/gocharm
godeps -u $GOPATH/src/github.com/juju/gocharm/dependencies.tsv
go install github.com/juju/gocharm/...

# build a charm
gocharm github.com/juju/gocharm/example-charms/helloworld

# deploy it
juju deploy local:trusty/helloworld

Charm maintenance

It is common practice to write a charm as a single piece of code that is called by all the hooks. When a new relation is added to a charm, the relevant symbolic links must be created in the hooks directory, and relation information added to metadata.yaml. Although relatively trivial to do, this has a nuisance factor and, more importantly, there is significant potential for the hooks and the metadata to get out of sync with the code, and for bugs to result.

In gocharm, relation metadata in metadata.yaml, the config.yaml file and the whole hooks directory are automatically generated from the code when the charm is built. This means that there’s much less chance of them getting out of sync, and it also opens the door to…

… Modularity.

I have written one or two minor charms before, and I have studied the code for a few more. One thing that concerned me about the code that I saw and that I wrote was its lack of modularity. Every charm that provides an http relation, for example, implements all the required logic for that relation. While that’s not a great deal in most cases, when this is repeated for a bunch of relations, the code becomes hard to follow, because the logic for handling particular relations is spread out. A config-changed hook, for example, needs to handle configuration options relating to all the various relations.

Ideally, if some charm provides a relation, I don’t want to write code that knows all the details of that relation. I want to import a package that already knows about that relation and provides me with a few relevant methods to make it easy to use.

The way gocharm addresses that is by allowing you to register several functions for the same hook. This means that several independent pieces of code can register a config-changed hook for example – the code for all them will be invoked (in sequence) when the service configuration changes.

Since relation and config metadata are automatically generated, this means that a reusable package can create relations and configuration options and register hooks to be executed for all of them. It doesn’t matter that other packages may be registering functions to be called in response to the same hooks – each package can do what it needs to.

Ideally we would have relation implementations for all the relation interfaces out there. I have made a tiny start towards that by implementing some relations in charmbits. If you import the httprelation package for example, and register a provider, the http-relation-joined and config-changed hooks will be registered, the appropriate relation, config metadata and hook files created.

For a hint at where this approach could lead, see this example “hello world” charm, which defines a web service and implements a charm that serves it, including configurable http port, https port and https certificates, all in under 30 lines of code. It’s easy to compose that functionality too. It takes only a few more lines to add a configuration option.

Persistent state

Sometimes it is useful for a charm to remember things between hook invocations. Using gocharm, when you pass a pointer to RegisterContext all the state pointed to will be saved persistently. Nothing else is required. In practice, this makes maintaining persistent state as easy as using an instance variable.

Dependencies

Sometimes you don’t want to deal with dependencies. When the functionality your charm requires is all available in the charm itself, why not just embed everything in the charm?

With cross-compiled Go, this becomes straightforward. By default, the Go binary that runs the hooks is compiled when the charm is created, and included in the charm. This approach does have its down sides too though – sometimes you don’t know what architecture you’re going to deploy onto, for example; and having only the binary can make the charm harder to debug, because every code change will require the binary to be pushed up to the environment. With this in mind, the gocharm command has a -source flag. In this mode, the binary is not included with the charm. Instead, all the source code is “vendored” into the charm including all dependencies. At charm install time, the Go compiler is installed and the charm binary built. It’s also possible to configure it so that the binary is built every time a hook runs.

There is an issue with charm size – Go binaries are quite big. I intend to compress the executable when it’s built, and uncompress at charm install and upgrade time. In the meantime, the goupx command can be useful for compressing the binaries before deploying the charm.

Charm-provided services

The Go charm code can not only be used to run hooks. Gocharm also supports long-running services, such as web servers, written using the charm code itself. This makes it quite convenient to write single-purpose charms that provide a given service when that service is written in Go.

Currently the service runs directly from within the hook binary in the charm directory, but this is potentially problematic with regard to charm upgrades. In the future, the binary will be copied to another location and replaced while the service is stopped.

Future work

Testing

I’d like to make a really convincing testing story for Go charms. For charms where both the hook logic and the charm service is written in Go, It should be possible to test almost all of the logic of the charm without deploying it, giving early reassurance that the charm will work. I have made some steps in that direction but there is a lot more to be done here.

Zero-downtime upgrades

When a service run by a charm needs to be upgraded, it is customary to restart the service. The integrated service functionality offered by gocharm makes it potentially possible to automatically provide zero-downtime upgrades, where a new server is brought up before the old one is torn down, gracefully switching from one to the other.

More reusable pieces

Currently there are implementations of the http, mongodb and elasticsearch relation interfaces. It would be great to have reference implementations for as many relation interfaces as possible.

Final thoughts

I hope that this will be useful. For those that feel that Go is an inappropriate language for writing charms, I don’t think there’s much that’s being done with gocharm that can’t be done in other languages too. Perhaps it might be worth taking some of the ideas here (I’m thinking in particular of the modularity aspect) and implementing them in another language, such as Python.

Gocharm is still in its early stages. Contributions welcome!

Some suggested rules for generating good errors in Go

April 23, 2014

Possibly the most common statement in all Go code is the following:

if err != nil {
    return err
}

We check whether an error has occurred. Then we pass that error on to someone else to handle.

I call this “passing the buck”. Something went wrong. We don’t know what to do, so we pass on responsibility for the error to something else.

It’s not uncommon to find functions containing 10 or more occurrences of the above statement.

Unlike some people, I am not unhappy with the verbosity of this. Every place that we see this check, we know that we are thinking about the possible error that’s happening and dealing with it in the best way that we can think of. The statement above expresses this quite reasonably, in my opinion, but there is more to consider.

The error cannot be returned forever. The buck stops somewhere. At some point, assuming we do our best to handle all errors, we’re going to do something like this:

if err != nil {
    log.Fatalf("it all went horribly wrong: %v", err)
}

or this:

if err != nil {
    log.Printf("continuing after error: %v", err)
}

or perhaps this:

if os.IsNotExist(err) {
    // If the file was not there, ignore
    // the fact that we cannot remove it.
    return nil
}

The amount of information that we will see in the above two cases is entirely dependent on the chain of return statements that produced it. If each returned exactly the error that it encountered, then we will see the original error that caused the error cascade to start.

Seeing the original error is not always very helpful though. On one memorable occasion, a user of a network API was seeing the simple error “EOF” in response to an API call. It turned out that that error had propagated up through thirteen levels before being reported. The original error was almost entirely uninformative – were we reading from the network when we encountered the EOF? A disk file? Some context would have been very helpful.

A common alternative to just passing the buck is to add some context to the buck that we pass:

if err != nil {
    return fmt.Errorf("cannot sprang the doodah: %v", err)
}

If every place did this, we would have very long (but beautifully annotated) error messages. Part of the reason these error messages would be long is that they are likely to contain redundant information.

For instance:

f, err := os.Open(path)
if err != nil {
    return fmt.Errorf("cannot open %q: %v", path, err)
}

would print:

cannot open "/fdsfds/dsfvgfdsv": open /fdsfds/dsfvgfdsv: no such file or directory

We have just annotated our error message with exactly the same information that was already in the error. When writing the above code, most people will be aware of that, and just pass the buck.

That’s where it starts to become a bad habit. We get used to just passing the buck (the error already contains the information needed to identify the problem, right?), and eventually it’s very easy to end up in a situation where we have no idea what the error signifies (“why the $#!% was it trying to read that file when I was doing that?”).

I’d like to suggest some rules that, if followed, should make it easier to diagnose problems when something goes unexpectedly wrong.

Rule 1 for good Go errors: annotate the error whenever it will add useful information

We don’t want hugely redundant error messages, but we do want errors that describe what went wrong in sufficient detail that we can have a rough idea of what the fix might be without the need to reproduce the problem under controlled circumstances.

There is one significant problem that I’ve been ignoring here. By annotating the error with fmt.Errorf, we lose any error type. That means that the caller, on receiving the returned and annotated error, can know only that an error occurred. Without actually inspecting the error string itself (not a great idea as error messages can change), it cannot make a decision based on the kind of error.

Before suggesting a possible solution, I’d like to take a moment to think about error inspection. Consider the following code:

if err := somepkg.SomeFunction(); err != nil && !os.IsNotExist(err) {
    return err
}

We just deliberately ignored any errors that were produced by the os package when a file was not found. But we didn’t just call an package in the os package. How did we know that SomeFunction could return this kind of error?

In an ideal world, the answer would be “because the documentation for SomeFunction states that in these circumstances, an error is returned that satisfies os.IsNotExists”. Unfortunately, we do not live in an ideal world. The answer is commonly “because SomeFunction returned an os.IsNotExist error when I called it”.

There is a problem with the latter approach – it makes for more brittle and less maintainable software, because the type of every possible returned error type is implicitly now part of the contract of the function. If SomeFunction changes so that it happens not to return an IsNotExist error, it may break its clients. In fact, they could be broken by a change to any of the functions SomeFunction happens to call.

This is another reason to dislike the “pass the buck” idiom – not only does it ignore the opportunity to add useful information to the returned error, but it actively makes the software more likely to break when refactored.

This leads me to suggest a couple more rules

Rule 2 for good Go errors: document error return types

If an error return type is documented, we know that it’s part of the contract of the function and it’s OK to rely on. There should be a test for it too.

Rule 3 for good Go errors: hide undocumented types

By hiding types that are not a documented part of a function’s interface, it should be obvious to a caller when they are relying on undocumented behaviour. They will either avoid doing that, or change the called function to document the returned type, or (if it’s in an external package) exert pressure on the upstream maintainer to change it.

errgo: making it easy to observe these rules.

I’ve implemented a package, errgo to try to make it easy to build Go software that observes the above rules without imposing undue overhead.

The standard “pass the buck idiom” becomes:

if err != nil {
    return errgo.Mask(err)
}

The “Mask” name means “mask the type of the error” – the error type is hidden in the returned error, but errgo actually automatically adds context here even though we didn’t add an explicit error string. The returned error contains the source code location of the call to errgo.Mask, although the error string is unchanged.

This means that if we have a cascading chain of “pass the buck” errors, we can actually find out the location of each place where the buck was passed.

if err != nil {
    log.Fatalf("it all went horribly wrong: %v; details: %s", err, errgo.Details(err))
}

(In fact we can use %#v instead of errgo.Details here if we like). This will print something like this (it’s all on one line to make grep easier):

it all went horribly wrong: cannot sprang the doodah: EOF; details: [{/usr/ubuntu/src/github.com/me/sprang.go:213: cannot sprang the doodah} {/usr/ubuntu/src/github.com/me/doodah.go:213: } {/usr/ubuntu/src/github.com/me/doodah.go:345: } {EOF}]

We can add explicitly more context to the error if desired:

if err != nil {
    return errgo.Notef(err, "cannot sprang the doodah")
}

In accordance with Rule 3, an error created by errgo hides the type of the wrapped error. It is possible to let error types pass through the wrapping process though – by adding an error predicate to errgo.Mask. An error predicate is just a function that reports whether a given error should be passed through.

Any function with the type func(error) bool can be used as an error predicate. For instance, os.IsNotExist works as an error predicate:

// LockIt tries to aquire the lock. It returns an error
// that satisfies os.IsNotExist if the lock file has not
// been created.
func LockIt() error {
    f, err := os.Open(lockPath)
    if err != nil {
        return errgo.Mask(err, os.IsNotExist)
    }
    [ etc ... ]
}

To accommodate the common idiom of using specific error values rather than error types, errgo provides a trivial helper function, Is:

if err != nil {
    return errgo.Mask(err, errgo.Is(rsa.ErrDecryption))
}

Since Mask wraps the error to add the source code caller metadata, the resulting error will never satisfy os.IsNotExist directly. Instead, errgo.Cause can be used to recover the cause of the error. Cause is safe to call on any error, not just errors created with errgo – if the error is not created by errgo, it will just return the error itself.

if os.IsNotExist(errgo.Cause(err)) {
    // If the file was not there, ignore
    // the fact that we cannot remove it.
    return nil
}

Sometimes we wish to create an error that preserves information about the error cascade, but also adds specific information. The errgo.Err type implements all the functionality of the errgo error stack in a form suitable for embedding into a custom error type. I’ll leave describing that to another blog post.

Converting existing code to use errgo

I converted the Juju code base (~200Kloc) to use errgo, by applying the following transformation rules:

This:

if someStatement; err != nil {
    return err
}

becomes:

if someStatement; err != nil {
    return errgo.Mask(err)
}

This:

fmt.Errorf("some text: %v", err)

becomes

errgo.Newf(err, "some text")

This:

errors.New("some text")

becomes:

errgo.New("some text")

This:

if someStatement; err == somethingButNotNil

becomes

if someStatement; errgo.Cause(err) == somethingButNotNil

This:

err.(T)

becomes:

errgo.Cause(err).(T)

As we use gocheck for testing, I also added some rules for conversion of gocheck test cases.

I forked gofix to add the above transformation rules.

The rules above are by no means complete (we rely heavily on the fact that we almost always name error variables “err”, for example), but were sufficient to transform the vast majority of the code base. There was still a fair amount of work to do. In particular, we need to change any Mask call to add the error types that are expected to be returned. This proved to be not too hard (in the vast majority of cases we really do want to mask the error) – an alternative approach is initially to let all errors through (using errgo.Any) and gradually move towards stricter errors.

Although the changes have not yet landed, some initial experiments have shown that the errors we get are vastly more informative. My belief is that the new scheme will leave us with a code base that’s more easily understood and refactored too.

Cancellation in Go, the Juju way

March 15, 2014

I read the recent Go blog post on pipelines and cancellation with interest because we have explored this area quite a bit where I work, in the Juju project.

Over the last couple of years, a pattern has emerged that works well for us, so I thought it might be illuminating to show an implementation of the same problem that uses that pattern.

The pattern centres around this simple interface.

type Worker interface {
    Kill()
    Wait() error
}

To implement some code that runs independently using its own goroutines, we define a type representing the task that the code performs, and implement Kill and Wait methods on it.

The interface contract is almost trivially simple: the Kill method asks the worker to die, but does not actually wait for it to die. The Wait method waits for a worker to die, either from natural causes (because it has completed its task or encountered an unrecoverable error), or because it was killed. In either case, it returns any error encountered before it shut down.

Both methods are idemopotent – it is ok to kill a worker many times, and Wait will always return the same thing. To stop a worker, we kill it and wait for it to quit:

func Stop(w Worker) error {
    w.Kill()
    return w.Wait()
}

An useful ingredient when implementing a Worker is Gustavo Niemeyer’s tomb package, which encapsulates some of the logic implemented in the Go pipelines blog post. In particular, it keeps track of the first error encountered and allows goroutines to monitor the current “liveness” status of a worker.

Here is a version of the pipelines blog post’s parallel.go code. In this example, the fileDigester type is our Worker. Just like the original code, we digest each file concurrently, sending the results on a channel. Unlike the original, though, the tomb takes on some of the burden of error propagation – when part of the worker dies unexpectedly, all the other workers will see that the tomb is dying and stop what they’re doing.

I have chosen to use Keith Rarick’s fs package instead of filepath.Walk because I think it makes the control flow more straightforward. The actual code to digest the file now gets its own function (filedigester.sumFile) and digests the file without reading the entire contents into memory. I have declared some methods on fileDigester as exported, even though fileDigester itself is not exported, to make it clear that they represent the “public” interface to the type.

Much of the teardown logic is encapsulated in the following code, which actually runs the main loop:

go func() {
    d.tomb.Kill(d.run(root))
    d.wg.Wait()
    close(d.out)
    d.tomb.Done()
}()

It runs the main loop and kills the tomb with the error that’s returned (this idiom means it’s easy to return in multiple places from the run method without needing to kill the tomb in each place). We then wait for all the outstanding digest goroutines to finish, close the results channel. The very last thing we do is to signal that the worker is done.

The bounded-concurrency version is here. Again, I have kept the basic approach as the original, with a pool of goroutines doing the digesting, reading paths from a channel. Even though we’re using a tomb, we’re still free to use other methods to shut down parts of the worker – in this case we close the paths channel to shut down the digester goroutines.

The code is somewhat longer, but I think that the structure is helpful and makes the code easier to reason about.

There is another advantage to making all workers implement a common interface. It enables us to write higher level functionality that manages workers. An example of this is worker.Runner, an API that makes it straightforward to manage a set of long-lived workers, automatically restarting them when they fail.

rjson – readable JSON

September 24, 2012

[Edited: after some feedback, I have renamed this project to rjson (it’s really not at all Go-specific) and changed the specification so that unquoted strings are accepted only as object keys]

JSON is a fine encoding. It has a very simple data model; it’s easy to understand and to write parsers for.  But personally, I find it a bit awkward to read and to edit. All those quotes add noise, and I’m always forgetting to remove the final comma at the end of an array or an object. I’m not the only one either. I’ve chatted to people about why they are using YAML, and the reason is usually not because of the zillion features that YAML offers, but because the format is more human-friendly to edit.

A while ago I had an idea of how I might help this, and yesterday I had a free day to do it. I forked the Go json package to make rjson. It uses an idea taken from the Go syntax rules to make commas optional, and quotes become optional around object keys that look like identifiers. The rjson syntax rules are summarised in the rjson package documentation.

To make it easy to experiment with and use this format, I created a rjson command that can read and write both formats.

Here is a transcript showing how the command can be used:

% cat config.json
{
  "ui" : {
    "global" : {
      "ui-type" : "default",
      "show-indicator" : true
    }
  }
}% rjson < config.json
{
	ui: {
		global: {
			show-indicator: true
			ui-type: "default"
		}
	}
}
% rjson -indent '' < config.json
{ui:{global:{show-indicator:true,ui-type:"default"}}}
% rjson -indent '' -j < config.json
{"ui":{"global":{"show-indicator":true,"ui-type":"default"}}}
%

You might notice that the compact version of the rjson format is smaller than the equivalent JSON. On a random selection of JSON files (all that I could find in my home directory), I measured that they were about 7% smaller on average when encoded with gson -indent ”. This was a nice bonus that I had not considered.

To use rjson, you’ll need a working Go installation; then you can fetch and install the command into your go tree thus:

go get launchpad.net/rjson/cmd/rjson

Enjoy!

Logging in Reverse

March 31, 2011

A recent post on the Go mailing list asked how to use the Go logging package to log messages in reverse order, so that they could be displayed most-recent-first in a console window.

The package gives no obvious way to do this, but the flexibility of Go interfaces makes it almost trivial to do. It took about 10 minutes to write the following code. The log package can use an arbitrary io.Writer for output so I defined a type, ReverseBuffer, with a Write method that stores all the data in a reverse-ordered linked list and a Reader method that returns an io.Reader that can be used to read the data back.

It would not take much longer to implement a size or message count limit on the list if desired.

I like the way that the language features worked together to make the design almost inevitable once I had thought of it. Once you have an implementation of an interface, everything else just works.

You can run it in the Go Playground to see it working.

package main

import (
        "io"
        "os"
        "log"
)

type msg struct {
        data []byte
        next *msg
}

// ReverseBuffer is an implementation of io.Writer that stores data such
// that it can later be read with the data from all writes reversed.
type ReverseBuffer struct {
        msgs *msg
}

type reverseReader struct {
        data []byte
        msgs *msg
}

func (w *ReverseBuffer) Write(data []byte) (int, os.Error) {
        if len(data) == 0 {
                return 0, nil
        }
        w.msgs = &msg{append([]byte(nil), data...), w.msgs}
        return len(data), nil
}

// Reader returns a new io.Reader that can be used to read all the data
// written to w.  The data from more recent writes is returned first.
func (w *ReverseBuffer) Reader() io.Reader {
        return &reverseReader{nil, w.msgs}
}

// Read implements io.Reader.Read.
func (r *reverseReader) Read(data []byte) (int, os.Error) {
        if len(r.data) == 0 {
                if r.msgs == nil {
                        return 0, os.EOF
                }
                r.data = r.msgs.data
                r.msgs = r.msgs.next
        }
        n := copy(data, r.data)
        r.data = r.data[n:]
        return n, nil
}

func main() {
        w := new(ReverseBuffer)
        out := log.New(w, "", log.Ldate|log.Ltime)
        out.Printf("one")
        out.Printf("two")
        out.Printf("three")
        io.Copy(os.Stdout, w.Reader())
}

Bidirectional RPC with netchan

February 10, 2011

Today’s firewall-riddled network makes it hard to set up ad hoc topologies of communicating programs. Opening up a single incoming port for a server to listen on is usually possible. If you want arbitrary numbers of ports, then it’s harder, and as for calling a client back, well, forget it.

Remote Procedure Call (RPC) is a common paradigm that is easy to use in this scenario. A client makes a connection to the server, sends some data to the server; the server operates on that data and sends back the results.

Say I want the control flow to go the other way? That’s OK too: the client makes a connection to the server, then acts in the RPC server role, reading data from the “server”, operating on it, and sending back the results.

Harder is when I want control flow to go both ways – for the client to be able to make requests of the server and for the server to make requests of clients. If I can manage two connections per client, it’s easy enough, but for the purposes of this article I assume that is not possible.

Luckily Go’s flexible interface-based APIs make this kind of thing quite straightforward. We can layer the Go RPC package on top of the Go netchan package. This article explains how, and uses the functionality to build a flexible client-server application where clients make their local file system available to the server, and any client can access any other client’s file system.

If you want to skip ahead and just read the example code, it’s here.

The RPC package can run neatly on top of any byte stream – all we need is an io.ReadWriteCloser at each end. Server.ServeConn will start an RPC server; NewClient will start the client end. We just need a way of layering a client to server byte stream onto netchan.

From netchan to Listener

Using the netchan package, we can send and receive data from several channels on the same underlying connection. If we use one channel of []byte in each direction, it is straightforward to make them appear as an io.ReadWriteCloser.

First the Read half. When we get a read request, we satisfy it by reading the channel. If the channel gives us too much data, we save it and return it in subsequent Read requests.

type chanReader struct {
	buf []byte
	c   <-chan []byte
}

func newChanReader(c <-chan []byte) *chanReader {
	return &chanReader{c: c}
}

func (r *chanReader) Read(buf []byte) (int, os.Error) {
	for len(r.buf) == 0 {
		r.buf = <-r.c
		if closed(r.c) {
			return 0, os.EOF
		}
	}
	n := copy(buf, r.buf)
	r.buf = r.buf[n:]
	return n, nil
}

Next we implement the write half. When we get a Write request, we write the data down the channel. Note that we need to copy the data because the function calling Write is likely to re-use the same buffer.

type chanWriter struct {
	c chan<- []byte
}

func newChanWriter(c chan<- []byte) *chanWriter {
	return &chanWriter{c: c}
}
func (w *chanWriter) Write(buf []byte) (n int, err os.Error) {
	b := make([]byte, len(buf))
	copy(b, buf)
	w.c <- b
	return len(buf), nil
}

Now we have built the first layer, but if we want to run an RPC service over this, then we need more. In fact, it would be nice if we could have an implementation of net.Listener. Then we could simply pass it to Accept.

Recall that the Listener interface looks like this:

type Listener interface {
    // Accept waits for and returns the next connection to the listener.
    Accept() (c net.Conn, err os.Error)

    // Close closes the listener.
    // The error returned is an os.Error to satisfy io.Closer;
    Close() os.Error

    // Addr returns the listener's network address.
    Addr() net.Addr
}

Each new connection must implement net.Conn, which requires more than just Read, Write and Close:

type Conn interface {
    Read(b []byte) (n int, err os.Error)
    Write(b []byte) (n int, err os.Error)
    Close() os.Error

    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    SetTimeout(nsec int64) os.Error
    SetReadTimeout(nsec int64) os.Error
    SetWriteTimeout(nsec int64) os.Error
}

Luckily we can easily provide dummy implementations of all the extra methods because none of their functionality is used by the rpc package. For details, see netchanConn.

Now comes an awkward bridge to cross. We need a way to establish a unique pair of channels between the netchan server and a given client. Netchan does not make it easy, but it is possible.

We do it by getting the server to generate an endless set of channel pairs and export each pair using a unique name. Then we use an auxilliary channel to send these names from the server to any client that wants a connection.

Here’s an excerpt from the paraphrased code that creates the Listener. The service variable holds a name that we are going to use as the “port name” for clients to connect to.

[Much of the code in this article has been paraphrased to omit the parts dealing with error checking and teardown, to make the main control flow more obvious.]

func Listen(exp *netchan.Exporter, service string) (net.Listener, os.Error) {
	r := &netchanListener{
		exp: exp,
		name: service,
		conns: make(chan net.Conn),
	}
	// Create the auxilliary channel and export it.
	clientNames := make(chan string)
	exp.Export(service, clientNames, netchan.Send)
	go func() {
		for i := 0; ; i++ {
			clientName := fmt.Sprintf("%s.%d", service, i)
			r.exporter(clientName)
			select{
			case clientNames <- clientName:
			case <-r.closed:
				// Listener has been closed, so stop exporting channels.
				return
			}
		}
	}()
	return r, nil
}

The exporter function creates two channels and exports them, spawns a goroutine to handle them, and returns.

func (r *netchanListener) exporter(clientName string) {
	req := make(chan []byte), 
	reply := make(chan []byte)
	r.exp.Export(clientName+".req", req, netchan.Recv)
	r.exp.Export(clientName+".reply", reply, netchan.Send)

	go func() {
		c := &netchanConn{
			chanReader: newChanReader(req),
			chanWriter: newChanWriter(reply),
		}
		r.conns <- c:
	}()
}

Now the Accept function is trivial (omitting teardown code again):

func (r *netchanListener) Accept() (c net.Conn, err os.Error) {
	return <-r.conns, nil
}

We can then define the client side. Note that we import only a single value from the auxilliary channel – this ensures that the server does not create many unnecessary channel pairs only for their names to be lost in the channel buffer.

func Dial(imp *netchan.Importer, service string) (net.Conn, os.Error) {
	// Import auxilliary channel.
	cnames := make(chan string)
	imp.ImportNValues(service, cnames, netchan.Recv, 1, 1)
	clientName := <-cnames

	// Import channel pair
	req := make(chan []byte)
	imp.Import(clientName + ".req", req, netchan.Send, 200)
	reply := make(chan []byte)
	imp.Import(clientName + ".reply", reply, netchan.Recv, 200)

	return &netchanConn{
		chanReader: &chanReader{c: reply},
		chanWriter: &chanWriter{c: req},
	}, nil
}

Between them, Listen and Dial provide the functionality we need to set up bidirectional RPC. I have made them available as the package at rog-go.googlecode.com/hg/ncnet.

Putting it together

Now that we have the Dial and Listener functionality over a netchan connection, we can use it to create a little application. It works like this:

  • Each client connects to a central server.
  • Each client exports an RPC interface to the server
  • The server exports an RPC interface to the clients

For this example, we want the client to export one method only: Read, which allows the server to ask the client for the contents of a file. The server will export two methods to the clients: List, to ask for a list of all currently connected clients, and Read to ask for the contents of a file on a given client.

We will start by defining the Server RPC type. It will hold one additional method to those listed above: Publish. This will be called by a client to tell the server that it is there, and to set up the server to client RPC link. Access to fields in the Server type is guarded by a mutex because RPC methods can be called concurrently.

type Server struct {
	mu       sync.Mutex
	clients  map[string]*rpc.Client		// All current clients.
	exp      *netchan.Exporter
	clientid int					// Unique client id generator.
}

The List method simply iterates over the list of clients and returns it. Note that the RPC package requires that both the argument and return types are represented as pointers. List takes no arguments, so we define it as taking a pointer to an empty struct.

func (srv *Server) List(_ *struct{}, names *[]string) os.Error {
	srv.mu.Lock()
	defer srv.mu.Unlock()
	for name := range srv.clients {
		*names = append(*names, name)
	}
	return nil
}

The Read method looks up a client and invokes its Read operation. Here, we’re turning client to server RPC into server to client RPC.

type ReadReq struct {
	Client string
	Path   string
}
func (srv *Server) Read(req *ReadReq, data *[]byte) os.Error {
	srv.mu.Lock()
	client := srv.clients[req.Client]
	srv.mu.Unlock()
	if client == nil {
		return os.ErrorString("unknown client")
	}
	return client.Call("Client.Read", &req.Path, data)
}

The Publish method is to be called by a client to establish the initial reverse-RPC connection with the server. It allocates a new name which it returns to the client, starting a new goroutine listening for a netchan connection on that name. When the connection is accepted, it starts a new RPC client instance on it and adds it to the Server’s list of clients.

func (srv *Server) Publish(name *string, clientId *string) os.Error {
	srv.mu.Lock()
	defer srv.mu.Unlock()
	*clientId = fmt.Sprintf("client%d", srv.clientid)
	srv.clientid++
	listener, _ := ncnet.Listen(srv.exp, *clientId)
	go func() {
		// Accept a single connection, then close the listener.
		conn, _ := listener.Accept()
		listener.Close()
		client := rpc.NewClient(conn)
		srv.mu.Lock()
		srv.clients[*name] = client
		srv.mu.Unlock()
	}()
	return nil
}

To start a new server, we create a new netchan Exporter, a ncnet Listener on top of that (“ctl” is just a port name that we choose by convention), and export an RPC server instance on that. The network address is given in addr (e.g. myserver.com:8080).

func main() {
	addr := os.Args[1]
	exp := netchan.NewExporter()
	exp.Listen("tcp", addr)
	listener, _ := ncnet.Listen(exp, "ctl")
	srv := &Server{
		exp:     exp,
		clients: make(map[string]*rpc.Client),
	}
	rpcsrv := rpc.NewServer()
	rpcsrv.Register(srv)
	rpcsrv.Accept(listener)
	listener.Close()
}

All that remains is the client side. There’s no state for the client, so we’ll just use an empty struct for the Client RPC type. The Read method takes a file name and returns the data as a []byte.

type Client struct{}

func Client(Client) Read(file *string, data *[]byte) (err os.Error) {
	f, err := os.Open(*file, os.O_RDONLY, 0)
	if err != nil {
		return err
	}
	*data, err = ioutil.ReadAll(f)
	return
}

To establish the initial client connection, we dial the server, establish a netchan link to the server’s RPC interface. Then we Publish our name and export the Client RPC type on the resulting connection. We can then interact with the server and the user as we please – in this case we provide a little command line interface reading commands from the user and executing them.

func main() {
	clientName := os.Args[1]
	addr := os.Args[2]

	imp, _ := netchan.NewImporter("tcp", addr)
	srvconn, _ := ncnet.Dial(imp, "ctl")
	srv := rpc.NewClient(srvconn)

	var clientId string
	srv.Call("Server.Publish", clientName, &clientId)

	clientsrv := rpc.NewServer()
	clientsrv.Register(Client{}); err != nil {
		log.Fatal("clientsrv register failed: ", err)
	}

	clientconn, err := ncnet.Dial(imp, clientId)
	if err != nil {
		log.Fatalf("ncnet dial %q failed: %v", clientId, err)
	}

	go clientsrv.ServeConn(clientconn)
	interact(srv)
	clientconn.Close()
}

Summing up

We’ve created something with quite a bit of flexibility here. The example program has the bare bones, but there are many other ways we could use the components. For instance, it would be easy to provide direct client-to-client communication by simply gluing two pipes together at the server side. Or a client could negotiate for upstream or downstream streams of data of any type.

It shows, I think, just how well the Go interface-based model works.

To try it out, build it like this:

goinstall rog-go.googlecode.com/hg/cmd/share
cd $GOROOT/src/pkg/rog-go.googlecode.com/hg/cmd/share
make

Then follow the instructions in doc.go to run it. An example client session:

% share -name second-client localhost:8765
> list
["first-client" "second-client"]
> read first-client /tmp/x
hello world
> ^D
% 

Lenses and Listeners in Go.

September 20, 2010

A little while ago, I developed a small experimental graphics package for Go, with the aim of giving full rein to the concurrency of the language and keeping things as modular and “building block”y as possible.

I wanted to allow an application to be structured as many independent agents, each interacting freely with the graphics environment, with independent communication between them.

I haven’t got there yet with the API design (although I’m happy with the way that some things work), but I do particularly like a package I created to solve one problem. It mixes functional style, interfaces and channels in a way that is seems to me to be uniquely “Go-like”. Whether it’s nice or useful I’ll leave you to decide.

Scratches on the wall

Imagine that the screen is a physical blackboard with several people writing on it. Each person can independently make marks on the board, and they can talk to others to help them decide what to write. Others in the room attend to different tasks, writing letters to the outside world, talking amongst themselves and to the people at the blackboard.

With goroutines as people, and talking as communication over a channel, this is how I imagine the graphical environment could be in Go. There is one difficulty, however, and that’s the problem of deadlock. In graphical environments, there are often several elements that reflect one another – for instance, a slider bar might have a textual display of its value, and the value might be used to determine the graphical appearance of some other element.

If each element is represented with a goroutine, then the above setup implies a cyclical dependency. The slider bar can be dragged, the textual display may be edited, and the graphical appearance may be changed as a result of some other action – each of which implies a change to the other two elements.

Naively, we might model this system something like this:

	func SliderElement(setc <-chan float64, getc chan<- float64) {
		var currentValue float64
		for {
			select{
			case currentValue = <-setc:
	
			case m := <-MouseEventChannel:
				currentValue = mousePointToValue(m)
				getc <- currentValue
			}
			drawSlider(currentValue)
		}
	}
			
	func TextElement(setc chan<- string, getc <-chan string) {
		<i>... similar to SliderElement</i>
	}
	
	func GraphicalElement(setc chan <-float64, getc <-chan float64) {
		<i>... also similar to SliderElement</i>
	}
	
	func setup() {
		sliderSet := make(chan float64)
		sliderGet := make(chan float64)
		go SliderElement(sliderSet, sliderGet)
	
		textSet := make(chan string)
		textGet := make(chan string)
		go TextElement(textSet, textGet)
	
		graphicalSet := make(chan float64)
		graphicalGet := make(chan float64)
		go GraphicalElement(graphicalSet, graphicalGet)

		// communications goroutine
		go func() {
			select {
			case v := <-sliderGet:
				textSet <- fmt.Sprint(v)
				graphicalSet <- v
			case v := <-textGet:
				f, err := strconv.Atof(v)
				if err != nil {
					// what now?
					break
				}
				sliderSet <- f
				graphicalSet <- v
			case v := <-graphicalGet:
				textSet <- fmt.Sprint(v)
				sliderSet <- v
			}
		}()
	}

There is are a few problems with this approach, but one show-stopper: it can deadlock. If the communications goroutine happens to be sending on graphicalSet at the same time that the graphical element has decided that it will change, then the two will block forever trying to perform the operation, and the whole thing will hang up. This is fairly unlikely if all the “set” actions are triggered by user mouse events, but not impossible. When independent actions are added to the mix, for example when triggered by a network communication, then it becomes much more likely.

Other problems with the above structure:

  • The number of elements is hard-wired into the communications goroutine. How would we introduce a variable number of graphical elements depending
    on the value?

  • The update speed of one element is linked to the update speed of all of them. If one of the elements is slow to update (for example, the graphical element might be a picture that takes 200ms to redraw), then
    the slider will be sluggish to react.

  • There is no way for the text element to know that the user has entered some invalid text without building knowledge of the acceptable format into it.

The Value of Listening

As one possible solution to this issue, I implemented the “values” package (rog-go.googlecode.com/hg/values). Instead of elements talking directly to one another, they talk to a proxy, a Value, which acts as an intermediary. Any goroutine can set the contents of a value or ask to be notified when the value changes.
The Value type is an interface as follows:

	type Value interface {
		Set(val interface{}) os.Error
		Iter() <-chan interface{}
		Close()
		Type() reflect.Type
	}

To create a new Value, we use values.NewValue:

	v := values.NewValue(0.5)

This creates a new Value holding a single floating point number, 0.5. We can set its contents with Set:

	v.Set(0.6)

Despite the interface{} argument to Set, only a single type of value may be stored, fixed by the initial value passed to NewValue.

	v.Set("hello")		// runtime error

If we want, we can find out that type:

	fmt.Printf("%v\n", v.Type())		// prints "float64"

If we wish to observe changes to the value, we use Iter. This code will start a new goroutine that prints the value each time it changes:

	c := v.Iter()
	go func() {
		for x := range c {
			fmt.Printf("%v\n", x)
		}
	}()

One key property of Value is that Set never waits for the observers to receive the value – in fact, it never blocks. This means that two goroutines that are observing changes to a Value can call Set on that value without risk of deadlock.

There is, however, aother risk: calling Set in response to a value received on the Iter channel will lead to livelock:

	c := v.Iter()
	go func() {
		// this loops forever, printing the same value.
		for x := range c {
			fmt.Printf("%v\n", x)
			c.Set(x)
		}
	}()

As a result, we have a rule: do not call Set in response to a notification of a changed value.

With the primitives we now have, we can join the slider to the graphical element. The code for the Elements is still fairly similar, but the setup() glue becomes much simpler:

	func SliderElement(v Value) {
		c := v.Iter()
		currentValue := <-c.(float64)
		for {
			select{
			case currentValue = <-c:

			case m := <-MouseEventChannel:
				currentValue = mousePointToValue(m)
				v.Set(currentValue)
			}
			drawSlider(currentValue)
		}
	}

	func GraphicalElement(v Value) {
		... similar to SliderElement
	}

	func setup() {
		v := values.NewValue(0.5)
		go SliderElement(v)
		go GraphicalElement(v)
	}

Here we have linked two elements – each can be independently manipulated, but each one always shows the current value. We would like to link in the text element too, but there’s one problem: the text element uses strings, not float64s.

If we could create a value that mirrors the original value, but using a textual representation instead of the original float64 representation, then implementing TextElement would be simple. As it happens (ta da!) the values package makes that quite straightforward.

In focus

“Lens” is a term (probably misborrowed) from functional language terminology. The analogy is the way that light can pass through a glass lens in either direction, bending one way on the way in, and the other on the way out.

I represent a Lens as a pair of functions, one implementing the inverse transformation to the other.

	func stringToFloat(s string) (float64, os.Error) { return strconv.Atof(s) }
	
	func float64ToString(f float64) (string, os.Error)  { return fmt.Sprint(f), nil }
	
	var stringToFloatLens = values.NewLens(stringToFloat, float64ToString)

Despite the analogy, a Go Lens is actually unidirectional.

	x, _ := stringToFloat64Lens.Transform("1.34e99")
	fmt.Printf("%g\n", x.(float64))

To transform in the opposite direction, the Lens can be reversed:

	float64ToStringLens := stringToFloat64Lens.Reverse()
	x, _ := float64ToStringLens.Transform(1.34e99)
	fmt.Printf("%s\n", x.(string))

We can transform the type of a Value using values.Transform and a Lens.Values passed to Set are passed through the lens one way; values received from Iter are passed the other way. Thus we can complete our setup function by adding the third element:

	func setup() {
		v := values.NewValue(0.5)
		go SliderElement(v)
		go GraphicalElement(v)

		sv := values.Transform(v, float64ToStringLens)
		go TextElement(sv)
	}

One more piece completes the toolkit: Lenses can be combined. Suppose, for example, that the Slider value ranges between 0 and 1, whereas the value expected by the graphical element and displayed in the text element varies from -180 to +180.

The values package provides a lens which implements this kind of transformation: UnitFloat2RangedFloat.
Here’s how it might be used:

	func setup() {
		v := values.NewValue(0.5)
		unit2angle := values.UnitFloat2RangedFloat(-180, 180)
		go SliderElement(v)
		go GraphicalElement(v.Transform(unit2angle))
		go TextElement(v.Transform(unit2angle.Combine(float64ToStringLens)))
	}

The observant might note that there are actually two ways of doing this. Without using Combine, we could apply two successive transformations using values.Transform: The two forms are subtly different – values.Transform introduces a new goroutine, and one more buffer element into the channel, whereas Combine does not.

In a subsequent blog post, I plan to talk a little more about the kinds of structures that can be built with this primitive, how it is implemented and performance considerations.

Unlimited Buffering with Low Overhead

February 10, 2010

In Go, channels have a fixed length buffer. Sometimes it is useful to add a buffer of unlimited length to a channel (here is an example). The first question is what the interface should look like. I can think of three immediate possibilities (assume T is an arbitrary type – if Go had generics, this would be a generic function):

Given a channel, make sure that no writes to that channel will
block, and return a channel from which the buffered values can be read:

func Buffer(in <-chan T) <-chan T

Given a channel, return a channel that will buffer writes
to that channel:

func Buffer(out chan<- T) chan <-T

Given two channels, connect them via a buffering process:

func Buffer(in <-chan T, out chan<- T)

Of these possibilities, on balance I think I prefer the second, as no operations will be performed on the original channel except when a value is written on the returned channel.

I’d be interested in hearing arguments for or against the other possibilities.

Here is one simple, and relatively slow implementation. It uses the doubly-linked list implementation from the Go library. I timed it at 2076ns per item transferred on my machine. Note the code that runs before the select statement each time through the loop, which works out whether we want to be sending a value, and when it is time to finish. This relies on the fact that in a Go select statement, operations on nil channels are ignored.

import "container/list"
func BufferList(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var buf = list.New()
		for {
			outc := out
			var v T
			n := buf.Len()
			if n == 0 {
				// buffer empty: don't try to send on output
				if in == nil {
					close(out)
					return
				}
				outc = nil
			}else{
				v = buf.Front().Value.(T)
			}
			select {
			case e := <-in:
				if closed(in) {
					in = nil
				} else {
					buf.PushBack(e)
				}
			case outc <- v:
				buf.Remove(buf.Front())
			}
		}
	}()
	return in
}

The above implementation allocates a new linked list item for every value transferred. Here’s an alternative implementation that uses an array as a circular buffer, amortising allocations over time by doubling the size of the buffer when it overflows, and shrinking it when there is too much space. Although the basic structure is similar, the code is more complex, and the time saving is modest – I timed it at 1729ns per item transferred, an improvement of 17%. Removing the code to shrink the buffer does not make it significantly faster.

func BufferRingOrig(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var zero T
		var buf = make([]T, 10)
		var i = 0 // location of first value in buffer.
		var n = 0 // number of items in buffer.
		for {
			outc := out
			switch {
			case n == 0:
				// buffer empty: don't try to send on output
				if in == nil {
					close(out)
					return
				}
				outc = nil

			case n == len(buf):
				// buffer full: expand it
				b := make([]T, n*2)
				copy(b, buf[i:])
				copy(b[n-i:], buf[0:i])
				i = 0
				buf = b

			case len(buf) > 128 && n*3 < len(buf):
				// buffer too big, shrink it
				b := make([]T, len(buf) / 2)
				j := i + n
				if j > len(buf) {
					// wrap around
					k := j - len(buf)
					j = len(buf)
					copy(b, buf[i:j])
					copy(b[j - i:], buf[0:k])
				}else{
					// contiguous
					copy(b, buf[i:j])
				}
				i = 0
				buf = b
			}
			select {
			case e := <-in:
				if closed(in) {
					in = nil
				} else {
					j := i + n
					if j >= len(buf) {
						j -= len(buf)
					}
					buf[j] = e
					n++
				}
			case outc <- buf[i]:
				buf[i] = zero
				if i++; i == len(buf) {
					i = 0
				}
				n--
			}
		}
	}()
	return in
}

I wondered if the unnecessary tests before the select statement were making any significant difference to the time taken. Although it makes it easy to preserve the invariants, there is no need to test whether the buffer is empty when a value has just been placed in it, for example.

Here is a version that only does the tests when necessary. Interestingly, this change actually made the code run marginally slower (1704ns per item)

func BufferRing(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var zero T
		var buf = make([]T, 10)
		var i = 0 // location of first value in buffer.
		var n = 0 // number of items in buffer.
		var outc chan<- T
		for {
			select {
			case e := <-in:
				if closed(in) {
					in = nil
					if n == 0 {
						close(out)
						return
					}
				} else {
					j := i + n
					if j >= len(buf) {
						j -= len(buf)
					}
					buf[j] = e
					n++
					if n == len(buf) {
						// buffer full: expand it
						b := make([]T, n*2)
						copy(b, buf[i:])
						copy(b[n-i:], buf[0:i])
						i = 0
						buf = b
					}
					outc = out
				}
			case outc <- buf[i]:
				buf[i] = zero
				if i++; i == len(buf) {
					i = 0
				}
				n--
				if n == 0 {
					// buffer empty: don't try to send on output
					if in == nil {
						close(out)
						return
					}
					outc = nil
				}
				if len(buf) > 128 && n*3 < len(buf) {
					// buffer too big, shrink it
					b := make([]T, len(buf) / 2)
					j := i + n
					if j > len(buf) {
						// wrap around
						k := j - len(buf)
						j = len(buf)
						copy(b, buf[i:j])
						copy(b[j - i:], buf[0:k])
					}else{
						// contiguous
						copy(b, buf[i:j])
					}
					i = 0
					buf = b
				}
			}
		}
	}()
	return in
}

Although the speed improvement from the above piece of code was disappointing, the change paves the way for a change that really does make a difference. A select statement in Go is significantly more costly than a regular channel operation. In the code below, we loop receiving or sending values as long as we can do so without blocking. Here’s a version of the list-based code that does this. I measured it at 752ns per item, an improvement of 63% over the original, or 2.7x faster.

func BufferListCont(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var buf = list.New()
		var outc chan<- T
		var v T
		for {
			select {
			case e := <-in:
				if buf.Len() == 0 && !closed(in) {
					outc = out
					v = e
				}
				for {
					if closed(in) {
						in = nil
						if buf.Len() == 0 {
							close(out)
							return
						}
						break
					}
					buf.PushBack(e)
					var ok bool
					if e, ok = <-in; !ok {
						break
					}
				}
	
			case outc <- v:
				for {
					buf.Remove(buf.Front())
					if buf.Len() == 0 {
						// buffer empty: don't try to send on output
						if in == nil {
							close(out)
							return
						}
						outc = nil
						break
					}
					v = buf.Front().Value.(T)
					if ok := outc <- v; !ok {
						break
					}
				}
			}
		}
	}()
	return in
}

One objection to the above code is that in theory if there was a fast enough producer on another processor, the buffer process could spend forever feeding values into the buffer, without ever trying to write them out. Although I believe that in practice the risk is negligible, it’s easy to guard against anyway, by only adding a fixed maximum number of values before returning to the select statement.

Here’s my final implementation, using the looping technique and with the guard added in.

I timed it at 427ns per item transferred, an improvement of 79% over the original version, or almost 5x faster. Using a buffered channel directly is only 2.4x faster than this.

func BufferRingContCheck(out chan<- T) chan<- T {
	in := make(chan T, 100)
	go func() {
		var zero T
		var buf = make([]T, 10)
		var i = 0 // location of first value in buffer.
		var n = 0 // number of items in buffer.
		var outc chan<- T
		for {
			select {
			case e := <-in:
				for added := 0; added < 1000; added++ {
					if closed(in) {
						in = nil
						if n == 0 {
							close(out)
							return
						}
						break
					}
					j := i + n
					if j >= len(buf) {
						j -= len(buf)
					}
					buf[j] = e
					n++
					outc = out		// enable output
					if n == len(buf) {
						// buffer full: expand it
						b := make([]T, n*2)
						copy(b, buf[i:])
						copy(b[n-i:], buf[0:i])
						i = 0
						buf = b
					}
					var ok bool
					if e, ok = <-in; !ok {
						break
					}
				}
			case outc <- buf[i]:
				for {
					buf[i] = zero
					if i++; i == len(buf) {
						i = 0
					}
					n--
					if n == 0 {
						// buffer empty: don't try to send on output
						if in == nil {
							close(out)
							return
						}
						outc = nil
						break
					}
					if len(buf) > 128 && n*3 < len(buf) {
						// buffer too big, shrink it
						b := make([]T, len(buf) / 2)
						j := i + n
						if j > len(buf) {
							// wrap around
							k := j - len(buf)
							j = len(buf)
							copy(b, buf[i:j])
							copy(b[j - i:], buf[0:k])
						}else{
							// contiguous
							copy(b, buf[i:j])
						}
						i = 0
						buf = b
					}
					if ok := outc <- buf[i]; !ok {
						break
					}
				}
			}
		}
	}()
	return in
}

Obviously the final code is significantly bigger and more complex than the original. Which implementation should we choose? Lacking generics, this code cannot usefully be put into a library, as most channels are not of type chan interface{}.

Given this, in most instances, perhaps the first version is to be preferred, as it’s smaller to cut and paste, and easier to understand. In cases where performance is crucial, the final version can easily be substituted.

Perhaps there’s another faster technique that I haven’t found yet. I’d be interested to hear any ideas on the matter.

The code with all the tests and benchmarks can be found here.

Select functions for Go

January 4, 2010

In this thread, Rowan Davies wrote:

There’s no STM implementation for Go, as far as I know. STM doesn’t fit so well with message passing channels, which Go adopts as another quite good alternative to locks. When used well they can avoid the compositionality issues with locks, and have less of a performance cost compared to STM – but STM arguably scales better to complicated situations in terms of the ease of avoiding deadlocks and the like.

STM isn’t as expressive as channels – you can’t build channels within STM (although you can layer them on top of it, which then loses some of the compositionality benefits)

Channels have their own compositionality issues – if you want to be able to alt on something, you need to have access to the raw channel, but the interface might require other things to happen after the channel operation – these must be exposed by the interface, which is undesirable.

For example, in some example code I wrote in a previous post, there’s some code to read a value:

func (r *Receiver) Read() interface{} {
	b := <-r.C
	v := b.v
	r.C <- b
	r.C = b.c
	return v
}

It would be nice if we could have this Read as part of a select statement. Currently, the only way to do this is to make the channel publicly readable, and have a function that the user must remember to call with the value read from the channel:

func (r *Receiver) DoneRead(b broadcast) interface{} {
	v := b.v
	r.C <- b
	r.C = b.c
	return v
}

select {
case b := <-r.C:
    v := r.DoneRead(b)
    ...
....
}

This is error-prone – and more importantly, it breaks encapsulation by exposing the internal-only “broadcast” type.

For a nice way around these problems, I’ve been thinking that something like the select functions provided by the XC language might work well in Go.

A select function would be similar to a normal function except that its top level contains arms of a select statement:

selectfunc read(r *Receiver) interface{} {
    	case b := <-r.C:
	    v := b.v
	    r.C <- b
	    r.C = b.c
	    return v
}

Then you could do:

select {
case v := read(&r):
    .... do something with v
}

i.e. a select function can be used in place of a channel operation in any select arm. Using the select function in an expression would just block as normal.

Select functions seem enough like normal functions that one might ask whether they could be methods too. Given the current syntax. which doesn’t use the func keyword inside interface declarations, I’d say no. And they’re not strictly speaking necessary either. Something not too far from the original interface can be obtained by returning a selectfunc as a closure:

func (r *Receiver) Reader() (selectfunc () interface{}) {
 	return selectfunc() interface{} {
    		case b := <-r.C:
			v := b.v
			r.C <- b
			r.C = b.c
			return v
	}
}

Then you’d do:

select {
case v := r.Reader()() {
	...
}

Not entirely ideal, but quite feasible.

I think there are quite a few benefits to be gained from implementing something like this. And the implementation should be reasonably straightforward – the main implication, as far as I can see, is that the number of arms in an alt statement would not always be statically determinable.

What do people think?

Concurrent Idioms #1: Broadcasting values in Go with linked channels.

December 1, 2009

Channels in Go are powerful things, but it’s not always obvious how to get them to accomplish certain tasks. One of those tasks is one-to-many communication. Channels work very well if lots of writers are funneling values to a single reader, but it’s not immediately clear how multiple readers can all wait for values from a single writer.

Here’s what a Go API for doing this might look like:

type Broadcaster ...

func NewBroadcaster() Broadcaster
func (b Broadcaster) Write(v interface{})
func (b Broadcaster) Listen() chan interface{}

The broadcast channel is created with NewBroadcaster, and values can be written to it with Write. To listen on the channel, we call Listen, which gives us a new channel from which we can receive the values written.

The solution that immediately comes to mind (which I’ve used in the past) is to have an intermediate process that keeps a registry of all the reading processes. Each call to Listen adds a new channel to the registry, and the central process’s main loop might look something like this:

for {
        select {
        case v := <-inc:
                for _, c := range(listeners) {
                        c <- v
                }
        case c := <-registryc:
                listeners.push(c);
        }
}

This is the conventional way of doing things. It’s also probably the most sensible method. The process writing values will block until all the readers have read the previous value. An alternative might be to maintain an output buffer for each reader, which could either grow as necessary, or we could discard values when the buffer has filled up.

But this post isn’t about the sensible way of doing things. This post is about an implementation where the writer never blocks; a slow reader with a fast writer can fill all of memory if it goes on for long enough. And it’s not particularly efficient.

But I don’t care much, because I think this is cool. And I might find a genuine use for it some day.

Here’s the heart of it:

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

This is what I call a “linked channel” (analagously to a linked list). But even more than a linked list, it’s an Ouroboros data structure. That is, an instance of the structure can be sent down the channel that is inside itself.

Or the other way around. If I have a value of type chan broadcast around, then I can read a broadcast value b from it, giving me the arbitrary value b.v, and another value of the original type, b.c, allowing me to repeat the process.

The other part of the puzzle comes from the way that a buffered channel can used as a one-use one-to-many broadcast object. If I’ve got a buffered channel of some type T:

var c = make(chan T, 1)

then any process reading from it will block until a value is written.
When we want to broadcast a value, we simply write it to the channel. This value will only go to a single reader, however we observe the convention that if you read a value from the channel, you always put it back immediately.

func wait(c chan T) T {
	v := <-c
	c <- v;
	return v;
}

Putting the two pieces together, we can see that if the channel inside the broadcast struct is buffered in the above way, then we can get an endless stream of one-use one-to-many broadcast channels, each with an associated value.

Here’s the code:

package broadcast

type broadcast struct {
	c	chan broadcast;
	v	interface{};
}

type Broadcaster struct {
	// private fields:
	Listenc	chan chan (chan broadcast);
	Sendc	chan<- interface{};
}

type Receiver struct {
	// private fields:
	C chan broadcast;
}

// create a new broadcaster object.
func NewBroadcaster() Broadcaster {
	listenc := make(chan (chan (chan broadcast)));
	sendc := make(chan interface{});
	go func() {
		currc := make(chan broadcast, 1);
		for {
			select {
			case v := <-sendc:
				if v == nil {
					currc <- broadcast{};
					return;
				}
				c := make(chan broadcast, 1);
				b := broadcast{c: c, v: v};
				currc <- b;
				currc = c;
			case r := <-listenc:
				r <- currc
			}
		}
	}();
	return Broadcaster{
		Listenc: listenc,
		Sendc: sendc,
	};
}

// start listening to the broadcasts.
func (b Broadcaster) Listen() Receiver {
	c := make(chan chan broadcast, 0);
	b.Listenc <- c;
	return Receiver{<-c};
}

// broadcast a value to all listeners.
func (b Broadcaster) Write(v interface{})	{ b.Sendc <- v }

// read a value that has been broadcast,
// waiting until one is available if necessary.
func (r *Receiver) Read() interface{} {
	b := <-r.C;
	v := b.v;
	r.C <- b;
	r.C = b.c;
	return v;
}

This implementastion has the nice property that there’s no longer any need for a central registry of listeners. A Receiver value encapsulates a place in the stream of values and can be copied at will – each copy will receive an identical copy of the stream. There’s no need for an Unregister function either. Of course, if the readers don’t keep up with the writers, memory can be used indefinitely, but… isn’t this quite neat?

Here’s some example code using it:

package main

import (
	"fmt";
	"broadcast";
	"time";
)

var b = broadcast.NewBroadcaster();

func listen(r broadcast.Receiver) {
	for v := r.Read(); v != nil; v = r.Read() {
		go listen(r);
		fmt.Println(v);
	}
}

func main() {
	r := b.Listen();
	go listen(r);
	for i := 0; i  < 10; i++ {
		b.Write(i);
	}
	b.Write(nil);

	time.Sleep(3 * 1e9);
}

Exercise for the reader: without running the code, how many lines will this code print?