HEX
Server: Apache
System: Linux pdx1-shared-a1-38 6.6.104-grsec-jammy+ #3 SMP Tue Sep 16 00:28:11 UTC 2025 x86_64
User: mmickelson (3396398)
PHP: 8.1.31
Disabled: NONE
Upload Files
File: //usr/share/rubygems-integration/all/gems/bunny-2.19.0/lib/bunny/concurrent/continuation_queue.rb
require "thread"

module Bunny
  module Concurrent
    # Continuation queue implementation for MRI and Rubinius
    #
    # @private
    class ContinuationQueue
      def initialize
        @q    = []
        @lock = ::Mutex.new
        @cond = ::ConditionVariable.new
      end

      def push(item)
        @lock.synchronize do
          @q.push(item)
          @cond.signal
        end
      end
      alias << push

      def pop
        poll
      end

      def poll(timeout_in_ms = nil)
        timeout = timeout_in_ms ? timeout_in_ms / 1000.0 : nil

        @lock.synchronize do
          timeout_strikes_at = Time.now.utc + (timeout || 0)
          while @q.empty?
            wait = if timeout
                     timeout_strikes_at - Time.now.utc
                   else
                     nil
                   end
            @cond.wait(@lock, wait)
            raise ::Timeout::Error if wait && Time.now.utc >= timeout_strikes_at
          end
          item = @q.shift
          item
        end
      end

      def clear
        @lock.synchronize do
          @q.clear
        end
      end

      def empty?
        @q.empty?
      end

      def size
        @q.size
      end
      alias length size
    end
  end
end