class Qpid::Proton::Connection

An AMQP connection.

Constants

PROTON_METHOD_PREFIX

Attributes

container[R]

@return [Container] the container managing this connection

work_queue[R]

@return [WorkQueue] work queue to execute code serialized correctly for this connection

Public Class Methods

new(impl = Cproton.pn_connection) click to toggle source

@private

Calls superclass method
# File lib/core/connection.rb, line 34
def initialize(impl = Cproton.pn_connection)
  super()
  @impl = impl
  @overrides = nil
  @session_policy = nil
  @link_count = 0
  @link_prefix = ""
  self.class.store_instance(self, :pn_connection_attachments)
end
wrap(impl) click to toggle source

@private

# File lib/core/connection.rb, line 28
def self.wrap(impl)
  return nil if impl.nil?
  self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl)
end

Public Instance Methods

apply(opts) click to toggle source

@private

# File lib/core/connection.rb, line 128
def apply opts
  # NOTE: Only connection options are set here.
  # Transport options must be applied with {Transport#apply}
  @container = opts[:container]
  cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid
  cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name
  Cproton.pn_connection_set_container(@impl, cid)
  Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user]
  Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password]
  Cproton.pn_connection_set_hostname(@impl, opts[:virtual_host]) if opts[:virtual_host]
  @link_prefix = opts[:link_prefix] || cid
  Codec::Data.from_object(Cproton.pn_connection_offered_capabilities(@impl), opts[:offered_capabilities])
  Codec::Data.from_object(Cproton.pn_connection_desired_capabilities(@impl), opts[:desired_capabilities])
  Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties])
end
close(error=nil) click to toggle source

Closes the local end of the connection. The remote end may or may not be closed. @param error [Condition] Optional error condition to send with the close.

# File lib/core/connection.rb, line 174
def close(error=nil)
  Condition.assign(_local_condition, error)
  Cproton.pn_connection_close(@impl)
end
connection() click to toggle source

@return [Connection] self

# File lib/core/connection.rb, line 65
def connection() self; end
container_id() click to toggle source

@return AMQP container ID advertised by the remote peer. To get the local container ID use {#container} and {Container#id}

# File lib/core/connection.rb, line 72
def container_id() Cproton.pn_connection_remote_container(@impl); end
default_session() click to toggle source

Returns the default session for this connection.

@return [Session] The session.

# File lib/core/connection.rb, line 196
def default_session
  @session ||= open_session
end
desired_capabilities() click to toggle source

@return [Array<Symbol>] desired capabilities provided by the remote peer

# File lib/core/connection.rb, line 86
def desired_capabilities
  # Provide capabilities consistently as an array, even if encoded as a single symbol
  Codec::Data.to_multiple(Cproton.pn_connection_remote_desired_capabilities(@impl))
end
each_receiver() { |l| ... } click to toggle source

Get the {Receiver} links - see {#each_link}

# File lib/core/connection.rb, line 268
def each_receiver()
  return enum_for(:each_receiver) unless block_given?
  each_link.select { |l| yield l if l.receiver? }
end
each_sender() { |l| ... } click to toggle source

Get the {Sender} links - see {#each_link}

# File lib/core/connection.rb, line 262
def each_sender()
  return enum_for(:each_sender) unless block_given?
  each_link.select { |l| yield l if l.sender? }
end
each_session() { |wrap| ... } click to toggle source

Get the sessions on this connection. @overload each_session

@yieldparam s [Session] pass each session to block

@overload each_session

@return [Enumerator] enumerator over sessions
# File lib/core/connection.rb, line 229
def each_session(&block)
  return enum_for(:each_session) unless block_given?
  s = Cproton.pn_session_head(@impl, 0);
  while s
    yield Session.wrap(s)
    s = Cproton.pn_session_next(s, 0)
  end
  self
end
error() click to toggle source

@deprecated use {#condition}

# File lib/core/connection.rb, line 280
def error
  deprecated __method__, "#condition"
  Cproton.pn_error_code(Cproton.pn_connection_error(@impl))
end
idle_timeout() click to toggle source

Idle-timeout advertised by the remote peer, in seconds. @return [Numeric] Idle-timeout advertised by the remote peer, in seconds. @return [nil] if the peer does not advertise an idle time-out

# File lib/core/connection.rb, line 147
def idle_timeout()
  if transport && (t = transport.remote_idle_timeout)
    Rational(t, 1000)       # More precise than Float
  end
end
max_frame_size() click to toggle source

Maximum frame size, in bytes, advertised by the remote peer. See {Connection#open :max_frame_size} @return [Integer] maximum frame size @return [nil] no limit

# File lib/core/connection.rb, line 166
def max_frame_size()
  raise StateError, "connection not bound to transport" unless transport
  max = transport.remote_max_frame
  return max.zero? ? nil : max
end
max_sessions() click to toggle source

Session limit advertised by the remote peer. See {Connection#open :max_sessions} @return [Integer] maximum number of sessions per connection allowed by remote peer. @return [nil] no specific limit is set.

# File lib/core/connection.rb, line 156
def max_sessions()
  raise StateError, "connection not bound to transport" unless transport
  max = transport.remote_channel_max
  return max.zero? ? nil : max
end
offered_capabilities() click to toggle source

@return [Array<Symbol>] offered capabilities provided by the remote peer

# File lib/core/connection.rb, line 79
def offered_capabilities
  # Provide capabilities consistently as an array, even if encoded as a single symbol
  Codec::Data.to_multiple(Cproton.pn_connection_remote_offered_capabilities(@impl))
end
open(opts=nil) click to toggle source

Open the local end of the connection.

@option opts [MessagingHandler] :handler handler for events related to this connection.

@option opts [String] :user User name for authentication @option opts [String] :password Authentication secret @option opts [String] :virtual_host Virtual host name @option opts [String] :container_id (provided by {Container}) override advertised container-id

@option opts [Hash<Symbol=>Object>] :properties Application-defined properties @option opts [Array<Symbol>] :offered_capabilities Extensions the endpoint supports @option opts [Array<Symbol>] :desired_capabilities Extensions the endpoint can use

@option opts [Numeric] :idle_timeout Seconds before closing an idle connection @option opts [Integer] :max_sessions Limit the number of active sessions @option opts [Integer] :max_frame_size Limit the size of AMQP frames

@option opts [Boolean] :sasl_enabled (false) Enable or disable SASL. @option opts [Boolean] :sasl_allow_insecure_mechs (false) Allow mechanisms that send secrets in cleartext @option opts [String] :sasl_allowed_mechs Specify the SASL mechanisms allowed for this connection. The value is a space-separated list of mechanism names. The mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default. To enable them, you must explicitly add them using this option. Clients must set the allowed mechanisms before the the outgoing connection is attempted. Servers must set them before the listening connection is setup.

@option opts [SSLDomain] :ssl_domain SSL configuration domain.

# File lib/core/connection.rb, line 121
def open(opts=nil)
  return if local_active?
  apply opts if opts
  Cproton.pn_connection_open(@impl)
end
open_receiver(opts=nil) click to toggle source

Open a on the default_session @option opts (see Session#open_receiver)

# File lib/core/connection.rb, line 216
def open_receiver(opts=nil) default_session.open_receiver(opts) end
open_sender(opts=nil) click to toggle source

Open a sender on the default_session @option opts (see Session#open_sender)

# File lib/core/connection.rb, line 212
def open_sender(opts=nil) default_session.open_sender(opts) end
open_session() click to toggle source

Open a new session on this connection.

# File lib/core/connection.rb, line 204
def open_session
  s = Session.wrap(Cproton.pn_session(@impl))
  s.open
  return s
end
overrides?() click to toggle source

@deprecated no replacement

# File lib/core/connection.rb, line 59
def overrides?() deprecated __method__; false; end
properties() click to toggle source

@return [Hash] connection-properties provided by the remote peer

# File lib/core/connection.rb, line 93
def properties
  Codec::Data.to_object(Cproton.pn_connection_remote_properties(@impl))
end
session_head(mask) click to toggle source

@deprecated use {#each_session}

# File lib/core/connection.rb, line 219
def  session_head(mask)
  deprecated __method__, "#each_session"
  Session.wrap(Cproton.pn_session_head(@impl, mask))
end
session_policy?() click to toggle source

@deprecated no replacement

# File lib/core/connection.rb, line 62
def session_policy?() deprecated __method__; false; end
state() click to toggle source

Gets the endpoint current state flags

@see Endpoint#LOCAL_UNINIT @see Endpoint#LOCAL_ACTIVE @see Endpoint#LOCAL_CLOSED @see Endpoint#LOCAL_MASK

@return [Integer] The state flags.

# File lib/core/connection.rb, line 188
def state
  Cproton.pn_connection_state(@impl)
end
transport() click to toggle source

@return [Transport, nil] transport bound to this connection, or nil if unbound.

# File lib/core/connection.rb, line 68
def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end
user() click to toggle source

@return [String] User name used for authentication (outgoing connection) or the authenticated user name (incoming connection)

# File lib/core/connection.rb, line 54
def user()
  Cproton.pn_connection_get_user(impl) or (connection.transport && connection.transport.user)
end
virtual_host() click to toggle source

@return [String] The AMQP hostname for the connection.

# File lib/core/connection.rb, line 45
def virtual_host() Cproton.pn_connection_remote_hostname(@impl); end
work_head() click to toggle source

@deprecated use {#MessagingHandler} to handle work

# File lib/core/connection.rb, line 274
def work_head
  deprecated __method__
  Delivery.wrap(Cproton.pn_work_head(@impl))
end

Protected Instance Methods

_local_condition() click to toggle source
# File lib/core/connection.rb, line 295
def _local_condition
  Cproton.pn_connection_condition(@impl)
end
_remote_condition() click to toggle source
# File lib/core/connection.rb, line 299
def _remote_condition
  Cproton.pn_connection_remote_condition(@impl)
end