The Devver Blog

A Boulder startup improving the way developers work.

Sending Files with EventMachine

Devver has to keep the developer’s environment synchronized with our servers. To do this our Devver client sends all of the project files to our servers. We currently have a EventMachine client transfer files over SSL to a EventMachine server. We went through various stages and methods of sending files with EventMachine before finding a good solution. On smaller projects we didn’t even realize how bad our performance was. After bringing up some larger projects we realized we needed to look more into our file transfer performance. Since I couldn’t find much out on the web about this, I thought sharing some examples of how we had set up our EventMachine clients and servers to send files might be useful to someone else out there.

I got some help from people on the EventMachine mailing list, here is the thread discussing sending large files with EventMachine.

Since I was already playing around with a few of our options, I decided to do some comparisons between using EM.send_data, EM.stream_file_data, an alternative buffer recommended by James Tucker, our crappy buffer we have been using before we discovered the default EM BufferedTokenizer, and layering compression on top of the various methods. We had hacked together our buffer tokenizer rather quickly, and it always performed well enough in our initial testing, but it shows why performance tests are worth a little bit of effort. The benchmarks on the various setups are below (This was all done on localhost, it is worth noting that compression helps much more between remote servers).

Sending log file with compression turned off (5 times)
OurBadBufferedTokenizer: 10.40 s
Standard EM BufferedTokenizer: 0.93 s
Tucker’s BufferedTokenizer: 0.92 s
steam_file_data w/ EM BufferedTokenizer: 0.98 s

Sending log file with compression turned on (5 times)
OurBadBufferedTokenizer: 1.02 s
Standard EM BufferedTokenizer: 0.99 s
Tucker’s BufferedTokenizer: 1.04 s
steam_file_data w/ EM BufferedTokenizer: N/A can’t use stream_file_data with on the fly compression

Sending compressed Mp3 file with compression turned off (5 times)
OurBadBufferedTokenizer: 18.55 s
Standard EM BufferedTokenizer: 1.09 s
Tucker’s BufferedTokenizer: 1.10 s
steam_file_data w/ EM BufferedTokenizer: 1.22 s

Adding compression to already compressed files like mp3s doesn’t change the time significantly. This is a longer run just to show how the times vary with a larger test. I also tested on full projects and the variance seemed to hold.
Sending compressed Mp3 file with compression turned off (25 times)
OurBadBufferedTokenizer: N/A takes too long
Standard EM BufferedTokenizer: 5.70 s
Tucker’s BufferedTokenizer: 4.38 s
steam_file_data w/ EM BufferedTokenizer: 4.82 s

Below are the little tests and examples I was working with. Obviously you won’t have the same files on your system or Tucker’s buffer, so I packed everything up as zip. To try everything out just download the EventMachine sending files tests zip. Then extract, and run ‘ruby em_send_file_test.rb’. Any thoughts or feedback are welcome, I am still learning the ins and outs of EventMachine so feel free to send me any tips.

dir = File.expand_path(File.dirname(__FILE__))
unless($LOAD_PATH.member?(dir))
  $LOAD_PATH.unshift(dir)
end

require 'test/unit'
require 'eventmachine'
require 'zlib'
require 'yaml'
require 'ruby-debug'
require 'buffered_tokenizer_pastie'
require 'benchmark'

Thread.abort_on_exception = true

SERVER_PORT = 7999
SERVER_IP = '127.0.0.1'
TOKEN = "|DEFAULTDELIMITED|"
#check with different types of files compression
#results varies a bunch for txt vs compressed like mp3
FILE_NAME = '~/development.log'
#FILE_NAME = '~/Blue.mp3'
COMPRESS = false
#COMPRESS = true

TIMES = 5

class EmClientExample < EventMachine::Connection

  def unbind
    puts "client connection has terminated"
  end

  def process(data)
    puts "client got data: #{data}"
    send_files() if data=="success"
    send(prepare("some_msg")) if data=="filesDone"
    send(prepare("quit")) if data=="ack"
    if(data=="goodbye")
      puts "Client successfully sent all data shutting down!!!!"
      EventMachine::stop_event_loop
    end
  end

  def send_files()
    puts "sending files"
    @files = Array.new(TIMES,[FILE_NAME, Time.now.to_s])
    send_files_loop
  end

  def send_files_loop
    if @files && @files.length > 0
      file = @files.shift
      EM.next_tick do
        send_file(file[0],file[1])
        send_files_loop
      end
    else
      puts "done syncing files"
      send(prepare("files_completed"))
    end
  end

  def send_file(path,mtime)
    puts "Syncing "+path
    contents = File.read(File.expand_path(path))
    contents = Zlib::Deflate.deflate(contents,Zlib::BEST_SPEED) if COMPRESS
    send(prepare("send_file #{path}, #{mtime}, content:#{contents}"))
  end

  def send(str)
    #puts "sending: #{str}"
    send_data str
  end

  def prepare(str)
    str+TOKEN
  end

  def self.push_start()
    EventMachine.connect(SERVER_IP,SERVER_PORT,self) do |c|
      c.send_files()
    end
  end

end

class EmClientExampleBadBuffer < EmClientExample

  attr_accessor :buffer

  def initialize(*args)
    super
    @buffer = DataBuffer.new
  end

  def receive_data(data)
    @buffer.append(data)
    while(command = @buffer.grab)
      process(command)
    end
  end

  def prepare(str)
    @buffer.prepare(str)
  end

end

class EmClientExampleBuffToken < EmClientExample

  def initialize(*args)
    super
    @recv_buffer = BufferedTokenizer.new(TOKEN)
  end

  def receive_data(data)
    @recv_buffer.extract(data).each do |m|
      process(m)
    end
  end

end

class EmClientExampleStreamBuffToken < EmClientExample

  def initialize(*args)
    super
    @recv_buffer = BufferedTokenizer.new(TOKEN)
  end

  def send_files_loop
    if @files && @files.length > 0
      file = @files.shift
      EM.next_tick do
        send_file(file[0],file[1])
      end
    else
      puts "done syncing files"
      send(prepare("files_completed"))
    end
  end

  def send_file(path,mtime)
    puts "Syncing "+path
    send("send_file #{path}, #{mtime}, content:")

    EM::Deferrable.future( stream_file_data(File.expand_path(path)) ) {
      send(prepare(""))
      send_files_loop
    }
  end

  def receive_data(data)
    @recv_buffer.extract(data).each do |m|
      process(m)
    end
  end

end

class EmClientExamplePastie < EmClientExample

  def initialize(*args)
    super
    @recv_buffer = BufferedTokenizerPastie.new(TOKEN)
  end

  def receive_data(data)
    @recv_buffer.extract(data).each do |m|
      process(m)
    end
  end

end

class EmServerExample < EventMachine::Connection

  def post_init
    if(@signature)
      client = Socket.unpack_sockaddr_in(get_peername)
      puts "Received a new connection from #{client.last}:#{client.first}"
    end
  end

  def unbind
    puts "server connection has terminated\n"
  end

  def process(data)
    #puts "server: #{data[0..15]}"
    send(prepare("success")) if data=="login"
    send(prepare("filesDone")) if data=="files_completed"
    send(prepare("ack")) if data=="some_msg"
    if data.match(/^send_file/)
      #puts data[0..40]
      puts "received file"
      start = data.index(", content:") + ", content:".length
      ender = data.length
      contents = data[start,ender]
      contents = Zlib::Inflate.inflate(contents) if COMPRESS
      file_contents = File.read(File.expand_path(FILE_NAME))
      if contents != file_contents
        puts "file was corrupted"
        puts "received length: #{contents.length} file lenght: #{file_contents.length}"
        #File.open(File.expand_path("~/copy.file"),"w") do |f|
        #  f << contents
        #end
      end
    end
    if data=="quit"
      send(prepare("goodbye"))
      close_connection_after_writing
    end
  end

  def prepare(str)
    str+TOKEN
  end

  def send(msg)
    #puts "server sent: #{msg}"
    send_data msg
  end

end

class EmServerExampleBadBuffer < EmServerExample

  def initialize(*args)
    super
    @buffer = DataBuffer.new
  end

  def receive_data(data)
    @buffer.append(data)
    while(command = @buffer.grab)
      process(command)
    end
  end

  def prepare(str)
    @buffer.prepare(str)
  end

end

class EmServerExampleBuffToken < EmServerExample

  def initialize(*args)
    super
    @recv_buffer = BufferedTokenizer.new(TOKEN)
  end

  def receive_data(data)
    @recv_buffer.extract(data).each do |m|
      process(m)
    end
  end

end

class EmServerExamplePastie < EmServerExample

  def initialize(*args)
    super
    @recv_buffer = BufferedTokenizerPastie.new(TOKEN)
  end

  def receive_data(data)
    @recv_buffer.extract(data).each do |m|
      process(m)
    end
  end

end

class DataBuffer
  FRONT_DELIMITER = "0x5b".hex.chr # '['
  BACK_DELIMITER = "0x5d".hex.chr #']'[0].to_s(16).hex.chr
  DELIMITER = "|#{FRONT_DELIMITER}#{FRONT_DELIMITER}#{FRONT_DELIMITER}GT_DELIM#{BACK_DELIMITER}#{BACK_DELIMITER}#{BACK_DELIMITER}#{BACK_DELIMITER}|"
  DELIM_ESCAPE = /#{Regexp.escape(DELIMITER)}/
    DELIM_ESCAPE_END = /#{Regexp.escape(DELIMITER)}\Z/

    def initialize
      @unprocessed = ""
      @commands = []
    end

    def grab
      new_messages = @unprocessed.split(DELIM_ESCAPE)
      while new_messages.length > 1
        @commands << new_messages.shift
      end
      msg_length = new_messages.length
      if msg_length > 0
        if msg_length == 1 && (@unprocessed=~DELIM_ESCAPE_END)
          @commands.push(new_messages.shift)
          @unprocessed = ""
        else
          #put the rest of the last statement back into the buffer
          while(cut=@unprocessed.index(DELIM_ESCAPE))
            @unprocessed = (@unprocessed[cut..@unprocessed.length]).sub(DELIMITER,"")
          end
        end
      end
      if @commands.length > 0
        return @commands.shift
      else
        return nil
      end
    end

    def prepare(str)
      str.to_s+DELIMITER
    end

    def append(data)
      @unprocessed = @unprocessed + data
    end

  end

  class EmSendFileTest < Test::Unit::TestCase

    def test_placeholder
      assert true
    end

    def start_server(server_type)
      server_pid = fork {
        EventMachine::run do
          EventMachine::start_server SERVER_IP, SERVER_PORT, server_type
          puts "Server now accepting requests..."
        end
      }
      server_pid
    end

    def start_client(client_type)
      client_pid = fork {
        EventMachine::run { client_type.push_start() }
      }
      client_pid
    end

    def run_against_server_client(client_example, server_example)
      assert_nothing_raised do
        puts Benchmark.realtime {
          server_pid = start_server(server_example)
          #make sure server is up for client to connect to
          sleep(0.2)
          client_pid = start_client(client_example)
          sleep(0.2)

          Process.wait(client_pid)
          puts "client finished"

          #I don't know a clean way to end event machine take it down
          Process.kill('KILL',server_pid)
          Process.waitall
        }
        puts "##############################################################"
      end
    end

    def test_em_send_files_with_em_buffered_tokenizer
      puts "send files test with em buffered tokenizer"
      client_example = EmClientExampleBuffToken
      server_example = EmServerExampleBuffToken
      run_against_server_client(client_example, server_example)
    end

    def test_em_stream_files_with_em_buffered_tokenizer
      puts "steam_file_data test with em buffered tokenizer"
      if COMPRESS == true
        puts "steam_file_data can't be used with on the fly compression"
      else
        client_example = EmClientExampleStreamBuffToken
        server_example = EmServerExampleBuffToken
        run_against_server_client(client_example, server_example)
      end
    end

    def test_em_send_files_with_bad_tokenizer
      puts "send files test with our bad bueffered tokenizer"
      client_example = EmClientExampleBadBuffer
      server_example = EmServerExampleBadBuffer
      run_against_server_client(client_example, server_example)
    end

    def test_em_send_files_with_pastie_tokenizer
      puts "send files test with the pastied tokenizer"
      client_example = EmClientExamplePastie
      server_example = EmServerExamplePastie
      run_against_server_client(client_example, server_example)
    end

  end

Written by DanM

October 8, 2008 at 8:22 am

Posted in Uncategorized

5 Responses

Subscribe to comments with RSS.

  1. There is one important thing to keep in mind with stream_file_data versus just dropping a huge chunk of data onto send_data. stream_file_data sends a bite sized chunk of data, then waits until the next reactor tick to send more, allowing other events to be handled. On the other hand, if you drop a very large chunk of data onto send_data, you can starve other connections while that big chunk is being dealt with.

    Kirk Haines

    October 8, 2008 at 2:55 am

  2. Very interesting. Just what I had in mind for some Wednesday night 'light' reading…

    krista51951

    October 8, 2008 at 1:56 pm

  3. Haha yeah, we keep the blog pretty technical, since that seems to be the most interesting stuff to people following what we are up to.

    danmayer

    October 8, 2008 at 3:02 pm

  4. […] Sending Files with EventMachine (tags: ruby eventmachine programming multimedia) […]

  5. […] Sending Files with EventMachine. How to send large files with EventMachine […]


Comments are closed.

%d bloggers like this: