5 #if !defined(RXCPP_RX_BEHAVIOR_HPP)
6 #define RXCPP_RX_BEHAVIOR_HPP
8 #include "../rx-includes.hpp"
17 class behavior_observer :
public detail::multicast_observer<T>
19 typedef behavior_observer<T> this_type;
20 typedef detail::multicast_observer<T> base_type;
22 class behavior_observer_state :
public std::enable_shared_from_this<behavior_observer_state>
24 mutable std::mutex lock;
28 behavior_observer_state(T
first)
33 void reset(T v)
const {
34 std::unique_lock<std::mutex> guard(lock);
38 std::unique_lock<std::mutex> guard(lock);
43 std::shared_ptr<behavior_observer_state> state;
46 behavior_observer(T f, composite_subscription l)
48 , state(std::make_shared<behavior_observer_state>(std::move(f)))
52 subscriber<T> get_subscriber()
const {
53 return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*
this)).as_dynamic();
61 void on_next(V v)
const {
63 base_type::on_next(std::move(v));
72 detail::behavior_observer<T> s;
81 return s.has_observers();
89 return s.get_subscriber();
95 if (keepAlive.get_subscription().is_subscribed()) {
96 o.on_next(get_value());
98 keepAlive.add(s.get_subscriber(), std::move(o));