00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033
00034
00035 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00036 #define TBB_PREVIEW_TUPLE 1
00037 #include "compat/tuple"
00038 #else
00039 #include <tuple>
00040 #endif
00041
00042 #include<list>
00043 #include<queue>
00044
00055 namespace tbb {
00056 namespace flow {
00057
00059 enum concurrency { unlimited = 0, serial = 1 };
00060
00061 namespace interface6 {
00062
00064 class continue_msg {};
00065
00066 template< typename T > class sender;
00067 template< typename T > class receiver;
00068 class continue_receiver;
00069
00071 template< typename T >
00072 class sender {
00073 public:
00075 typedef T output_type;
00076
00078 typedef receiver<T> successor_type;
00079
00080 virtual ~sender() {}
00081
00083 virtual bool register_successor( successor_type &r ) = 0;
00084
00086 virtual bool remove_successor( successor_type &r ) = 0;
00087
00089 virtual bool try_get( T & ) { return false; }
00090
00092 virtual bool try_reserve( T & ) { return false; }
00093
00095 virtual bool try_release( ) { return false; }
00096
00098 virtual bool try_consume( ) { return false; }
00099 };
00100
00102 template< typename T >
00103 class receiver {
00104 public:
00106 typedef T input_type;
00107
00109 typedef sender<T> predecessor_type;
00110
00112 virtual ~receiver() {}
00113
00115 virtual bool try_put( const T& t ) = 0;
00116
00118 virtual bool register_predecessor( predecessor_type & ) { return false; }
00119
00121 virtual bool remove_predecessor( predecessor_type & ) { return false; }
00122 };
00123
00125
00126 class continue_receiver : public receiver< continue_msg > {
00127 public:
00128
00130 typedef continue_msg input_type;
00131
00133 typedef sender< continue_msg > predecessor_type;
00134
00136 continue_receiver( int number_of_predecessors = 0 ) {
00137 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00138 my_current_count = 0;
00139 }
00140
00142 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00143 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00144 my_current_count = 0;
00145 }
00146
00148 virtual ~continue_receiver() { }
00149
00151 bool register_predecessor( predecessor_type & ) {
00152 spin_mutex::scoped_lock l(my_mutex);
00153 ++my_predecessor_count;
00154 return true;
00155 }
00156
00158
00161 bool remove_predecessor( predecessor_type & ) {
00162 spin_mutex::scoped_lock l(my_mutex);
00163 --my_predecessor_count;
00164 return true;
00165 }
00166
00168
00170 bool try_put( const input_type & ) {
00171 {
00172 spin_mutex::scoped_lock l(my_mutex);
00173 if ( ++my_current_count < my_predecessor_count )
00174 return true;
00175 else
00176 my_current_count = 0;
00177 }
00178 execute();
00179 return true;
00180 }
00181
00182 protected:
00183 spin_mutex my_mutex;
00184 int my_predecessor_count;
00185 int my_current_count;
00186 int my_initial_predecessor_count;
00187
00189
00191 virtual void execute() = 0;
00192 };
00193
00194 #include "internal/_flow_graph_impl.h"
00195 using namespace internal::graph_policy_namespace;
00196
00197 class graph;
00198 class graph_node;
00199
00200 template <typename GraphContainerType, typename GraphNodeType>
00201 class graph_iterator {
00202 friend class graph;
00203 friend class graph_node;
00204 public:
00205 typedef size_t size_type;
00206 typedef GraphNodeType value_type;
00207 typedef GraphNodeType* pointer;
00208 typedef GraphNodeType& reference;
00209 typedef const GraphNodeType& const_reference;
00210 typedef std::forward_iterator_tag iterator_category;
00211
00213 graph_iterator() : my_graph(NULL), current_node(NULL) {}
00214
00216 graph_iterator(const graph_iterator& other) :
00217 my_graph(other.my_graph), current_node(other.current_node)
00218 {}
00219
00221 graph_iterator& operator=(const graph_iterator& other) {
00222 if (this != &other) {
00223 my_graph = other.my_graph;
00224 current_node = other.current_node;
00225 }
00226 return *this;
00227 }
00228
00230 reference operator*() const;
00231
00233 pointer operator->() const;
00234
00236 bool operator==(const graph_iterator& other) const {
00237 return ((my_graph == other.my_graph) && (current_node == other.current_node));
00238 }
00239
00241 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
00242
00244 graph_iterator& operator++() {
00245 internal_forward();
00246 return *this;
00247 }
00248
00250 graph_iterator operator++(int) {
00251 graph_iterator result = *this;
00252 operator++();
00253 return result;
00254 }
00255
00256 private:
00257
00258 GraphContainerType *my_graph;
00259
00260 pointer current_node;
00261
00263 graph_iterator(GraphContainerType *g, bool begin);
00264 void internal_forward();
00265 };
00266
00268
00269 class graph : tbb::internal::no_copy {
00270 friend class graph_node;
00271
00272 template< typename Body >
00273 class run_task : public task {
00274 public:
00275 run_task( Body& body ) : my_body(body) {}
00276 task *execute() {
00277 my_body();
00278 return NULL;
00279 }
00280 private:
00281 Body my_body;
00282 };
00283
00284 template< typename Receiver, typename Body >
00285 class run_and_put_task : public task {
00286 public:
00287 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00288 task *execute() {
00289 my_receiver.try_put( my_body() );
00290 return NULL;
00291 }
00292 private:
00293 Receiver &my_receiver;
00294 Body my_body;
00295 };
00296
00297 public:
00299 explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
00300 {
00301 own_context = true;
00302 my_context = new task_group_context();
00303 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00304 my_root_task->set_ref_count(1);
00305 }
00306
00308 explicit graph(task_group_context& use_this_context) :
00309 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
00310 {
00311 own_context = false;
00312 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00313 my_root_task->set_ref_count(1);
00314 }
00315
00317
00318 ~graph() {
00319 wait_for_all();
00320 my_root_task->set_ref_count(0);
00321 task::destroy( *my_root_task );
00322 if (own_context) delete my_context;
00323 }
00324
00326
00328 void increment_wait_count() {
00329 if (my_root_task)
00330 my_root_task->increment_ref_count();
00331 }
00332
00334
00336 void decrement_wait_count() {
00337 if (my_root_task)
00338 my_root_task->decrement_ref_count();
00339 }
00340
00342
00344 template< typename Receiver, typename Body >
00345 void run( Receiver &r, Body body ) {
00346 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00347 run_and_put_task< Receiver, Body >( r, body ) );
00348 }
00349
00351
00353 template< typename Body >
00354 void run( Body body ) {
00355 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00356 run_task< Body >( body ) );
00357 }
00358
00360
00361 void wait_for_all() {
00362 if (my_root_task)
00363 my_root_task->wait_for_all();
00364 my_root_task->set_ref_count(1);
00365 }
00366
00368 task * root_task() {
00369 return my_root_task;
00370 }
00371
00372
00373 template<typename C, typename N>
00374 friend class graph_iterator;
00375
00376
00377 typedef graph_iterator<graph,graph_node> iterator;
00378 typedef graph_iterator<const graph,const graph_node> const_iterator;
00379
00380
00382 iterator begin() { return iterator(this, true); }
00384 iterator end() { return iterator(this, false); }
00386 const_iterator begin() const { return const_iterator(this, true); }
00388 const_iterator end() const { return const_iterator(this, false); }
00390 const_iterator cbegin() const { return const_iterator(this, true); }
00392 const_iterator cend() const { return const_iterator(this, false); }
00393
00394 private:
00395 task *my_root_task;
00396 task_group_context *my_context;
00397 bool own_context;
00398
00399 graph_node *my_nodes, *my_nodes_last;
00400
00401 spin_mutex nodelist_mutex;
00402 void register_node(graph_node *n);
00403 void remove_node(graph_node *n);
00404 };
00405
00406 template <typename C, typename N>
00407 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
00408 {
00409 if (begin) current_node = my_graph->my_nodes;
00410
00411 }
00412
00413 template <typename C, typename N>
00414 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
00415 __TBB_ASSERT(current_node, "graph_iterator at end");
00416 return *operator->();
00417 }
00418
00419 template <typename C, typename N>
00420 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
00421 return current_node;
00422 }
00423
00424
00425 template <typename C, typename N>
00426 void graph_iterator<C,N>::internal_forward() {
00427 if (current_node) current_node = current_node->next;
00428 }
00429
00431 class graph_node : tbb::internal::no_assign {
00432 friend class graph;
00433 template<typename C, typename N>
00434 friend class graph_iterator;
00435 protected:
00436 graph& my_graph;
00437 graph_node *next, *prev;
00438 public:
00439 graph_node(graph& g) : my_graph(g) {
00440 my_graph.register_node(this);
00441 }
00442 virtual ~graph_node() {
00443 my_graph.remove_node(this);
00444 }
00445 };
00446
00447 void graph::register_node(graph_node *n) {
00448 n->next = NULL;
00449 {
00450 spin_mutex::scoped_lock lock(nodelist_mutex);
00451 n->prev = my_nodes_last;
00452 if (my_nodes_last) my_nodes_last->next = n;
00453 my_nodes_last = n;
00454 if (!my_nodes) my_nodes = n;
00455 }
00456 }
00457
00458 void graph::remove_node(graph_node *n) {
00459 {
00460 spin_mutex::scoped_lock lock(nodelist_mutex);
00461 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
00462 if (n->prev) n->prev->next = n->next;
00463 if (n->next) n->next->prev = n->prev;
00464 if (my_nodes_last == n) my_nodes_last = n->prev;
00465 if (my_nodes == n) my_nodes = n->next;
00466 }
00467 n->prev = n->next = NULL;
00468 }
00469
00470 #include "internal/_flow_graph_node_impl.h"
00471
00473 template < typename Output >
00474 class source_node : public graph_node, public sender< Output > {
00475 using graph_node::my_graph;
00476 public:
00478 typedef Output output_type;
00479
00481 typedef receiver< Output > successor_type;
00482
00484 template< typename Body >
00485 source_node( graph &g, Body body, bool is_active = true )
00486 : graph_node(g), my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00487 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00488 my_reserved(false), my_has_cached_item(false)
00489 {
00490 my_successors.set_owner(this);
00491 }
00492
00494 source_node( const source_node& src ) :
00495 graph_node(src.my_graph), sender<Output>(),
00496 my_root_task( src.my_root_task), my_active(src.init_my_active),
00497 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00498 my_reserved(false), my_has_cached_item(false)
00499 {
00500 my_successors.set_owner(this);
00501 }
00502
00504 ~source_node() { delete my_body; }
00505
00507 bool register_successor( receiver<output_type> &r ) {
00508 spin_mutex::scoped_lock lock(my_mutex);
00509 my_successors.register_successor(r);
00510 if ( my_active )
00511 spawn_put();
00512 return true;
00513 }
00514
00516 bool remove_successor( receiver<output_type> &r ) {
00517 spin_mutex::scoped_lock lock(my_mutex);
00518 my_successors.remove_successor(r);
00519 return true;
00520 }
00521
00523 bool try_get( output_type &v ) {
00524 spin_mutex::scoped_lock lock(my_mutex);
00525 if ( my_reserved )
00526 return false;
00527
00528 if ( my_has_cached_item ) {
00529 v = my_cached_item;
00530 my_has_cached_item = false;
00531 } else if ( (*my_body)(v) == false ) {
00532 return false;
00533 }
00534 return true;
00535 }
00536
00538 bool try_reserve( output_type &v ) {
00539 spin_mutex::scoped_lock lock(my_mutex);
00540 if ( my_reserved ) {
00541 return false;
00542 }
00543
00544 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00545 my_has_cached_item = true;
00546
00547 if ( my_has_cached_item ) {
00548 v = my_cached_item;
00549 my_reserved = true;
00550 return true;
00551 } else {
00552 return false;
00553 }
00554 }
00555
00557
00558 bool try_release( ) {
00559 spin_mutex::scoped_lock lock(my_mutex);
00560 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00561 my_reserved = false;
00562 if(!my_successors.empty())
00563 spawn_put();
00564 return true;
00565 }
00566
00568 bool try_consume( ) {
00569 spin_mutex::scoped_lock lock(my_mutex);
00570 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00571 my_reserved = false;
00572 my_has_cached_item = false;
00573 if ( !my_successors.empty() ) {
00574 spawn_put();
00575 }
00576 return true;
00577 }
00578
00580 void activate() {
00581 spin_mutex::scoped_lock lock(my_mutex);
00582 my_active = true;
00583 if ( !my_successors.empty() )
00584 spawn_put();
00585 }
00586
00587 private:
00588 task *my_root_task;
00589 spin_mutex my_mutex;
00590 bool my_active;
00591 bool init_my_active;
00592 internal::source_body<output_type> *my_body;
00593 internal::broadcast_cache< output_type > my_successors;
00594 bool my_reserved;
00595 bool my_has_cached_item;
00596 output_type my_cached_item;
00597
00598 friend class internal::source_task< source_node< output_type > >;
00599
00601 void apply_body( ) {
00602 output_type v;
00603 if ( try_reserve(v) == false )
00604 return;
00605
00606 if ( my_successors.try_put( v ) )
00607 try_consume();
00608 else
00609 try_release();
00610 }
00611
00613 void spawn_put( ) {
00614 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00615 internal::source_task< source_node< output_type > >( *this ) );
00616 }
00617 };
00618
00620 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00621 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00622 using graph_node::my_graph;
00623 public:
00624 typedef Input input_type;
00625 typedef Output output_type;
00626 typedef sender< input_type > predecessor_type;
00627 typedef receiver< output_type > successor_type;
00628 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00629 typedef internal::function_output<output_type> fOutput_type;
00630
00632 template< typename Body >
00633 function_node( graph &g, size_t concurrency, Body body ) :
00634 graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body)
00635 {}
00636
00638 function_node( const function_node& src ) :
00639 graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
00640 fOutput_type()
00641 {}
00642
00643 bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00644
00645 protected:
00646 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00647 };
00648
00650 template < typename Input, typename Output, typename Allocator >
00651 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00652 using graph_node::my_graph;
00653 public:
00654 typedef Input input_type;
00655 typedef Output output_type;
00656 typedef sender< input_type > predecessor_type;
00657 typedef receiver< output_type > successor_type;
00658 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00659 typedef internal::function_input_queue<input_type, Allocator> queue_type;
00660 typedef internal::function_output<output_type> fOutput_type;
00661
00663 template< typename Body >
00664 function_node( graph &g, size_t concurrency, Body body ) :
00665 graph_node(g), fInput_type( g, concurrency, body, new queue_type() )
00666 {}
00667
00669 function_node( const function_node& src ) :
00670 graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOutput_type()
00671 {}
00672
00673 bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00674
00675 protected:
00676 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00677 };
00678
00679 #include "tbb/internal/_flow_graph_types_impl.h"
00680
00682
00683 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00684 class multifunction_node :
00685 public graph_node,
00686 public internal::multifunction_input
00687 <
00688 Input,
00689 typename internal::wrap_tuple_elements<
00690 std::tuple_size<Output>::value,
00691 internal::function_output,
00692 Output
00693 >::type,
00694 Allocator
00695 > {
00696 using graph_node::my_graph;
00697 private:
00698 static const int N = std::tuple_size<Output>::value;
00699 public:
00700 typedef Input input_type;
00701 typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type output_ports_type;
00702 private:
00703 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00704 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00705 public:
00706 template<typename Body>
00707 multifunction_node( graph &g, size_t concurrency, Body body ) :
00708 graph_node(g), base_type(g,concurrency, body)
00709 {}
00710 multifunction_node( const multifunction_node &other) :
00711 graph_node(other.my_graph), base_type(other)
00712 {}
00713
00714 };
00715
00716 template < typename Input, typename Output, typename Allocator >
00717 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
00718 typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00719 using graph_node::my_graph;
00720 static const int N = std::tuple_size<Output>::value;
00721 public:
00722 typedef Input input_type;
00723 typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type output_ports_type;
00724 private:
00725 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00726 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00727 public:
00728 template<typename Body>
00729 multifunction_node( graph &g, size_t concurrency, Body body) :
00730 graph_node(g), base_type(g,concurrency, body, new queue_type())
00731 {}
00732 multifunction_node( const multifunction_node &other) :
00733 graph_node(other.my_graph), base_type(other, new queue_type())
00734 {}
00735 };
00736
00738
00739
00740 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00741 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
00742 static const int N = std::tuple_size<TupleType>::value;
00743 typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
00744 public:
00745 typedef typename base_type::output_ports_type output_ports_type;
00746 private:
00747 struct splitting_body {
00748 void operator()(const TupleType& t, output_ports_type &p) {
00749 internal::emit_element<N>::emit_this(t, p);
00750 }
00751 };
00752 public:
00753 typedef TupleType input_type;
00754 typedef Allocator allocator_type;
00755 split_node(graph &g) : base_type(g, unlimited, splitting_body()) {}
00756 split_node( const split_node & other) : base_type(other) {}
00757 };
00758
00760 template <typename Output>
00761 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00762 using graph_node::my_graph;
00763 public:
00764 typedef continue_msg input_type;
00765 typedef Output output_type;
00766 typedef sender< input_type > predecessor_type;
00767 typedef receiver< output_type > successor_type;
00768 typedef internal::function_output<output_type> fOutput_type;
00769
00771 template <typename Body >
00772 continue_node( graph &g, Body body ) :
00773 graph_node(g), internal::continue_input<output_type>( g, body )
00774 {}
00775
00777 template <typename Body >
00778 continue_node( graph &g, int number_of_predecessors, Body body ) :
00779 graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body )
00780 {}
00781
00783 continue_node( const continue_node& src ) :
00784 graph_node(src.my_graph), internal::continue_input<output_type>(src),
00785 internal::function_output<Output>()
00786 {}
00787
00788 protected:
00789 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00790 };
00791
00792 template< typename T >
00793 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00794 using graph_node::my_graph;
00795 public:
00796 typedef T input_type;
00797 typedef T output_type;
00798 typedef sender< input_type > predecessor_type;
00799 typedef receiver< output_type > successor_type;
00800
00801 overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
00802 my_successors.set_owner( this );
00803 }
00804
00805
00806 overwrite_node( const overwrite_node& src ) :
00807 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
00808 {
00809 my_successors.set_owner( this );
00810 }
00811
00812 ~overwrite_node() {}
00813
00814 bool register_successor( successor_type &s ) {
00815 spin_mutex::scoped_lock l( my_mutex );
00816 if ( my_buffer_is_valid ) {
00817
00818 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
00819
00820 my_successors.register_successor( s );
00821 return true;
00822 } else {
00823
00824 return false;
00825 }
00826 } else {
00827
00828 my_successors.register_successor( s );
00829 return true;
00830 }
00831 }
00832
00833 bool remove_successor( successor_type &s ) {
00834 spin_mutex::scoped_lock l( my_mutex );
00835 my_successors.remove_successor(s);
00836 return true;
00837 }
00838
00839 bool try_put( const T &v ) {
00840 spin_mutex::scoped_lock l( my_mutex );
00841 my_buffer = v;
00842 my_buffer_is_valid = true;
00843 my_successors.try_put(v);
00844 return true;
00845 }
00846
00847 bool try_get( T &v ) {
00848 spin_mutex::scoped_lock l( my_mutex );
00849 if ( my_buffer_is_valid ) {
00850 v = my_buffer;
00851 return true;
00852 } else {
00853 return false;
00854 }
00855 }
00856
00857 bool is_valid() {
00858 spin_mutex::scoped_lock l( my_mutex );
00859 return my_buffer_is_valid;
00860 }
00861
00862 void clear() {
00863 spin_mutex::scoped_lock l( my_mutex );
00864 my_buffer_is_valid = false;
00865 }
00866
00867 protected:
00868 spin_mutex my_mutex;
00869 internal::broadcast_cache< T, null_rw_mutex > my_successors;
00870 T my_buffer;
00871 bool my_buffer_is_valid;
00872 };
00873
00874 template< typename T >
00875 class write_once_node : public overwrite_node<T> {
00876 public:
00877 typedef T input_type;
00878 typedef T output_type;
00879 typedef sender< input_type > predecessor_type;
00880 typedef receiver< output_type > successor_type;
00881
00883 write_once_node(graph& g) : overwrite_node<T>(g) {}
00884
00886 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00887
00888 bool try_put( const T &v ) {
00889 spin_mutex::scoped_lock l( this->my_mutex );
00890 if ( this->my_buffer_is_valid ) {
00891 return false;
00892 } else {
00893 this->my_buffer = v;
00894 this->my_buffer_is_valid = true;
00895 this->my_successors.try_put(v);
00896 return true;
00897 }
00898 }
00899 };
00900
00902 template <typename T>
00903 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00904 using graph_node::my_graph;
00905 internal::broadcast_cache<T> my_successors;
00906 public:
00907 typedef T input_type;
00908 typedef T output_type;
00909 typedef sender< input_type > predecessor_type;
00910 typedef receiver< output_type > successor_type;
00911
00912 broadcast_node(graph& g) : graph_node(g) {
00913 my_successors.set_owner( this );
00914 }
00915
00916
00917 broadcast_node( const broadcast_node& src ) :
00918 graph_node(src.my_graph), receiver<T>(), sender<T>()
00919 {
00920 my_successors.set_owner( this );
00921 }
00922
00924 virtual bool register_successor( receiver<T> &r ) {
00925 my_successors.register_successor( r );
00926 return true;
00927 }
00928
00930 virtual bool remove_successor( receiver<T> &r ) {
00931 my_successors.remove_successor( r );
00932 return true;
00933 }
00934
00935 bool try_put( const T &t ) {
00936 my_successors.try_put(t);
00937 return true;
00938 }
00939 };
00940
00941 #include "internal/_flow_graph_item_buffer_impl.h"
00942
00944 template <typename T, typename A=cache_aligned_allocator<T> >
00945 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00946 using graph_node::my_graph;
00947 public:
00948 typedef T input_type;
00949 typedef T output_type;
00950 typedef sender< input_type > predecessor_type;
00951 typedef receiver< output_type > successor_type;
00952 typedef buffer_node<T, A> my_class;
00953 protected:
00954 typedef size_t size_type;
00955 internal::round_robin_cache< T, null_rw_mutex > my_successors;
00956
00957 task *my_parent;
00958
00959 friend class internal::forward_task< buffer_node< T, A > >;
00960
00961 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00962 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00963
00964
00965 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00966 public:
00967 char type;
00968 T *elem;
00969 successor_type *r;
00970 buffer_operation(const T& e, op_type t) :
00971 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
00972 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
00973 };
00974
00975 bool forwarder_busy;
00976 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
00977 friend class internal::aggregating_functor<my_class, buffer_operation>;
00978 internal::aggregator< my_handler, buffer_operation> my_aggregator;
00979
00980 virtual void handle_operations(buffer_operation *op_list) {
00981 buffer_operation *tmp;
00982 bool try_forwarding=false;
00983 while (op_list) {
00984 tmp = op_list;
00985 op_list = op_list->next;
00986 switch (tmp->type) {
00987 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
00988 case rem_succ: internal_rem_succ(tmp); break;
00989 case req_item: internal_pop(tmp); break;
00990 case res_item: internal_reserve(tmp); break;
00991 case rel_res: internal_release(tmp); try_forwarding = true; break;
00992 case con_res: internal_consume(tmp); try_forwarding = true; break;
00993 case put_item: internal_push(tmp); try_forwarding = true; break;
00994 case try_fwd: internal_forward(tmp); break;
00995 }
00996 }
00997 if (try_forwarding && !forwarder_busy) {
00998 forwarder_busy = true;
00999 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
01000 }
01001 }
01002
01004 virtual void forward() {
01005 buffer_operation op_data(try_fwd);
01006 do {
01007 op_data.status = WAIT;
01008 my_aggregator.execute(&op_data);
01009 } while (op_data.status == SUCCEEDED);
01010 }
01011
01013 virtual void internal_reg_succ(buffer_operation *op) {
01014 my_successors.register_successor(*(op->r));
01015 __TBB_store_with_release(op->status, SUCCEEDED);
01016 }
01017
01019 virtual void internal_rem_succ(buffer_operation *op) {
01020 my_successors.remove_successor(*(op->r));
01021 __TBB_store_with_release(op->status, SUCCEEDED);
01022 }
01023
01025 virtual void internal_forward(buffer_operation *op) {
01026 T i_copy;
01027 bool success = false;
01028 size_type counter = my_successors.size();
01029
01030 while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
01031 this->fetch_back(i_copy);
01032 if( my_successors.try_put(i_copy) ) {
01033 this->invalidate_back();
01034 --(this->my_tail);
01035 success = true;
01036 }
01037 --counter;
01038 }
01039 if (success && !counter)
01040 __TBB_store_with_release(op->status, SUCCEEDED);
01041 else {
01042 __TBB_store_with_release(op->status, FAILED);
01043 forwarder_busy = false;
01044 }
01045 }
01046
01047 virtual void internal_push(buffer_operation *op) {
01048 this->push_back(*(op->elem));
01049 __TBB_store_with_release(op->status, SUCCEEDED);
01050 }
01051
01052 virtual void internal_pop(buffer_operation *op) {
01053 if(this->pop_back(*(op->elem))) {
01054 __TBB_store_with_release(op->status, SUCCEEDED);
01055 }
01056 else {
01057 __TBB_store_with_release(op->status, FAILED);
01058 }
01059 }
01060
01061 virtual void internal_reserve(buffer_operation *op) {
01062 if(this->reserve_front(*(op->elem))) {
01063 __TBB_store_with_release(op->status, SUCCEEDED);
01064 }
01065 else {
01066 __TBB_store_with_release(op->status, FAILED);
01067 }
01068 }
01069
01070 virtual void internal_consume(buffer_operation *op) {
01071 this->consume_front();
01072 __TBB_store_with_release(op->status, SUCCEEDED);
01073 }
01074
01075 virtual void internal_release(buffer_operation *op) {
01076 this->release_front();
01077 __TBB_store_with_release(op->status, SUCCEEDED);
01078 }
01079
01080 public:
01082 buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(),
01083 my_parent( g.root_task() ), forwarder_busy(false) {
01084 my_successors.set_owner(this);
01085 my_aggregator.initialize_handler(my_handler(this));
01086 }
01087
01089 buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
01090 reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
01091 my_parent( src.my_parent ) {
01092 forwarder_busy = false;
01093 my_successors.set_owner(this);
01094 my_aggregator.initialize_handler(my_handler(this));
01095 }
01096
01097 virtual ~buffer_node() {}
01098
01099
01100
01101
01102
01104
01105 bool register_successor( receiver<output_type> &r ) {
01106 buffer_operation op_data(reg_succ);
01107 op_data.r = &r;
01108 my_aggregator.execute(&op_data);
01109 return true;
01110 }
01111
01113
01115 bool remove_successor( receiver<output_type> &r ) {
01116 r.remove_predecessor(*this);
01117 buffer_operation op_data(rem_succ);
01118 op_data.r = &r;
01119 my_aggregator.execute(&op_data);
01120 return true;
01121 }
01122
01124
01126 bool try_get( T &v ) {
01127 buffer_operation op_data(req_item);
01128 op_data.elem = &v;
01129 my_aggregator.execute(&op_data);
01130 return (op_data.status==SUCCEEDED);
01131 }
01132
01134
01136 bool try_reserve( T &v ) {
01137 buffer_operation op_data(res_item);
01138 op_data.elem = &v;
01139 my_aggregator.execute(&op_data);
01140 return (op_data.status==SUCCEEDED);
01141 }
01142
01144
01145 bool try_release() {
01146 buffer_operation op_data(rel_res);
01147 my_aggregator.execute(&op_data);
01148 return true;
01149 }
01150
01152
01153 bool try_consume() {
01154 buffer_operation op_data(con_res);
01155 my_aggregator.execute(&op_data);
01156 return true;
01157 }
01158
01160
01161 bool try_put(const T &t) {
01162 buffer_operation op_data(t, put_item);
01163 my_aggregator.execute(&op_data);
01164 return true;
01165 }
01166 };
01167
01169 template <typename T, typename A=cache_aligned_allocator<T> >
01170 class queue_node : public buffer_node<T, A> {
01171 protected:
01172 typedef typename buffer_node<T, A>::size_type size_type;
01173 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01174
01175 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01176
01178 void internal_forward(queue_operation *op) {
01179 T i_copy;
01180 bool success = false;
01181 size_type counter = this->my_successors.size();
01182 if (this->my_reserved || !this->item_valid(this->my_head)) {
01183 __TBB_store_with_release(op->status, FAILED);
01184 this->forwarder_busy = false;
01185 return;
01186 }
01187
01188 while (counter>0 && this->item_valid(this->my_head)) {
01189 this->fetch_front(i_copy);
01190 if(this->my_successors.try_put(i_copy)) {
01191 this->invalidate_front();
01192 ++(this->my_head);
01193 success = true;
01194 }
01195 --counter;
01196 }
01197 if (success && !counter)
01198 __TBB_store_with_release(op->status, SUCCEEDED);
01199 else {
01200 __TBB_store_with_release(op->status, FAILED);
01201 this->forwarder_busy = false;
01202 }
01203 }
01204
01205 void internal_pop(queue_operation *op) {
01206 if ( this->my_reserved || !this->item_valid(this->my_head)){
01207 __TBB_store_with_release(op->status, FAILED);
01208 }
01209 else {
01210 this->pop_front(*(op->elem));
01211 __TBB_store_with_release(op->status, SUCCEEDED);
01212 }
01213 }
01214 void internal_reserve(queue_operation *op) {
01215 if (this->my_reserved || !this->item_valid(this->my_head)) {
01216 __TBB_store_with_release(op->status, FAILED);
01217 }
01218 else {
01219 this->my_reserved = true;
01220 this->fetch_front(*(op->elem));
01221 this->invalidate_front();
01222 __TBB_store_with_release(op->status, SUCCEEDED);
01223 }
01224 }
01225 void internal_consume(queue_operation *op) {
01226 this->consume_front();
01227 __TBB_store_with_release(op->status, SUCCEEDED);
01228 }
01229
01230 public:
01231 typedef T input_type;
01232 typedef T output_type;
01233 typedef sender< input_type > predecessor_type;
01234 typedef receiver< output_type > successor_type;
01235
01237 queue_node( graph &g ) : buffer_node<T, A>(g) {}
01238
01240 queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01241 };
01242
01244 template< typename T, typename A=cache_aligned_allocator<T> >
01245 class sequencer_node : public queue_node<T, A> {
01246 internal::function_body< T, size_t > *my_sequencer;
01247 public:
01248 typedef T input_type;
01249 typedef T output_type;
01250 typedef sender< input_type > predecessor_type;
01251 typedef receiver< output_type > successor_type;
01252
01254 template< typename Sequencer >
01255 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01256 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01257
01259 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01260 my_sequencer( src.my_sequencer->clone() ) {}
01261
01263 ~sequencer_node() { delete my_sequencer; }
01264 protected:
01265 typedef typename buffer_node<T, A>::size_type size_type;
01266 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01267
01268 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01269
01270 private:
01271 void internal_push(sequencer_operation *op) {
01272 size_type tag = (*my_sequencer)(*(op->elem));
01273
01274 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01275
01276 if(this->size() > this->capacity())
01277 this->grow_my_array(this->size());
01278 this->item(tag) = std::make_pair( *(op->elem), true );
01279 __TBB_store_with_release(op->status, SUCCEEDED);
01280 }
01281 };
01282
01284 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01285 class priority_queue_node : public buffer_node<T, A> {
01286 public:
01287 typedef T input_type;
01288 typedef T output_type;
01289 typedef sender< input_type > predecessor_type;
01290 typedef receiver< output_type > successor_type;
01291
01293 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01294
01296 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01297
01298 protected:
01299 typedef typename buffer_node<T, A>::size_type size_type;
01300 typedef typename buffer_node<T, A>::item_type item_type;
01301 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01302
01303 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01304
01305 void handle_operations(prio_operation *op_list) {
01306 prio_operation *tmp ;
01307 bool try_forwarding=false;
01308 while (op_list) {
01309 tmp = op_list;
01310 op_list = op_list->next;
01311 switch (tmp->type) {
01312 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01313 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01314 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01315 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01316 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01317 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01318 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01319 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01320 }
01321 }
01322
01323 if (mark<this->my_tail) heapify();
01324 if (try_forwarding && !this->forwarder_busy) {
01325 this->forwarder_busy = true;
01326 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01327 }
01328 }
01329
01331 void internal_forward(prio_operation *op) {
01332 T i_copy;
01333 bool success = false;
01334 size_type counter = this->my_successors.size();
01335
01336 if (this->my_reserved || this->my_tail == 0) {
01337 __TBB_store_with_release(op->status, FAILED);
01338 this->forwarder_busy = false;
01339 return;
01340 }
01341
01342 while (counter>0 && this->my_tail > 0) {
01343 i_copy = this->my_array[0].first;
01344 bool msg = this->my_successors.try_put(i_copy);
01345 if ( msg == true ) {
01346 if (mark == this->my_tail) --mark;
01347 --(this->my_tail);
01348 this->my_array[0].first=this->my_array[this->my_tail].first;
01349 if (this->my_tail > 1)
01350 reheap();
01351 success = true;
01352 }
01353 --counter;
01354 }
01355 if (success && !counter)
01356 __TBB_store_with_release(op->status, SUCCEEDED);
01357 else {
01358 __TBB_store_with_release(op->status, FAILED);
01359 this->forwarder_busy = false;
01360 }
01361 }
01362
01363 void internal_push(prio_operation *op) {
01364 if ( this->my_tail >= this->my_array_size )
01365 this->grow_my_array( this->my_tail + 1 );
01366 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01367 ++(this->my_tail);
01368 __TBB_store_with_release(op->status, SUCCEEDED);
01369 }
01370 void internal_pop(prio_operation *op) {
01371 if ( this->my_reserved == true || this->my_tail == 0 ) {
01372 __TBB_store_with_release(op->status, FAILED);
01373 }
01374 else {
01375 if (mark<this->my_tail &&
01376 compare(this->my_array[0].first,
01377 this->my_array[this->my_tail-1].first)) {
01378
01379
01380 *(op->elem) = this->my_array[this->my_tail-1].first;
01381 --(this->my_tail);
01382 __TBB_store_with_release(op->status, SUCCEEDED);
01383 }
01384 else {
01385 *(op->elem) = this->my_array[0].first;
01386 if (mark == this->my_tail) --mark;
01387 --(this->my_tail);
01388 __TBB_store_with_release(op->status, SUCCEEDED);
01389 this->my_array[0].first=this->my_array[this->my_tail].first;
01390 if (this->my_tail > 1)
01391 reheap();
01392 }
01393 }
01394 }
01395 void internal_reserve(prio_operation *op) {
01396 if (this->my_reserved == true || this->my_tail == 0) {
01397 __TBB_store_with_release(op->status, FAILED);
01398 }
01399 else {
01400 this->my_reserved = true;
01401 *(op->elem) = reserved_item = this->my_array[0].first;
01402 if (mark == this->my_tail) --mark;
01403 --(this->my_tail);
01404 __TBB_store_with_release(op->status, SUCCEEDED);
01405 this->my_array[0].first = this->my_array[this->my_tail].first;
01406 if (this->my_tail > 1)
01407 reheap();
01408 }
01409 }
01410 void internal_consume(prio_operation *op) {
01411 this->my_reserved = false;
01412 __TBB_store_with_release(op->status, SUCCEEDED);
01413 }
01414 void internal_release(prio_operation *op) {
01415 if (this->my_tail >= this->my_array_size)
01416 this->grow_my_array( this->my_tail + 1 );
01417 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01418 ++(this->my_tail);
01419 this->my_reserved = false;
01420 __TBB_store_with_release(op->status, SUCCEEDED);
01421 heapify();
01422 }
01423 private:
01424 Compare compare;
01425 size_type mark;
01426 input_type reserved_item;
01427
01428 void heapify() {
01429 if (!mark) mark = 1;
01430 for (; mark<this->my_tail; ++mark) {
01431 size_type cur_pos = mark;
01432 input_type to_place = this->my_array[mark].first;
01433 do {
01434 size_type parent = (cur_pos-1)>>1;
01435 if (!compare(this->my_array[parent].first, to_place))
01436 break;
01437 this->my_array[cur_pos].first = this->my_array[parent].first;
01438 cur_pos = parent;
01439 } while( cur_pos );
01440 this->my_array[cur_pos].first = to_place;
01441 }
01442 }
01443
01444 void reheap() {
01445 size_type cur_pos=0, child=1;
01446 while (child < mark) {
01447 size_type target = child;
01448 if (child+1<mark &&
01449 compare(this->my_array[child].first,
01450 this->my_array[child+1].first))
01451 ++target;
01452
01453 if (compare(this->my_array[target].first,
01454 this->my_array[this->my_tail].first))
01455 break;
01456 this->my_array[cur_pos].first = this->my_array[target].first;
01457 cur_pos = target;
01458 child = (cur_pos<<1)+1;
01459 }
01460 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01461 }
01462 };
01463
01465
01468 template< typename T >
01469 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01470 using graph_node::my_graph;
01471 public:
01472 typedef T input_type;
01473 typedef T output_type;
01474 typedef sender< input_type > predecessor_type;
01475 typedef receiver< output_type > successor_type;
01476
01477 private:
01478 task *my_root_task;
01479 size_t my_threshold;
01480 size_t my_count;
01481 internal::predecessor_cache< T > my_predecessors;
01482 spin_mutex my_mutex;
01483 internal::broadcast_cache< T > my_successors;
01484 int init_decrement_predecessors;
01485
01486 friend class internal::forward_task< limiter_node<T> >;
01487
01488
01489 friend class internal::decrementer< limiter_node<T> >;
01490
01491 void decrement_counter() {
01492 input_type v;
01493
01494
01495 if ( my_predecessors.get_item( v ) == false
01496 || my_successors.try_put(v) == false ) {
01497 spin_mutex::scoped_lock lock(my_mutex);
01498 --my_count;
01499 if ( !my_predecessors.empty() )
01500 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01501 internal::forward_task< limiter_node<T> >( *this ) );
01502 }
01503 }
01504
01505 void forward() {
01506 {
01507 spin_mutex::scoped_lock lock(my_mutex);
01508 if ( my_count < my_threshold )
01509 ++my_count;
01510 else
01511 return;
01512 }
01513 decrement_counter();
01514 }
01515
01516 public:
01518 internal::decrementer< limiter_node<T> > decrement;
01519
01521 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01522 graph_node(g), my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01523 init_decrement_predecessors(num_decrement_predecessors),
01524 decrement(num_decrement_predecessors)
01525 {
01526 my_predecessors.set_owner(this);
01527 my_successors.set_owner(this);
01528 decrement.set_owner(this);
01529 }
01530
01532 limiter_node( const limiter_node& src ) :
01533 graph_node(src.my_graph), receiver<T>(), sender<T>(),
01534 my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01535 init_decrement_predecessors(src.init_decrement_predecessors),
01536 decrement(src.init_decrement_predecessors)
01537 {
01538 my_predecessors.set_owner(this);
01539 my_successors.set_owner(this);
01540 decrement.set_owner(this);
01541 }
01542
01544 bool register_successor( receiver<output_type> &r ) {
01545 my_successors.register_successor(r);
01546 return true;
01547 }
01548
01550
01551 bool remove_successor( receiver<output_type> &r ) {
01552 r.remove_predecessor(*this);
01553 my_successors.remove_successor(r);
01554 return true;
01555 }
01556
01558 bool try_put( const T &t ) {
01559 {
01560 spin_mutex::scoped_lock lock(my_mutex);
01561 if ( my_count >= my_threshold )
01562 return false;
01563 else
01564 ++my_count;
01565 }
01566
01567 bool msg = my_successors.try_put(t);
01568
01569 if ( msg != true ) {
01570 spin_mutex::scoped_lock lock(my_mutex);
01571 --my_count;
01572 if ( !my_predecessors.empty() )
01573 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01574 internal::forward_task< limiter_node<T> >( *this ) );
01575 }
01576
01577 return msg;
01578 }
01579
01581 bool register_predecessor( predecessor_type &src ) {
01582 spin_mutex::scoped_lock lock(my_mutex);
01583 my_predecessors.add( src );
01584 if ( my_count < my_threshold && !my_successors.empty() )
01585 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01586 internal::forward_task< limiter_node<T> >( *this ) );
01587 return true;
01588 }
01589
01591 bool remove_predecessor( predecessor_type &src ) {
01592 my_predecessors.remove( src );
01593 return true;
01594 }
01595 };
01596
01597 #include "internal/_flow_graph_join_impl.h"
01598
01599 using internal::reserving_port;
01600 using internal::queueing_port;
01601 using internal::tag_matching_port;
01602 using internal::input_port;
01603 using internal::tag_value;
01604 using internal::NO_TAG;
01605
01606 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01607
01608 template<typename OutputTuple>
01609 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01610 private:
01611 static const int N = std::tuple_size<OutputTuple>::value;
01612 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01613 public:
01614 typedef OutputTuple output_type;
01615 typedef typename unfolded_type::input_ports_type input_ports_type;
01616 join_node(graph &g) : unfolded_type(g) { }
01617 join_node(const join_node &other) : unfolded_type(other) {}
01618 };
01619
01620 template<typename OutputTuple>
01621 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01622 private:
01623 static const int N = std::tuple_size<OutputTuple>::value;
01624 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01625 public:
01626 typedef OutputTuple output_type;
01627 typedef typename unfolded_type::input_ports_type input_ports_type;
01628 join_node(graph &g) : unfolded_type(g) { }
01629 join_node(const join_node &other) : unfolded_type(other) {}
01630 };
01631
01632
01633 template<typename OutputTuple>
01634 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01635 tag_matching_port, OutputTuple, tag_matching> {
01636 private:
01637 static const int N = std::tuple_size<OutputTuple>::value;
01638 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01639 public:
01640 typedef OutputTuple output_type;
01641 typedef typename unfolded_type::input_ports_type input_ports_type;
01642 template<typename B0, typename B1>
01643 join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01644 template<typename B0, typename B1, typename B2>
01645 join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01646 template<typename B0, typename B1, typename B2, typename B3>
01647 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01648 template<typename B0, typename B1, typename B2, typename B3, typename B4>
01649 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01650 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01651 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01652 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01653 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01654 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01655 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01656 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01657 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01658 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01659 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01660 join_node(const join_node &other) : unfolded_type(other) {}
01661 };
01662
01663 #if TBB_PREVIEW_GRAPH_NODES
01664
01665 #include "internal/_flow_graph_or_impl.h"
01666
01667 template<typename InputTuple>
01668 class or_node : public internal::unfolded_or_node<InputTuple> {
01669 private:
01670 static const int N = std::tuple_size<InputTuple>::value;
01671 public:
01672 typedef typename internal::or_output_type<InputTuple>::type output_type;
01673 typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01674 or_node(graph& g) : unfolded_type(g) { }
01675
01676 or_node( const or_node& other ) : unfolded_type(other) { }
01677 };
01678 #endif // TBB_PREVIEW_GRAPH_NODES
01679
01681 template< typename T >
01682 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01683 p.register_successor( s );
01684 }
01685
01687 template< typename T >
01688 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01689 p.remove_successor( s );
01690 }
01691
01693 template< typename Body, typename Node >
01694 Body copy_body( Node &n ) {
01695 return n.template copy_function_object<Body>();
01696 }
01697
01698 }
01699
01700 using interface6::graph;
01701 using interface6::graph_node;
01702 using interface6::continue_msg;
01703 using interface6::sender;
01704 using interface6::receiver;
01705 using interface6::continue_receiver;
01706
01707 using interface6::source_node;
01708 using interface6::function_node;
01709 using interface6::multifunction_node;
01710 using interface6::split_node;
01711 using interface6::internal::output_port;
01712 #if TBB_PREVIEW_GRAPH_NODES
01713 using interface6::or_node;
01714 #endif
01715 using interface6::continue_node;
01716 using interface6::overwrite_node;
01717 using interface6::write_once_node;
01718 using interface6::broadcast_node;
01719 using interface6::buffer_node;
01720 using interface6::queue_node;
01721 using interface6::sequencer_node;
01722 using interface6::priority_queue_node;
01723 using interface6::limiter_node;
01724 using namespace interface6::internal::graph_policy_namespace;
01725 using interface6::join_node;
01726 using interface6::input_port;
01727 using interface6::copy_body;
01728 using interface6::make_edge;
01729 using interface6::remove_edge;
01730 using interface6::internal::NO_TAG;
01731 using interface6::internal::tag_value;
01732
01733 }
01734 }
01735
01736 #endif // __TBB_flow_graph_H