class Qpid::Proton::Handler::ReactorMessagingAdapter

Adapter to convert raw proton events for the old {Handler::MessagingHandler} used by the Reactor.

Public Class Methods

new(handler) click to toggle source
Calls superclass method Qpid::Proton::Handler::Adapter::new
# File lib/handler/reactor_messaging_adapter.rb, line 26
def initialize handler
  super
  @opts = (handler.options if handler.respond_to?(:options)) || {}
  @opts[:prefetch] ||= 10
  @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error
  [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k|
    @opts[k] = true unless @opts.include? k
  end
end
open_close(endpoint) click to toggle source

Define repetative on_xxx_open/close methods for each endpoint type

# File lib/handler/reactor_messaging_adapter.rb, line 55
def self.open_close(endpoint)
  on_opening = :"on_#{endpoint}_opening"
  on_opened = :"on_#{endpoint}_opened"
  on_closing = :"on_#{endpoint}_closing"
  on_closed = :"on_#{endpoint}_closed"
  on_error = :"on_#{endpoint}_error"

  Module.new do
    define_method(:"on_#{endpoint}_local_open") do |event|
      delegate(on_opened, event) if event.context.remote_open?
    end

    define_method(:"on_#{endpoint}_remote_open") do |event|
      if event.context.local_open?
        delegate(on_opened, event)
      elsif event.context.local_uninit?
        delegate(on_opening, event)
        event.context.open if @opts[:auto_open]
      end
    end

    define_method(:"on_#{endpoint}_local_close") do |event|
      delegate(on_closed, event) if event.context.remote_closed?
    end

    define_method(:"on_#{endpoint}_remote_close") do |event|
      if event.context.remote_condition
        delegate_error(on_error, event)
      elsif event.context.local_closed?
        delegate(on_closed, event)
      elsif @opts[:peer_close_is_error]
        Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close")
        delegate_error(on_error, event)
      else
        delegate(on_closing, event)
      end
      event.context.close if @opts[:auto_close]
    end
  end
end

Public Instance Methods

add_credit(event) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 149
def add_credit(event)
  r = event.receiver
  prefetch = @opts[:prefetch]
  if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
    r.flow(prefetch - r.credit)
  end
end
delegate(method, event) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 38
def delegate(method, event)
  event.method = method     # Update the event with the new method
  event.dispatch(@handler) or dispatch(:on_unhandled, event)
end
delegate_error(method, event) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 43
def delegate_error(method, event)
  event.method = method
  unless event.dispatch(@handler) || dispatch(:on_error, event)
    dispatch(:on_unhandled, event)
    event.connection.close(event.context.condition) if @opts[:auto_close]
  end
end
on_container_start(container) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 51
def on_container_start(container) delegate(:on_start, Event.new(nil, nil, container)); end
on_container_stop(container) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 52
def on_container_stop(container) delegate(:on_stop, Event.new(nil, nil, container)); end
on_delivery(event) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 106
def on_delivery(event)
  if event.link.receiver?       # Incoming message
    d = event.delivery
    if d.aborted?
      delegate(:on_aborted, event)
      d.settle
    elsif d.complete?
      if d.link.local_closed? && @opts[:auto_accept]
        d.release
      else
        begin
          delegate(:on_message, event)
          d.accept if @opts[:auto_accept] && !d.settled?
        rescue Qpid::Proton::Reject
          d.reject
        rescue Qpid::Proton::Release
          d.release(true)
        end
      end
    end
    delegate(:on_settled, event) if d.settled?
    add_credit(event)
  else                      # Outgoing message
    t = event.tracker
    if t.updated?
      case t.state
      when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event)
      when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event)
      when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event)
      when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event)
      end
      delegate(:on_settled, event) if t.settled?
      t.settle if @opts[:auto_settle]
    end
  end
end
on_transport_closed(event) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 99
def on_transport_closed(event) delegate(:on_transport_closed, event); end
on_transport_error(event) click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 98
def on_transport_error(event) delegate_error(:on_transport_error, event); end