RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Classes | Functions | Variables
rxcpp::operators Namespace Reference

Classes

struct  is_operator
 
struct  operator_base
 
struct  tag_operator
 

Functions

template<class... AN>
auto all (AN &&... an) -> operator_factory< all_tag, AN... >
 
template<class... AN>
auto is_empty (AN &&... an) -> operator_factory< is_empty_tag, AN... >
 Returns an Observable that emits true if the source Observable is empty, otherwise false. More...
 
template<class... AN>
auto amb (AN &&... an) -> operator_factory< amb_tag, AN... >
 
template<class... AN>
auto any (AN &&... an) -> operator_factory< any_tag, AN... >
 
template<class... AN>
auto exists (AN &&... an) -> operator_factory< exists_tag, AN... >
 Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto contains (AN &&... an) -> operator_factory< contains_tag, AN... >
 Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto buffer (AN &&... an) -> operator_factory< buffer_count_tag, AN... >
 
template<class... AN>
auto buffer_with_time (AN &&... an) -> operator_factory< buffer_with_time_tag, AN... >
 
template<class... AN>
auto buffer_with_time_or_count (AN &&... an) -> operator_factory< buffer_with_time_or_count_tag, AN... >
 
template<class... AN>
auto combine_latest (AN &&... an) -> operator_factory< combine_latest_tag, AN... >
 
template<class... AN>
auto concat (AN &&... an) -> operator_factory< concat_tag, AN... >
 
template<class... AN>
auto concat_map (AN &&... an) -> operator_factory< concat_map_tag, AN... >
 
template<class... AN>
auto concat_transform (AN &&... an) -> operator_factory< concat_map_tag, AN... >
 
template<class... AN>
auto connect_forever (AN &&... an) -> operator_factory< connect_forever_tag, AN... >
 
template<class... AN>
auto debounce (AN &&... an) -> operator_factory< debounce_tag, AN... >
 
template<class... AN>
auto delay (AN &&... an) -> operator_factory< delay_tag, AN... >
 
template<class... AN>
auto distinct (AN &&... an) -> operator_factory< distinct_tag, AN... >
 
template<class... AN>
auto distinct_until_changed (AN &&... an) -> operator_factory< distinct_until_changed_tag, AN... >
 
template<class... AN>
auto element_at (AN &&... an) -> operator_factory< element_at_tag, AN... >
 
template<class... AN>
auto filter (AN &&... an) -> operator_factory< filter_tag, AN... >
 
template<class... AN>
auto finally (AN &&... an) -> operator_factory< final ly_tag
 
template<class... AN>
auto flat_map (AN &&... an) -> operator_factory< flat_map_tag, AN... >
 
template<class... AN>
auto merge_transform (AN &&... an) -> operator_factory< flat_map_tag, AN... >
 
template<class... AN>
auto group_by (AN &&... an) -> operator_factory< group_by_tag, AN... >
 
template<class... AN>
auto ignore_elements (AN &&... an) -> operator_factory< ignore_elements_tag, AN... >
 
template<class ResultType , class Operator >
auto lift (Operator &&op) -> detail::lift_factory< ResultType, Operator >
 
template<class... AN>
auto map (AN &&... an) -> operator_factory< map_tag, AN... >
 
template<class... AN>
auto transform (AN &&... an) -> operator_factory< map_tag, AN... >
 
template<class... AN>
auto merge (AN &&... an) -> operator_factory< merge_tag, AN... >
 
template<class... AN>
auto merge_delay_error (AN &&... an) -> operator_factory< merge_delay_error_tag, AN... >
 
template<class... AN>
auto multicast (AN &&... an) -> operator_factory< multicast_tag, AN... >
 
template<class... AN>
auto observe_on (AN &&... an) -> operator_factory< observe_on_tag, AN... >
 
template<class... AN>
auto on_error_resume_next (AN &&... an) -> operator_factory< on_error_resume_next_tag, AN... >
 
template<class... AN>
auto switch_on_error (AN &&... an) -> operator_factory< on_error_resume_next_tag, AN... >
 
template<class... AN>
auto pairwise (AN &&... an) -> operator_factory< pairwise_tag, AN... >
 
template<class... AN>
auto publish (AN &&... an) -> operator_factory< publish_tag, AN... >
 
template<class... AN>
auto publish_synchronized (AN &&... an) -> operator_factory< publish_synchronized_tag, AN... >
 Turn a cold observable hot and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto reduce (AN &&... an) -> operator_factory< reduce_tag, AN... >
 
template<class... AN>
auto accumulate (AN &&... an) -> operator_factory< reduce_tag, AN... >
 
auto first () -> operator_factory< first_tag >
 For each item from this observable reduce it by sending only the first item. More...
 
auto last () -> operator_factory< last_tag >
 For each item from this observable reduce it by sending only the last item. More...
 
auto count () -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
 For each item from this observable reduce it by incrementing a count. More...
 
auto average () -> operator_factory< average_tag >
 For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. More...
 
auto sum () -> operator_factory< sum_tag >
 For each item from this observable reduce it by adding to the previous items. More...
 
auto min () -> operator_factory< min_tag >
 For each item from this observable reduce it by taking the min value of the previous items. More...
 
auto max () -> operator_factory< max_tag >
 For each item from this observable reduce it by taking the max value of the previous items. More...
 
template<class... AN>
auto ref_count (AN &&... an) -> operator_factory< ref_count_tag, AN... >
 
template<class... AN>
auto repeat (AN &&... an) -> operator_factory< repeat_tag, AN... >
 
template<class... AN>
auto replay (AN &&... an) -> operator_factory< replay_tag, AN... >
 
template<class... AN>
auto retry (AN &&... an) -> operator_factory< retry_tag, AN... >
 
template<class... AN>
auto sample_with_time (AN &&... an) -> operator_factory< sample_with_time_tag, AN... >
 
template<class... AN>
auto scan (AN &&... an) -> operator_factory< scan_tag, AN... >
 
template<class... AN>
auto sequence_equal (AN &&... an) -> operator_factory< sequence_equal_tag, AN... >
 
template<class... AN>
auto skip (AN &&... an) -> operator_factory< skip_tag, AN... >
 
template<class... AN>
auto skip_last (AN &&... an) -> operator_factory< skip_last_tag, AN... >
 
template<class... AN>
auto skip_until (AN &&... an) -> operator_factory< skip_until_tag, AN... >
 
template<class... AN>
auto skip_while (AN &&... an) -> operator_factory< skip_while_tag, AN... >
 
template<class... AN>
auto start_with (AN &&... an) -> operator_factory< start_with_tag, AN... >
 
template<class T , class... ArgN>
auto subscribe (ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
 
auto as_dynamic () -> detail::dynamic_factory
 
auto as_blocking () -> detail::blocking_factory
 
template<class... AN>
auto subscribe_on (AN &&... an) -> operator_factory< subscribe_on_tag, AN... >
 
template<class... AN>
auto switch_if_empty (AN &&... an) -> operator_factory< switch_if_empty_tag, AN... >
 
template<class... AN>
auto default_if_empty (AN &&... an) -> operator_factory< default_if_empty_tag, AN... >
 If the source Observable terminates without emitting any items, emits a default item and completes. More...
 
template<class... AN>
auto switch_on_next (AN &&... an) -> operator_factory< switch_on_next_tag, AN... >
 
template<class... AN>
auto take (AN &&... an) -> operator_factory< take_tag, AN... >
 
template<class... AN>
auto take_last (AN &&... an) -> operator_factory< take_last_tag, AN... >
 
template<class... AN>
auto take_until (AN &&... an) -> operator_factory< take_until_tag, AN... >
 
template<class... AN>
auto take_while (AN &&... an) -> operator_factory< take_while_tag, AN... >
 
template<class... AN>
auto tap (AN &&... an) -> operator_factory< tap_tag, AN... >
 
template<class... AN>
auto time_interval (AN &&... an) -> operator_factory< time_interval_tag, AN... >
 
template<class... AN>
auto timeout (AN &&... an) -> operator_factory< timeout_tag, AN... >
 
template<class... AN>
auto timestamp (AN &&... an) -> operator_factory< timestamp_tag, AN... >
 
template<class... AN>
auto window (AN &&... an) -> operator_factory< window_tag, AN... >
 
template<class... AN>
auto window_with_time (AN &&... an) -> operator_factory< window_with_time_tag, AN... >
 
template<class... AN>
auto window_with_time_or_count (AN &&... an) -> operator_factory< window_with_time_or_count_tag, AN... >
 
template<class... AN>
auto window_toggle (AN &&... an) -> operator_factory< window_toggle_tag, AN... >
 
template<class... AN>
auto with_latest_from (AN &&... an) -> operator_factory< with_latest_from_tag, AN... >
 
template<class... AN>
auto zip (AN &&... an) -> operator_factory< zip_tag, AN... >
 

Variables

auto AN
 

Function Documentation

◆ accumulate()

template<class... AN>
auto rxcpp::operators::accumulate ( AN &&...  an) -> operator_factory<reduce_tag, AN...>

◆ all()

template<class... AN>
auto rxcpp::operators::all ( AN &&...  an) -> operator_factory<all_tag, AN...>

◆ amb()

template<class... AN>
auto rxcpp::operators::amb ( AN &&...  an) -> operator_factory<amb_tag, AN...>

◆ any()

template<class... AN>
auto rxcpp::operators::any ( AN &&...  an) -> operator_factory<any_tag, AN...>

◆ as_blocking()

auto rxcpp::operators::as_blocking ( ) -> detail::blocking_factory
inline

Return a new observable that contains the blocking methods for this observable.

Returns
An observable that contains the blocking methods for this observable.
Sample Code\n
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){
printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 3070107664] Start task
[thread 3015701568] Emit value: 1
[thread 3015701568] OnNext: 1
[thread 3015701568] Emit value: 2
[thread 3015701568] OnNext: 2
[thread 3015701568] Emit value: 3
[thread 3015701568] OnNext: 3
[thread 3015701568] OnCompleted
[thread 3070107664] Finish task

◆ as_dynamic()

auto rxcpp::operators::as_dynamic ( ) -> detail::dynamic_factory
inline

Return a new observable that performs type-forgetting conversion of this observable.

Returns
The source observable converted to observable<T>.
Note
This operator could be useful to workaround lambda deduction bug on msvc 2013.
Sample Code\n
auto o1 = rxcpp::observable<>::range(1, 3);
auto values = o1.concat(o2, o3);
printf("type of o1: %s\n", typeid(o1).name());
printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name());
printf("type of o2: %s\n", typeid(o2).name());
printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name());
printf("type of o3: %s\n", typeid(o3).name());
printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name());
printf("type of values: %s\n", typeid(values).name());
printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name());
type of o1: N5rxcpp10observableIiNS_7sources6detail5rangeIiNS_19identity_one_workerEEEEE
type of o1.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o2: N5rxcpp10observableIiNS_7sources6detail7iterateISt5arrayIiLj1EENS_19identity_one_workerEEEEE
type of o2.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o3: N5rxcpp10observableIiNS_7sources6detail7iterateISt16initializer_listIiENS_19identity_one_workerEEEEE
type of o3.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of values: N5rxcpp10observableIiNS_9operators6detail6concatINS0_IiNS_18dynamic_observableIiEEEENS0_IS6_NS4_IS6_EEEENS_19identity_one_workerEEEEE
type of values.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE

◆ average()

auto rxcpp::operators::average ( ) -> operator_factory<average_tag>
inline

For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.

Returns
An observable that emits a single item: the average of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 4).average();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2.500000
OnCompleted
When the source observable completes without emitting any items:
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: average() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ buffer()

template<class... AN>
auto rxcpp::operators::buffer ( AN &&...  an) -> operator_factory<buffer_count_tag, AN...>

◆ buffer_with_time()

template<class... AN>
auto rxcpp::operators::buffer_with_time ( AN &&...  an) -> operator_factory<buffer_with_time_tag, AN...>

◆ buffer_with_time_or_count()

template<class... AN>
auto rxcpp::operators::buffer_with_time_or_count ( AN &&...  an) -> operator_factory<buffer_with_time_or_count_tag, AN...>

◆ combine_latest()

template<class... AN>
auto rxcpp::operators::combine_latest ( AN &&...  an) -> operator_factory<combine_latest_tag, AN...>

◆ concat()

template<class... AN>
auto rxcpp::operators::concat ( AN &&...  an) -> operator_factory<concat_tag, AN...>

◆ concat_map()

template<class... AN>
auto rxcpp::operators::concat_map ( AN &&...  an) -> operator_factory<concat_map_tag, AN...>

◆ concat_transform()

template<class... AN>
auto rxcpp::operators::concat_transform ( AN &&...  an) -> operator_factory<concat_map_tag, AN...>

◆ connect_forever()

template<class... AN>
auto rxcpp::operators::connect_forever ( AN &&...  an) -> operator_factory<connect_forever_tag, AN...>

◆ contains()

template<class... AN>
auto rxcpp::operators::contains ( AN &&...  an) -> operator_factory<contains_tag, AN...>

Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Tthe type of the item to search for.
Parameters
valuethe item to search for.
Returns
An observable that emits true if the source Observable emitted a specified item, otherwise false.
Sample Code\n
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).contains(3);
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted

◆ count()

auto rxcpp::operators::count ( ) -> operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>
inline

For each item from this observable reduce it by incrementing a count.

Returns
An observable that emits a single item: the number of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ debounce()

template<class... AN>
auto rxcpp::operators::debounce ( AN &&...  an) -> operator_factory<debounce_tag, AN...>

◆ default_if_empty()

template<class... AN>
auto rxcpp::operators::default_if_empty ( AN &&...  an) -> operator_factory<default_if_empty_tag, AN...>

If the source Observable terminates without emitting any items, emits a default item and completes.

Template Parameters
Valuethe type of the value to emit.
Parameters
vthe default value to emit.
Returns
Observable that emits the specified default item if the source observable is empty.
Sample Code\n
values.subscribe(
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); } );
OnNext: 42
OnCompleted

◆ delay()

template<class... AN>
auto rxcpp::operators::delay ( AN &&...  an) -> operator_factory<delay_tag, AN...>

◆ distinct()

template<class... AN>
auto rxcpp::operators::distinct ( AN &&...  an) -> operator_factory<distinct_tag, AN...>

◆ distinct_until_changed()

template<class... AN>
auto rxcpp::operators::distinct_until_changed ( AN &&...  an) -> operator_factory<distinct_until_changed_tag, AN...>

◆ element_at()

template<class... AN>
auto rxcpp::operators::element_at ( AN &&...  an) -> operator_factory<element_at_tag, AN...>

◆ exists()

template<class... AN>
auto rxcpp::operators::exists ( AN &&...  an) -> operator_factory<exists_tag, AN...>

Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
An observable that emits true if any item emitted by the source observable satisfies a specified condition, otherwise false.
Sample Code\n
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).exists([](int n) { return n > 3; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted

◆ filter()

template<class... AN>
auto rxcpp::operators::filter ( AN &&...  an) -> operator_factory<filter_tag, AN...>

◆ finally()

template<class... AN>
auto rxcpp::operators::finally ( AN &&...  an) -> operator_factory<
final

◆ first()

auto rxcpp::operators::first ( ) -> operator_factory<first_tag>
inline

For each item from this observable reduce it by sending only the first item.

Returns
An observable that emits only the very first item emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).first();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
When the source observable calls on_error:
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: first() requires a stream with at least one value

◆ flat_map()

template<class... AN>
auto rxcpp::operators::flat_map ( AN &&...  an) -> operator_factory<flat_map_tag, AN...>

◆ group_by()

template<class... AN>
auto rxcpp::operators::group_by ( AN &&...  an) -> operator_factory<group_by_tag, AN...>

◆ ignore_elements()

template<class... AN>
auto rxcpp::operators::ignore_elements ( AN &&...  an) -> operator_factory<ignore_elements_tag, AN...>

◆ is_empty()

template<class... AN>
auto rxcpp::operators::is_empty ( AN &&...  an) -> operator_factory<is_empty_tag, AN...>

Returns an Observable that emits true if the source Observable is empty, otherwise false.

Returns
An observable that emits a boolean value.
Sample Code\n
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).is_empty();
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });

◆ last()

auto rxcpp::operators::last ( ) -> operator_factory<last_tag>
inline

For each item from this observable reduce it by sending only the last item.

Returns
An observable that emits only the very last item emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).last();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: last() requires a stream with at least one value

◆ lift()

template<class ResultType , class Operator >
auto rxcpp::operators::lift ( Operator &&  op) -> detail::lift_factory<ResultType, Operator>

◆ map()

template<class... AN>
auto rxcpp::operators::map ( AN &&...  an) -> operator_factory<map_tag, AN...>

◆ max()

auto rxcpp::operators::max ( ) -> operator_factory<max_tag>
inline

For each item from this observable reduce it by taking the max value of the previous items.

Returns
An observable that emits a single item: the max of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 4).max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4.000000
OnCompleted
When the source observable completes without emitting any items:
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: max() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ merge()

template<class... AN>
auto rxcpp::operators::merge ( AN &&...  an) -> operator_factory<merge_tag, AN...>

◆ merge_delay_error()

template<class... AN>
auto rxcpp::operators::merge_delay_error ( AN &&...  an) -> operator_factory<merge_delay_error_tag, AN...>

◆ merge_transform()

template<class... AN>
auto rxcpp::operators::merge_transform ( AN &&...  an) -> operator_factory<flat_map_tag, AN...>

◆ min()

auto rxcpp::operators::min ( ) -> operator_factory<min_tag>
inline

For each item from this observable reduce it by taking the min value of the previous items.

Returns
An observable that emits a single item: the min of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 4).min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1.000000
OnCompleted
When the source observable completes without emitting any items:
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: min() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ multicast()

template<class... AN>
auto rxcpp::operators::multicast ( AN &&...  an) -> operator_factory<multicast_tag, AN...>

◆ observe_on()

template<class... AN>
auto rxcpp::operators::observe_on ( AN &&...  an) -> operator_factory<observe_on_tag, AN...>

◆ on_error_resume_next()

template<class... AN>
auto rxcpp::operators::on_error_resume_next ( AN &&...  an) -> operator_factory<on_error_resume_next_tag, AN...>

◆ pairwise()

template<class... AN>
auto rxcpp::operators::pairwise ( AN &&...  an) -> operator_factory<pairwise_tag, AN...>

◆ publish()

template<class... AN>
auto rxcpp::operators::publish ( AN &&...  an) -> operator_factory<publish_tag, AN...>

◆ publish_synchronized()

template<class... AN>
auto rxcpp::operators::publish_synchronized ( AN &&...  an) -> operator_factory<publish_synchronized_tag, AN...>

Turn a cold observable hot and allow connections to the source to be independent of subscriptions.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cna scheduler all values are queued and delivered on.
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
Sample Code\n
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted

◆ reduce()

template<class... AN>
auto rxcpp::operators::reduce ( AN &&...  an) -> operator_factory<reduce_tag, AN...>

◆ ref_count()

template<class... AN>
auto rxcpp::operators::ref_count ( AN &&...  an) -> operator_factory<ref_count_tag, AN...>

◆ repeat()

template<class... AN>
auto rxcpp::operators::repeat ( AN &&...  an) -> operator_factory<repeat_tag, AN...>

◆ replay()

template<class... AN>
auto rxcpp::operators::replay ( AN &&...  an) -> operator_factory<replay_tag, AN...>

◆ retry()

template<class... AN>
auto rxcpp::operators::retry ( AN &&...  an) -> operator_factory<retry_tag, AN...>

◆ sample_with_time()

template<class... AN>
auto rxcpp::operators::sample_with_time ( AN &&...  an) -> operator_factory<sample_with_time_tag, AN...>

◆ scan()

template<class... AN>
auto rxcpp::operators::scan ( AN &&...  an) -> operator_factory<scan_tag, AN...>

◆ sequence_equal()

template<class... AN>
auto rxcpp::operators::sequence_equal ( AN &&...  an) -> operator_factory<sequence_equal_tag, AN...>

◆ skip()

template<class... AN>
auto rxcpp::operators::skip ( AN &&...  an) -> operator_factory<skip_tag, AN...>

◆ skip_last()

template<class... AN>
auto rxcpp::operators::skip_last ( AN &&...  an) -> operator_factory<skip_last_tag, AN...>

◆ skip_until()

template<class... AN>
auto rxcpp::operators::skip_until ( AN &&...  an) -> operator_factory<skip_until_tag, AN...>

◆ skip_while()

template<class... AN>
auto rxcpp::operators::skip_while ( AN &&...  an) -> operator_factory<skip_while_tag, AN...>

◆ start_with()

template<class... AN>
auto rxcpp::operators::start_with ( AN &&...  an) -> operator_factory<start_with_tag, AN...>

◆ subscribe()

template<class T , class... ArgN>
auto rxcpp::operators::subscribe ( ArgN &&...  an) -> detail::subscribe_factory<decltype (make_subscriber<T>(std::forward<ArgN>(an)...))>

◆ subscribe_on()

template<class... AN>
auto rxcpp::operators::subscribe_on ( AN &&...  an) -> operator_factory<subscribe_on_tag, AN...>

◆ sum()

auto rxcpp::operators::sum ( ) -> operator_factory<sum_tag>
inline

For each item from this observable reduce it by adding to the previous items.

Returns
An observable that emits a single item: the sum of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 6
OnCompleted
When the source observable completes without emitting any items:
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: sum() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ switch_if_empty()

template<class... AN>
auto rxcpp::operators::switch_if_empty ( AN &&...  an) -> operator_factory<switch_if_empty_tag, AN...>

◆ switch_on_error()

template<class... AN>
auto rxcpp::operators::switch_on_error ( AN &&...  an) -> operator_factory<on_error_resume_next_tag, AN...>

◆ switch_on_next()

template<class... AN>
auto rxcpp::operators::switch_on_next ( AN &&...  an) -> operator_factory<switch_on_next_tag, AN...>

◆ take()

template<class... AN>
auto rxcpp::operators::take ( AN &&...  an) -> operator_factory<take_tag, AN...>

◆ take_last()

template<class... AN>
auto rxcpp::operators::take_last ( AN &&...  an) -> operator_factory<take_last_tag, AN...>

◆ take_until()

template<class... AN>
auto rxcpp::operators::take_until ( AN &&...  an) -> operator_factory<take_until_tag, AN...>

◆ take_while()

template<class... AN>
auto rxcpp::operators::take_while ( AN &&...  an) -> operator_factory<take_while_tag, AN...>

◆ tap()

template<class... AN>
auto rxcpp::operators::tap ( AN &&...  an) -> operator_factory<tap_tag, AN...>

◆ time_interval()

template<class... AN>
auto rxcpp::operators::time_interval ( AN &&...  an) -> operator_factory<time_interval_tag, AN...>

◆ timeout()

template<class... AN>
auto rxcpp::operators::timeout ( AN &&...  an) -> operator_factory<timeout_tag, AN...>

◆ timestamp()

template<class... AN>
auto rxcpp::operators::timestamp ( AN &&...  an) -> operator_factory<timestamp_tag, AN...>

◆ transform()

template<class... AN>
auto rxcpp::operators::transform ( AN &&...  an) -> operator_factory<map_tag, AN...>

◆ window()

template<class... AN>
auto rxcpp::operators::window ( AN &&...  an) -> operator_factory<window_tag, AN...>

◆ window_toggle()

template<class... AN>
auto rxcpp::operators::window_toggle ( AN &&...  an) -> operator_factory<window_toggle_tag, AN...>

◆ window_with_time()

template<class... AN>
auto rxcpp::operators::window_with_time ( AN &&...  an) -> operator_factory<window_with_time_tag, AN...>

◆ window_with_time_or_count()

template<class... AN>
auto rxcpp::operators::window_with_time_or_count ( AN &&...  an) -> operator_factory<window_with_time_or_count_tag, AN...>

◆ with_latest_from()

template<class... AN>
auto rxcpp::operators::with_latest_from ( AN &&...  an) -> operator_factory<with_latest_from_tag, AN...>

◆ zip()

template<class... AN>
auto rxcpp::operators::zip ( AN &&...  an) -> operator_factory<zip_tag, AN...>

Variable Documentation

◆ AN

auto rxcpp::operators::AN
Initial value:
{
return operator_factory<finally_tag, AN...>(std::make_tuple(std::forward<AN>(an)...))
rxcpp::operators::AN
auto AN
Definition: rx-finally.hpp:105
rxcpp::operators::max
auto max() -> operator_factory< max_tag >
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-reduce.hpp:496
rxcpp::operators::min
auto min() -> operator_factory< min_tag >
For each item from this observable reduce it by taking the min value of the previous items.
Definition: rx-reduce.hpp:475
rxcpp::operators::concat
auto concat(AN &&... an) -> operator_factory< concat_tag, AN... >
Definition: rx-concat.hpp:235
rxcpp::operators::last
auto last() -> operator_factory< last_tag >
For each item from this observable reduce it by sending only the last item.
Definition: rx-reduce.hpp:395
rxcpp::sources::timer
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Definition: rx-timer.hpp:114
rxcpp::empty_error
Definition: rx-operators.hpp:289
rxcpp::sources::range
auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> observable< T, detail::range< T, identity_one_worker >>
Definition: rx-range.hpp:119
rxcpp::observe_on_new_thread
observe_on_one_worker observe_on_new_thread()
Definition: rx-observe_on.hpp:328
rxcpp::operators::first
auto first() -> operator_factory< first_tag >
For each item from this observable reduce it by sending only the first item.
Definition: rx-reduce.hpp:378
rxcpp::operators::average
auto average() -> operator_factory< average_tag >
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-reduce.hpp:433
rxcpp::observable::sum
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1254
rxcpp::operators::count
auto count() -> operator_factory< reduce_tag, int, rxu::count, rxu::detail::take_at< 0 >>
For each item from this observable reduce it by incrementing a count.
Definition: rx-reduce.hpp:412
cpplinq::from
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
rxcpp::operators::as_blocking
auto as_blocking() -> detail::blocking_factory
Definition: rx-subscribe.hpp:144
rxcpp::operators::take
auto take(AN &&... an) -> operator_factory< take_tag, AN... >
Definition: rx-take.hpp:133
rxcpp::observable::last
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1230
rxcpp::operators::subscribe
auto subscribe(ArgN &&... an) -> detail::subscribe_factory< decltype(make_subscriber< T >(std::forward< ArgN >(an)...))>
Definition: rx-subscribe.hpp:87
rxcpp::util::rethrow_exception
RXCPP_NORETURN void rethrow_exception(error_ptr e)
Definition: rx-util.hpp:902
rxcpp::sources::interval
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Definition: rx-interval.hpp:113
rxcpp::observable::max
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-observable.hpp:1278
rxcpp::operators::publish_synchronized
auto publish_synchronized(AN &&... an) -> operator_factory< publish_synchronized_tag, AN... >
Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
Definition: rx-publish.hpp:72
rxcpp::observable::default_if_empty
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes.
Definition: rx-observable.hpp:722
rxcpp::observable::first
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1218
rxcpp::operators::sum
auto sum() -> operator_factory< sum_tag >
For each item from this observable reduce it by adding to the previous items.
Definition: rx-reduce.hpp:454
rxcpp::observable
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
rxcpp::observable::average
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1266
rxcpp::sources::just
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
rxcpp::observable::min
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items.
Definition: rx-observable.hpp:1290