class Qpid::Proton::Container
An AMQP container manages a set of {Listener}s and {Connection}s which contain {#Sender} and {#Receiver} links to transfer messages. Usually, each AMQP client or server process has a single container for all of its connections and links.
One or more threads can call {#run}, events generated by all the listeners and connections will be dispatched in the {#run} threads.
Attributes
Auto-stop flag.
True (the default) means that the container will stop automatically, as if {#stop} had been called, when the last listener or connection closes.
False means {#run} will not return unless {#stop} is called.
@return [Bool] auto-stop state
@return [MessagingHandler] The container-wide handler
@return [String] unique identifier for this container
True if the container has been stopped and can no longer be used. @return [Bool] stopped state
Public Class Methods
Create a new Container
@overload initialize(id=nil)
@param id [String,Symbol] A unique ID for this container, use random UUID if nil.
@overload initialize(handler=nil, id=nil)
@param id [String,Symbol] A unique ID for this container, use random UUID if nil. @param handler [MessagingHandler] Optional default handler for connections that do not have their own handler (see {#connect} and {#listen}) *Note*: For multi-threaded code, it is recommended to use a separate handler instance for each connection, as a shared handler may be called concurrently.
# File lib/core/container.rb, line 55 def initialize(*args) @handler, @id = nil case args.size when 2 then @handler, @id = args when 1 then @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol) @handler = args[0] unless @id when 0 then else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2" end # Use an empty messaging adapter to give default behaviour if there's no global handler. @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil) @id = (@id || SecureRandom.uuid).freeze # Threading and implementation notes: see comment on #run_one @work = Queue.new @work << :start @work << :select @wake = SelectWaker.new # Wakes #run thread in IO.select @auto_stop = true # Stop when @active drops to 0 @work_queue = WorkQueue.new(self) # work scheduled by other threads for :select context # Following instance variables protected by lock @lock = Mutex.new @active = 0 # All active tasks, in @selectable, @work or being processed @selectable = Set.new # Tasks ready to block in IO.select @running = 0 # Count of #run threads @stopped = false # #stop called @stop_err = nil # Optional error to pass to tasks, from #stop @panic = nil # Exception caught in a run thread, to be raised by all run threads end
Public Instance Methods
Open an AMQP connection.
@param url [String, URI] Open a {TCPSocket} to url.host, url.port. url.scheme must be “amqp” or “amqps”, url.scheme.nil? is treated as “amqp” url.user, url.password are used as defaults if opts, opts are nil @option (see Connection#open
) @return [Connection] The new AMQP connection
# File lib/core/container.rb, line 121 def connect(url, opts=nil) not_stopped url = Qpid::Proton::uri url opts ||= {} if url.user || url.password opts[:user] ||= url.user opts[:password] ||= url.password end opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps" connect_io(TCPSocket.new(url.host, url.port), opts) end
Open an AMQP protocol connection on an existing {IO} object @param io [IO] An existing {IO} object, e.g. a {TCPSocket} @option (see Connection#open
)
# File lib/core/container.rb, line 136 def connect_io(io, opts=nil) not_stopped cd = connection_driver(io, opts) cd.connection.open() add(cd) cd.connection end
# File lib/core/container.rb, line 94 def inspect() to_s; end
Listen for incoming AMQP connections
@param url [String,URI] Listen on host:port of the AMQP URL
@param handler [Listener::Handler] A {Listener::Handler} object that will be called with events for this listener and can generate a new set of options for each one. @return [Listener] The AMQP listener.
# File lib/core/container.rb, line 151 def listen(url, handler=Listener::Handler.new) not_stopped url = Qpid::Proton::uri url # TODO aconway 2017-11-01: amqps, SSL listen_io(TCPServer.new(url.host, url.port), handler) end
Listen for incoming AMQP connections on an existing server socket. @param io A server socket, for example a {TCPServer} @param handler [Listener::Handler] Handler
for events from this listener
# File lib/core/container.rb, line 162 def listen_io(io, handler=Listener::Handler.new) not_stopped l = ListenTask.new(io, handler, self) add(l) l.listener end
Run the container: wait for IO activity, dispatch events to handlers.
Multi-threaading : More than one thread can call {#run} concurrently, the container will use all {#run} threads as a thread pool. Calls to {MessagingHandler} or {Listener::Handler} methods are serialized for each connection or listener. See {WorkQueue} for coordinating with other threads.
Exceptions: If any handler method raises an exception it will stop the container, and the exception will be raised by all calls to {#run}. For single threaded code this is often desirable. Multi-threaded server applications should normally rescue exceptions in the handler and deal with them in another way: logging, closing the connection with an error condition, signalling another thread etc.
@return [void] Returns when the container stops, see {#stop} and {#auto_stop}
@raise [StoppedError] If the container has already been stopped when {#run} was called.
@raise [Exception] If any {MessagingHandler} or {Listener::Handler} managed by
the container raises an exception, that exception will be raised by {#run}
# File lib/core/container.rb, line 191 def run @lock.synchronize do @running += 1 # Note: ensure clause below will decrement @running raise StoppedError if @stopped end while task = @work.pop run_one(task, Time.now) end @lock.synchronize { raise @panic if @panic } ensure @lock.synchronize do if (@running -= 1) > 0 work_wake nil # Signal the next thread else # This is the last thread, no need to do maybe_panic around this final handler call. @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop end end end
Number of threads in {#run} @return [Bool] {#run} thread count
# File lib/core/container.rb, line 112 def running() @lock.synchronize { @running }; end
(see WorkQueue#schedule
)
# File lib/core/container.rb, line 249 def schedule(at, &block) @work_queue.schedule(at, &block) end
Stop the container.
Close all listeners and abort all connections without doing AMQP protocol close.
{#stop} returns immediately, calls to {#run} will return when all activity is finished.
The container can no longer be used, using a stopped container raises {StoppedError}. Create a new container if you want to resume activity.
@param error [Condition] Optional error condition passed to
{MessagingHandler#on_transport_error} for each connection and {Listener::Handler::on_error} for each listener.
@param panic [Exception] Optional exception to raise from all calls to run()
# File lib/core/container.rb, line 227 def stop(error=nil, panic=nil) @lock.synchronize do return if @stopped @stop_err = Condition.convert(error) @panic = panic @stopped = true check_stop_lh # NOTE: @stopped => # - no new run threads can join # - no more select calls after next wakeup # - once @active == 0, all threads will be stopped with nil end wake end
# File lib/core/container.rb, line 93 def to_s() "#<#{self.class} id=#{id.inspect}>"; end
Get the {WorkQueue} that can be used to schedule code to be run by the container.
Note: to run code that affects a {Connection} or it's associated objects, use {Connection#work_queue}
# File lib/core/container.rb, line 246 def work_queue() @work_queue; end
Private Instance Methods
All new tasks are added here
# File lib/core/container.rb, line 454 def add task @lock.synchronize do @active += 1 task.close @stop_err if @stopped end work_wake task end
# File lib/core/container.rb, line 477 def check_stop_lh if @active.zero? && (@auto_stop || @stopped) && @work_queue.empty? @stopped = true work_wake nil # Signal threads to stop true end end
# File lib/core/container.rb, line 446 def connection_driver(io, opts=nil, server=false) opts ||= {} opts[:container] = self opts[:handler] ||= @adapter ConnectionTask.new(self, io, opts, server) end
Rescue any exception raised by the block and stop the container.
# File lib/core/container.rb, line 430 def maybe_panic begin yield rescue Exception => e stop(nil, e) nil end end
# File lib/core/container.rb, line 485 def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
# File lib/core/container.rb, line 462 def rearm task @lock.synchronize do if task.finished? @active -= 1 check_stop_lh elsif @stopped task.close @stop_err work_wake task else @selectable << task end end @wake.wake end
Handle a single item from the @work queue, this is the heart of the run
loop. Take one task from @work, process it, and rearm for select Tasks are: ConnectionTask
, ListenTask
, :start, :select
-
ConnectionTask/ListenTask have can_read, can_write, next_tick to set up IO.select and process to run handlers and process relevant
work_queue
-
:select does IO.select and processes
Container#work_queue
# File lib/core/container.rb, line 371 def run_one(task, now) case task when :start maybe_panic { @adapter.on_container_start(self) } if @adapter.respond_to? :on_container_start when :select # Compute read/write select sets and minimum next_tick for select timeout r, w = [@wake], [] next_tick = @work_queue.next_tick @lock.synchronize do @selectable.each do |s| r << s if s.can_read? w << s if s.can_write? next_tick = earliest(s.next_tick, next_tick) end end timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick r, w = IO.select(r, w, nil, timeout) @wake.reset if r && r.delete(@wake) now = Time.now unless timeout == 0 # Update now if we may have blocked # selected is a Set to eliminate duplicates between r, w and next_tick due. selected = Set.new selected.merge(r) if r selected.merge(w) if w stopped = @lock.synchronize do if @stopped # close everything @selectable.each { |s| s.close @stop_err; @work << s } @selectable.clear @work_queue.close @wake.close else @selectable -= selected # Remove already-selected tasks from @selectable # Also select and remove items with next_tick before now @selectable.delete_if { |s| before_eq(s.next_tick, now) and selected << s } end @stopped end selected.each { |s| @work << s } # Queue up tasks needing #process maybe_panic { @work_queue.process(now) } # Process current work queue items @work_queue.clear if stopped @lock.synchronize { check_stop_lh } if @work_queue.empty? @work << :select unless stopped # Enable next select when ConnectionTask then maybe_panic { task.process now } rearm task when ListenTask then io, opts = maybe_panic { task.process } add(connection_driver(io, opts, true)) if io rearm task end end
# File lib/core/container.rb, line 253 def wake() @wake.wake; end
Normally if we add work we need to set a wakeup to ensure a single run
thread doesn't get stuck in select while there is other work on the queue.
# File lib/core/container.rb, line 441 def work_wake(task) @work << task @wake.wake end