def self.aggregate(current_bucket)
db = ::Mongo::Connection.new(hostname).db(database)
retentions.sort_by! {|r| r['seconds']}
docs = []
fine_stats_collection = db.collection(retentions.first['name'])
retentions[1..-1].each_with_index do |retention,index|
coarse_stats_collection = db.collection(retention['name'])
puts "Aggregating #{retention['name']}"
step = retention['seconds']
current_coarse_bucket = current_bucket / step * step - step
previous_coarse_bucket = current_coarse_bucket - step
puts "#{Time.at(previous_coarse_bucket)}..#{Time.at(current_coarse_bucket)}"
if coarse_stats_collection.find({:ts => previous_coarse_bucket}).count == 0
print '.'
stats_to_aggregate = fine_stats_collection.find(
{:ts => {"$gte" => previous_coarse_bucket, "$lt" => current_coarse_bucket}})
rows = stats_to_aggregate.to_a
count = rows.count
rows.group_by {|r| r["stat"] }.each_pair do |name,stats|
case stats.first['type']
when 'timer'
mean = stats.collect {|stat| stat['values']['mean'] }.inject( 0 ) { |s,x| s+x } / stats.count
max = stats.collect {|stat| stat['values']['max'] }.max
min = stats.collect {|stat| stat['values']['min'] }.min
upper_key = stats.first['values'].keys.find{|k| k =~ /upper_/}
max_at_threshold = stats.collect {|stat| stat['values'][upper_key] }.max
total_stats = stats.collect {|stat| stat['values']['count'] }.inject( 0 ) { |s,x| s+x }
doc = { :stat => name,
:values => {
:mean => mean,
:max => max,
:min => min,
upper_key.to_sym => max_at_threshold,
:count => total_stats
},
:type => "timer",
:ts => previous_coarse_bucket
}
when 'counter'
doc = {:stat => name,
:value => stats.collect {|stat| stat['value'] }.inject( 0 ) { |s,x| s+x },
:ts => previous_coarse_bucket,
:type => "counter"
}
else
raise "unknown type #{stats.first['type']}"
end
docs.push(doc)
end
coarse_stats_collection.insert(docs) unless docs.empty?
end
end
end