Using a channel as a queue

by

in

I try to write a Go article every Monday, but the holiday season has disrupted me a bit. So here’s a little something I dug out of my stocking to get things going again.

I thought an interesting way to play with the exp/draw package, the http client, and the image decoder would be a program to animate the tiles from Google Maps, showing the world scrolling by, as though you were in an airplane looking straight down and flying in a straight line. It came to me in one of those flying dreams… (not really).

The general idea is simple: fetch a tile, and display it, animating it by sliding it to the left while fetching the next one to the east and ensuring that you either get it before the last one scrolled off, or stop the scrolling. That’s a perfect job for goroutines; one should be fetching and one should be in charge of drawing, and should block if the other one is not done. And they should talk via a channel, like all well behaved goroutines do. Because channels are ordered and can be buffered, the channel between the fetcher and the displayer can play the part of a queue in this producer/consumer system. Simple and elegant, just how we like it to be in Go-land.

Here’s the program. You can also get by running “goinstall jra-go.googlecode.com/hg”, then go look in your $GOROOT/src/pkg/jra-go.googlecode.com/hg/cmd/mapscroll directory.

package main

import (
  "log"
  "fmt"
  "time"
  "http"
  "exp/draw"
  "exp/draw/x11"
  "image"
  "image/png"
  "image/jpeg"
)

func imagePuller(urls chan string, imgs chan *image.Image) {
  for url := range urls {
    r, _, err := http.Get(url)
    if err == nil {
      log.Print("Fetched ", url)
      ctype, found := r.Header["Content-Type"]
      if found {
        switch {
        default:
          log.Print("For ", url, ", unknown type: ", ctype)
        case ctype == "image/png":
          img, err := png.Decode(r.Body)
          if err == nil {
            imgs <- &img
          } else {
            log.Print("For ", url, ", decode error: ", err)
          }
        case ctype == "image/jpeg" || ctype == "image/jpg":
          img, err := jpeg.Decode(r.Body)
          if err == nil {
            imgs <- &img
          } else {
            log.Print("For ", url, ", decode error: ", err)
            return
          }
        }
      } else {
        log.Print("For ", url, ", no content type.")
      }
      r.Body.Close()
    } else {
      log.Print("Error fetching ", url, ": ", err)
    }
  }
}

func urlGen(urls chan string) {
  x := 33981
  y := 23179
  for {
    url := fmt.Sprintf("http://khm1.google.com/kh/v=74&x=%d&s=&y=%d&z=16&s=Ga", x, y)
    urls <- url
    x++
  }
}

func processEvent(ch <-chan interface{}) {
  for {
    if closed(ch) {
      log.Exit("X display closed.")
    }
    ev, ok := <-ch
    if !ok {
      // no events, return
      return
    }

    switch ev.(type) {
    case draw.ErrEvent:
      log.Exit("X11 err: ", ev.(draw.ErrEvent).Err)
    }
  }
}

func main() {
  urls := make(chan string, 4)
  imgReady := make(chan *image.Image, 4)

  go imagePuller(urls, imgReady)
  go urlGen(urls)

  xdisp, err := x11.NewWindow()
  if err != nil {
    log.Exit("X11 error: ", err)
  }
  screen := xdisp.Screen()

  tileHeight := 256
  tileWidth := 256
  numTiles := screen.Bounds().Dx()/tileWidth + 2
  tileStrip := image.NewRGBA(numTiles*tileWidth, tileHeight)

  // pre-load the tile strip
  for i := 0; i < numTiles; i++ {
    iptr := <-imgReady
    img := *iptr
    draw.Draw(tileStrip, image.Rect(i*tileWidth, 0, i*tileWidth+tileWidth, tileHeight), img, image.ZP)
  }

  topBlack := (screen.Bounds().Dy() - tileHeight) / 2
  for {
    for x := 0; x < tileWidth; x += 15 {
      then := time.Nanoseconds()
      draw.Draw(screen, image.Rect(0, topBlack, screen.Bounds().Dx(), topBlack+tileHeight), tileStrip, image.Pt(x, 0))
      now := time.Nanoseconds()
      frameTime := now - then

      // a flush is just a write on a channel, so it takes negligible time
      xdisp.FlushImage()

      toSleep := 0.1*1e9 - frameTime
      //      log.Print("Took ", frameTime, " ns to draw, will sleep ", toSleep, " ns")
      time.Sleep(toSleep)
      processEvent(xdisp.EventChan())
    }

    // shift tiles in tileStrip and put in new last one
    draw.Draw(tileStrip, image.Rect(0, 0, (numTiles-1)*tileWidth, tileHeight), tileStrip, image.Pt(tileWidth, 0))
    iptr := <-imgReady
    img := *iptr
    draw.Draw(tileStrip, image.Rect((numTiles-1)*tileWidth, 0, numTiles*tileWidth, tileHeight), img, image.ZP)
  }
}

Some little details to point out:

Note how I made the goroutine that fetches urls read from a chan string to get its URLs instead of making them up itself. This compartmentalizes the logic, and has nothing to do with parallelism. A good practice in Go is to use channels to improve clarity, not only as part of a scheme to increase parallelism. Your goal is to express the intrinsic "this hands to this" nature of your program, and if that results in a speedup with multiple cores, fine. If it just results in clearer code, even better!

The imgReady channel is buffered, 4 deep. That means I'll have 4 images queued up and ready at all times. When the head is popped, there are 3 left. If each tile took 1 second to scroll by, then the maximum delay for any one fetch would be 3 seconds, and the fetches should average under one second. For this little app, it's over thinking it; but the same analysis would hold for other asynchronous producer/consumer systems of this type.

The images themselves are not sent through the channel, only pointers to them. This is to reduce the amount of copying that the runtime has to do. For this application the difference would probably be almost undetectable, just a minor increase in CPU time (the dominant user of CPU time is the draw.Draw which moves the strip across the screen). But it seemed like a perfect place to be sending pointers instead of the objects themselves; in Go we share memory by communicating, not communicate by sharing. But a good compromise is where one goroutine says to the other "I'm done with the thing we can both access that is at the end of this pointer, you take it".

The animation in this beast is definitely not the smoothest and most beautiful. The way exp/draw/x11 puts stuff on the screen is pretty costly, and it is too far removed from the X server itself to be able to synchronize updates and avoid flashing artifacts. Remember that the "exp" in exp/draw/x11 means "experimental" and it probably won't stay around for the long haul. Perhaps linking against an OpenGL library using cgo would let me make the animation smoother. Perhaps I'll look into that one day...

Take a look at that unassuming processEvents hidden down in the middle of the inner loop. It is important to get things to close down nicely, and even to keep your window running without blocking on unhanded events. Without it, if you close the window, the system keeps fetching and updating the in-memory copy of the screen image. Every time through, processEvents empties the event channel, ignoring stuff it doesn't care about. This is important, because if the event channels fill up, the X11 window hangs because the sends to the event channel are blocking (see $GOROOT/src/pkg/exp/draw/x11/conn.go, search for eventc). Even if you have no UI, you still need to empty that sucker, because just moving the mouse over the window creates events. (I can't reproduce the theoretical hang I'm talking about now, so I could be wrong on this part. But empty the channel, just to be safe.)

Update: Andrew tells me that not bailing early is non-idiomatic (see imagePuller, "if found") and results in more indents than necessary. I tend to agree, but obviously I'm not quite yet in the habit of doing things the Go way. Thanks for the feedback, Andrew.

PS: The starting location is Lausanne, my beautiful city. The x++ causes it to fly due east from Lausanne. If you wait until you get to 180 degrees east, it will likely fail, because I didn't handle global wraparound right. 🙂


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *