class Qpid::Proton::WorkQueue
A thread-safe queue of work for multi-threaded programs.
A {Container} can have multiple threads calling {Container#run} The container ensures that work associated with a single {Connection} or {Listener} is serialized - two threads will never concurrently call handlers associated with the same object.
To have your own code serialized in the same, add a block to the connection's {WorkQueue}. The block will be invoked as soon as it is safe to do so.
A {Connection} and the objects associated with it ({Session}, {Sender}, {Receiver}, {Delivery}, {Tracker}) are not thread safe, so if you have multiple threads calling {Container#run} or if you want to affect objects managed by the container from non-container threads you need to use the {WorkQueue}
Public Class Methods
@private
# File lib/core/work_queue.rb, line 71 def initialize(container) @lock = Mutex.new @schedule = Schedule.new @container = container @closed = nil end
Public Instance Methods
Add a block of code to be invoked in sequence.
@yield [ ] the block will be invoked with no parameters in the appropriate thread context @note Thread Safe: may be called in any thread. @return [void] @raise [StoppedError] if the queue is closed and cannot accept more work
# File lib/core/work_queue.rb, line 49 def add(&block) schedule(0, &block) end
@private
# File lib/core/work_queue.rb, line 95 def clear() @lock.synchronize { @schedule.clear } end
@private
# File lib/core/work_queue.rb, line 79 def close() @lock.synchronize { @closed = StoppedError.new } end
@private
# File lib/core/work_queue.rb, line 92 def empty?() @lock.synchronize { @schedule.empty? } end
@private
# File lib/core/work_queue.rb, line 89 def next_tick() @lock.synchronize { @schedule.next_tick } end
@private
# File lib/core/work_queue.rb, line 82 def process(now) while p = @lock.synchronize { @schedule.pop(now) } p.call end end
Schedule
a block to be invoked at a certain time.
@param at [Time] Invoke block as soon as possible after Time at
@param at [Numeric] Invoke block after a delay of at
seconds from now @yield [ ] (see add
) @note (see add
) @return (see add
) @raise (see add
)
# File lib/core/work_queue.rb, line 61 def schedule(at, &block) raise ArgumentError, "no block" unless block_given? @lock.synchronize do raise @closed if @closed @schedule.insert(at, block) end @container.send :wake end