class Qpid::Proton::Handler::ReactorMessagingAdapter
Adapter
to convert raw proton events for the old {Handler::MessagingHandler} used by the Reactor
.
Public Class Methods
new(handler)
click to toggle source
Calls superclass method
Qpid::Proton::Handler::Adapter::new
# File lib/handler/reactor_messaging_adapter.rb, line 26 def initialize handler super @opts = (handler.options if handler.respond_to?(:options)) || {} @opts[:prefetch] ||= 10 @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k| @opts[k] = true unless @opts.include? k end end
open_close(endpoint)
click to toggle source
Define repetative on_xxx_open/close methods for each endpoint type
# File lib/handler/reactor_messaging_adapter.rb, line 55 def self.open_close(endpoint) on_opening = :"on_#{endpoint}_opening" on_opened = :"on_#{endpoint}_opened" on_closing = :"on_#{endpoint}_closing" on_closed = :"on_#{endpoint}_closed" on_error = :"on_#{endpoint}_error" Module.new do define_method(:"on_#{endpoint}_local_open") do |event| delegate(on_opened, event) if event.context.remote_open? end define_method(:"on_#{endpoint}_remote_open") do |event| if event.context.local_open? delegate(on_opened, event) elsif event.context.local_uninit? delegate(on_opening, event) event.context.open if @opts[:auto_open] end end define_method(:"on_#{endpoint}_local_close") do |event| delegate(on_closed, event) if event.context.remote_closed? end define_method(:"on_#{endpoint}_remote_close") do |event| if event.context.remote_condition delegate_error(on_error, event) elsif event.context.local_closed? delegate(on_closed, event) elsif @opts[:peer_close_is_error] Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close") delegate_error(on_error, event) else delegate(on_closing, event) end event.context.close if @opts[:auto_close] end end end
Public Instance Methods
add_credit(event)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 149 def add_credit(event) r = event.receiver prefetch = @opts[:prefetch] if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit) r.flow(prefetch - r.credit) end end
delegate(method, event)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 38 def delegate(method, event) event.method = method # Update the event with the new method event.dispatch(@handler) or dispatch(:on_unhandled, event) end
delegate_error(method, event)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 43 def delegate_error(method, event) event.method = method unless event.dispatch(@handler) || dispatch(:on_error, event) dispatch(:on_unhandled, event) event.connection.close(event.context.condition) if @opts[:auto_close] end end
on_container_start(container)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 51 def on_container_start(container) delegate(:on_start, Event.new(nil, nil, container)); end
on_container_stop(container)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 52 def on_container_stop(container) delegate(:on_stop, Event.new(nil, nil, container)); end
on_delivery(event)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 106 def on_delivery(event) if event.link.receiver? # Incoming message d = event.delivery if d.aborted? delegate(:on_aborted, event) d.settle elsif d.complete? if d.link.local_closed? && @opts[:auto_accept] d.release else begin delegate(:on_message, event) d.accept if @opts[:auto_accept] && !d.settled? rescue Qpid::Proton::Reject d.reject rescue Qpid::Proton::Release d.release(true) end end end delegate(:on_settled, event) if d.settled? add_credit(event) else # Outgoing message t = event.tracker if t.updated? case t.state when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event) when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event) when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event) when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event) end delegate(:on_settled, event) if t.settled? t.settle if @opts[:auto_settle] end end end
on_link_flow(event)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 143 def on_link_flow(event) add_credit(event) l = event.link delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0 end
on_link_local_open(event)
click to toggle source
Add flow control for link opening events
Calls superclass method
# File lib/handler/reactor_messaging_adapter.rb, line 102 def on_link_local_open(event) super; add_credit(event); end
on_link_remote_open(event)
click to toggle source
Calls superclass method
# File lib/handler/reactor_messaging_adapter.rb, line 103 def on_link_remote_open(event) super; add_credit(event); end
on_transport_closed(event)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 99 def on_transport_closed(event) delegate(:on_transport_closed, event); end
on_transport_error(event)
click to toggle source
# File lib/handler/reactor_messaging_adapter.rb, line 98 def on_transport_error(event) delegate_error(:on_transport_error, event); end