class Qpid::Proton::Container

An AMQP container manages a set of {Listener}s and {Connection}s which contain {#Sender} and {#Receiver} links to transfer messages. Usually, each AMQP client or server process has a single container for all of its connections and links.

One or more threads can call {#run}, events generated by all the listeners and connections will be dispatched in the {#run} threads.

Attributes

auto_stop[RW]

Auto-stop flag.

True (the default) means that the container will stop automatically, as if {#stop} had been called, when the last listener or connection closes.

False means {#run} will not return unless {#stop} is called.

@return [Bool] auto-stop state

handler[R]

@return [MessagingHandler] The container-wide handler

id[R]

@return [String] unique identifier for this container

stopped[RW]

True if the container has been stopped and can no longer be used. @return [Bool] stopped state

Public Class Methods

new(*args) click to toggle source

Create a new Container @overload initialize(id=nil)

@param id [String,Symbol] A unique ID for this container, use random UUID if nil.

@overload initialize(handler=nil, id=nil)

@param id [String,Symbol] A unique ID for this container, use random UUID if nil.
@param handler [MessagingHandler] Optional default handler for connections
 that do not have their own handler (see {#connect} and {#listen})

 *Note*: For multi-threaded code, it is recommended to use a separate
 handler instance for each connection, as a shared handler may be called
 concurrently.
# File lib/core/container.rb, line 55
def initialize(*args)
  @handler, @id = nil
  case args.size
  when 2 then @handler, @id = args
  when 1 then
    @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol)
    @handler = args[0] unless @id
  when 0 then
  else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2"
  end
  # Use an empty messaging adapter to give default behaviour if there's no global handler.
  @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil)
  @id = (@id || SecureRandom.uuid).freeze

  # Threading and implementation notes: see comment on #run_one
  @work = Queue.new
  @work << :start
  @work << :select
  @wake = SelectWaker.new   # Wakes #run thread in IO.select
  @auto_stop = true         # Stop when @active drops to 0
  @work_queue = WorkQueue.new(self)  # work scheduled by other threads for :select context

  # Following instance variables protected by lock
  @lock = Mutex.new
  @active = 0               # All active tasks, in @selectable, @work or being processed
  @selectable = Set.new     # Tasks ready to block in IO.select
  @running = 0              # Count of #run threads
  @stopped = false          # #stop called
  @stop_err = nil           # Optional error to pass to tasks, from #stop
  @panic = nil              # Exception caught in a run thread, to be raised by all run threads
end

Public Instance Methods

connect(url, opts=nil) click to toggle source

Open an AMQP connection.

@param url [String, URI] Open a {TCPSocket} to url.host, url.port. url.scheme must be “amqp” or “amqps”, url.scheme.nil? is treated as “amqp” url.user, url.password are used as defaults if opts, opts are nil @option (see Connection#open) @return [Connection] The new AMQP connection

# File lib/core/container.rb, line 121
def connect(url, opts=nil)
  not_stopped
  url = Qpid::Proton::uri url
  opts ||= {}
  if url.user ||  url.password
    opts[:user] ||= url.user
    opts[:password] ||= url.password
  end
  opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps"
  connect_io(TCPSocket.new(url.host, url.port), opts)
end
connect_io(io, opts=nil) click to toggle source

Open an AMQP protocol connection on an existing {IO} object @param io [IO] An existing {IO} object, e.g. a {TCPSocket} @option (see Connection#open)

# File lib/core/container.rb, line 136
def connect_io(io, opts=nil)
  not_stopped
  cd = connection_driver(io, opts)
  cd.connection.open()
  add(cd)
  cd.connection
end
inspect() click to toggle source
# File lib/core/container.rb, line 94
def inspect() to_s; end
listen(url, handler=Listener::Handler.new) click to toggle source

Listen for incoming AMQP connections

@param url [String,URI] Listen on host:port of the AMQP URL @param handler [Listener::Handler] A {Listener::Handler} object that will be called with events for this listener and can generate a new set of options for each one. @return [Listener] The AMQP listener.

# File lib/core/container.rb, line 151
def listen(url, handler=Listener::Handler.new)
  not_stopped
  url = Qpid::Proton::uri url
  # TODO aconway 2017-11-01: amqps, SSL
  listen_io(TCPServer.new(url.host, url.port), handler)
end
listen_io(io, handler=Listener::Handler.new) click to toggle source

Listen for incoming AMQP connections on an existing server socket. @param io A server socket, for example a {TCPServer} @param handler [Listener::Handler] Handler for events from this listener

# File lib/core/container.rb, line 162
def listen_io(io, handler=Listener::Handler.new)
  not_stopped
  l = ListenTask.new(io, handler, self)
  add(l)
  l.listener
end
run() click to toggle source

Run the container: wait for IO activity, dispatch events to handlers.

Multi-threaading : More than one thread can call {#run} concurrently, the container will use all {#run} threads as a thread pool. Calls to {MessagingHandler} or {Listener::Handler} methods are serialized for each connection or listener. See {WorkQueue} for coordinating with other threads.

Exceptions: If any handler method raises an exception it will stop the container, and the exception will be raised by all calls to {#run}. For single threaded code this is often desirable. Multi-threaded server applications should normally rescue exceptions in the handler and deal with them in another way: logging, closing the connection with an error condition, signalling another thread etc.

@return [void] Returns when the container stops, see {#stop} and {#auto_stop}

@raise [StoppedError] If the container has already been stopped when {#run} was called.

@raise [Exception] If any {MessagingHandler} or {Listener::Handler} managed by

the container raises an exception, that exception will be raised by {#run}
# File lib/core/container.rb, line 191
def run
  @lock.synchronize do
    @running += 1        # Note: ensure clause below will decrement @running
    raise StoppedError if @stopped
  end
  while task = @work.pop
    run_one(task, Time.now)
  end
  @lock.synchronize { raise @panic if @panic }
ensure
  @lock.synchronize do
    if (@running -= 1) > 0
      work_wake nil         # Signal the next thread
    else
      # This is the last thread, no need to do maybe_panic around this final handler call.
      @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
    end
  end
end
running() click to toggle source

Number of threads in {#run} @return [Bool] {#run} thread count

# File lib/core/container.rb, line 112
def running() @lock.synchronize { @running }; end
schedule(at, &block) click to toggle source

(see WorkQueue#schedule)

# File lib/core/container.rb, line 249
def schedule(at, &block) @work_queue.schedule(at, &block) end
stop(error=nil, panic=nil) click to toggle source

Stop the container.

Close all listeners and abort all connections without doing AMQP protocol close.

{#stop} returns immediately, calls to {#run} will return when all activity is finished.

The container can no longer be used, using a stopped container raises {StoppedError}. Create a new container if you want to resume activity.

@param error [Condition] Optional error condition passed to

{MessagingHandler#on_transport_error} for each connection and
{Listener::Handler::on_error} for each listener.

@param panic [Exception] Optional exception to raise from all calls to run()

# File lib/core/container.rb, line 227
def stop(error=nil, panic=nil)
  @lock.synchronize do
    return if @stopped
    @stop_err = Condition.convert(error)
    @panic = panic
    @stopped = true
    check_stop_lh
    # NOTE: @stopped =>
    # - no new run threads can join
    # - no more select calls after next wakeup
    # - once @active == 0, all threads will be stopped with nil
  end
  wake
end
to_s() click to toggle source
# File lib/core/container.rb, line 93
def to_s() "#<#{self.class} id=#{id.inspect}>"; end
work_queue() click to toggle source

Get the {WorkQueue} that can be used to schedule code to be run by the container.

Note: to run code that affects a {Connection} or it's associated objects, use {Connection#work_queue}

# File lib/core/container.rb, line 246
def work_queue() @work_queue; end

Private Instance Methods

add(task) click to toggle source

All new tasks are added here

# File lib/core/container.rb, line 454
def add task
  @lock.synchronize do
    @active += 1
    task.close @stop_err if @stopped
  end
  work_wake task
end
check_stop_lh() click to toggle source
# File lib/core/container.rb, line 477
def check_stop_lh
  if @active.zero? && (@auto_stop || @stopped) && @work_queue.empty?
    @stopped = true
    work_wake nil          # Signal threads to stop
    true
  end
end
connection_driver(io, opts=nil, server=false) click to toggle source
# File lib/core/container.rb, line 446
def connection_driver(io, opts=nil, server=false)
  opts ||= {}
  opts[:container] = self
  opts[:handler] ||= @adapter
  ConnectionTask.new(self, io, opts, server)
end
maybe_panic() { || ... } click to toggle source

Rescue any exception raised by the block and stop the container.

# File lib/core/container.rb, line 430
def maybe_panic
  begin
    yield
  rescue Exception => e
    stop(nil, e)
    nil
  end
end
not_stopped() click to toggle source
# File lib/core/container.rb, line 485
def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
rearm(task) click to toggle source
# File lib/core/container.rb, line 462
def rearm task
  @lock.synchronize do
    if task.finished?
      @active -= 1
      check_stop_lh
    elsif @stopped
      task.close @stop_err
      work_wake task
    else
      @selectable << task
    end
  end
  @wake.wake
end
run_one(task, now) click to toggle source

Handle a single item from the @work queue, this is the heart of the run loop. Take one task from @work, process it, and rearm for select Tasks are: ConnectionTask, ListenTask, :start, :select

  • ConnectionTask/ListenTask have can_read, can_write, next_tick to set up IO.select and process to run handlers and process relevant work_queue

  • nil means exit from the run thread exit (handled by run)

  • :select does IO.select and processes Container#work_queue

# File lib/core/container.rb, line 371
def run_one(task, now)
  case task

  when :start
    maybe_panic { @adapter.on_container_start(self) } if @adapter.respond_to? :on_container_start

  when :select
    # Compute read/write select sets and minimum next_tick for select timeout
    r, w = [@wake], []
    next_tick = @work_queue.next_tick
    @lock.synchronize do
      @selectable.each do |s|
        r << s if s.can_read?
        w << s if s.can_write?
        next_tick = earliest(s.next_tick, next_tick)
      end
    end

    timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
    r, w = IO.select(r, w, nil, timeout)
    @wake.reset if r && r.delete(@wake)
    now = Time.now unless timeout == 0 # Update now if we may have blocked

    # selected is a Set to eliminate duplicates between r, w and next_tick due.
    selected = Set.new
    selected.merge(r) if r
    selected.merge(w) if w
    stopped = @lock.synchronize do
      if @stopped           # close everything
        @selectable.each { |s| s.close @stop_err; @work << s }
        @selectable.clear
        @work_queue.close
        @wake.close
      else
        @selectable -= selected # Remove already-selected tasks from @selectable
        # Also select and remove items with next_tick before now
        @selectable.delete_if { |s| before_eq(s.next_tick, now) and selected << s }
      end
      @stopped
    end
    selected.each { |s| @work << s } # Queue up tasks needing #process
    maybe_panic { @work_queue.process(now) } # Process current work queue items
    @work_queue.clear if stopped
    @lock.synchronize { check_stop_lh } if @work_queue.empty?

    @work << :select  unless stopped # Enable next select

  when ConnectionTask then
    maybe_panic { task.process now }
    rearm task

  when ListenTask then
    io, opts = maybe_panic { task.process }
    add(connection_driver(io, opts, true)) if io
    rearm task
  end
end
wake() click to toggle source
# File lib/core/container.rb, line 253
def wake() @wake.wake; end
work_wake(task) click to toggle source

Normally if we add work we need to set a wakeup to ensure a single run thread doesn't get stuck in select while there is other work on the queue.

# File lib/core/container.rb, line 441
def work_wake(task)
  @work << task
  @wake.wake
end