The Devver Blog

A Boulder startup improving the way developers work.

Ruby Beanstalkd distributed worker basics

At Devver we have a lot of jobs to do quickly, so we distribute our work out to a group of EC2 workers. We have tried and used a number of queuing solutions with Ruby, but in the end beanstalkd seemed to be the best solution for us at the time.

I have only seen a few posts about the basics of using beanstalkd with Ruby. I decided to make two posts evolving a simple Ruby beanstalkd example into a more complicated example. This way people new to beanstalkd could see how easy it can be to get up and running with distributed processing using Ruby and beanstalkd. Then people that are doing more advanced work with beanstalkd could see some examples of how we are working with it here at Devver. It would also be great for more experienced beanstalkd warriors to share their thoughts as there aren’t many examples out in the wild. The lack of examples makes it harder to learn and difficult to decide what the best practices are when working with beanstalkd queues.

I have also shared two scripts we have found useful while working with beanstalkd. beanstalk_monitor.rb, which lets you see all the queue statistics about current usage, or to monitor the information of a single queue you are interested in. Finally, beanstalk_killer.rb, which is useful if you want to work on how your code will react to beanstalkd getting backed up or stalling (in beanstalkd speak, “Putting on the brakes”). It was a little harder to pull everything out and make a simple example from our code than I thought, and obviously the example is a bit useless. It should still give a solid example of how to do the basics of distributing jobs with beanstalkd.

For those new to beanstalk, there are a few things you will need to know like how to get a queue object, how to put objects on the queue, how to take objects off the queue, and how to control which queue you are working with. For a higher level overview or more detailed information, I recommend checking out the beanstalkd FAQ. The full example code is below, but first taking a look at the basic snippets might help.

#to work with beanstalk you need to get a client connection
queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
#by default you will be working on the 'default' tube or queue
#if we wanted to work on a different queue we could change tubes, like so
queue.watch('test_queue')
queue.use('test_queue')
queue.ignore('default')
#to put a simple string on a queue
queue.put('hello queue world')
#to receive a simple string
job = queue.reserve
puts job.body #prints 'hello queue world'
#if you don't delete the job when you're done, the queue assumes there is an error
#and the job will show back up on the queue again
job.delete

How to run this example (on OS X, with macports installed)

> sudo port install beanstalkd
> sudo gem install beanstalk-client
> beanstalkd
> ruby beanstalk_tester.rb

Download: beanstalk_tester.rb

require 'beanstalk-client.rb'

DEFAULT_PORT = 11300
SERVER_IP = '127.0.0.1'
#beanstalk will order the queues based on priority, with the same priority
#it acts FIFO, in a later example we will use the priority
#(higher numbers are higher priority)
DEFAULT_PRIORITY = 65536
#TTR is time for the job to reappear on the queue.
#Assuming a worker died before completing work and never called job.delete
#the same job would return back on the queue (in seconds)
TTR = 3

class BeanBase


  #To work with multiple queues you must tell beanstalk which queues
  #you plan on writing to (use), and which queues you will reserve jobs from
  #(watch). In this case we also want to ignore the default queue
  def get_queue(queue_name)
    queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
    queue.watch(queue_name)
    queue.use(queue_name)
    queue.ignore('default')
    queue
  end

end

class BeanDistributor < BeanBase

  def initialize(amount)
    @messages = amount
  end

  def start_distributor
    #put all the work on the request queue
    bean_queue = get_queue('requests')
    @messages.times do |num|
      msg = BeanRequest.new(1,num)
      #Take our ruby object and convert it to yml and put it on the queue
      bean_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
    end

    puts "distributor now getting results"
    #get all the results from the results queue
    bean_queue = get_queue('results')
    @messages.times do |num|
      result = take_msg(bean_queue)
      puts "result: #{result}"
    end

  end

  #this will take a message off the queue, process it and return the result
  def take_msg(queue)
    msg = queue.reserve
    #by calling ybody we get the content of the message and convert it from yml
    count = msg.ybody.count
    msg.delete
    return count
  end

end

class BeanWorker < BeanBase

  def initialize(amount)
    @messages = amount
    @received_msgs = 0
  end

  def start_worker
    results = []
    #get and process all the requests, on the requests queue
    bean_queue = get_queue('requests')
    @messages.times do |num|
      result = take_msg(bean_queue)
      results << result
      @received_msgs += 1
    end

    #return all of the results, by placing them on the separate results queue
    bean_queue = get_queue('results')
    results.each do |result|
      msg = BeanResult.new(1,result)
      bean_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
    end

    #this is just to pass information out of the forked process
    #we return the number of messages we received as our exit status
    exit @received_msgs
  end

  #this will take a message off the queue, process it and return the result
  def take_msg(queue)
    msg = queue.reserve
    #by calling ybody we get the content of the message and convert it from yml
    count = msg.ybody.count
    result = count*count
    msg.delete
    return result
  end

end

############
# These are just simple message classes that we pass using beanstalks
# to yml and from yml functions.
############
class BeanRequest
  attr_accessor :project_id, :count
  def initialize(project_id, count=0)
    @project_id = project_id
    @count = count
  end
end

class BeanResult
  attr_accessor :project_id, :count
  def initialize(project_id, count=0)
    @project_id = project_id
    @count = count
  end
end

#write X messages on the queue
numb = 10

recv_count = 0

# Most of the time you will have two entirely seperate classes
# but to make it easy to run this example we will just fork and start our server
# and client seperately. We will wait for them to complete and check
# if we received all the messages we expected.
puts "starting distributor"
server_pid = fork {
  BeanDistributor.new(numb).start_distributor
}

puts "starting client"
client_pid = fork {
  BeanWorker.new(numb).start_worker
}

Process.wait(client_pid)
recv_count = $?.exitstatus
puts "client finished received #{recv_count} msgs"
if(numb==recv_count)
  puts "received the expected number of messages"
else
  puts "error didn't receive the correct number of messages"
end

Process.wait(server_pid)
Advertisements

Written by DanM

October 28, 2008 at 2:35 pm

8 Responses

Subscribe to comments with RSS.

  1. Hi there, you might want to consider using our (Reevoo) beanstalk_messaging plugin which takes care of a lot of the grunt work when it comes to managing queue processes, connecting to and polling queues:

    http://labs.reevoo.com/plugins/beanstalk-messagin
    http://github.com/lukeredpath/beanstalk-messaging

    Luke Redpath

    October 29, 2008 at 9:42 am

  2. Thx for beanstalk_monitor.rb 🙂 Saves me a good night of sleep!

    Dirceu Jr.

    November 4, 2008 at 1:19 pm

  3. […] post is a follow up to Ruby beanstalkd basics, I will try to make the example code little more interesting and useful. I am calling this is a […]

  4. I've got a (small) collection of beanstalk tools on github that are useful for monitoring and inspect. If you feel they're lacking in some way, please let me know. 🙂

    dustin

    November 19, 2008 at 5:26 pm

  5. (sorry, apparently I can't edit posts here — beanstalk tools: http://github.com/dustin/beanstalk-tools )

    dustin

    November 19, 2008 at 5:29 pm

  6. thanks I should look at what all the default tools do before we develop any more home grown solutions

    danmayer

    November 19, 2008 at 7:26 pm

  7. […] Ruby Beanstalkd distributed worker basics (tags: ruby cluster programming ec2) […]

  8. […] Ruby Beanstalkd Distributed Worker Basics How to Create a Daemon Process Using Ruby and the Daemons RubyGem Err the Blog – About this blog Beanstalk Messaging Queue”> […]


Comments are closed.

%d bloggers like this: