# File lib/raindrops/aggregate/pmq.rb, line 65
  def initialize(params = {})
    opts = {
      :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
      :worker_interval => 10,
      :master_interval => 5,
      :lossy => false,
      :mq_attr => nil,
      :mq_umask => 0666,
      :aggregate => Aggregate.new,
    }.merge! params
    @master_interval = opts[:master_interval]
    @worker_interval = opts[:worker_interval]
    @aggregate = opts[:aggregate]
    @worker_queue = @worker_interval ? [] : nil
    @mutex = Mutex.new

    @mq_name = opts[:queue]
    mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
    Tempfile.open("raindrops_pmq") do |t|
      @wr = File.open(t.path, "wb")
      @rd = File.open(t.path, "rb")
    end
    @cached_aggregate = @aggregate
    flush_master
    @mq_send = if opts[:lossy]
      @nr_dropped = 0
      mq.nonblock = true
      mq.method :trysend
    else
      mq.method :send
    end
  end