5 #if !defined(RXCPP_SOURCES_RX_RANGE_HPP)
6 #define RXCPP_SOURCES_RX_RANGE_HPP
8 #include "../rx-includes.hpp"
39 template<
class T,
class Coordination>
40 struct range :
public source_base<T>
42 typedef rxu::decay_t<Coordination> coordination_type;
43 typedef typename coordination_type::coordinator_type coordinator_type;
45 struct range_state_type
47 range_state_type(T f, T l, std::ptrdiff_t s, coordination_type cn)
51 , coordination(std::move(cn))
57 coordination_type coordination;
59 range_state_type initial;
60 range(T f, T l, std::ptrdiff_t s, coordination_type cn)
61 : initial(f, l, s, std::move(cn))
64 template<
class Subscriber>
65 void on_subscribe(Subscriber o)
const {
69 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
71 auto controller = coordinator.get_worker();
75 auto producer = [=](
const rxsc::schedulable&
self){
77 if (!dest.is_subscribed()) {
83 dest.on_next(state.next);
84 if (!dest.is_subscribed()) {
89 if (
std::max(state.last, state.next) -
std::min(state.last, state.next) < std::abs(state.step)) {
90 if (state.last != state.next) {
91 dest.on_next(state.last);
97 state.next =
static_cast<T
>(state.step + state.next);
104 [&](){
return coordinator.act(producer);},
106 if (selectedProducer.empty()) {
110 controller.schedule(selectedProducer.get());
126 template<
class T,
class Coordination>
130 detail::range<T, Coordination>(
first,
last, step, std::move(cn)));
134 template<
class T,
class Coordination>
136 ->
typename std::enable_if<is_coordination<Coordination>::value,
139 detail::range<T, Coordination>(
first,
last, 1, std::move(cn)));
143 template<
class T,
class Coordination>
145 ->
typename std::enable_if<is_coordination<Coordination>::value,