Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
Loading...
Searching...
No Matches
_flow_graph_streaming_node.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_flow_graph_streaming_H
18#define __TBB_flow_graph_streaming_H
19
20#ifndef __TBB_flow_graph_H
21#error Do not #include this internal file directly; use public TBB headers instead.
22#endif
23
24#if __TBB_PREVIEW_STREAMING_NODE
25
26// Included in namespace tbb::flow::interfaceX (in flow_graph.h)
27
28namespace internal {
29
30template <int N1, int N2>
32 // "+1" since the port_ref range is a closed interval (includes its endpoints).
33 static const int size = N2 - N1 + 1;
34};
35
36} // internal
37
38// The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
39// So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
40template <int N1, int N2 = N1>
43};
44
45namespace internal {
46
47template <typename T>
49 static const int value = 1;
50};
51
52template <int N1, int N2>
53struct num_arguments<port_ref_impl<N1,N2>(*)()> {
55};
56
57template <int N1, int N2>
60};
61
62template <typename... Args>
63void ignore_return_values( Args&&... ) {}
64
65template <typename T>
66T or_return_values( T&& t ) { return t; }
67template <typename T, typename... Rest>
68T or_return_values( T&& t, Rest&&... rest ) {
69 return t | or_return_values( std::forward<Rest>(rest)... );
70}
71
72template<typename JP>
74 typedef size_t type;
75 typedef std::false_type is_key_matching;
76};
77
78template<typename Key>
80 typedef Key type;
81 typedef std::true_type is_key_matching;
82};
83
84template<typename Key>
86 typedef const Key &type;
87 typedef std::true_type is_key_matching;
88};
89
90template<typename Device, typename Key>
92 Device my_device;
93 typename std::decay<Key>::type my_key;
94public:
95 // TODO: investigate why default constructor is required
97 streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
98 Key key() const { return my_key; }
99 const Device& device() const { return my_device; }
100};
101
102// --------- Kernel argument helpers --------- //
103template <typename T>
105 typedef std::false_type type;
106};
107
108template <int N1, int N2>
110 typedef std::true_type type;
111};
112
113template <int N1, int N2>
114struct is_port_ref_impl< port_ref_impl<N1, N2>( * )() > {
115 typedef std::true_type type;
116};
117
118template <typename T>
121};
122
123template <typename ...Args1>
125
126template <typename A1, typename ...Args1>
127struct convert_and_call_impl<A1, Args1...> {
128 static const size_t my_delta = 1; // Index 0 contains device
129
130 template <typename F, typename Tuple, typename ...Args2>
131 static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
132 convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
133 }
134 template <typename F, typename Tuple, typename ...Args2>
135 static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
136 convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
137 }
138 template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
139 static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
140 convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
141 args2..., std::get<N1 + my_delta>(t));
142 }
143 template <typename F, typename Tuple, int N, typename ...Args2>
144 static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
145 convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
146 }
147
148 template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
149 static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
150 doit_impl(x, f, t, fn(), args1..., args2...);
151 }
152 template <typename F, typename Tuple, int N, typename ...Args2>
153 static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
154 doit_impl(x, f, t, fn(), args1..., args2...);
155 }
156};
157
158template <>
160 template <typename F, typename Tuple, typename ...Args2>
161 static void doit(F& f, Tuple&, Args2&... args2) {
162 f(args2...);
163 }
164};
165// ------------------------------------------- //
166
167template<typename JP, typename StreamFactory, typename... Ports>
169 // Do not use 'using' instead of 'struct' because Microsoft Visual C++ 12.0 fails to compile.
170 template <typename T>
172 typedef typename StreamFactory::template async_msg_type<T> type;
173 };
174
175 typedef tuple< typename async_msg_type<Ports>::type... > input_tuple;
177 typedef tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy<JP>::type >,
179
180 // indexer_node parameters pack expansion workaround for VS2013 for streaming_node
181 typedef indexer_node< typename async_msg_type<Ports>::type... > indexer_node_type;
182};
183
184// Default empty implementation
185template<typename StreamFactory, typename KernelInputTuple, typename = void>
187 typedef typename StreamFactory::device_type device_type;
188 typedef typename StreamFactory::kernel_type kernel_type;
189 typedef KernelInputTuple kernel_input_tuple;
190protected:
191 template <typename ...Args>
192 void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
193 factory.send_kernel( device, kernel, args... );
194 }
195};
196
197// Implementation for StreamFactory supporting range
198template<typename StreamFactory, typename KernelInputTuple>
199class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
200 typedef typename StreamFactory::device_type device_type;
201 typedef typename StreamFactory::kernel_type kernel_type;
202 typedef KernelInputTuple kernel_input_tuple;
203
204 typedef typename StreamFactory::range_type range_type;
205
206 // Container for randge. It can contain either port references or real range.
207 struct range_wrapper {
208 virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
209 virtual range_wrapper *clone() const = 0;
210 virtual ~range_wrapper() {}
211 };
212
213 struct range_value : public range_wrapper {
214 range_value( const range_type& value ) : my_value(value) {}
215
216 range_value( range_type&& value ) : my_value(std::move(value)) {}
217
219 return my_value;
220 }
221
222 range_wrapper *clone() const __TBB_override {
223 return new range_value(my_value);
224 }
225 private:
227 };
228
229 template <int N>
230 struct range_mapper : public range_wrapper {
232
234 // "+1" since get<0>(ip) is StreamFactory::device.
235 return get<N + 1>(ip).data(false);
236 }
237
238 range_wrapper *clone() const __TBB_override {
239 return new range_mapper<N>;
240 }
241 };
242
243protected:
244 template <typename ...Args>
245 void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
246 __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
247 factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
248 }
249
250public:
251 kernel_executor_helper() : my_range_wrapper(NULL) {}
252
253 kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
254
255 kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
256 // Set moving holder mappers to NULL to prevent double deallocation
257 executor.my_range_wrapper = NULL;
258 }
259
261 if (my_range_wrapper) delete my_range_wrapper;
262 }
263
264 void set_range(const range_type& work_size) {
265 my_range_wrapper = new range_value(work_size);
266 }
267
268 void set_range(range_type&& work_size) {
269 my_range_wrapper = new range_value(std::move(work_size));
270 }
271
272 template <int N>
274 my_range_wrapper = new range_mapper<N>;
275 }
276
277 template <int N>
279 my_range_wrapper = new range_mapper<N>;
280 }
281
282private:
283 range_wrapper* my_range_wrapper;
284};
285
286} // internal
287
288/*
289/---------------------------------------- streaming_node ------------------------------------\
290| |
291| /--------------\ /----------------------\ /-----------\ /----------------------\ |
292| | | | (device_with_key) O---O | | | |
293| | | | | | | | | |
294O---O indexer_node O---O device_selector_node O---O join_node O---O kernel_node O---O
295| | | | (multifunction_node) | | | | (multifunction_node) | |
296O---O | | O---O | | O---O
297| \--------------/ \----------------------/ \-----------/ \----------------------/ |
298| |
299\--------------------------------------------------------------------------------------------/
300*/
301template<typename... Args>
303
304template<typename... Ports, typename JP, typename StreamFactory>
306streaming_node< tuple<Ports...>, JP, StreamFactory >
307 : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
308 typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
309 , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
310{
311 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
312 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
314protected:
315 typedef typename StreamFactory::device_type device_type;
316 typedef typename StreamFactory::kernel_type kernel_type;
317private:
319 typedef composite_node<input_tuple, output_tuple> base_type;
320 static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
321 static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
322
325
326 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
327 typedef typename indexer_node_type::output_type indexer_node_output_type;
328 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
329 typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
330 typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
331
332 template <int... S>
333 typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
334 return std::tie( internal::input_port<S>( my_indexer_node )... );
335 }
336
337 template <int... S>
338 typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
339 return std::tie( internal::output_port<S>( my_kernel_node )... );
340 }
341
342 typename base_type::input_ports_type get_input_ports() {
343 return get_input_ports( input_sequence() );
344 }
345
346 typename base_type::output_ports_type get_output_ports() {
347 return get_output_ports( output_sequence() );
348 }
349
350 template <int N>
352 make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
353 return 0;
354 }
355
356 template <int... S>
358 make_edge( my_indexer_node, my_device_selector_node );
359 make_edge( my_device_selector_node, my_join_node );
360 internal::ignore_return_values( make_Nth_edge<S + 1>()... );
361 make_edge( my_join_node, my_kernel_node );
362 }
363
364 void make_edges() {
365 make_edges( input_sequence() );
366 }
367
368 class device_selector_base {
369 public:
370 virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
371 virtual device_selector_base *clone( streaming_node &n ) const = 0;
373 };
374
375 template <typename UserFunctor>
376 class device_selector : public device_selector_base, tbb::internal::no_assign {
377 public:
378 device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
379 : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
380 , my_user_functor( uf ), my_node(n), my_factory( f )
381 {
382 my_port_epoches.fill( 0 );
383 }
384
385 void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
386 (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
388 || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
389 }
390
391 device_selector_base *clone( streaming_node &n ) const __TBB_override {
392 return new device_selector( my_user_functor, n, my_factory );
393 }
394 private:
395 typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
396 typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
397
398 template <int... S>
400 dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
401 return dispatch;
402 }
403
404 template <typename T>
405 key_type get_key( std::false_type, const T &, size_t &epoch ) {
407 return epoch++;
408 }
409
410 template <typename T>
411 key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
413 return key_from_message<key_type>( t );
414 }
415
416 template <int N>
417 void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
418 typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
419 elem_type e = internal::cast_to<elem_type>( v );
420 device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
421 my_factory.send_data( device, e );
422 get<N + 1>( op ).try_put( e );
423 }
424
425 template< typename DevicePort >
426 device_type get_device( key_type key, DevicePort& dp ) {
427 typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
428 if ( it == my_devices.end() ) {
429 device_type d = my_user_functor( my_factory );
430 std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
431 bool res = dp.try_put( device_with_key_type( d, key ) );
432 __TBB_ASSERT_EX( res, NULL );
433 my_node.notify_new_device( d );
434 }
435 epoch_desc &e = it->second;
436 device_type d = e.my_device;
437 if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
438 return d;
439 }
440
441 struct epoch_desc {
442 epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
445 };
446
447 std::unordered_map<typename std::decay<key_type>::type, epoch_desc> my_devices;
448 std::array<size_t, NUM_INPUTS> my_port_epoches;
450 UserFunctor my_user_functor;
452 StreamFactory &my_factory;
453 };
454
455 class device_selector_body {
456 public:
457 device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
458
459 void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
460 (*my_device_selector)(v, op);
461 }
462 private:
463 device_selector_base *my_device_selector;
464 };
465
466 // TODO: investigate why copy-construction is disallowed
467 class args_storage_base : tbb::internal::no_copy {
468 public:
469 typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
470
471 virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
472 virtual void send( device_type d ) = 0;
473 virtual args_storage_base *clone() const = 0;
474 virtual ~args_storage_base () {}
475
476 protected:
477 args_storage_base( const kernel_type& kernel, StreamFactory &f )
478 : my_kernel( kernel ), my_factory( f )
479 {}
480
481 args_storage_base( const args_storage_base &k )
482 : tbb::internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
483 {}
484
486 StreamFactory &my_factory;
487 };
488
489 template <typename... Args>
490 class args_storage : public args_storage_base {
492
493 // ---------- Update events helpers ---------- //
494 template <int N>
495 bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
496 const auto& t = get<N + 1>( ip );
497 auto &port = get<N>( op );
498 return port.try_put( t );
499 }
500
501 template <int... S>
503 return internal::or_return_values( do_try_put<S>( ip, op )... );
504 }
505
506 // ------------------------------------------- //
507 class run_kernel_func : tbb::internal::no_assign {
508 public:
509 run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
510 : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
511
512 // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
513 // Allow the compiler to deduce types for function pointers automatically.
514 template <typename... FnArgs>
515 void operator()( FnArgs&... args ) {
516 internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
517 }
518 private:
519 struct kernel_func : tbb::internal::no_copy {
522 const args_storage& my_storage;
524
525 kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
526 : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
527 {}
528
529 template <typename... FnArgs>
530 void operator()( FnArgs&... args ) {
531 my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
532 }
533 } my_kernel_func;
534 };
535
536 template<typename FinalizeFn>
537 class run_finalize_func : tbb::internal::no_assign {
538 public:
539 run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
540 : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
541
542 // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
543 // Allow the compiler to deduce types for function pointers automatically.
544 template <typename... FnArgs>
545 void operator()( FnArgs&... args ) {
546 internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
547 }
548 private:
550
551 struct finalize_func : tbb::internal::no_assign {
552 StreamFactory &my_factory;
554 FinalizeFn my_fn;
555
556 finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
557 : my_factory(factory), my_device(device), my_fn(fn) {}
558
559 template <typename... FnArgs>
560 void operator()( FnArgs&... args ) {
561 my_factory.finalize( my_device, my_fn, args... );
562 }
563 } my_finalize_func;
564 };
565
566 template<typename FinalizeFn>
567 static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
568 return run_finalize_func<FinalizeFn>( ip, factory, fn );
569 }
570
571 class send_func : tbb::internal::no_assign {
572 public:
573 send_func( StreamFactory &factory, device_type d )
574 : my_factory(factory), my_device( d ) {}
575
576 template <typename... FnArgs>
577 void operator()( FnArgs&... args ) {
578 my_factory.send_data( my_device, args... );
579 }
580 private:
581 StreamFactory &my_factory;
583 };
584
585 public:
586 args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
587 : args_storage_base( kernel, f )
588 , my_args_pack( std::forward<Args>(args)... )
589 {}
590
591 args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
592
593 args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
594
595 void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) __TBB_override {
596 // Make const qualified args_pack (from non-const)
597 const args_pack_type& const_args_pack = my_args_pack;
598 // factory.enqure_kernel() gets
599 // - 'ip' tuple elements by reference and updates it (and 'ip') with dependencies
600 // - arguments (from my_args_pack) by const-reference via const_args_pack
601 tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
602
603 if (! do_try_put( ip, op, input_sequence() ) ) {
604 graph& g = n.my_graph;
605 // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
606 g.increment_wait_count();
607
608 // factory.finalize() gets
609 // - 'ip' tuple elements by reference, so 'ip' might be changed
610 // - arguments (from my_args_pack) by const-reference via const_args_pack
611 tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
612 g.decrement_wait_count();
613 }), const_args_pack );
614 }
615 }
616
618 // factory.send() gets arguments by reference and updates these arguments with dependencies
619 // (it gets but usually ignores port_ref-s)
620 tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
621 }
622
623 args_storage_base *clone() const __TBB_override {
624 // Create new args_storage with copying constructor.
625 return new args_storage<Args...>( *this );
626 }
627
628 private:
631 };
632
633 // Body for kernel_multifunction_node.
634 class kernel_body : tbb::internal::no_assign {
635 public:
636 kernel_body( const streaming_node &node ) : my_node( node ) {}
637
639 __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
640 // 'ip' is passed by value to create local copy for updating inside enqueue_kernel()
641 my_node.my_args_storage->enqueue( ip, op, my_node );
642 }
643 private:
645 };
646
647 template <typename T, typename U = typename internal::is_port_ref<T>::type >
648 struct wrap_to_async {
649 typedef T type; // Keep port_ref as it is
650 };
651
652 template <typename T>
653 struct wrap_to_async<T, std::false_type> {
654 typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
655 };
656
657 template <typename... Args>
658 args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
659 // In this variadic template convert all simple types 'T' into 'async_msg_type<T>'
660 return new args_storage<Args...>(storage, std::forward<Args>(args)...);
661 }
662
664 my_args_storage->send( d );
665 }
666
667 template <typename ...Args>
668 void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
669 this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
670 }
671
672public:
673 template <typename DeviceSelector>
674 streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
675 : base_type( g )
676 , my_indexer_node( g )
677 , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
678 , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
679 , my_join_node( g )
680 , my_kernel_node( g, serial, kernel_body( *this ) )
681 // By default, streaming_node maps all its ports to the kernel arguments on a one-to-one basis.
682 , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
683 {
684 base_type::set_external_ports( get_input_ports(), get_output_ports() );
685 make_edges();
686 }
687
688 streaming_node( const streaming_node &node )
689 : base_type( node.my_graph )
690 , my_indexer_node( node.my_indexer_node )
691 , my_device_selector( node.my_device_selector->clone( *this ) )
692 , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
693 , my_join_node( node.my_join_node )
694 , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
695 , my_args_storage( node.my_args_storage->clone() )
696 {
697 base_type::set_external_ports( get_input_ports(), get_output_ports() );
698 make_edges();
699 }
700
701 streaming_node( streaming_node &&node )
702 : base_type( node.my_graph )
703 , my_indexer_node( std::move( node.my_indexer_node ) )
704 , my_device_selector( node.my_device_selector->clone(*this) )
705 , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
706 , my_join_node( std::move( node.my_join_node ) )
707 , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
708 , my_args_storage( node.my_args_storage )
709 {
710 base_type::set_external_ports( get_input_ports(), get_output_ports() );
711 make_edges();
712 // Set moving node mappers to NULL to prevent double deallocation.
713 node.my_args_storage = NULL;
714 }
715
717 if ( my_args_storage ) delete my_args_storage;
718 if ( my_device_selector ) delete my_device_selector;
719 }
720
721 template <typename... Args>
722 void set_args( Args&&... args ) {
723 // Copy the base class of args_storage and create new storage for "Args...".
724 args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
725 delete my_args_storage;
726 my_args_storage = new_args_storage;
727 }
728
729protected:
730 void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
731
732private:
734 device_selector_base *my_device_selector;
736 join_node<kernel_input_tuple, JP> my_join_node;
738
739 args_storage_base *my_args_storage;
740};
741
742#endif // __TBB_PREVIEW_STREAMING_NODE
743#endif // __TBB_flow_graph_streaming_H
__TBB_DEPRECATED internal::port_ref_impl< N1, N2 > port_ref()
class __TBB_DEPRECATED streaming_node
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s const char ITT_FORMAT s __itt_frame ITT_FORMAT p __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu __itt_counter unsigned long long ITT_FORMAT lu __itt_counter __itt_clock_domain unsigned long long void ITT_FORMAT p const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void * fn
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle * key
STL namespace.
The graph class.
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
K key_from_message(const T &t)
Definition: flow_graph.h:721
void ignore_return_values(Args &&...)
field of type K being used for matching.
is_port_ref_impl< typenametbb::internal::strip< T >::type >::type type
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N, N >(*fn)(), Args1 &... args1, Args2 &... args2)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >, Args1 &... args1, Args2 &... args2)
static void doit_impl(std::true_type, F &f, Tuple &t, port_ref_impl< N, N >, Args1 &... args1, Args2 &... args2)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >(*fn)(), Args1 &... args1, Args2 &... args2)
static void doit_impl(std::false_type, F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
static void doit(F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
static void doit(F &f, Tuple &, Args2 &... args2)
tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy< JP >::type >, typename async_msg_type< Ports >::type... > kernel_input_tuple
tuple< typename async_msg_type< Ports >::type... > input_tuple
indexer_node< typename async_msg_type< Ports >::type... > indexer_node_type
StreamFactory::template async_msg_type< T > type
void enqueue_kernel_impl(kernel_input_tuple &, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
void enqueue_kernel_impl(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
internal::streaming_node_traits< JP, StreamFactory, Ports... >::input_tuple input_tuple
internal::make_sequence< NUM_OUTPUTS >::type output_sequence
base_type::output_ports_type get_output_ports(internal::sequence< S... >)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::indexer_node_type indexer_node_type
multifunction_node< kernel_input_tuple, output_tuple > kernel_multifunction_node
base_type::input_ports_type get_input_ports(internal::sequence< S... >)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::output_tuple output_tuple
args_storage_base * make_args_storage(const args_storage_base &storage, Args &&... args) const
void reset_node(reset_flags=rf_reset_protocol) __TBB_override
internal::streaming_device_with_key< device_type, key_type > device_with_key_type
void enqueue_kernel(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
multifunction_node< indexer_node_output_type, kernel_input_tuple > device_selector_node
internal::make_sequence< NUM_INPUTS >::type input_sequence
streaming_node(graph &g, const kernel_type &kernel, DeviceSelector d, StreamFactory &f)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::kernel_input_tuple kernel_input_tuple
composite_node< input_tuple, output_tuple > base_type
virtual void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)=0
virtual device_selector_base * clone(streaming_node &n) const =0
device_selector(UserFunctor uf, streaming_node &n, StreamFactory &f)
std::unordered_map< typename std::decay< key_type >::type, epoch_desc > my_devices
device_selector_base * clone(streaming_node &n) const __TBB_override
void send_and_put_impl(size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op) __TBB_override
static dispatch_funcs_type create_dispatch_funcs(internal::sequence< S... >)
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
virtual void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n)=0
static run_finalize_func< FinalizeFn > make_run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op, internal::sequence< S... >) const
args_storage(const kernel_type &kernel, StreamFactory &f, Args &&... args)
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op) const
void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n) __TBB_override
run_kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage)
kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage, device_type device)
run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
void operator()(kernel_input_tuple ip, typename args_storage_base::output_ports_type &op)
StreamFactory::template async_msg_type< typename tbb::internal::strip< T >::type > type
Detects whether two given types are the same.
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.