class Qpid::Proton::Connection
An AMQP connection.
Constants
- PROTON_METHOD_PREFIX
Attributes
@return [Container] the container managing this connection
@return [WorkQueue] work queue to execute code serialized correctly for this connection
Public Class Methods
@private
# File lib/core/connection.rb, line 34 def initialize(impl = Cproton.pn_connection) super() @impl = impl @overrides = nil @session_policy = nil @link_count = 0 @link_prefix = "" self.class.store_instance(self, :pn_connection_attachments) end
@private
# File lib/core/connection.rb, line 28 def self.wrap(impl) return nil if impl.nil? self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl) end
Public Instance Methods
@private
# File lib/core/connection.rb, line 128 def apply opts # NOTE: Only connection options are set here. # Transport options must be applied with {Transport#apply} @container = opts[:container] cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name Cproton.pn_connection_set_container(@impl, cid) Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user] Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password] Cproton.pn_connection_set_hostname(@impl, opts[:virtual_host]) if opts[:virtual_host] @link_prefix = opts[:link_prefix] || cid Codec::Data.from_object(Cproton.pn_connection_offered_capabilities(@impl), opts[:offered_capabilities]) Codec::Data.from_object(Cproton.pn_connection_desired_capabilities(@impl), opts[:desired_capabilities]) Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties]) end
Closes the local end of the connection. The remote end may or may not be closed. @param error [Condition] Optional error condition to send with the close.
# File lib/core/connection.rb, line 174 def close(error=nil) Condition.assign(_local_condition, error) Cproton.pn_connection_close(@impl) end
@return [Connection] self
# File lib/core/connection.rb, line 65 def connection() self; end
@return AMQP container ID advertised by the remote peer. To get the local container ID use {#container} and {Container#id}
# File lib/core/connection.rb, line 72 def container_id() Cproton.pn_connection_remote_container(@impl); end
Returns the default session for this connection.
@return [Session] The session.
# File lib/core/connection.rb, line 196 def default_session @session ||= open_session end
@return [Array<Symbol>] desired capabilities provided by the remote peer
# File lib/core/connection.rb, line 86 def desired_capabilities # Provide capabilities consistently as an array, even if encoded as a single symbol Codec::Data.to_multiple(Cproton.pn_connection_remote_desired_capabilities(@impl)) end
Get the links on this connection. @overload each_link
@yieldparam l [Link] pass each link to block
@overload each_link
@return [Enumerator] enumerator over links
# File lib/core/connection.rb, line 250 def each_link return enum_for(:each_link) unless block_given? l = Cproton.pn_link_head(@impl, 0); while l l2 = l # get next before yield, in case yield closes l and unlinks it l = Cproton.pn_link_next(l, 0) yield Link.wrap(l2) end self end
Get the {Receiver} links - see {#each_link}
# File lib/core/connection.rb, line 268 def each_receiver() return enum_for(:each_receiver) unless block_given? each_link.select { |l| yield l if l.receiver? } end
Get the {Sender} links - see {#each_link}
# File lib/core/connection.rb, line 262 def each_sender() return enum_for(:each_sender) unless block_given? each_link.select { |l| yield l if l.sender? } end
Get the sessions on this connection. @overload each_session
@yieldparam s [Session] pass each session to block
@overload each_session
@return [Enumerator] enumerator over sessions
# File lib/core/connection.rb, line 229 def each_session(&block) return enum_for(:each_session) unless block_given? s = Cproton.pn_session_head(@impl, 0); while s yield Session.wrap(s) s = Cproton.pn_session_next(s, 0) end self end
@deprecated use {#condition}
# File lib/core/connection.rb, line 280 def error deprecated __method__, "#condition" Cproton.pn_error_code(Cproton.pn_connection_error(@impl)) end
Idle-timeout advertised by the remote peer, in seconds. @return [Numeric] Idle-timeout advertised by the remote peer, in seconds. @return [nil] if the peer does not advertise an idle time-out
# File lib/core/connection.rb, line 147 def idle_timeout() if transport && (t = transport.remote_idle_timeout) Rational(t, 1000) # More precise than Float end end
@deprecated use {#each_link}
# File lib/core/connection.rb, line 240 def link_head(mask) deprecated __method__, "#each_link" Link.wrap(Cproton.pn_link_head(@impl, mask)) end
@private Generate a unique link name, internal use only.
# File lib/core/connection.rb, line 286 def link_name() @link_prefix + "/" + (@link_count += 1).to_s(32) end
Maximum frame size, in bytes, advertised by the remote peer. See {Connection#open :max_frame_size} @return [Integer] maximum frame size @return [nil] no limit
# File lib/core/connection.rb, line 166 def max_frame_size() raise StateError, "connection not bound to transport" unless transport max = transport.remote_max_frame return max.zero? ? nil : max end
Session
limit advertised by the remote peer. See {Connection#open :max_sessions} @return [Integer] maximum number of sessions per connection allowed by remote peer. @return [nil] no specific limit is set.
# File lib/core/connection.rb, line 156 def max_sessions() raise StateError, "connection not bound to transport" unless transport max = transport.remote_channel_max return max.zero? ? nil : max end
@return [Array<Symbol>] offered capabilities provided by the remote peer
# File lib/core/connection.rb, line 79 def offered_capabilities # Provide capabilities consistently as an array, even if encoded as a single symbol Codec::Data.to_multiple(Cproton.pn_connection_remote_offered_capabilities(@impl)) end
Open the local end of the connection.
@option opts [MessagingHandler] :handler handler for events related to this connection.
@option opts [String] :user User name for authentication @option opts [String] :password Authentication secret @option opts [String] :virtual_host Virtual host name @option opts [String] :container_id (provided by {Container}) override advertised container-id
@option opts [Hash<Symbol=>Object>] :properties Application-defined properties @option opts [Array<Symbol>] :offered_capabilities Extensions the endpoint supports @option opts [Array<Symbol>] :desired_capabilities Extensions the endpoint can use
@option opts [Numeric] :idle_timeout Seconds before closing an idle connection @option opts [Integer] :max_sessions Limit the number of active sessions @option opts [Integer] :max_frame_size Limit the size of AMQP frames
@option opts [Boolean] :sasl_enabled (false) Enable or disable SASL
. @option opts [Boolean] :sasl_allow_insecure_mechs (false) Allow mechanisms that send secrets in cleartext @option opts [String] :sasl_allowed_mechs Specify the SASL
mechanisms allowed for this connection. The value is a space-separated list of mechanism names. The mechanisms allowed by default are determined by your SASL
library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default. To enable them, you must explicitly add them using this option. Clients must set the allowed mechanisms before the the outgoing connection is attempted. Servers must set them before the listening connection is setup.
@option opts [SSLDomain] :ssl_domain SSL
configuration domain.
# File lib/core/connection.rb, line 121 def open(opts=nil) return if local_active? apply opts if opts Cproton.pn_connection_open(@impl) end
Open a on the default_session
@option opts (see Session#open_receiver
)
# File lib/core/connection.rb, line 216 def open_receiver(opts=nil) default_session.open_receiver(opts) end
Open a sender on the default_session
@option opts (see Session#open_sender
)
# File lib/core/connection.rb, line 212 def open_sender(opts=nil) default_session.open_sender(opts) end
Open a new session on this connection.
# File lib/core/connection.rb, line 204 def open_session s = Session.wrap(Cproton.pn_session(@impl)) s.open return s end
@deprecated no replacement
# File lib/core/connection.rb, line 59 def overrides?() deprecated __method__; false; end
@return [Hash] connection-properties provided by the remote peer
# File lib/core/connection.rb, line 93 def properties Codec::Data.to_object(Cproton.pn_connection_remote_properties(@impl)) end
@deprecated use {#each_session}
# File lib/core/connection.rb, line 219 def session_head(mask) deprecated __method__, "#each_session" Session.wrap(Cproton.pn_session_head(@impl, mask)) end
@deprecated no replacement
# File lib/core/connection.rb, line 62 def session_policy?() deprecated __method__; false; end
Gets the endpoint current state flags
@see Endpoint#LOCAL_UNINIT @see Endpoint#LOCAL_ACTIVE @see Endpoint#LOCAL_CLOSED @see Endpoint#LOCAL_MASK
@return [Integer] The state flags.
# File lib/core/connection.rb, line 188 def state Cproton.pn_connection_state(@impl) end
@return [Transport, nil] transport bound to this connection, or nil if unbound.
# File lib/core/connection.rb, line 68 def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end
@return [String] User name used for authentication (outgoing connection) or the authenticated user name (incoming connection)
# File lib/core/connection.rb, line 54 def user() Cproton.pn_connection_get_user(impl) or (connection.transport && connection.transport.user) end
@return [String] The AMQP hostname for the connection.
# File lib/core/connection.rb, line 45 def virtual_host() Cproton.pn_connection_remote_hostname(@impl); end
@deprecated use {#MessagingHandler} to handle work
# File lib/core/connection.rb, line 274 def work_head deprecated __method__ Delivery.wrap(Cproton.pn_work_head(@impl)) end
Protected Instance Methods
# File lib/core/connection.rb, line 295 def _local_condition Cproton.pn_connection_condition(@impl) end
# File lib/core/connection.rb, line 299 def _remote_condition Cproton.pn_connection_remote_condition(@impl) end