class Qpid::Proton::WorkQueue

A thread-safe queue of work for multi-threaded programs.

A {Container} can have multiple threads calling {Container#run} The container ensures that work associated with a single {Connection} or {Listener} is serialized - two threads will never concurrently call handlers associated with the same object.

To have your own code serialized in the same, add a block to the connection's {WorkQueue}. The block will be invoked as soon as it is safe to do so.

A {Connection} and the objects associated with it ({Session}, {Sender}, {Receiver}, {Delivery}, {Tracker}) are not thread safe, so if you have multiple threads calling {Container#run} or if you want to affect objects managed by the container from non-container threads you need to use the {WorkQueue}

Public Class Methods

new(container) click to toggle source

@private

# File lib/core/work_queue.rb, line 71
def initialize(container)
  @lock = Mutex.new
  @schedule = Schedule.new
  @container = container
  @closed = nil
end

Public Instance Methods

add(&block) click to toggle source

Add a block of code to be invoked in sequence.

@yield [ ] the block will be invoked with no parameters in the appropriate thread context @note Thread Safe: may be called in any thread. @return [void] @raise [StoppedError] if the queue is closed and cannot accept more work

# File lib/core/work_queue.rb, line 49
def add(&block)
  schedule(0, &block)
end
clear() click to toggle source

@private

# File lib/core/work_queue.rb, line 95
def clear() @lock.synchronize { @schedule.clear } end
close() click to toggle source

@private

# File lib/core/work_queue.rb, line 79
def close() @lock.synchronize { @closed = StoppedError.new } end
empty?() click to toggle source

@private

# File lib/core/work_queue.rb, line 92
def empty?() @lock.synchronize { @schedule.empty? } end
next_tick() click to toggle source

@private

# File lib/core/work_queue.rb, line 89
def next_tick() @lock.synchronize { @schedule.next_tick } end
process(now) click to toggle source

@private

# File lib/core/work_queue.rb, line 82
def process(now)
  while p = @lock.synchronize { @schedule.pop(now) }
    p.call
  end
end
schedule(at, &block) click to toggle source

Schedule a block to be invoked at a certain time.

@param at [Time] Invoke block as soon as possible after Time at @param at [Numeric] Invoke block after a delay of at seconds from now @yield [ ] (see add) @note (see add) @return (see add) @raise (see add)

# File lib/core/work_queue.rb, line 61
def schedule(at, &block)
  raise ArgumentError, "no block" unless block_given?
  @lock.synchronize do
    raise @closed if @closed
    @schedule.insert(at, block)
  end
  @container.send :wake
end