The Devver Blog

A Boulder startup improving the way developers work.

Ruby Beanstalkd distributed worker intermediate lessons

This post is a follow up to Ruby beanstalkd basics, I will try to make the example code little more interesting and useful. I am calling this is a Ruby beanstalkd intermediate write up, it sets up a few workers and distributes and receives results simultaneously. In this example the code resembles real code a bit more (using a queue cache and block passing). If there is enough interest in the Ruby/beanstalkd community, I will follow up with beanstalkd advanced lessons, and go into how we deal with failure cases such as worker dying during jobs, random jobs failing, processing multiple ‘projects’ at one time, using job priority settings, and using TTR/timeouts.

So in this example we are making an estimate of PI. Yes I know that there are far better approximations out there than my simple results, but this was what I came up with for an incredibly simple distributed computing problem. I based my example on the PI Calculation problem from an Introduction to Parallel Computing. The basic idea is that you can calculate pi by guessing random points in a square and then seeing how many points are inside a circle that fits inside the square (PI= 4 * points_in_circle/total_points).

I made a bunch of comments in the code that should help you follow but there are a few key sections worth pointing out.

In the Ruby beanstalkd Basics, both the Server and the Clients only used one queue at a time. Now since we are sending on one queue while also listening on another we need access to both queues at once. We simply have a helper function with a queue_cache to make getting and reusing multiple queues incredibly easy.

def get_queue(queue_name)
    @queue_cache ||= {}
    if @queue_cache.has_key?(queue_name)
      return @queue_cache[queue_name]
    else
      queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
      queue.watch(queue_name)
      queue.use(queue_name)
      queue.ignore('default')
      @queue_cache[queue_name] = queue
      return queue
    end
  end

In the basic example each class had a function that got a job and did some work and deleted the job. It is easy to imagine workers that might have many different kinds of work to do on jobs. In every case they are going to grab a job, work on the job, and delete the job. We decided to break that up and make it easy to just pass a work block when workers get a job.

def take_msg(queue)
    msg = queue.reserve
    #by calling ybody we get the content of the message and convert it from yml
    body = msg.ybody
    if block_given?
      yield(body)
    end
    msg.delete
  end

#call take_msg like so
take_msg(queue) do |body|
  #work on body
end

One other thing you should keep a look out for in the code below is checking if a queue has any jobs. Many times workers will check if jobs exist and take them, and if there aren’t any jobs the process is free to do something else. I do this in this example, the server continually checks incoming results to immediately display. If no results have arrived yet, the server continues sending out job requests as fast as it can. This is useful since taking jobs from beanstalkd is a blocking call. They did add support for non-blocking calls in beanstalkd 1.1, but I haven’t started using the newest version yet. I think everything else should be pretty self explanatory, feel free to ask me any questions. To run the code it is the same as before: download beanstalk_intermediate.rb, start beanstalkd, and run the example with ruby.

$ beanstalkd &
$ ruby beanstalk_intermediate.rb
starting distributor
starting client(s)
distributor sending out  jobs
.......................................................
.............................................
received all the results our estimate for pi is: 3.142776
# of workers time to complete
1 real 0m7.282s
user 0m4.114s
sys 0m0.978s
2 real 0m5.667s
user 0m2.736s
sys 0m0.670s
3 real 0m4.999s
user 0m2.014s
sys 0m0.515s
4 real 0m4.612s
user 0m1.608s
sys 0m0.442s
5 real 0m4.517s
user 0m1.474s
sys 0m0.416s
require 'beanstalk-client.rb'

DEFAULT_PORT = 11300
SERVER_IP = '127.0.0.1'
#beanstalk will order the queues based on priority, with the same priority
#it acts FIFO, in a later example we will use the priority
#(higher numbers are higher priority)
DEFAULT_PRIORITY = 65536
#TTR is time for the job to reappear on the queue.
#Assuming a worker died before completing work and never called job.delete
#the same job would return back on the queue (in TTR seconds)
TTR = 3

class BeanBase

  #To work with multiple queues you must tell beanstalk which queues
  #you plan on writing to (use), and which queues you will reserve jobs from
  #(watch). In this case we also want to ignore the default queue
  #you need a different queue object for each tube you plan on using or
  #you can switch what the tub is watching and using a bunch, we just keep a few
  #queues open on the tubes we want.
  def get_queue(queue_name)
    @queue_cache ||= {}
    if @queue_cache.has_key?(queue_name)
      return @queue_cache[queue_name]
    else
      queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
      queue.watch(queue_name)
      queue.use(queue_name)
      queue.ignore('default')
      @queue_cache[queue_name] = queue
      return queue
    end
  end

  #this will take a message off the queue, and process it with the block
  def take_msg(queue)
    msg = queue.reserve
    #by calling ybody we get the content of the message and convert it from yml
    body = msg.ybody
    if block_given?
      yield(body)
    end
    msg.delete
  end

  def results_ready?(queue)
    queue.peek_ready!=nil
  end

end

class BeanDistributor < BeanBase

  def initialize(chunks,points_per_chunk)
    @chunks = chunks
    @points_per_chunk = points_per_chunk
    @messages_out = 0
    @circle_count = 0
  end

  def get_incoming_results(queue)
    if(results_ready?(queue))
      result = nil
      take_msg(queue) do |body|
        result = body.count
      end
      @messages_out -= 1
      print "." #display that we received another result
      @circle_count += result
    else
      #do nothing
    end
  end

  def start_distributor
    request_queue = get_queue('requests')
    results_queue = get_queue('results')
    #put all the work on the request queue
    puts "distributor sending out #{@messages} jobs"
    @chunks.times do |num|
      msg = BeanRequest.new(1,@points_per_chunk)
      #Take our ruby object and convert it to yml and put it on the queue
      request_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
      @messages_out += 1
      #if there are results get them if not continue sending out work
      get_incoming_results(results_queue)
    end

    while @messages_out > 0
      get_incoming_results(results_queue)
    end
    npoints = @chunks * @points_per_chunk
    pi = 4.0*@circle_count/(npoints)
    puts "\nreceived all the results our estimate for pi is: #{pi}"
  end

end

class BeanWorker < BeanBase

  def initialize()
  end

  def write_result(queue, result)
    msg = BeanResult.new(1,result)
    queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
  end

  def in_circle
    #generate 2 random numbers see if they are in the circle
    range = 1000000.0
    radius = range / 2
    xcord = rand(range) - radius
    ycord = rand(range) - radius
    if( (xcord**2) + (ycord**2) <= (radius**2) )
      return 1
    else
      return 0
    end
  end

  def start_worker
    request_queue = get_queue('requests')
    results_queue = get_queue('results')
    #get requests and do the work until the worker is killed
    while(true)
      result = 0
      take_msg(request_queue) do |body|
        chunks = body.count
        chunks.times { result += in_circle}
      end
      write_result(results_queue,result)
    end

  end

end

############
# These are just simple message classes that we pass using beanstalks
# to yml and from yml functions.
############
class BeanRequest
  attr_accessor :project_id, :count
  def initialize(project_id, count=0)
    @project_id = project_id
    @count = count
  end
end

class BeanResult
  attr_accessor :project_id, :count
  def initialize(project_id, count=0)
    @project_id = project_id
    @count = count
  end
end

#how many different jobs we should do
chunks = 100
#how many points to calculate per chunk
points_per_chunk = 10000
#how many workers should we have
#(normally different machines, in our example fork them off)
workers = 5

# Most of the time you will have two entirely separate classes
# but to make it easy to run this example we will just fork and start our server
# and client separately. We will wait for them to complete and check
# if we received all the messages we expected.
puts "starting distributor"
server_pid = fork {
  BeanDistributor.new(chunks,points_per_chunk).start_distributor
}

puts "starting client(s)"
client_pids = []
workers.times do |num|
  client_pid = fork {
    BeanWorker.new.start_worker
  }
  client_pids << client_pid
end

Process.wait(server_pid)
#take down the clients
client_pids.each do |pid|
  Process.kill("HUP",pid)
end

Written by DanM

November 19, 2008 at 3:19 pm

Posted in Development, Hacking, Ruby

3 Responses

Subscribe to comments with RSS.

  1. Thanks for the tutorial! It would be great to see the more advanced one you mentioned if you have the time to write it up.

    Ted Benson

    January 25, 2009 at 2:36 pm

  2. Interesting tutorial, thanks for writing it. Please keep the series going with the advanced one, the things you plan to discuss in that one are those that interest me the most.

    Ugo Riboni

    January 29, 2009 at 5:16 am

  3. Hi. I've adapted this wonderful code to a little app of my own. I've noticed that with the beanstalkd before the latest version that's out right now, the worker processes would once in a while die on what I suspected was the blocked request queue. When I upgraded to the latest beanstalkd, all but one of the forked off processes would die almost immediately. So I thought I'd take advantage of the newish timeout feature on queue reserves in the take_msg method as follows:

    begin
    msg = queue.reserve(0)
    rescue
    sleep 1
    retry
    end

    I also implemented a little patch I found on the beanstalk-talk mailing list to the ruby beanstalk-client that addressed a reported bug in the timeout code. So far so good.

    bwts

    October 19, 2009 at 4:50 pm


Comments are closed.

%d bloggers like this: