class Qpid::Proton::ConnectionDriver
Associate an AMQP {Connection} and {Transport} with an {IO}
-
{#read} reads AMQP binary data from the {IO} and generates events
-
{#tick} generates timing-related events
-
{#event} gets events to be dispatched to {Handler::MessagingHandler}s
-
{#write} writes AMQP binary data to the {IO}
Thread safety: The {ConnectionDriver} is not thread safe but separate {ConnectionDriver} instances can be processed concurrently. The {Container} handles multiple connections concurrently in multiple threads.
Attributes
Time returned by the last call to {#tick}
Public Class Methods
Create a {Connection} and {Transport} associated with io
@param io [IO] An {IO} or {IO}-like object that responds
to {IO#read_nonblock} and {IO#write_nonblock}
# File lib/core/connection_driver.rb, line 37 def initialize(io) @impl = Cproton.pni_connection_driver or raise NoMemoryError @io = io @rbuf = "" # String for re-usable read buffer end
Public Instance Methods
@return [Bool] True if the driver can read more data
# File lib/core/connection_driver.rb, line 57 def can_read?() Cproton.pni_connection_driver_read_size(@impl) > 0; end
@return [Bool] True if the driver has data to write
# File lib/core/connection_driver.rb, line 60 def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end
Disconnect both sides of the transport sending/waiting for AMQP close frames. See comments on {#close_write}
# File lib/core/connection_driver.rb, line 158 def close error=nil close_write error close_read end
Disconnect the read side of the transport, without waiting for an AMQP close frame. See comments on {#close_write}
# File lib/core/connection_driver.rb, line 150 def close_read error=nil set_error error Cproton.pn_connection_driver_read_close(@impl) @io.close_read rescue nil # Allow double-close end
Disconnect the write side of the transport, without sending an AMQP close frame. To close politely, you should use {Connection#close}, the transport will close itself once the protocol close is complete.
# File lib/core/connection_driver.rb, line 136 def close_write error=nil set_error error Cproton.pn_connection_driver_write_close(@impl) @io.close_write rescue nil # Allow double-close end
@return [Connection]
# File lib/core/connection_driver.rb, line 44 def connection() @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl)) end
Iterator for all available events
# File lib/core/connection_driver.rb, line 76 def each_event() while e = event yield e end end
Get the next event to dispatch, nil if no events available
# File lib/core/connection_driver.rb, line 67 def event() e = Cproton.pn_connection_driver_next_event(@impl) Event.new(e) if e end
True if {#event} will return non-nil
# File lib/core/connection_driver.rb, line 73 def event?() Cproton.pn_connection_driver_has_event(@impl); end
True if the ConnectionDriver
has nothing left to do: both sides of the transport are closed and there are no events to dispatch.
# File lib/core/connection_driver.rb, line 64 def finished?() Cproton.pn_connection_driver_finished(@impl); end
Non-blocking read from {#io}, generate events for {#event} IO errors are returned as transport errors by {#event}, not raised
# File lib/core/connection_driver.rb, line 84 def read size = Cproton.pni_connection_driver_read_size(@impl) return if size <= 0 @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty? rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR # Try again later. rescue EOFError # EOF is not an error close_read rescue IOError, SystemCallError => e close e end
Is the read side of the driver closed?
# File lib/core/connection_driver.rb, line 143 def read_closed?() Cproton.pn_connection_driver_read_closed(@impl); end
Handle time-related work, for example idle-timeout events. May generate events for {#event} and change {#can_read?}, {#can_write?}
@param [Time] now the current time, defaults to {Time#now}.
@return [Time] time of the next scheduled event, or nil if there are no scheduled events. If non-nil you must call {#tick} again no later than this time.
# File lib/core/connection_driver.rb, line 118 def tick(now=Time.now) transport = Cproton.pni_connection_driver_transport(@impl) ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i) @next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000); unless @next_tick idle = Cproton.pn_transport_get_idle_timeout(transport); @next_tick = now + (idle.to_r / 1000) unless idle.zero? end @next_tick end
@return [IO] Allows ConnectionDriver
to be passed directly to {IO#select}
# File lib/core/connection_driver.rb, line 54 def to_io() @io; end
@return [Transport]
# File lib/core/connection_driver.rb, line 49 def transport() @transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl)) end
Non-blocking write to {#io} IO errors are returned as transport errors by {#event}, not raised
# File lib/core/connection_driver.rb, line 99 def write data = Cproton.pn_connection_driver_write_buffer(@impl) return unless data && data.size > 0 n = @io.write_nonblock(data) Cproton.pn_connection_driver_write_done(@impl, n) if n > 0 rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR # Try again later. rescue IOError, SystemCallError => e close e end
Is the write side of the driver closed?
# File lib/core/connection_driver.rb, line 146 def write_closed?() Cproton.pn_connection_driver_read_closed(@impl); end
Private Instance Methods
# File lib/core/connection_driver.rb, line 165 def set_error err transport.condition ||= Condition.convert(err, "proton:io") if err end