Bidirectional RPC with netchan

Today’s firewall-riddled network makes it hard to set up ad hoc topologies of communicating programs. Opening up a single incoming port for a server to listen on is usually possible. If you want arbitrary numbers of ports, then it’s harder, and as for calling a client back, well, forget it.

Remote Procedure Call (RPC) is a common paradigm that is easy to use in this scenario. A client makes a connection to the server, sends some data to the server; the server operates on that data and sends back the results.

Say I want the control flow to go the other way? That’s OK too: the client makes a connection to the server, then acts in the RPC server role, reading data from the “server”, operating on it, and sending back the results.

Harder is when I want control flow to go both ways – for the client to be able to make requests of the server and for the server to make requests of clients. If I can manage two connections per client, it’s easy enough, but for the purposes of this article I assume that is not possible.

Luckily Go’s flexible interface-based APIs make this kind of thing quite straightforward. We can layer the Go RPC package on top of the Go netchan package. This article explains how, and uses the functionality to build a flexible client-server application where clients make their local file system available to the server, and any client can access any other client’s file system.

If you want to skip ahead and just read the example code, it’s here.

The RPC package can run neatly on top of any byte stream – all we need is an io.ReadWriteCloser at each end. Server.ServeConn will start an RPC server; NewClient will start the client end. We just need a way of layering a client to server byte stream onto netchan.

From netchan to Listener

Using the netchan package, we can send and receive data from several channels on the same underlying connection. If we use one channel of []byte in each direction, it is straightforward to make them appear as an io.ReadWriteCloser.

First the Read half. When we get a read request, we satisfy it by reading the channel. If the channel gives us too much data, we save it and return it in subsequent Read requests.

type chanReader struct {
	buf []byte
	c   <-chan []byte
}

func newChanReader(c <-chan []byte) *chanReader {
	return &chanReader{c: c}
}

func (r *chanReader) Read(buf []byte) (int, os.Error) {
	for len(r.buf) == 0 {
		r.buf = <-r.c
		if closed(r.c) {
			return 0, os.EOF
		}
	}
	n := copy(buf, r.buf)
	r.buf = r.buf[n:]
	return n, nil
}

Next we implement the write half. When we get a Write request, we write the data down the channel. Note that we need to copy the data because the function calling Write is likely to re-use the same buffer.

type chanWriter struct {
	c chan<- []byte
}

func newChanWriter(c chan<- []byte) *chanWriter {
	return &chanWriter{c: c}
}
func (w *chanWriter) Write(buf []byte) (n int, err os.Error) {
	b := make([]byte, len(buf))
	copy(b, buf)
	w.c <- b
	return len(buf), nil
}

Now we have built the first layer, but if we want to run an RPC service over this, then we need more. In fact, it would be nice if we could have an implementation of net.Listener. Then we could simply pass it to Accept.

Recall that the Listener interface looks like this:

type Listener interface {
    // Accept waits for and returns the next connection to the listener.
    Accept() (c net.Conn, err os.Error)

    // Close closes the listener.
    // The error returned is an os.Error to satisfy io.Closer;
    Close() os.Error

    // Addr returns the listener's network address.
    Addr() net.Addr
}

Each new connection must implement net.Conn, which requires more than just Read, Write and Close:

type Conn interface {
    Read(b []byte) (n int, err os.Error)
    Write(b []byte) (n int, err os.Error)
    Close() os.Error

    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    SetTimeout(nsec int64) os.Error
    SetReadTimeout(nsec int64) os.Error
    SetWriteTimeout(nsec int64) os.Error
}

Luckily we can easily provide dummy implementations of all the extra methods because none of their functionality is used by the rpc package. For details, see netchanConn.

Now comes an awkward bridge to cross. We need a way to establish a unique pair of channels between the netchan server and a given client. Netchan does not make it easy, but it is possible.

We do it by getting the server to generate an endless set of channel pairs and export each pair using a unique name. Then we use an auxilliary channel to send these names from the server to any client that wants a connection.

Here’s an excerpt from the paraphrased code that creates the Listener. The service variable holds a name that we are going to use as the “port name” for clients to connect to.

[Much of the code in this article has been paraphrased to omit the parts dealing with error checking and teardown, to make the main control flow more obvious.]

func Listen(exp *netchan.Exporter, service string) (net.Listener, os.Error) {
	r := &netchanListener{
		exp: exp,
		name: service,
		conns: make(chan net.Conn),
	}
	// Create the auxilliary channel and export it.
	clientNames := make(chan string)
	exp.Export(service, clientNames, netchan.Send)
	go func() {
		for i := 0; ; i++ {
			clientName := fmt.Sprintf("%s.%d", service, i)
			r.exporter(clientName)
			select{
			case clientNames <- clientName:
			case <-r.closed:
				// Listener has been closed, so stop exporting channels.
				return
			}
		}
	}()
	return r, nil
}

The exporter function creates two channels and exports them, spawns a goroutine to handle them, and returns.

func (r *netchanListener) exporter(clientName string) {
	req := make(chan []byte), 
	reply := make(chan []byte)
	r.exp.Export(clientName+".req", req, netchan.Recv)
	r.exp.Export(clientName+".reply", reply, netchan.Send)

	go func() {
		c := &netchanConn{
			chanReader: newChanReader(req),
			chanWriter: newChanWriter(reply),
		}
		r.conns <- c:
	}()
}

Now the Accept function is trivial (omitting teardown code again):

func (r *netchanListener) Accept() (c net.Conn, err os.Error) {
	return <-r.conns, nil
}

We can then define the client side. Note that we import only a single value from the auxilliary channel – this ensures that the server does not create many unnecessary channel pairs only for their names to be lost in the channel buffer.

func Dial(imp *netchan.Importer, service string) (net.Conn, os.Error) {
	// Import auxilliary channel.
	cnames := make(chan string)
	imp.ImportNValues(service, cnames, netchan.Recv, 1, 1)
	clientName := <-cnames

	// Import channel pair
	req := make(chan []byte)
	imp.Import(clientName + ".req", req, netchan.Send, 200)
	reply := make(chan []byte)
	imp.Import(clientName + ".reply", reply, netchan.Recv, 200)

	return &netchanConn{
		chanReader: &chanReader{c: reply},
		chanWriter: &chanWriter{c: req},
	}, nil
}

Between them, Listen and Dial provide the functionality we need to set up bidirectional RPC. I have made them available as the package at rog-go.googlecode.com/hg/ncnet.

Putting it together

Now that we have the Dial and Listener functionality over a netchan connection, we can use it to create a little application. It works like this:

  • Each client connects to a central server.
  • Each client exports an RPC interface to the server
  • The server exports an RPC interface to the clients

For this example, we want the client to export one method only: Read, which allows the server to ask the client for the contents of a file. The server will export two methods to the clients: List, to ask for a list of all currently connected clients, and Read to ask for the contents of a file on a given client.

We will start by defining the Server RPC type. It will hold one additional method to those listed above: Publish. This will be called by a client to tell the server that it is there, and to set up the server to client RPC link. Access to fields in the Server type is guarded by a mutex because RPC methods can be called concurrently.

type Server struct {
	mu       sync.Mutex
	clients  map[string]*rpc.Client		// All current clients.
	exp      *netchan.Exporter
	clientid int					// Unique client id generator.
}

The List method simply iterates over the list of clients and returns it. Note that the RPC package requires that both the argument and return types are represented as pointers. List takes no arguments, so we define it as taking a pointer to an empty struct.

func (srv *Server) List(_ *struct{}, names *[]string) os.Error {
	srv.mu.Lock()
	defer srv.mu.Unlock()
	for name := range srv.clients {
		*names = append(*names, name)
	}
	return nil
}

The Read method looks up a client and invokes its Read operation. Here, we’re turning client to server RPC into server to client RPC.

type ReadReq struct {
	Client string
	Path   string
}
func (srv *Server) Read(req *ReadReq, data *[]byte) os.Error {
	srv.mu.Lock()
	client := srv.clients[req.Client]
	srv.mu.Unlock()
	if client == nil {
		return os.ErrorString("unknown client")
	}
	return client.Call("Client.Read", &req.Path, data)
}

The Publish method is to be called by a client to establish the initial reverse-RPC connection with the server. It allocates a new name which it returns to the client, starting a new goroutine listening for a netchan connection on that name. When the connection is accepted, it starts a new RPC client instance on it and adds it to the Server’s list of clients.

func (srv *Server) Publish(name *string, clientId *string) os.Error {
	srv.mu.Lock()
	defer srv.mu.Unlock()
	*clientId = fmt.Sprintf("client%d", srv.clientid)
	srv.clientid++
	listener, _ := ncnet.Listen(srv.exp, *clientId)
	go func() {
		// Accept a single connection, then close the listener.
		conn, _ := listener.Accept()
		listener.Close()
		client := rpc.NewClient(conn)
		srv.mu.Lock()
		srv.clients[*name] = client
		srv.mu.Unlock()
	}()
	return nil
}

To start a new server, we create a new netchan Exporter, a ncnet Listener on top of that (“ctl” is just a port name that we choose by convention), and export an RPC server instance on that. The network address is given in addr (e.g. myserver.com:8080).

func main() {
	addr := os.Args[1]
	exp := netchan.NewExporter()
	exp.Listen("tcp", addr)
	listener, _ := ncnet.Listen(exp, "ctl")
	srv := &Server{
		exp:     exp,
		clients: make(map[string]*rpc.Client),
	}
	rpcsrv := rpc.NewServer()
	rpcsrv.Register(srv)
	rpcsrv.Accept(listener)
	listener.Close()
}

All that remains is the client side. There’s no state for the client, so we’ll just use an empty struct for the Client RPC type. The Read method takes a file name and returns the data as a []byte.

type Client struct{}

func Client(Client) Read(file *string, data *[]byte) (err os.Error) {
	f, err := os.Open(*file, os.O_RDONLY, 0)
	if err != nil {
		return err
	}
	*data, err = ioutil.ReadAll(f)
	return
}

To establish the initial client connection, we dial the server, establish a netchan link to the server’s RPC interface. Then we Publish our name and export the Client RPC type on the resulting connection. We can then interact with the server and the user as we please – in this case we provide a little command line interface reading commands from the user and executing them.

func main() {
	clientName := os.Args[1]
	addr := os.Args[2]

	imp, _ := netchan.NewImporter("tcp", addr)
	srvconn, _ := ncnet.Dial(imp, "ctl")
	srv := rpc.NewClient(srvconn)

	var clientId string
	srv.Call("Server.Publish", clientName, &clientId)

	clientsrv := rpc.NewServer()
	clientsrv.Register(Client{}); err != nil {
		log.Fatal("clientsrv register failed: ", err)
	}

	clientconn, err := ncnet.Dial(imp, clientId)
	if err != nil {
		log.Fatalf("ncnet dial %q failed: %v", clientId, err)
	}

	go clientsrv.ServeConn(clientconn)
	interact(srv)
	clientconn.Close()
}

Summing up

We’ve created something with quite a bit of flexibility here. The example program has the bare bones, but there are many other ways we could use the components. For instance, it would be easy to provide direct client-to-client communication by simply gluing two pipes together at the server side. Or a client could negotiate for upstream or downstream streams of data of any type.

It shows, I think, just how well the Go interface-based model works.

To try it out, build it like this:

goinstall rog-go.googlecode.com/hg/cmd/share
cd $GOROOT/src/pkg/rog-go.googlecode.com/hg/cmd/share
make

Then follow the instructions in doc.go to run it. An example client session:

% share -name second-client localhost:8765
> list
["first-client" "second-client"]
> read first-client /tmp/x
hello world
> ^D
% 
Advertisements

4 Responses to “Bidirectional RPC with netchan”

  1. ovi Says:

    netchan is no longer supported in Go v1. Can you please update your code to Go v1?

  2. rogpeppe Says:

    @ovi Not easily, I’m afraid, as the code relies quite heavily on netchan. Some of the techniques are still useful though.

    • ovi Says:

      You’r right, but i’d sure like to see a Go v1 working bi-directional RPC. Maybe this should be posted on golang nuts mailing list…

  3. René Kistl Says:

    I have written a Bidirectional RPC library for Go, see: https://github.com/pcdummy/go-bidirpc

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


%d bloggers like this: