Ruby, AMQP and RabbitMQ for Data Processing Win.

Overview

At work we needed to be able to process a huge amount of data. 13 individual areas and each area took about 2 days with only 18 months backlog.. and we were getting the data nightly. Once we process that backlog the amount of data is drastically reduced but the new system is constantly changing which could mean having to re-process that backlog to incorporate new changes or fixes. We needed something faster but our current process was sequential (process users, process clients (500 each time until done), etc for a couple more sections).

The order of processing the sections cannot be changed but we could process individual records in each section in parallel. We looked at a couple of options including Haskell and Erlang but didn't have the time to spend on those rewrites. I decided to try a map/reduce approach that could still use our existing Rails models reducing the time spent on trying something new. I settled on RabbitMQ to be a broker where I could publish messages and have a pool of workers process those messages (yes I'm partial to Erlang).

We now have a publisher class, a processer class and a reporter class. The publisher takes each section and publishes the legacy record to a message queue (RabbitMQ) and a pool of processers will listen for messages, run the conversion then save the result to a local database. The reporter simply lets me see how many pending messages there are and how many consumers (workers) are listening to the queue.

As a non-formal benchmark, we are now able to run through all 13 areas with a 12 month backlog in under 24 hours which would have taken a little over 2 weeks with the old process.

The code that follows is the 'framework' I used for the new conversion process.


Contents


Setting Up

The first thing to do is setup RabbitMQ and install Erlang if needed. For Erlang its a simple configure, make and make install steps to get it on your system (Mac or Linux) or the Windows binary. http://erlang.org/download.html (Recommend the R12B-5 version)

RabbitMQ

From source:

$ cd your_preferred_work_directory
$ hg clone http://hg.rabbitmq.com/rabbitmq-codegen
$ hg clone http://hg.rabbitmq.com/rabbitmq-server
$ cd rabbitmq-server
$ make run

This gives you a nice quick rabbitmq server up and running. If you wish to go the more formal route you can get a tarball/package and follow the appropriate instructions.

Ruby AMQP

You'll need the ruby amqp library.

$ sudo gem sources -a http://gems.github.com
$ sudo gem install tmm1-amqp

With RabbitMQ up and running you can do some of the examples from the tmm1-amqp gem to make sure everything is operating smoothly.


Helper Class

Everyone loves helpers.

require 'rubygems'
require 'mq'
require 'benchmark'

class Integer
  def paged_collect(mod)
    acc = Hash.new
    self.times do |num|
      index = num % mod
      acc[index] = Array.new unless acc[index]
      acc[index] = acc[index] << num
    end
    acc
  end
end

module MQHelper

  def log message
    puts "#{MQ.id}: #{message}"
    $stdout.flush
  end

  def logp *args
    print args
    $stdout.flush
  end

  def graceful_death
    AMQP.stop{ EM.stop }
    exit(0)
  end

  protected

  def load_rails_environment
    mark = Benchmark.realtime do
      require 'config/environment'
    end
    log "loaded rails environment... #{mark} seconds"
  end

  def infrastructure(&block)
    mark = Benchmark.realtime do
      block.call
    end
    log "loading required infrastructure... #{mark} seconds"
  end

  def calc_pages(total, offset)
    if total < offset
      1
    else
      (total / offset).to_i
    end
  end

  def calc_modulus(total_pages)
    if total_pages < 100
      3 + (total_pages/10).to_i
    else
      13 + (total_pages/10).to_i
    end
  end

  def serialize data
    Marshal.dump(data)
  end

  def unserialize data
    autoload_missing_constants do
      Marshal.load data
    end
  end

  def autoload_missing_constants
    yield
  rescue ArgumentError => error
    lazy_load ||= Hash.new {|hash, hash_key| hash[hash_key] = true; false}
    if error.to_s[/undefined class|referred/] && !lazy_load[error.to_s.split.last.constantize]
      retry
    else
      raise error
    end
  end

end

Integer#paged_collect takes an integer(x) and loops from 0 to x (y) collecting sets of pages based on the modulus of y with the parameter. The purpose here was to give you an array of arrays. It's use will be made more clear in the Publisher class.

#load_rails_environment and #infrastructure load required files. Infrastructure is a helper to benchmark how long it took to load those needed files.

#calc_pages takes a total and an offset and calculates how many pages are needed to cover the total amount.

#calc_modulus is a helper method for #paged_collect to give a decent sized integer for the modulus operation.

#serialize and #unserialize are helper methods to load and dump data. #autoload_missing_constants allows a Marshal.load to find the constants it needs and not fail.


Publisher Class

A few things before the code..

AMQP.start do
end

sets up the amqp gem and eventmachine for publishing and consuming messages. Your code will be wrapped inside this and the block will not exit until you call EM.stop_event_loop or:

AMQP.stop do
  EM.stop
end

The above is a graceful way of telling the amqp gem and eventmachine to finish any io work left (publishing etc) and then stop. We'll use the two above code blocks a good bit to do 'burst' publishing. More details after the code:

#!/usr/bin/ruby

# Core
require 'date'

# Gems
require 'rubygems'
require 'mq'

# Helper
require 'import.mq.helper.rb'

# Shutdown AMQP gracefully so messages are published
Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

class Publisher

  attr_reader :options

  include MQHelper

  # Need rails for our models
  def initialize env
    load_rails_environment
  end

  # entry point - ` publish.rb "users" ` will publish the users
  def publish
    section = ARGV[0]

    if section.nil?
      log "You must supply a section to publish (users, clients, etc..)."
      graceful_death
    else
      case section
      when "users"
        publish_users
      when "clients"
        publish_clients
    end
  end

  protected

  def publish_users
    total = Legacy::User.count
    log "publishing #{total} users..."

    # We know the user count is small so we don't do
    # any pagination and instead load it all at once
    # and publish
    lusers = Legacy::User.all

    # start amqp
    AMQP.start do
      # create/grab our exchange 'conversion' which is a topic exchange
      topic = MQ.new.topic('conversion')

      # for each legacy user, publish its dump with the routing key 'import.users'
      lusers.each do |user|
        logp "publishing #{user.login}..."
        topic.publish(serialize(user), :key => 'import.users')
        log "OK!"
      end

      # We're done so tell amqp and eventmachine to finish publishing and then stop
      AMQP.stop do
        EM.stop
      end
    end
  end # end publish users

  def publish_clients
    # calculate the number of times (pages) we need to loop with a 100 row offset/limit
    total_count = Legacy::Client.count
    number_of_pages = calc_pages(total_count, 100)
    log "publishing #{total_count} clients in #{number_of_pages} pages..."

    # Publishes a set of pages, allows eventmachine/amqp to send them
    # then restarts the process with another set of pages.
    modulus = calc_modulus(number_of_pages)
    number_of_pages.paged_collect(modulus).each_value do |pages|
      AMQP.start do
        pages.each do |page|
          lclients = Legacy::Client.all(:limit => 100, :offset => page*100)

          topic = MQ.new.topic('conversion')
          lclients.map do |lclient|
            topic.publish(serialize(lclient), :key => 'import.clients')
          end
          log "published page #{page} of #{number_of_pages} for clients..."
        end
        AMQP.stop do
          EM.stop
        end
      end
    end
  end # end publish clients

end

Publisher.new(ENV).publish

You might want to read through the code comments for the #publish_users to get a feel for how a simple legacy publish works. #publish_clients is a bit more complex in that we do 'burst' publishing. Burst publishing in this context is publishing a set amount of messages, waiting for that to finish, then publishing the next set. The reason why this is important is that if you are in a tight loop going through 100,000 records publishing them all then your workers won't see a single message until you're done with the 100,000 record loop. By doing the 'burst' publishing we can make sure 500 messages are sent, then do another 500, wait for those to be sent then continue on so we know our workers have work to do while we churn through 100,000 records.

So let's take a look at #publish_clients specifically the 'burst' part.

# Publishes a set of pages, allows eventmachine/amqp to send them
# then restarts the process with another set of pages.
modulus = calc_modulus(number_of_pages)
number_of_pages.paged_collect(modulus).each_value do |pages|
  AMQP.start do
    pages.each do |page|
      lclients = Legacy::Client.all(:limit => 100, :offset => page*100)

      topic = MQ.new.topic('conversion')
      lclients.map do |lclient|
        topic.publish(serialize(lclient), :key => 'import.clients')
      end
      log "published page #{page} of #{number_of_pages} for clients..."
    end
    AMQP.stop do
      EM.stop
    end
  end
end

The first two lines are constructing our sets of pages. If we had 10 pages our sets of pages would look like:

{0 => [0, 4, 8], 1 => [1, 5, 9], 2 => [2, 6], 3 => [3, 7]}

This means the first burst will publish pages 0, 4 and 8 and the next burst will publish 1, 5, and 9 and so on. Now we get into our 'AMQP.start' block. For each of the pages we grab the legacy records and then publish each one to our topic exchange 'conversion' with a routing key of 'import.clients'. The routing key is important so your workers know what type of data is coming to them. It also lets you have two sets of workers listening on different routing keys but on the same exchange.

After publishing each page we gracefully stop AMQP and eventmachine which allows our messages to be published so our workers can start processing. Then we move on to the next set of pages.


Processer Class

The Processer class is a bit more complex in that we use one script invocation to start a pool of workers. I'll list the entire code then break it down piece by piece.

#!/usr/bin/ruby

# Core
require 'date'

# Gems
require 'rubygems'
require 'mq'

# Helper
require 'import.mq.helper.rb'

# For ack to work appropriatly you must shutdown AMQP gracefully,
# otherwise all items in your queue will be returned
Signal.trap('INT') {
  unless EM.forks.empty?
    EM.forks.each do |pid|
      Process.kill('KILL', pid)
    end
  end
  AMQP.stop{ EM.stop }
  exit(0)
}
Signal.trap('TERM') {
  unless EM.forks.empty?
    EM.forks.each do |pid|
      Process.kill('KILL', pid)
    end
  end
  AMQP.stop{ EM.stop }
  exit(0)
}

# spawn workers
workers = ARGV[1] ? (Integer(ARGV[1]) rescue 1) : 1
puts "workers: #{workers}"
EM.fork(workers) do
  AMQP.start do

    class Processer

      attr_reader :options

      include MQHelper

      def initialize env
        @options = env

        load_rails_environment
      end

      # entry point - `process.rb "users"`
      def process
        section = ARGV[0]

        if section.nil?
          log "You must supply a section to process (users, clients, etc..)"
          graceful_death
        else
          case section
          when "users"
            process_user
          when "clients"
            infrastructure do
              require 'client_loader'
            end
            process_client
          end
        end
      end # end process

      protected

      # update legacy users to new users model
      def process_user
        mq = MQ.new
        # open a queue (name can be anything)
        # and then bind it to the topic exchange 'conversion'
        # and routing key 'import.users'.
        queue = mq.queue('cltc.import.users').bind(mq.topic('conversion'), :key => 'import.users')

        run_process(queue) do |user|
          new_user = User.find_by_login(user.login) || User.new
          new_user.login = user.login
          new_user.password = new_user.password_confirmation = user.password

          new_user.save false
        end
      end # end process user

      # update legacy clients to new clients model
      def process_client
        mq = MQ.new
        queue = mq.queue('cltc.import.clients').bind(mq.topic('cltc'), :key => 'import.clients')

        run_process(queue) do |client|
          ClientLoader.new client
        end
      end # end process client

      def run_process(queue, &block)
        queue.subscribe(:ack => true) { |headers, payload|
          data = unserialize(payload)
          block.call(data)
          headers.ack
        }
      end # end run process

    end # end Processer

    Processer.new(ENV).process
  end # end AMQP.start
end # end EM.fork

# wait on forks
while !EM.forks.empty?
  sleep(5)
end

Let's first look at our Signal traps for INT and TERM. I'll only list one here as they have the same code for each trap.

Signal.trap('INT') {
  unless EM.forks.empty?
    EM.forks.each do |pid|
      Process.kill('KILL', pid)
    end
  end
  AMQP.stop{ EM.stop }
  exit(0)
}

Later on you'll see that we use eventmachine's fork method to create our pool of workers. When we send the 'master' process an INT or TERM we need to kill all of our forks so we don't leave any orphans. Again we see the AMQP#stop being called to let things finish. Our fork code is pretty simple, since we wrap all of our main code inside EM#fork.

# spawn workers
workers = ARGV[1] ? (Integer(ARGV[1]) rescue 1) : 1
puts "workers: #{workers}"
EM.fork(workers) do
  AMQP.start do
    # main code here
  end
end

Here we grab how many workers from the command line so process.rb "users" 16 would start 16 workers on the users section. We then tell EM#fork how many workers we want and then have AMQP#start and our main code after that. It should be noted that EM#fork is an extension provided by the tmm1-amqp library.

The next bit of code is pretty straight-forward so I won't re-paste it here. In short declare our Processer class, load rails for our models and define our #process method to take the command line argument and process the appropriate section. Next up is our #process_user method.

# update legacy users to new users model
def process_user
  mq = MQ.new
  # open a queue (name can be anything)
  # and then bind it to the topic exchange 'conversion'
  # and routing key 'import.users'.
  queue = mq.queue('cltc.import.users').bind(mq.topic('conversion'), :key => 'import.users')

  run_process(queue) do |user|
    new_user = User.find_by_login(user.login) || User.new
    new_user.login = user.login
    new_user.password = new_user.password_confirmation = user.password

    new_user.save false
  end
end # end process user

A consumer/processer/listener of messages uses a queue on the message queue server to tell the server it wants to receive messages. A queue alone isn't enough however so we must bind our queue to the topic exchange we were publishing to earlier which in this case was 'conversion'. We also pass a routing key because topic exchanges send messages to queues(listeners) based on that key. Other exchange types are direct (one to one) and fanout (unconditional one to many).

After we bind to our queue we need to send a message to the server telling it we want to start consuming/processing messages. The #run_process method does just this and accepts a queue to use and a block which will be called for each message received.

Let's jump to the #run_process method.

def run_process(queue, &block)
  queue.subscribe(:ack => true) { |headers, payload|
    data = unserialize(payload)
    block.call(data)
    headers.ack
  }
end # end run process

This method takes a queue and a block to call for processing each message received. We subscribe to the queue which is asynchronous. This means whenever the server has a message it will let us know and the block given to subscribe will be called. The ack option we passed to subscribe is telling the server that we will let it know that we processed the message. If the server doesn't receive an ack from us it should put the message back into the queue. This is useful when a worker dies and you want that message to be 'recovered'.

The payload is the message body. Here we unserialize it then call our block with it for processing then use headers.ack to tell the server we finished processing. Next up is #process_clients.

# update legacy clients to new clients model
def process_client
  mq = MQ.new
  queue = mq.queue('cltc.import.clients').bind(mq.topic('cltc'), :key => 'import.clients')

  run_process(queue) do |client|
    ClientLoader.new client
  end
end # end process client

Again we create our queue and then bind to our topic exchange with the appropriate routing key that matches how the messages were published in our Publisher class. Then we again call #run_process grab our legacy client in the block and use another class to do our conversion. The block you pass to #run_process can do whatever it needs to do to process/convert the data. As long as it returns the message will be acked.

If you look at the bottom of the code you see we let our master process live by doing a sleep loop as long as there are forks active. This allows us to use our master process as a control process. We can send an INT or TERM signal and have our master process kill the forks.


Reporter Class

All this processing needs some sort of reporting. You could put logging statements in the processing code but logging takes time which decreases performance and you want all the performance you can get if you are doing a lot of data processing. Instead we use a separate class to report on how many messages are left to be processed or how many consumers/workers are waiting for work.

#!/usr/bin/ruby

# Core
require 'date'
require 'benchmark'

# Gems
require 'rubygems'
require 'mq'

# Helper
require 'import.mq.helper.rb'

# For ack to work appropriatly you must shutdown AMQP gracefully,
# otherwise all items in your queue will be returned
Signal.trap('INT') {
  AMQP.stop{ EM.stop }
  exit(0)
}
Signal.trap('TERM') {
  AMQP.stop{ EM.stop }
  exit(0)
}

AMQP.start do

  class Reporter

    attr_reader :options
    @timer = nil

    include MQHelper

    def initialize env
      @options = env

      load_rails_environment
    end

    def report
      section = ARGV[0]

      if section.nil?
        log "You must supply a section to report (users, clients, etc..)"
        graceful_death
      else
        case section
        when "users"
          report_user
        when "clients"
          report_client
        end
      end
    end # end report

    protected

    def report_user
      mq = MQ.new
      queue = mq.queue('cltc.import.users').bind(mq.topic('cltc'), :key => 'import.users')

      run_report(queue)
    end # end report user

    def report_client
      mq = MQ.new
      queue = mq.queue('cltc.import.clients').bind(mq.topic('cltc'), :key => 'import.clients')

      run_report(queue)
    end

    def run_report(queue)
      @timer = EM.add_periodic_timer(5) {
        queue.status {|num_messages, num_consumers|
          log "[#{Time::now.strftime('%m/%d - %H:%M:%S')}]  #{num_consumers} consumers have #{num_messages} messages left to process"
        }
      }
    end # end run report

  end # end Reporter

  Reporter.new(ENV).report
end # end AMQP.start

This should look familiar as most of it is the same concept as the Publisher class. The new code to look at is in the #run_report method. Here we add a periodic timer that will run the given block every 5 seconds. We use eventmachine's handiness to do this. The timed block will ask the status of the given queue and call the given block with the number of consumers (listeners) and the number of messages waiting to be processed. Because each queue is bound by a routing key you will only see that routing key's consumers and pending messages. Alternatively if you ran a report with a routing key of 'import.*' you would see the consumers and pending messages on 'import.users' and 'import.clients' (* matches one level).

With our status we simply log how many consumers and messages we have.

How to tell you've finished publishing and processing

You know you've finished publishing when your publisher process exits. But how do you know when you've finished processing all the way? The key is to know how many consumers were listening on a routing key. So let's say we had 16 consumers on 'import.clients'. When our publisher(s) finish for that key and our reporter states we have 0 messages left with 16 consumers listening we know we're done. When consumers are busy processing they will drop from the status report. So if all 16 consumers were processing and we have 1,000 pending messages our reporter would tell us we had 0 consumers and 1,000 messages.

There may be an easier to way to tell when we're done processing but this method definitely works and can be used to switch your pool or workers to a new section and begin work there. Switching them over is an exercise for the reader.


Resources

Feel free to leave comments/questions especially if I left something out or didn't explain a section very well.