HEX
Server: Apache
System: Linux pdx1-shared-a1-38 6.6.104-grsec-jammy+ #3 SMP Tue Sep 16 00:28:11 UTC 2025 x86_64
User: mmickelson (3396398)
PHP: 8.1.31
Disabled: NONE
Upload Files
File: //usr/share/rubygems-integration/all/gems/bunny-2.19.0/lib/bunny/reader_loop.rb
require "thread"

module Bunny
  # Network activity loop that reads and passes incoming AMQP 0.9.1 methods for
  # processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel.
  # This loop uses a separate thread internally.
  #
  # This mimics the way RabbitMQ Java is designed quite closely.
  # @private
  class ReaderLoop

    def initialize(transport, session, session_error_handler)
      @transport             = transport
      @session               = session
      @session_error_handler = session_error_handler
      @logger                = @session.logger

      @mutex                 = Mutex.new

      @stopping              = false
      @stopped               = false
      @network_is_down       = false
    end


    def start
      @thread    = Thread.new(&method(:run_loop))
    end

    def resume
      start
    end


    def run_loop
      loop do
        begin
          break if @mutex.synchronize { @stopping || @stopped || @network_is_down }
          run_once
        rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Timeout::Error,
               OpenSSL::OpenSSLError => e
          break if terminate? || @session.closing? || @session.closed?

          @network_is_down = true
          if @session.automatically_recover?
            log_exception(e, level: :warn)
            @session.handle_network_failure(e)
          else
            log_exception(e)
            @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
          end
        rescue ShutdownSignal => _
          @mutex.synchronize { @stopping = true }
          break
        rescue Exception => e
          break if terminate?
          if !(@session.closing? || @session.closed?)
            log_exception(e)

            @network_is_down = true
            @session_error_handler.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e))
          end
        rescue Errno::EBADF => _ebadf
          break if terminate?
          # ignored, happens when we loop after the transport has already been closed
          @mutex.synchronize { @stopping = true }
        end
      end

      @mutex.synchronize { @stopped = true }
    end

    def run_once
      frame = @transport.read_next_frame
      return if frame.is_a?(AMQ::Protocol::HeartbeatFrame)

      if !frame.final? || frame.method_class.has_content?
        header   = @transport.read_next_frame
        content  = ''

        if header.body_size > 0
          loop do
            body_frame = @transport.read_next_frame
            content << body_frame.decode_payload

            break if content.bytesize >= header.body_size
          end
        end

        @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content])
      else
        @session.handle_frame(frame.channel, frame.decode_payload)
      end
    end

    def stop
      @mutex.synchronize { @stopping = true }
    end

    def stopped?
      @mutex.synchronize { @stopped }
    end

    def stopping?
      @mutex.synchronize { @stopping }
    end

    def terminate_with(e)
      @mutex.synchronize { @stopping = true }

      self.raise(e)
    end

    def raise(e)
      @thread.raise(e) if @thread
    end

    def join
      # Thread#join can/would trigger a re-raise of an unhandled exception in this thread.
      # In addition, Thread.handle_interrupt can be used by other libraries or application code
      # that would make this join operation fail with an obscure exception.
      # So we try to save everyone some really unpleasant debugging time by introducing
      # this condition which typically would not evaluate to true anyway.
      #
      # See ruby-amqp/bunny#589 and ruby-amqp/bunny#590 for background.
      @thread.join if @thread && @thread != Thread.current
    end

    def kill
      if @thread
        @thread.kill
        @thread.join
      end
    end

    protected

    def log_exception(e, level: :error)
      if !(io_error?(e) && (@session.closing? || @session.closed?))
        @logger.send level, "Exception in the reader loop: #{e.class.name}: #{e.message}"
        @logger.send level, "Backtrace: "
        e.backtrace.each do |line|
          @logger.send level, "\t#{line}"
        end
      end
    end

    def io_error?(e)
      [AMQ::Protocol::EmptyResponseError, IOError, SystemCallError].any? do |klazz|
        e.is_a?(klazz)
      end
    end

    def terminate?
      @mutex.synchronize { @stopping || @stopped }
    end
  end
end