def connect(path, query_parameters = {}, &block)
method = query_parameters.delete(:method) || :get
delete_proc = query_parameters.delete(:delete) || self.on_delete
limit_proc = query_parameters.delete(:limit) || self.on_limit
error_proc = query_parameters.delete(:error) || self.on_error
reconnect_proc = query_parameters.delete(:reconnect) || self.on_reconnect
inited_proc = query_parameters.delete(:inited) || self.on_inited
direct_message_proc = query_parameters.delete(:direct_message) || self.on_direct_message
timeline_status_proc = query_parameters.delete(:timeline_status) || self.on_timeline_status
anything_proc = query_parameters.delete(:anything) || self.on_anything
params = normalize_filter_parameters(query_parameters)
extra_stream_parameters = query_parameters.delete(:extra_stream_parameters) || {}
uri = method == :get ? build_uri(path, params) : build_uri(path)
stream_params = {
:path => uri,
:method => method.to_s.upcase,
:user_agent => user_agent,
:on_inited => inited_proc,
:filters => params.delete(:track),
:params => params,
:ssl => true
}.merge(auth_params).merge(extra_stream_parameters)
if @on_interval_proc.is_a?(Proc)
interval = @on_interval_time || Configuration::DEFAULT_TIMER_INTERVAL
@timer = EventMachine.add_periodic_timer(interval) do
EventMachine.defer do
@on_interval_proc.call
end
end
end
@stream = Twitter::JSONStream.connect(stream_params)
@stream.each_item do |item|
begin
raw_hash = json_parser.decode(item)
rescue MultiJson::DecodeError
error_proc.call("MultiJson::DecodeError occured in stream: #{item}") if error_proc.is_a?(Proc)
next
end
unless raw_hash.is_a?(::Hash)
error_proc.call("Unexpected JSON object in stream: #{item}") if error_proc.is_a?(Proc)
next
end
hash = TweetStream::Hash.new(raw_hash)
if hash[:delete] && hash[:delete][:status]
delete_proc.call(hash[:delete][:status][:id], hash[:delete][:status][:user_id]) if delete_proc.is_a?(Proc)
elsif hash[:limit] && hash[:limit][:track]
limit_proc.call(hash[:limit][:track]) if limit_proc.is_a?(Proc)
elsif hash[:direct_message]
yield_message_to direct_message_proc, TweetStream::DirectMessage.new(hash[:direct_message])
elsif hash[:text] && hash[:user]
@last_status = TweetStream::Status.new(hash)
yield_message_to timeline_status_proc, @last_status
if block_given?
case block.arity
when 1
yield @last_status
when 2
yield @last_status, self
end
end
end
yield_message_to anything_proc, hash
end
@stream.on_error do |message|
error_proc.call(message) if error_proc.is_a?(Proc)
end
@stream.on_reconnect do |timeout, retries|
reconnect_proc.call(timeout, retries) if reconnect_proc.is_a?(Proc)
end
@stream.on_max_reconnects do |timeout, retries|
raise TweetStream::ReconnectError.new(timeout, retries)
end
@stream
end