Ruby, AMQP and RabbitMQ for Data Processing Win.
Overview
At work we needed to be able to process a huge amount of data. 13 individual areas and each area took about 2 days with only 18 months backlog.. and we were getting the data nightly. Once we process that backlog the amount of data is drastically reduced but the new system is constantly changing which could mean having to re-process that backlog to incorporate new changes or fixes. We needed something faster but our current process was sequential (process users, process clients (500 each time until done), etc for a couple more sections).
The order of processing the sections cannot be changed but we could process individual records in each section in parallel. We looked at a couple of options including Haskell and Erlang but didn't have the time to spend on those rewrites. I decided to try a map/reduce approach that could still use our existing Rails models reducing the time spent on trying something new. I settled on RabbitMQ to be a broker where I could publish messages and have a pool of workers process those messages (yes I'm partial to Erlang).
We now have a publisher class, a processer class and a reporter class. The publisher takes each section and publishes the legacy record to a message queue (RabbitMQ) and a pool of processers will listen for messages, run the conversion then save the result to a local database. The reporter simply lets me see how many pending messages there are and how many consumers (workers) are listening to the queue.
As a non-formal benchmark, we are now able to run through all 13 areas with a 12 month backlog in under 24 hours which would have taken a little over 2 weeks with the old process.
The code that follows is the 'framework' I used for the new conversion process.
Contents
Setting Up
The first thing to do is setup RabbitMQ and install Erlang if needed. For Erlang its a simple configure, make and make install steps to get it on your system (Mac or Linux) or the Windows binary. http://erlang.org/download.html (Recommend the R12B-5 version)
RabbitMQ
From source:
$ cd your_preferred_work_directory
$ hg clone http://hg.rabbitmq.com/rabbitmq-codegen
$ hg clone http://hg.rabbitmq.com/rabbitmq-server
$ cd rabbitmq-server
$ make run
This gives you a nice quick rabbitmq server up and running. If you wish to go the more formal route you can get a tarball/package and follow the appropriate instructions.
Ruby AMQP
You'll need the ruby amqp library.
$ sudo gem sources -a http://gems.github.com
$ sudo gem install tmm1-amqp
With RabbitMQ up and running you can do some of the examples from the tmm1-amqp gem to make sure everything is operating smoothly.
Helper Class
Everyone loves helpers.
require 'rubygems' require 'mq' require 'benchmark' class Integer def paged_collect(mod) acc = Hash.new self.times do |num| index = num % mod acc[index] = Array.new unless acc[index] acc[index] = acc[index] << num end acc end end module MQHelper def log message puts "#{MQ.id}: #{message}" $stdout.flush end def logp *args print args $stdout.flush end def graceful_death AMQP.stop{ EM.stop } exit(0) end protected def load_rails_environment mark = Benchmark.realtime do require 'config/environment' end log "loaded rails environment... #{mark} seconds" end def infrastructure(&block) mark = Benchmark.realtime do block.call end log "loading required infrastructure... #{mark} seconds" end def calc_pages(total, offset) if total < offset 1 else (total / offset).to_i end end def calc_modulus(total_pages) if total_pages < 100 3 + (total_pages/10).to_i else 13 + (total_pages/10).to_i end end def serialize data Marshal.dump(data) end def unserialize data autoload_missing_constants do Marshal.load data end end def autoload_missing_constants yield rescue ArgumentError => error lazy_load ||= Hash.new {|hash, hash_key| hash[hash_key] = true; false} if error.to_s[/undefined class|referred/] && !lazy_load[error.to_s.split.last.constantize] retry else raise error end end end
Integer#paged_collect takes an integer(x) and loops from 0 to x (y) collecting sets of pages based on the modulus of y with the parameter. The purpose here was to give you an array of arrays. It's use will be made more clear in the Publisher class.
#load_rails_environment and #infrastructure load required files. Infrastructure is a helper to benchmark how long it took to load those needed files.
#calc_pages takes a total and an offset and calculates how many pages are needed to cover the total amount.
#calc_modulus is a helper method for #paged_collect to give a decent sized integer for the modulus operation.
#serialize and #unserialize are helper methods to load and dump data. #autoload_missing_constants allows a Marshal.load to find the constants it needs and not fail.
Publisher Class
A few things before the code..
AMQP.start do end
sets up the amqp gem and eventmachine for publishing and consuming messages. Your code will be wrapped inside this and the block will not exit until you call EM.stop_event_loop or:
AMQP.stop do EM.stop end
The above is a graceful way of telling the amqp gem and eventmachine to finish any io work left (publishing etc) and then stop. We'll use the two above code blocks a good bit to do 'burst' publishing. More details after the code:
#!/usr/bin/ruby # Core require 'date' # Gems require 'rubygems' require 'mq' # Helper require 'import.mq.helper.rb' # Shutdown AMQP gracefully so messages are published Signal.trap('INT') { AMQP.stop{ EM.stop } } Signal.trap('TERM'){ AMQP.stop{ EM.stop } } class Publisher attr_reader :options include MQHelper # Need rails for our models def initialize env load_rails_environment end # entry point - ` publish.rb "users" ` will publish the users def publish section = ARGV[0] if section.nil? log "You must supply a section to publish (users, clients, etc..)." graceful_death else case section when "users" publish_users when "clients" publish_clients end end protected def publish_users total = Legacy::User.count log "publishing #{total} users..." # We know the user count is small so we don't do # any pagination and instead load it all at once # and publish lusers = Legacy::User.all # start amqp AMQP.start do # create/grab our exchange 'conversion' which is a topic exchange topic = MQ.new.topic('conversion') # for each legacy user, publish its dump with the routing key 'import.users' lusers.each do |user| logp "publishing #{user.login}..." topic.publish(serialize(user), :key => 'import.users') log "OK!" end # We're done so tell amqp and eventmachine to finish publishing and then stop AMQP.stop do EM.stop end end end # end publish users def publish_clients # calculate the number of times (pages) we need to loop with a 100 row offset/limit total_count = Legacy::Client.count number_of_pages = calc_pages(total_count, 100) log "publishing #{total_count} clients in #{number_of_pages} pages..." # Publishes a set of pages, allows eventmachine/amqp to send them # then restarts the process with another set of pages. modulus = calc_modulus(number_of_pages) number_of_pages.paged_collect(modulus).each_value do |pages| AMQP.start do pages.each do |page| lclients = Legacy::Client.all(:limit => 100, :offset => page*100) topic = MQ.new.topic('conversion') lclients.map do |lclient| topic.publish(serialize(lclient), :key => 'import.clients') end log "published page #{page} of #{number_of_pages} for clients..." end AMQP.stop do EM.stop end end end end # end publish clients end Publisher.new(ENV).publish
You might want to read through the code comments for the #publish_users to get a feel for how a simple legacy publish works. #publish_clients is a bit more complex in that we do 'burst' publishing. Burst publishing in this context is publishing a set amount of messages, waiting for that to finish, then publishing the next set. The reason why this is important is that if you are in a tight loop going through 100,000 records publishing them all then your workers won't see a single message until you're done with the 100,000 record loop. By doing the 'burst' publishing we can make sure 500 messages are sent, then do another 500, wait for those to be sent then continue on so we know our workers have work to do while we churn through 100,000 records.
So let's take a look at #publish_clients specifically the 'burst' part.
# Publishes a set of pages, allows eventmachine/amqp to send them # then restarts the process with another set of pages. modulus = calc_modulus(number_of_pages) number_of_pages.paged_collect(modulus).each_value do |pages| AMQP.start do pages.each do |page| lclients = Legacy::Client.all(:limit => 100, :offset => page*100) topic = MQ.new.topic('conversion') lclients.map do |lclient| topic.publish(serialize(lclient), :key => 'import.clients') end log "published page #{page} of #{number_of_pages} for clients..." end AMQP.stop do EM.stop end end end
The first two lines are constructing our sets of pages. If we had 10 pages our sets of pages would look like:
{0 => [0, 4, 8], 1 => [1, 5, 9], 2 => [2, 6], 3 => [3, 7]}
This means the first burst will publish pages 0, 4 and 8 and the next burst will publish 1, 5, and 9 and so on. Now we get into our 'AMQP.start' block. For each of the pages we grab the legacy records and then publish each one to our topic exchange 'conversion' with a routing key of 'import.clients'. The routing key is important so your workers know what type of data is coming to them. It also lets you have two sets of workers listening on different routing keys but on the same exchange.
After publishing each page we gracefully stop AMQP and eventmachine which allows our messages to be published so our workers can start processing. Then we move on to the next set of pages.
Processer Class
The Processer class is a bit more complex in that we use one script invocation to start a pool of workers. I'll list the entire code then break it down piece by piece.
#!/usr/bin/ruby # Core require 'date' # Gems require 'rubygems' require 'mq' # Helper require 'import.mq.helper.rb' # For ack to work appropriatly you must shutdown AMQP gracefully, # otherwise all items in your queue will be returned Signal.trap('INT') { unless EM.forks.empty? EM.forks.each do |pid| Process.kill('KILL', pid) end end AMQP.stop{ EM.stop } exit(0) } Signal.trap('TERM') { unless EM.forks.empty? EM.forks.each do |pid| Process.kill('KILL', pid) end end AMQP.stop{ EM.stop } exit(0) } # spawn workers workers = ARGV[1] ? (Integer(ARGV[1]) rescue 1) : 1 puts "workers: #{workers}" EM.fork(workers) do AMQP.start do class Processer attr_reader :options include MQHelper def initialize env @options = env load_rails_environment end # entry point - `process.rb "users"` def process section = ARGV[0] if section.nil? log "You must supply a section to process (users, clients, etc..)" graceful_death else case section when "users" process_user when "clients" infrastructure do require 'client_loader' end process_client end end end # end process protected # update legacy users to new users model def process_user mq = MQ.new # open a queue (name can be anything) # and then bind it to the topic exchange 'conversion' # and routing key 'import.users'. queue = mq.queue('cltc.import.users').bind(mq.topic('conversion'), :key => 'import.users') run_process(queue) do |user| new_user = User.find_by_login(user.login) || User.new new_user.login = user.login new_user.password = new_user.password_confirmation = user.password new_user.save false end end # end process user # update legacy clients to new clients model def process_client mq = MQ.new queue = mq.queue('cltc.import.clients').bind(mq.topic('cltc'), :key => 'import.clients') run_process(queue) do |client| ClientLoader.new client end end # end process client def run_process(queue, &block) queue.subscribe(:ack => true) { |headers, payload| data = unserialize(payload) block.call(data) headers.ack } end # end run process end # end Processer Processer.new(ENV).process end # end AMQP.start end # end EM.fork # wait on forks while !EM.forks.empty? sleep(5) end
Let's first look at our Signal traps for INT and TERM. I'll only list one here as they have the same code for each trap.
Signal.trap('INT') { unless EM.forks.empty? EM.forks.each do |pid| Process.kill('KILL', pid) end end AMQP.stop{ EM.stop } exit(0) }
Later on you'll see that we use eventmachine's fork method to create our pool of workers. When we send the 'master' process an INT or TERM we need to kill all of our forks so we don't leave any orphans. Again we see the AMQP#stop being called to let things finish. Our fork code is pretty simple, since we wrap all of our main code inside EM#fork.
# spawn workers workers = ARGV[1] ? (Integer(ARGV[1]) rescue 1) : 1 puts "workers: #{workers}" EM.fork(workers) do AMQP.start do # main code here end end
Here we grab how many workers from the command line so process.rb "users" 16 would start 16 workers on the users section. We then tell EM#fork how many workers we want and then have AMQP#start and our main code after that. It should be noted that EM#fork is an extension provided by the tmm1-amqp library.
The next bit of code is pretty straight-forward so I won't re-paste it here. In short declare our Processer class, load rails for our models and define our #process method to take the command line argument and process the appropriate section. Next up is our #process_user method.
# update legacy users to new users model def process_user mq = MQ.new # open a queue (name can be anything) # and then bind it to the topic exchange 'conversion' # and routing key 'import.users'. queue = mq.queue('cltc.import.users').bind(mq.topic('conversion'), :key => 'import.users') run_process(queue) do |user| new_user = User.find_by_login(user.login) || User.new new_user.login = user.login new_user.password = new_user.password_confirmation = user.password new_user.save false end end # end process user
A consumer/processer/listener of messages uses a queue on the message queue server to tell the server it wants to receive messages. A queue alone isn't enough however so we must bind our queue to the topic exchange we were publishing to earlier which in this case was 'conversion'. We also pass a routing key because topic exchanges send messages to queues(listeners) based on that key. Other exchange types are direct (one to one) and fanout (unconditional one to many).
After we bind to our queue we need to send a message to the server telling it we want to start consuming/processing messages. The #run_process method does just this and accepts a queue to use and a block which will be called for each message received.
Let's jump to the #run_process method.
def run_process(queue, &block) queue.subscribe(:ack => true) { |headers, payload| data = unserialize(payload) block.call(data) headers.ack } end # end run process
This method takes a queue and a block to call for processing each message received. We subscribe to the queue which is asynchronous. This means whenever the server has a message it will let us know and the block given to subscribe will be called. The ack option we passed to subscribe is telling the server that we will let it know that we processed the message. If the server doesn't receive an ack from us it should put the message back into the queue. This is useful when a worker dies and you want that message to be 'recovered'.
The payload is the message body. Here we unserialize it then call our block with it for processing then use headers.ack to tell the server we finished processing. Next up is #process_clients.
# update legacy clients to new clients model def process_client mq = MQ.new queue = mq.queue('cltc.import.clients').bind(mq.topic('cltc'), :key => 'import.clients') run_process(queue) do |client| ClientLoader.new client end end # end process client
Again we create our queue and then bind to our topic exchange with the appropriate routing key that matches how the messages were published in our Publisher class. Then we again call #run_process grab our legacy client in the block and use another class to do our conversion. The block you pass to #run_process can do whatever it needs to do to process/convert the data. As long as it returns the message will be acked.
If you look at the bottom of the code you see we let our master process live by doing a sleep loop as long as there are forks active. This allows us to use our master process as a control process. We can send an INT or TERM signal and have our master process kill the forks.
Reporter Class
All this processing needs some sort of reporting. You could put logging statements in the processing code but logging takes time which decreases performance and you want all the performance you can get if you are doing a lot of data processing. Instead we use a separate class to report on how many messages are left to be processed or how many consumers/workers are waiting for work.
#!/usr/bin/ruby # Core require 'date' require 'benchmark' # Gems require 'rubygems' require 'mq' # Helper require 'import.mq.helper.rb' # For ack to work appropriatly you must shutdown AMQP gracefully, # otherwise all items in your queue will be returned Signal.trap('INT') { AMQP.stop{ EM.stop } exit(0) } Signal.trap('TERM') { AMQP.stop{ EM.stop } exit(0) } AMQP.start do class Reporter attr_reader :options @timer = nil include MQHelper def initialize env @options = env load_rails_environment end def report section = ARGV[0] if section.nil? log "You must supply a section to report (users, clients, etc..)" graceful_death else case section when "users" report_user when "clients" report_client end end end # end report protected def report_user mq = MQ.new queue = mq.queue('cltc.import.users').bind(mq.topic('cltc'), :key => 'import.users') run_report(queue) end # end report user def report_client mq = MQ.new queue = mq.queue('cltc.import.clients').bind(mq.topic('cltc'), :key => 'import.clients') run_report(queue) end def run_report(queue) @timer = EM.add_periodic_timer(5) { queue.status {|num_messages, num_consumers| log "[#{Time::now.strftime('%m/%d - %H:%M:%S')}] #{num_consumers} consumers have #{num_messages} messages left to process" } } end # end run report end # end Reporter Reporter.new(ENV).report end # end AMQP.start
This should look familiar as most of it is the same concept as the Publisher class. The new code to look at is in the #run_report method. Here we add a periodic timer that will run the given block every 5 seconds. We use eventmachine's handiness to do this. The timed block will ask the status of the given queue and call the given block with the number of consumers (listeners) and the number of messages waiting to be processed. Because each queue is bound by a routing key you will only see that routing key's consumers and pending messages. Alternatively if you ran a report with a routing key of 'import.*' you would see the consumers and pending messages on 'import.users' and 'import.clients' (* matches one level).
With our status we simply log how many consumers and messages we have.
How to tell you've finished publishing and processing
You know you've finished publishing when your publisher process exits. But how do you know when you've finished processing all the way? The key is to know how many consumers were listening on a routing key. So let's say we had 16 consumers on 'import.clients'. When our publisher(s) finish for that key and our reporter states we have 0 messages left with 16 consumers listening we know we're done. When consumers are busy processing they will drop from the status report. So if all 16 consumers were processing and we have 1,000 pending messages our reporter would tell us we had 0 consumers and 1,000 messages.
There may be an easier to way to tell when we're done processing but this method definitely works and can be used to switch your pool or workers to a new section and begin work there. Switching them over is an exercise for the reader.
Resources
Feel free to leave comments/questions especially if I left something out or didn't explain a section very well.