Gobs on the wire

This week, I want to talk about how to use Go to write a client/server system with both synchronous requests and asynchronous event notifications.

To help learn Go, I wrote a clone of the Conserver console server. Now, of course, the world didn’t need another console server, but it turned out to be an interesting experiment, because what a console server does is well suited to the tools Go gives a programmer. A console server’s job is to gather the output from one or more serial cables (or, when there’s a terminal server in the mix, one or more TCP connections implementing the Rtelnet “protocol”). It logs that output to files, and makes the output available in realtime to zero or more watchers. It allows one and only one of the watchers to be read-write, to actually control the device.

The Go version of the console server is called gocons. Go have a look at it, and then come back here. As usual, I want to point out some things I learned about Go while writing it.

My first attempt at making this thing work was with netchan. I thought it would be cool if the client could send in a request to the server asking to be notified on a certain channel when events happened on a certain console. But that crashed and burned because netchan can’t send channels. I think it would be theoretically possible to put the right stuff together to make it possible for netchan to send channels, but it would obviously be really hard, and the semantics of the channels would never really be the same. So, moving right along…

The next idea was to use the rpc package to implement the protocol between the client and the server. But that ran up onto another problem; RPC, by its very definition is synchronous. There is no clear way to use the RPC package to let the server send down asynchronous events to the client, like “these bytes arrived”. What I needed was to write my own wire protocol, one that could do both synchronous calls like “Can I listen to this? Yes.” and also asynchronous events like, “the console you are watching just output these bytes”. I’ve written protocols before, and it’s not hard, just a bit finicky. But Go provides the gob package, and that’s all you need to make a protocol.

So what you do is define types that represent the protocol messages and then let gob take responsibility for marshalling and unmarshalling, as well as figuring out the message delimiters. In our case, the types are called connReq and connReply. In a perfect world, these would be public types in some library, and the client and the server would both use them. In gocons, I got lazy and just copied and pasted them. The client does a gob.Decode on the net.TCPConn, and the result has to be a connReq (if it isn’t, something went wrong, and the client could either kill the connection, or try to decode the next thing on the connection). Because Go doesn’t have unions (they aren’t type safe) the connReq and connReply have to have all the fields in them that could be needed, even if a given protocol message doesn’t use them all. I haven’t calculated the protocol overhead of this, but because the potentially unused fields are things like byte slices and strings, it can’t be much; and empty byte slice will be encoded as a nil, not as a bunch of zeros for the whole underlying byte buffer.

A more sophisticated version of this idea might be to make a heirarchy of types, the base one being the simplest (with only an int in it to indicate the type) and the more complicated ones embedding the base one. But it’s hard to know what type you’d give to gob.Decode to write into; it seems like you’d have to separate out the two halves of the protocol message and send them with two calls to go.Encode. The first would be the int telling what kind it is, then the second gob would be precisely the right “shape” for the data. In any case, I didn’t do it that way for gocons. Simpler is better!

In the server, there are two pieces of code that are kind of interesting. One is the use of JSON for the config file format. The other is how incoming data it sent out to all the listeners.

First the simple one. It’s just an example of how to grab data out of a JSON file without using json.Unmarshall. I didn’t understand it, so while playing with json.Decode, I managed to get this working, and just left it at that. I don’t claim this is nice or pretty, but it works, and it might be useful to other people looking for examples on how to read JSON in Go.

The expected input is something like this:

{
    "consoles": {
        "firewall": "ts.company.com:2070",
        "web01": "ts.company.com:2071",
        "web02": "ts.company.com:2072",
        "web03": "ts.company.com:2073"
    }
}

The goal is to call addConsole once for each of the key/value pairs in the consoles hash.

Here’s how, if you don’t want to (or know how to) use json.Unmarshal:

  r, err := os.Open(*config, os.O_RDONLY, 0)
  if err != nil {
    log.Exitf("Cannot read config file %v: %v", *config, err)
  }
  dec := json.NewDecoder(r)
  var conf interface{}
  err = dec.Decode(&conf)
  if err != nil {
    log.Exit("JSON decode: ", err)
  }
  hash, ok := conf.(map[string]interface{})
  if !ok {
    log.Exit("JSON format error: got %T", conf)
  }
  consoles, ok := hash["consoles"]
  if !ok {
    log.Exit("JSON format error: key consoles not found")
  }
  c2, ok := consoles.(map[string]interface{})
  if !ok {
    log.Exitf("JSON format error: consoles key wrong type, %T", consoles)
  }
  for k, v := range c2 {
    s, ok := v.(string)
    if ok {
      addConsole(k, s)
    } else {
      log.Exit("Dial string for console %v is not a string.", k)
    }
  }

The general pattern here is that json.Decode gives you interface{}, and you need to use type selectors to work your way down the structure, and actually get the stuff you expect to be there out of it.

An infinitely prettier way to do this is to use json.Unmarshal. It’s a bit hard to understand from the docs, but this blog post makes it crystal clear.

The server is made up of a series of goroutines each running in blocking loops to handle their bit of i/o. Each console that is monitored has a read goroutine and a write one. The reader brings bytes in from it, and dispatches them to all the gocons clients who are listening. It maintains the list of clients in a linked list, but another data structure would work as well. The key is that the clients are not stored in the list as net.TCPConn’s, but as channels. What’s watching the other end of those channels for new data is the client’s proxy goroutine. Each time a client connects, a pair of goroutines are created, one for read, one for write. This allows us to do blocking reads on the input (search for dec.Decode for an example) without worrying about blocking other jobs in the server.

By keeping one single goroutine in charge of writing over the TCP connection, you don’t have to do any locking. You could, in principle, have multiple console managers all saying, “I’ve got something that should be multiplexed onto that TCP connection!” but not worry about them stomping on each other while writing to the connection. (The current implementation only listens to one console at a time.)

Here’s a snippet showing how we package up and send the notification that something new arrived on to all the console watchers:

    select {
      // a bunch of other channels to monitor here...
      case data := <-m.dataCh:
        if closed(m.dataCh) {
          break L
        }
        // multicast the data to the listeners
        for l := listeners; l != nil; l = l.next {
          if closed(l.ch) {
            // TODO: need to remove this node from the list, not just mark nil
            l.ch = nil
            log.Print("Marking listener ", l, " no longer active.")
          }
          if l.ch != nil {
            ok := l.ch <- consoleEvent{data}
            if !ok {
              log.Print("Listener ", l, " lost an event.")
            }
          }
        }

So, we make a new consoleEvent and send it to each listener. This is a bit wasteful: it makes a lot of garbage, which means the garbage collector will need to work harder. It might have been possible to make one consoleEvent, then send the same one to all the listeners. But if you are going to share memory like that, it is up to the programmer to insure that the receivers of the shared memory either treat it as read only, or use a mutex to control access to it. In our case, it is used in a read-only way, but far away in the code:

    // new event arrived from the console manager, so send it down the TCP connection
    case ev := <-evCh:
      reply.code = Data
      reply.data = ev.data
      err := enc.Encode(reply)
      if err != nil {
        log.Print("connection ", c.rw, ", failed to send data: ", err)
        break L
      }

This model, of two goroutines per thing that needs reading and writing, is magic. It dramatically reduced the code needed to implement gocons. The original console server has hundreds of lines of complicated code worrying about setting up the select mask, waiting on select, deciding if the signaled fd needs an accept(), or a read() or whatever (and finding the right data structures that correspond to the fd that just became usable). In gocons, and other idiomatic Go programs like the http server implemented in package http, you use blocking reads, and let the Go runtime arrange for the system has a whole to not block.

It is interesting to think about what would happen if writing to a client TCP connection were to block, though. As the blockingness is transitive in this system as written, it would eventually back up and block the thing that's supposed to be reading from the console, blocking all other clients. To protect against that, you need to put some firewalls in place; shared resources should not let individuals block them. You'd need to put short queues on the channels between the client proxy goroutines and the console manager reading goroutine, then have it do non-blocking writes on the channel, and if one would block, do something about it. For example, it could close the channel, in order to say, "hey, you're not draining things, and you need to clean up your act and come back to me later".

Writing and debugging this server in Go made me learn a bunch of things. I've still got lots to learn; the code still has some mysteries in it, like why I need a runtime.Gosched() to keep things from blocking, and how to deal with the fact that a closed channel causes a select to always fire on it. There's another workaround for a mystery hidden in setOwner: my first approach uncovered a bug (in the Go runtime, or in my understanding of it) that I had to workaround with a "pump goroutine" that forwards data from one place to another.

The impatient producer

Last week I showed how a channel could link a producer and a consumer, and I idly speculated on how you’d set the depth of the queue, relative to the rate and variability of the producer and the consumer’s need for new input.

This weekend, I got to thinking about my next interesting Go project. It will be an HTTP proxy that can do some nifty tricks to offload a slow Internet link. One of the features I thought I might add would be something to send all the new HTML it sees through a secondary processing system in order to find extra assets it might want to cache. This is a case of a producer that wants to act totally independently of the consumer. If the secondary processing goroutine is slow, or hung inserting something into a database, or whatever, the producer (the HTTP reply) can’t hang — he needs to reply to the client.

The solution is to hook up a dispatcher between the producer and one or more consumers. If you just wanted several consumers working, they can just all read from the same channel, and safely take the next item. But if you want to give some kind of guarantee to the producer that it won’t hang, you need to put the dispatcher in the loop. (Of course, the producer can also check for itself that it’s not going to hang by doing a non-blocking write on the channel, then throw away the work unit if it would block. But if you don’t want things getting thrown away, the dispatcher solution is better.) The dispatcher is written so that it can’t block. It finds a consumer ready to take the work unit, and if it doesn’t, then it makes a new one and sends the work unit to the new consumer. If the consumers are too slow, instead of making more of them (risking to make the system suffer collapse) it panics. Not nice, but better to fail fast, and clearly.

The result is in the jra-go.googlecode.com/hg repository as a module named autocon. Have a look.

As usual, I want to point out some things I liked/disliked about how this turned out.

First, take a look at the autoConsumer type: it’s private. Why? Because the interface this thing gives to the outside world (the producer) is just a channel. The producer doesn’t get a pointer to an object, it just gets the channel it will be talking to. If you wanted to tune or monitor the dispatcher, this wouldn’t work. But I decided to start out with the simplest thing, and this is it. In fact, when it came time to write the tests cases in autocon_test.go I had a problem; I wanted to know what was happening inside, but I didn’t even have a pointer to the autoConsumer to go dig into it, because once NewAutoConsumer returns, the only thing in the entire system that has a pointer to the autoConsumer is run(). I solved that problem with lexically bound channels, so that my workers had access to the same channel that the test routine did. Then the test routine empties the channel looking to see if the right amount of garbage comes out of it (nothing more, nothing less).

One thing that’s not too nice is I had to add a runtime.Gosched in the test routines to get them to work. And as usual with scheduling problems, it’s sensitive. When I was debugging scheduling problems using fmt.Print, I got different behavior than when I debugged using println. Why? Because the fmt package uses a channel to implement a leaky bucket of unused pp structures, and the scheduling decisions are disrupted by popping and pushing them inside of fmt.Println. Lesson: beware of fmt when debugging scheduler problems.

A better solution to finessing the scheduler with runtime.Gosched would be to arrange for a positive signal to come back that the workers had run and done their job and were ready to have the work checked. But because channels don’t have a “wait until 10 are outstanding” kind of primitive, I would have instead needed to implement some kind of condition variable. It didn’t seem like the right place, down in a test routine, to be doing that!

Another thing I really don’t like about Go, but don’t have a solution for, is the boilerplate “if closed()” after “for x := range ch”. When a goroutine is blocked on a read on a channel, and another goroutine closes it, the channel becomes readable. The result it returns is a nil. If you don’t want to litter your code with a bunch of “if x != nil”, you need to catch that case early and bail out. Luckily, the closed() primitive tells you what you need to know. I think the behavior of closed channels is just right in Go as it is; it is the unavoidable reality check you get when you go from beautiful theory into ugly practice.

And while I am beating up on myself, let me point out that I’m using a label and a labeled goto. Using labels always seems to me like some kind of failure; like it’s just four little characters short of a goto. But the need is undeniable; when you call break inside of nested for loops, the language spec says that the innermost loop will be broken. In this case, I don’t need it, but it somehow improves readability, in that the label explains exactly which loop we are exiting and why. I am deeply conflicted about loop labels, and I suppose the only answer is to keep using them until I get over it, or until I break the habit.

Using a channel as a queue

I try to write a Go article every Monday, but the holiday season has disrupted me a bit. So here’s a little something I dug out of my stocking to get things going again.

I thought an interesting way to play with the exp/draw package, the http client, and the image decoder would be a program to animate the tiles from Google Maps, showing the world scrolling by, as though you were in an airplane looking straight down and flying in a straight line. It came to me in one of those flying dreams… (not really).

The general idea is simple: fetch a tile, and display it, animating it by sliding it to the left while fetching the next one to the east and ensuring that you either get it before the last one scrolled off, or stop the scrolling. That’s a perfect job for goroutines; one should be fetching and one should be in charge of drawing, and should block if the other one is not done. And they should talk via a channel, like all well behaved goroutines do. Because channels are ordered and can be buffered, the channel between the fetcher and the displayer can play the part of a queue in this producer/consumer system. Simple and elegant, just how we like it to be in Go-land.

Here’s the program. You can also get by running “goinstall jra-go.googlecode.com/hg”, then go look in your $GOROOT/src/pkg/jra-go.googlecode.com/hg/cmd/mapscroll directory.

package main

import (
  "log"
  "fmt"
  "time"
  "http"
  "exp/draw"
  "exp/draw/x11"
  "image"
  "image/png"
  "image/jpeg"
)

func imagePuller(urls chan string, imgs chan *image.Image) {
  for url := range urls {
    r, _, err := http.Get(url)
    if err == nil {
      log.Print("Fetched ", url)
      ctype, found := r.Header["Content-Type"]
      if found {
        switch {
        default:
          log.Print("For ", url, ", unknown type: ", ctype)
        case ctype == "image/png":
          img, err := png.Decode(r.Body)
          if err == nil {
            imgs <- &img
          } else {
            log.Print("For ", url, ", decode error: ", err)
          }
        case ctype == "image/jpeg" || ctype == "image/jpg":
          img, err := jpeg.Decode(r.Body)
          if err == nil {
            imgs <- &img
          } else {
            log.Print("For ", url, ", decode error: ", err)
            return
          }
        }
      } else {
        log.Print("For ", url, ", no content type.")
      }
      r.Body.Close()
    } else {
      log.Print("Error fetching ", url, ": ", err)
    }
  }
}

func urlGen(urls chan string) {
  x := 33981
  y := 23179
  for {
    url := fmt.Sprintf("http://khm1.google.com/kh/v=74&x=%d&s=&y=%d&z=16&s=Ga", x, y)
    urls <- url
    x++
  }
}

func processEvent(ch <-chan interface{}) {
  for {
    if closed(ch) {
      log.Exit("X display closed.")
    }
    ev, ok := <-ch
    if !ok {
      // no events, return
      return
    }

    switch ev.(type) {
    case draw.ErrEvent:
      log.Exit("X11 err: ", ev.(draw.ErrEvent).Err)
    }
  }
}

func main() {
  urls := make(chan string, 4)
  imgReady := make(chan *image.Image, 4)

  go imagePuller(urls, imgReady)
  go urlGen(urls)

  xdisp, err := x11.NewWindow()
  if err != nil {
    log.Exit("X11 error: ", err)
  }
  screen := xdisp.Screen()

  tileHeight := 256
  tileWidth := 256
  numTiles := screen.Bounds().Dx()/tileWidth + 2
  tileStrip := image.NewRGBA(numTiles*tileWidth, tileHeight)

  // pre-load the tile strip
  for i := 0; i < numTiles; i++ {
    iptr := <-imgReady
    img := *iptr
    draw.Draw(tileStrip, image.Rect(i*tileWidth, 0, i*tileWidth+tileWidth, tileHeight), img, image.ZP)
  }

  topBlack := (screen.Bounds().Dy() - tileHeight) / 2
  for {
    for x := 0; x < tileWidth; x += 15 {
      then := time.Nanoseconds()
      draw.Draw(screen, image.Rect(0, topBlack, screen.Bounds().Dx(), topBlack+tileHeight), tileStrip, image.Pt(x, 0))
      now := time.Nanoseconds()
      frameTime := now - then

      // a flush is just a write on a channel, so it takes negligible time
      xdisp.FlushImage()

      toSleep := 0.1*1e9 - frameTime
      //      log.Print("Took ", frameTime, " ns to draw, will sleep ", toSleep, " ns")
      time.Sleep(toSleep)
      processEvent(xdisp.EventChan())
    }

    // shift tiles in tileStrip and put in new last one
    draw.Draw(tileStrip, image.Rect(0, 0, (numTiles-1)*tileWidth, tileHeight), tileStrip, image.Pt(tileWidth, 0))
    iptr := <-imgReady
    img := *iptr
    draw.Draw(tileStrip, image.Rect((numTiles-1)*tileWidth, 0, numTiles*tileWidth, tileHeight), img, image.ZP)
  }
}

Some little details to point out:

Note how I made the goroutine that fetches urls read from a chan string to get its URLs instead of making them up itself. This compartmentalizes the logic, and has nothing to do with parallelism. A good practice in Go is to use channels to improve clarity, not only as part of a scheme to increase parallelism. Your goal is to express the intrinsic "this hands to this" nature of your program, and if that results in a speedup with multiple cores, fine. If it just results in clearer code, even better!

The imgReady channel is buffered, 4 deep. That means I'll have 4 images queued up and ready at all times. When the head is popped, there are 3 left. If each tile took 1 second to scroll by, then the maximum delay for any one fetch would be 3 seconds, and the fetches should average under one second. For this little app, it's over thinking it; but the same analysis would hold for other asynchronous producer/consumer systems of this type.

The images themselves are not sent through the channel, only pointers to them. This is to reduce the amount of copying that the runtime has to do. For this application the difference would probably be almost undetectable, just a minor increase in CPU time (the dominant user of CPU time is the draw.Draw which moves the strip across the screen). But it seemed like a perfect place to be sending pointers instead of the objects themselves; in Go we share memory by communicating, not communicate by sharing. But a good compromise is where one goroutine says to the other "I'm done with the thing we can both access that is at the end of this pointer, you take it".

The animation in this beast is definitely not the smoothest and most beautiful. The way exp/draw/x11 puts stuff on the screen is pretty costly, and it is too far removed from the X server itself to be able to synchronize updates and avoid flashing artifacts. Remember that the "exp" in exp/draw/x11 means "experimental" and it probably won't stay around for the long haul. Perhaps linking against an OpenGL library using cgo would let me make the animation smoother. Perhaps I'll look into that one day...

Take a look at that unassuming processEvents hidden down in the middle of the inner loop. It is important to get things to close down nicely, and even to keep your window running without blocking on unhanded events. Without it, if you close the window, the system keeps fetching and updating the in-memory copy of the screen image. Every time through, processEvents empties the event channel, ignoring stuff it doesn't care about. This is important, because if the event channels fill up, the X11 window hangs because the sends to the event channel are blocking (see $GOROOT/src/pkg/exp/draw/x11/conn.go, search for eventc). Even if you have no UI, you still need to empty that sucker, because just moving the mouse over the window creates events. (I can't reproduce the theoretical hang I'm talking about now, so I could be wrong on this part. But empty the channel, just to be safe.)

Update: Andrew tells me that not bailing early is non-idiomatic (see imagePuller, "if found") and results in more indents than necessary. I tend to agree, but obviously I'm not quite yet in the habit of doing things the Go way. Thanks for the feedback, Andrew.

PS: The starting location is Lausanne, my beautiful city. The x++ causes it to fly due east from Lausanne. If you wait until you get to 180 degrees east, it will likely fail, because I didn't handle global wraparound right. 🙂