class Qpid::Proton::Handler::MessagingAdapter

Adapt raw proton events to {MessagingHandler} events.

Public Class Methods

open_close(endpoint) click to toggle source

Define repetative on_xxx_open/close methods for session and connection

# File lib/handler/messaging_adapter.rb, line 42
def self.open_close(endpoint)
  Module.new do
    define_method(:"on_#{endpoint}_remote_open") do |event|
      begin
        delegate(:"on_#{endpoint}_open", event.context)
        event.context.open if event.context.local_uninit?
      rescue StopAutoResponse
      end
    end

    define_method(:"on_#{endpoint}_remote_close") do |event|
      delegate_error(:"on_#{endpoint}_error", event.context) if event.context.condition
      begin
        delegate(:"on_#{endpoint}_close", event.context)
        event.context.close if event.context.local_active?
      rescue StopAutoResponse
      end
    end
  end
end

Public Instance Methods

add_credit(event) click to toggle source
# File lib/handler/messaging_adapter.rb, line 140
def add_credit(event)
  return unless (r = event.receiver)
  if r.open? && (r.drained == 0) && r.credit_window && (r.credit_window > r.credit)
    r.flow(r.credit_window - r.credit)
  end
end
delegate(method, *args) click to toggle source
# File lib/handler/messaging_adapter.rb, line 26
def delegate(method, *args)
  forward(method, *args) or forward(:on_unhandled, method, *args)
end
delegate_error(method, context) click to toggle source
# File lib/handler/messaging_adapter.rb, line 30
def delegate_error(method, context)
  unless forward(method, context) || forward(:on_error, context.condition)
    forward(:on_unhandled, method, context)
    # Close the whole connection on an un-handled error
    context.connection.close(context.condition)
  end
end
on_container_start(container) click to toggle source
# File lib/handler/messaging_adapter.rb, line 38
def on_container_start(container) delegate(:on_container_start, container); end
on_container_stop(container) click to toggle source
# File lib/handler/messaging_adapter.rb, line 39
def on_container_stop(container) delegate(:on_container_stop, container); end
on_delivery(event) click to toggle source
# File lib/handler/messaging_adapter.rb, line 98
def on_delivery(event)
  if event.link.receiver?       # Incoming message
    d = event.delivery
    if d.aborted?
      delegate(:on_delivery_abort, d)
    elsif d.complete?
      if d.link.local_closed? && d.receiver.auto_accept
        d.release         # Auto release after close
      else
        begin
          delegate(:on_message, d, d.message)
          d.accept if d.receiver.auto_accept && d.local_state == 0
        rescue Reject
          d.reject
        rescue Release
          d.release
        end
      end
    end
    delegate(:on_delivery_settle, d) if d.settled?
    add_credit(event)
  else                      # Outgoing message
    t = event.tracker
    case t.state
    when Delivery::ACCEPTED then delegate(:on_tracker_accept, t)
    when Delivery::REJECTED then delegate(:on_tracker_reject, t)
    when Delivery::RELEASED then delegate(:on_tracker_release, t)
    when Delivery::MODIFIED then delegate(:on_tracker_modify, t)
    end
    if t.settled?
      delegate(:on_tracker_settle, t)
      t.settle if t.sender.auto_settle
    end
  end
end
on_transport_closed(event) click to toggle source
# File lib/handler/messaging_adapter.rb, line 91
def on_transport_closed(event)
  delegate(:on_transport_close, event.context) rescue StopAutoResponse
end
on_transport_error(event) click to toggle source
# File lib/handler/messaging_adapter.rb, line 87
def on_transport_error(event)
  delegate_error(:on_transport_error, event.context)
end