# File lib/tweetstream/client.rb, line 323
    def connect(path, query_parameters = {}, &block)
      method = query_parameters.delete(:method) || :get
      delete_proc = query_parameters.delete(:delete) || self.on_delete
      limit_proc = query_parameters.delete(:limit) || self.on_limit
      error_proc = query_parameters.delete(:error) || self.on_error
      reconnect_proc = query_parameters.delete(:reconnect) || self.on_reconnect
      inited_proc = query_parameters.delete(:inited) || self.on_inited
      direct_message_proc = query_parameters.delete(:direct_message) || self.on_direct_message
      timeline_status_proc = query_parameters.delete(:timeline_status) || self.on_timeline_status
      anything_proc = query_parameters.delete(:anything) || self.on_anything

      params = normalize_filter_parameters(query_parameters)

      extra_stream_parameters = query_parameters.delete(:extra_stream_parameters) || {}

      uri = method == :get ? build_uri(path, params) : build_uri(path)

      stream_params = {
        :path       => uri,
        :method     => method.to_s.upcase,
        :user_agent => user_agent,
        :on_inited  => inited_proc,
        :filters    => params.delete(:track),
        :params     => params,
        :ssl        => true
      }.merge(auth_params).merge(extra_stream_parameters)

      if @on_interval_proc.is_a?(Proc)
        interval = @on_interval_time || Configuration::DEFAULT_TIMER_INTERVAL
        @timer = EventMachine.add_periodic_timer(interval) do
          EventMachine.defer do
            @on_interval_proc.call
          end
        end
      end

      @stream = Twitter::JSONStream.connect(stream_params)
      @stream.each_item do |item|
        begin
          raw_hash = json_parser.decode(item)
        rescue MultiJson::DecodeError
          error_proc.call("MultiJson::DecodeError occured in stream: #{item}") if error_proc.is_a?(Proc)
          next
        end

        unless raw_hash.is_a?(::Hash)
          error_proc.call("Unexpected JSON object in stream: #{item}") if error_proc.is_a?(Proc)
          next
        end

        hash = TweetStream::Hash.new(raw_hash)
        if hash[:delete] && hash[:delete][:status]
          delete_proc.call(hash[:delete][:status][:id], hash[:delete][:status][:user_id]) if delete_proc.is_a?(Proc)
        elsif hash[:limit] && hash[:limit][:track]
          limit_proc.call(hash[:limit][:track]) if limit_proc.is_a?(Proc)

        elsif hash[:direct_message]
          yield_message_to direct_message_proc, TweetStream::DirectMessage.new(hash[:direct_message])

        elsif hash[:text] && hash[:user]
          @last_status = TweetStream::Status.new(hash)
          yield_message_to timeline_status_proc, @last_status

          if block_given?
            # Give the block the option to receive either one
            # or two arguments, depending on its arity.
            case block.arity
              when 1
                yield @last_status
              when 2
                yield @last_status, self
            end
          end
        end

        yield_message_to anything_proc, hash
      end

      @stream.on_error do |message|
        error_proc.call(message) if error_proc.is_a?(Proc)
      end

      @stream.on_reconnect do |timeout, retries|
        reconnect_proc.call(timeout, retries) if reconnect_proc.is_a?(Proc)
      end

      @stream.on_max_reconnects do |timeout, retries|
        raise TweetStream::ReconnectError.new(timeout, retries)
      end

      @stream
    end