flow_graph.h

Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2012 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033 
00034 // use the VC10 or gcc version of tuple if it is available.
00035 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00036 #define TBB_PREVIEW_TUPLE 1
00037 #include "compat/tuple"
00038 #else
00039 #include <tuple>
00040 #endif
00041 
00042 #include<list>
00043 #include<queue>
00044 
00055 namespace tbb {
00056 namespace flow {
00057 
00059 enum concurrency { unlimited = 0, serial = 1 };
00060 
00061 namespace interface6 {
00062 
00064 class continue_msg {};
00065 
00066 template< typename T > class sender;
00067 template< typename T > class receiver;
00068 class continue_receiver;
00069 
00071 template< typename T >
00072 class sender {
00073 public:
00075     typedef T output_type;
00076 
00078     typedef receiver<T> successor_type;
00079 
00080     virtual ~sender() {}
00081 
00083     virtual bool register_successor( successor_type &r ) = 0;
00084 
00086     virtual bool remove_successor( successor_type &r ) = 0;
00087 
00089     virtual bool try_get( T & ) { return false; }
00090 
00092     virtual bool try_reserve( T & ) { return false; }
00093 
00095     virtual bool try_release( ) { return false; }
00096 
00098     virtual bool try_consume( ) { return false; }
00099 };
00100 
00102 template< typename T >
00103 class receiver {
00104 public:
00106     typedef T input_type;
00107 
00109     typedef sender<T> predecessor_type;
00110 
00112     virtual ~receiver() {}
00113 
00115     virtual bool try_put( const T& t ) = 0;
00116 
00118     virtual bool register_predecessor( predecessor_type & ) { return false; }
00119 
00121     virtual bool remove_predecessor( predecessor_type & ) { return false; }
00122 };
00123 
00125 
00126 class continue_receiver : public receiver< continue_msg > {
00127 public:
00128 
00130     typedef continue_msg input_type;
00131 
00133     typedef sender< continue_msg > predecessor_type;
00134 
00136     continue_receiver( int number_of_predecessors = 0 ) {
00137         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00138         my_current_count = 0;
00139     }
00140 
00142     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00143         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00144         my_current_count = 0;
00145     }
00146 
00148     virtual ~continue_receiver() { }
00149 
00151     /* override */ bool register_predecessor( predecessor_type & ) {
00152         spin_mutex::scoped_lock l(my_mutex);
00153         ++my_predecessor_count;
00154         return true;
00155     }
00156 
00158 
00161     /* override */ bool remove_predecessor( predecessor_type & ) {
00162         spin_mutex::scoped_lock l(my_mutex);
00163         --my_predecessor_count;
00164         return true;
00165     }
00166 
00168 
00170     /* override */ bool try_put( const input_type & ) {
00171         {
00172             spin_mutex::scoped_lock l(my_mutex);
00173             if ( ++my_current_count < my_predecessor_count )
00174                 return true;
00175             else
00176                 my_current_count = 0;
00177         }
00178         execute();
00179         return true;
00180     }
00181 
00182 protected:
00183     spin_mutex my_mutex;
00184     int my_predecessor_count;
00185     int my_current_count;
00186     int my_initial_predecessor_count;
00187 
00189 
00191     virtual void execute() = 0;
00192 };
00193 
00194 #include "internal/_flow_graph_impl.h"
00195 using namespace internal::graph_policy_namespace;
00196 
00197 class graph;
00198 class graph_node;
00199 
00200 template <typename GraphContainerType, typename GraphNodeType>
00201 class graph_iterator {
00202     friend class graph;
00203     friend class graph_node;
00204 public:
00205     typedef size_t size_type;
00206     typedef GraphNodeType value_type;
00207     typedef GraphNodeType* pointer;
00208     typedef GraphNodeType& reference;
00209     typedef const GraphNodeType& const_reference;
00210     typedef std::forward_iterator_tag iterator_category;
00211 
00213     graph_iterator() : my_graph(NULL), current_node(NULL) {}
00214 
00216     graph_iterator(const graph_iterator& other) :
00217         my_graph(other.my_graph), current_node(other.current_node)
00218     {}
00219 
00221     graph_iterator& operator=(const graph_iterator& other) {
00222         if (this != &other) {
00223             my_graph = other.my_graph;
00224             current_node = other.current_node;
00225         }
00226         return *this;
00227     }
00228 
00230     reference operator*() const;
00231 
00233     pointer operator->() const;
00234 
00236     bool operator==(const graph_iterator& other) const {
00237         return ((my_graph == other.my_graph) && (current_node == other.current_node));
00238     }
00239 
00241     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
00242 
00244     graph_iterator& operator++() {
00245         internal_forward();
00246         return *this;
00247     }
00248 
00250     graph_iterator operator++(int) {
00251         graph_iterator result = *this;
00252         operator++();
00253         return result;
00254     }
00255 
00256 private:
00257     // the graph over which we are iterating
00258     GraphContainerType *my_graph;
00259     // pointer into my_graph's my_nodes list
00260     pointer current_node;
00261 
00263     graph_iterator(GraphContainerType *g, bool begin);
00264     void internal_forward();
00265 };
00266 
00268 
00269 class graph : tbb::internal::no_copy {
00270     friend class graph_node;
00271 
00272     template< typename Body >
00273     class run_task : public task {
00274     public:
00275         run_task( Body& body ) : my_body(body) {}
00276         task *execute() {
00277             my_body();
00278             return NULL;
00279         }
00280     private:
00281         Body my_body;
00282     };
00283 
00284     template< typename Receiver, typename Body >
00285     class run_and_put_task : public task {
00286     public:
00287         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00288         task *execute() {
00289             my_receiver.try_put( my_body() );
00290             return NULL;
00291         }
00292     private:
00293         Receiver &my_receiver;
00294         Body my_body;
00295     };
00296 
00297 public:
00299     explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
00300     {
00301         own_context = true;
00302         my_context = new task_group_context();
00303         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00304         my_root_task->set_ref_count(1);
00305     }
00306 
00308     explicit graph(task_group_context& use_this_context) :
00309     my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
00310     {
00311         own_context = false;
00312         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00313         my_root_task->set_ref_count(1);
00314     }
00315 
00317 
00318     ~graph() {
00319         wait_for_all();
00320         my_root_task->set_ref_count(0);
00321         task::destroy( *my_root_task );
00322         if (own_context) delete my_context;
00323     }
00324 
00326 
00328     void increment_wait_count() {
00329         if (my_root_task)
00330             my_root_task->increment_ref_count();
00331     }
00332 
00334 
00336     void decrement_wait_count() {
00337         if (my_root_task)
00338             my_root_task->decrement_ref_count();
00339     }
00340 
00342 
00344     template< typename Receiver, typename Body >
00345         void run( Receiver &r, Body body ) {
00346        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00347            run_and_put_task< Receiver, Body >( r, body ) );
00348     }
00349 
00351 
00353     template< typename Body >
00354     void run( Body body ) {
00355        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00356            run_task< Body >( body ) );
00357     }
00358 
00360 
00361     void wait_for_all() {
00362         if (my_root_task)
00363             my_root_task->wait_for_all();
00364         my_root_task->set_ref_count(1);
00365     }
00366 
00368     task * root_task() {
00369         return my_root_task;
00370     }
00371 
00372     // ITERATORS
00373     template<typename C, typename N>
00374     friend class graph_iterator;
00375 
00376     // Graph iterator typedefs
00377     typedef graph_iterator<graph,graph_node> iterator;
00378     typedef graph_iterator<const graph,const graph_node> const_iterator;
00379 
00380     // Graph iterator constructors
00382     iterator begin() { return iterator(this, true); }
00384     iterator end() { return iterator(this, false); }
00386     const_iterator begin() const { return const_iterator(this, true); }
00388     const_iterator end() const { return const_iterator(this, false); }
00390     const_iterator cbegin() const { return const_iterator(this, true); }
00392     const_iterator cend() const { return const_iterator(this, false); }
00393 
00394 private:
00395     task *my_root_task;
00396     task_group_context *my_context;
00397     bool own_context;
00398 
00399     graph_node *my_nodes, *my_nodes_last;
00400 
00401     spin_mutex nodelist_mutex;
00402     void register_node(graph_node *n); 
00403     void remove_node(graph_node *n);
00404 };
00405 
00406 template <typename C, typename N>
00407 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
00408 {
00409     if (begin) current_node = my_graph->my_nodes;
00410     //else it is an end iterator by default
00411 }
00412 
00413 template <typename C, typename N>
00414 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
00415     __TBB_ASSERT(current_node, "graph_iterator at end");
00416     return *operator->();
00417 }
00418 
00419 template <typename C, typename N>
00420 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const { 
00421     return current_node;
00422 }
00423 
00424 
00425 template <typename C, typename N>
00426 void graph_iterator<C,N>::internal_forward() {
00427     if (current_node) current_node = current_node->next;
00428 }
00429 
00431 class graph_node : tbb::internal::no_assign {
00432     friend class graph;
00433     template<typename C, typename N>
00434     friend class graph_iterator;
00435 protected:
00436     graph& my_graph;
00437     graph_node *next, *prev;
00438 public:
00439     graph_node(graph& g) : my_graph(g) {
00440         my_graph.register_node(this);
00441     }
00442     virtual ~graph_node() {
00443         my_graph.remove_node(this);
00444     }
00445 };
00446 
00447 void graph::register_node(graph_node *n) {
00448     n->next = NULL;
00449     {
00450         spin_mutex::scoped_lock lock(nodelist_mutex);
00451         n->prev = my_nodes_last;
00452         if (my_nodes_last) my_nodes_last->next = n;
00453         my_nodes_last = n;
00454         if (!my_nodes) my_nodes = n;
00455     }
00456 }
00457 
00458 void graph::remove_node(graph_node *n) {
00459     {
00460         spin_mutex::scoped_lock lock(nodelist_mutex);
00461         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
00462         if (n->prev) n->prev->next = n->next;
00463         if (n->next) n->next->prev = n->prev;
00464         if (my_nodes_last == n) my_nodes_last = n->prev;
00465         if (my_nodes == n) my_nodes = n->next;
00466     }
00467     n->prev = n->next = NULL;
00468 }
00469 
00470 #include "internal/_flow_graph_node_impl.h"
00471 
00473 template < typename Output >
00474 class source_node : public graph_node, public sender< Output > {
00475     using graph_node::my_graph;
00476 public:
00478     typedef Output output_type;
00479 
00481     typedef receiver< Output > successor_type;
00482 
00484     template< typename Body >
00485     source_node( graph &g, Body body, bool is_active = true )
00486         : graph_node(g), my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00487         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00488         my_reserved(false), my_has_cached_item(false)
00489     {
00490         my_successors.set_owner(this);
00491     }
00492 
00494     source_node( const source_node& src ) :
00495         graph_node(src.my_graph), sender<Output>(),
00496         my_root_task( src.my_root_task), my_active(src.init_my_active),
00497         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00498         my_reserved(false), my_has_cached_item(false)
00499     {
00500         my_successors.set_owner(this);
00501     }
00502 
00504     ~source_node() { delete my_body; }
00505 
00507     /* override */ bool register_successor( receiver<output_type> &r ) {
00508         spin_mutex::scoped_lock lock(my_mutex);
00509         my_successors.register_successor(r);
00510         if ( my_active )
00511             spawn_put();
00512         return true;
00513     }
00514 
00516     /* override */ bool remove_successor( receiver<output_type> &r ) {
00517         spin_mutex::scoped_lock lock(my_mutex);
00518         my_successors.remove_successor(r);
00519         return true;
00520     }
00521 
00523     /*override */ bool try_get( output_type &v ) {
00524         spin_mutex::scoped_lock lock(my_mutex);
00525         if ( my_reserved )
00526             return false;
00527 
00528         if ( my_has_cached_item ) {
00529             v = my_cached_item;
00530             my_has_cached_item = false;
00531         } else if ( (*my_body)(v) == false ) {
00532             return false;
00533         }
00534         return true;
00535     }
00536 
00538     /* override */ bool try_reserve( output_type &v ) {
00539         spin_mutex::scoped_lock lock(my_mutex);
00540         if ( my_reserved ) {
00541             return false;
00542         }
00543 
00544         if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00545             my_has_cached_item = true;
00546 
00547         if ( my_has_cached_item ) {
00548             v = my_cached_item;
00549             my_reserved = true;
00550             return true;
00551         } else {
00552             return false;
00553         }
00554     }
00555 
00557 
00558     /* override */ bool try_release( ) {
00559         spin_mutex::scoped_lock lock(my_mutex);
00560         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00561         my_reserved = false;
00562         if(!my_successors.empty())
00563             spawn_put();
00564         return true;
00565     }
00566 
00568     /* override */ bool try_consume( ) {
00569         spin_mutex::scoped_lock lock(my_mutex);
00570         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00571         my_reserved = false;
00572         my_has_cached_item = false;
00573         if ( !my_successors.empty() ) {
00574             spawn_put();
00575         }
00576         return true;
00577     }
00578 
00580     void activate() {
00581         spin_mutex::scoped_lock lock(my_mutex);
00582         my_active = true;
00583         if ( !my_successors.empty() )
00584             spawn_put();
00585     }
00586 
00587 private:
00588     task *my_root_task;
00589     spin_mutex my_mutex;
00590     bool my_active;
00591     bool init_my_active;
00592     internal::source_body<output_type> *my_body;
00593     internal::broadcast_cache< output_type > my_successors;
00594     bool my_reserved;
00595     bool my_has_cached_item;
00596     output_type my_cached_item;
00597 
00598     friend class internal::source_task< source_node< output_type > >;
00599 
00601     /* override */ void apply_body( ) {
00602         output_type v;
00603         if ( try_reserve(v) == false )
00604             return;
00605 
00606         if ( my_successors.try_put( v ) )
00607             try_consume();
00608         else
00609             try_release();
00610     }
00611 
00613     /* override */ void spawn_put( ) {
00614         task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00615            internal::source_task< source_node< output_type > >( *this ) );
00616     }
00617 };
00618 
00620 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00621 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00622     using graph_node::my_graph;
00623 public:
00624     typedef Input input_type;
00625     typedef Output output_type;
00626     typedef sender< input_type > predecessor_type;
00627     typedef receiver< output_type > successor_type;
00628     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00629     typedef internal::function_output<output_type> fOutput_type;
00630 
00632     template< typename Body >
00633     function_node( graph &g, size_t concurrency, Body body ) :
00634         graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body)
00635     {}
00636 
00638     function_node( const function_node& src ) :
00639         graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
00640         fOutput_type()
00641     {}
00642 
00643     bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00644 
00645 protected:
00646     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00647 };
00648 
00650 template < typename Input, typename Output, typename Allocator >
00651 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00652     using graph_node::my_graph;
00653 public:
00654     typedef Input input_type;
00655     typedef Output output_type;
00656     typedef sender< input_type > predecessor_type;
00657     typedef receiver< output_type > successor_type;
00658     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00659     typedef internal::function_input_queue<input_type, Allocator> queue_type;
00660     typedef internal::function_output<output_type> fOutput_type;
00661 
00663     template< typename Body >
00664     function_node( graph &g, size_t concurrency, Body body ) :
00665         graph_node(g), fInput_type( g, concurrency, body, new queue_type() )
00666     {}
00667 
00669     function_node( const function_node& src ) :
00670         graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOutput_type()
00671     {}
00672 
00673     bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00674 
00675 protected:
00676     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00677 };
00678 
00679 #include "tbb/internal/_flow_graph_types_impl.h"
00680 
00682 // Output is a tuple of output types.
00683 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00684 class multifunction_node :
00685     public graph_node,
00686     public internal::multifunction_input
00687     <
00688         Input,
00689         typename internal::wrap_tuple_elements<
00690             std::tuple_size<Output>::value,  // #elements in tuple
00691             internal::function_output,  // wrap this around each element
00692             Output // the tuple providing the types
00693         >::type,
00694         Allocator
00695     > {
00696     using graph_node::my_graph;
00697 private:
00698     static const int N = std::tuple_size<Output>::value;
00699 public:
00700     typedef Input input_type;
00701     typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type output_ports_type;
00702 private:
00703     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00704     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00705 public:
00706     template<typename Body>
00707     multifunction_node( graph &g, size_t concurrency, Body body ) :
00708         graph_node(g), base_type(g,concurrency, body)
00709     {}
00710     multifunction_node( const multifunction_node &other) :
00711         graph_node(other.my_graph), base_type(other)
00712     {}
00713     // all the guts are in multifunction_input...
00714 };  // multifunction_node
00715 
00716 template < typename Input, typename Output, typename Allocator >
00717 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
00718     typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00719     using graph_node::my_graph;
00720     static const int N = std::tuple_size<Output>::value;
00721 public:
00722     typedef Input input_type;
00723     typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type output_ports_type;
00724 private:
00725     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00726     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00727 public:
00728     template<typename Body>
00729     multifunction_node( graph &g, size_t concurrency, Body body) :
00730         graph_node(g), base_type(g,concurrency, body, new queue_type())
00731     {}
00732     multifunction_node( const multifunction_node &other) :
00733         graph_node(other.my_graph), base_type(other, new queue_type())
00734     {}
00735 };  // multifunction_node
00736 
00738 //  successors.  The node has unlimited concurrency, so though it is marked as
00739 //  "rejecting" it does not reject inputs.
00740 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00741 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
00742     static const int N = std::tuple_size<TupleType>::value;
00743     typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
00744 public:
00745     typedef typename base_type::output_ports_type output_ports_type;
00746 private:
00747     struct splitting_body {
00748         void operator()(const TupleType& t, output_ports_type &p) {
00749             internal::emit_element<N>::emit_this(t, p);
00750         }
00751     };
00752 public:
00753     typedef TupleType input_type;
00754     typedef Allocator allocator_type;
00755     split_node(graph &g) : base_type(g, unlimited, splitting_body()) {}
00756     split_node( const split_node & other) : base_type(other) {}
00757 };
00758 
00760 template <typename Output>
00761 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00762     using graph_node::my_graph;
00763 public:
00764     typedef continue_msg input_type;
00765     typedef Output output_type;
00766     typedef sender< input_type > predecessor_type;
00767     typedef receiver< output_type > successor_type;
00768     typedef internal::function_output<output_type> fOutput_type;
00769 
00771     template <typename Body >
00772     continue_node( graph &g, Body body ) :
00773         graph_node(g), internal::continue_input<output_type>( g, body )
00774     {}
00775 
00777     template <typename Body >
00778     continue_node( graph &g, int number_of_predecessors, Body body ) :
00779         graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body )
00780     {}
00781 
00783     continue_node( const continue_node& src ) :
00784         graph_node(src.my_graph), internal::continue_input<output_type>(src),
00785         internal::function_output<Output>()
00786     {}
00787 
00788 protected:
00789     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00790 };
00791 
00792 template< typename T >
00793 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00794     using graph_node::my_graph;
00795 public:
00796     typedef T input_type;
00797     typedef T output_type;
00798     typedef sender< input_type > predecessor_type;
00799     typedef receiver< output_type > successor_type;
00800 
00801     overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
00802         my_successors.set_owner( this );
00803     }
00804 
00805     // Copy constructor; doesn't take anything from src; default won't work
00806     overwrite_node( const overwrite_node& src ) :
00807         graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
00808     {
00809         my_successors.set_owner( this );
00810     }
00811 
00812     ~overwrite_node() {}
00813 
00814     /* override */ bool register_successor( successor_type &s ) {
00815         spin_mutex::scoped_lock l( my_mutex );
00816         if ( my_buffer_is_valid ) {
00817             // We have a valid value that must be forwarded immediately.
00818             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
00819                 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
00820                 my_successors.register_successor( s );
00821                 return true;
00822             } else {
00823                 // We don't add the successor: it rejected our put and we became its predecessor instead
00824                 return false;
00825             }
00826         } else {
00827             // No valid value yet, just add as successor
00828             my_successors.register_successor( s );
00829             return true;
00830         }
00831     }
00832 
00833     /* override */ bool remove_successor( successor_type &s ) {
00834         spin_mutex::scoped_lock l( my_mutex );
00835         my_successors.remove_successor(s);
00836         return true;
00837     }
00838 
00839     /* override */ bool try_put( const T &v ) {
00840         spin_mutex::scoped_lock l( my_mutex );
00841         my_buffer = v;
00842         my_buffer_is_valid = true;
00843         my_successors.try_put(v);
00844         return true;
00845     }
00846 
00847     /* override */ bool try_get( T &v ) {
00848         spin_mutex::scoped_lock l( my_mutex );
00849         if ( my_buffer_is_valid ) {
00850             v = my_buffer;
00851             return true;
00852         } else {
00853             return false;
00854         }
00855     }
00856 
00857     bool is_valid() {
00858        spin_mutex::scoped_lock l( my_mutex );
00859        return my_buffer_is_valid;
00860     }
00861 
00862     void clear() {
00863        spin_mutex::scoped_lock l( my_mutex );
00864        my_buffer_is_valid = false;
00865     }
00866 
00867 protected:
00868     spin_mutex my_mutex;
00869     internal::broadcast_cache< T, null_rw_mutex > my_successors;
00870     T my_buffer;
00871     bool my_buffer_is_valid;
00872 };
00873 
00874 template< typename T >
00875 class write_once_node : public overwrite_node<T> {
00876 public:
00877     typedef T input_type;
00878     typedef T output_type;
00879     typedef sender< input_type > predecessor_type;
00880     typedef receiver< output_type > successor_type;
00881 
00883     write_once_node(graph& g) : overwrite_node<T>(g) {}
00884 
00886     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00887 
00888     /* override */ bool try_put( const T &v ) {
00889         spin_mutex::scoped_lock l( this->my_mutex );
00890         if ( this->my_buffer_is_valid ) {
00891             return false;
00892         } else {
00893             this->my_buffer = v;
00894             this->my_buffer_is_valid = true;
00895             this->my_successors.try_put(v);
00896             return true;
00897         }
00898     }
00899 };
00900 
00902 template <typename T>
00903 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00904     using graph_node::my_graph;
00905     internal::broadcast_cache<T> my_successors;
00906 public:
00907     typedef T input_type;
00908     typedef T output_type;
00909     typedef sender< input_type > predecessor_type;
00910     typedef receiver< output_type > successor_type;
00911 
00912     broadcast_node(graph& g) : graph_node(g) {
00913         my_successors.set_owner( this );
00914     }
00915 
00916     // Copy constructor
00917     broadcast_node( const broadcast_node& src ) :
00918         graph_node(src.my_graph), receiver<T>(), sender<T>()
00919     {
00920         my_successors.set_owner( this );
00921     }
00922 
00924     virtual bool register_successor( receiver<T> &r ) {
00925         my_successors.register_successor( r );
00926         return true;
00927     }
00928 
00930     virtual bool remove_successor( receiver<T> &r ) {
00931         my_successors.remove_successor( r );
00932         return true;
00933     }
00934 
00935     /* override */ bool try_put( const T &t ) {
00936         my_successors.try_put(t);
00937         return true;
00938     }
00939 };
00940 
00941 #include "internal/_flow_graph_item_buffer_impl.h"
00942 
00944 template <typename T, typename A=cache_aligned_allocator<T> >
00945 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00946     using graph_node::my_graph;
00947 public:
00948     typedef T input_type;
00949     typedef T output_type;
00950     typedef sender< input_type > predecessor_type;
00951     typedef receiver< output_type > successor_type;
00952     typedef buffer_node<T, A> my_class;
00953 protected:
00954     typedef size_t size_type;
00955     internal::round_robin_cache< T, null_rw_mutex > my_successors;
00956 
00957     task *my_parent;
00958 
00959     friend class internal::forward_task< buffer_node< T, A > >;
00960 
00961     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00962     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00963 
00964     // implements the aggregator_operation concept
00965     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00966     public:
00967         char type;
00968         T *elem;
00969         successor_type *r;
00970         buffer_operation(const T& e, op_type t) :
00971             type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
00972         buffer_operation(op_type t) : type(char(t)), r(NULL) {}
00973     };
00974 
00975     bool forwarder_busy;
00976     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
00977     friend class internal::aggregating_functor<my_class, buffer_operation>;
00978     internal::aggregator< my_handler, buffer_operation> my_aggregator;
00979 
00980     virtual void handle_operations(buffer_operation *op_list) {
00981         buffer_operation *tmp;
00982         bool try_forwarding=false;
00983         while (op_list) {
00984             tmp = op_list;
00985             op_list = op_list->next;
00986             switch (tmp->type) {
00987             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
00988             case rem_succ: internal_rem_succ(tmp); break;
00989             case req_item: internal_pop(tmp); break;
00990             case res_item: internal_reserve(tmp); break;
00991             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
00992             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
00993             case put_item: internal_push(tmp);  try_forwarding = true; break;
00994             case try_fwd:  internal_forward(tmp); break;
00995             }
00996         }
00997         if (try_forwarding && !forwarder_busy) {
00998             forwarder_busy = true;
00999             task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
01000         }
01001     }
01002 
01004     virtual void forward() {
01005         buffer_operation op_data(try_fwd);
01006         do {
01007             op_data.status = WAIT;
01008             my_aggregator.execute(&op_data);
01009         } while (op_data.status == SUCCEEDED);
01010     }
01011 
01013     virtual void internal_reg_succ(buffer_operation *op) {
01014         my_successors.register_successor(*(op->r));
01015         __TBB_store_with_release(op->status, SUCCEEDED);
01016     }
01017 
01019     virtual void internal_rem_succ(buffer_operation *op) {
01020         my_successors.remove_successor(*(op->r));
01021         __TBB_store_with_release(op->status, SUCCEEDED);
01022     }
01023 
01025     virtual void internal_forward(buffer_operation *op) {
01026         T i_copy;
01027         bool success = false; // flagged when a successor accepts
01028         size_type counter = my_successors.size();
01029         // Try forwarding, giving each successor a chance
01030         while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
01031             this->fetch_back(i_copy);
01032             if( my_successors.try_put(i_copy) ) {
01033                 this->invalidate_back();
01034                 --(this->my_tail);
01035                 success = true; // found an accepting successor
01036             }
01037             --counter;
01038         }
01039         if (success && !counter)
01040             __TBB_store_with_release(op->status, SUCCEEDED);
01041         else {
01042             __TBB_store_with_release(op->status, FAILED);
01043             forwarder_busy = false;
01044         }
01045     }
01046 
01047     virtual void internal_push(buffer_operation *op) {
01048         this->push_back(*(op->elem));
01049         __TBB_store_with_release(op->status, SUCCEEDED);
01050     }
01051 
01052     virtual void internal_pop(buffer_operation *op) {
01053         if(this->pop_back(*(op->elem))) {
01054             __TBB_store_with_release(op->status, SUCCEEDED);
01055         }
01056         else {
01057             __TBB_store_with_release(op->status, FAILED);
01058         }
01059     }
01060 
01061     virtual void internal_reserve(buffer_operation *op) {
01062         if(this->reserve_front(*(op->elem))) {
01063             __TBB_store_with_release(op->status, SUCCEEDED);
01064         }
01065         else {
01066             __TBB_store_with_release(op->status, FAILED);
01067         }
01068     }
01069 
01070     virtual void internal_consume(buffer_operation *op) {
01071         this->consume_front();
01072         __TBB_store_with_release(op->status, SUCCEEDED);
01073     }
01074 
01075     virtual void internal_release(buffer_operation *op) {
01076         this->release_front();
01077         __TBB_store_with_release(op->status, SUCCEEDED);
01078     }
01079 
01080 public:
01082     buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(),
01083         my_parent( g.root_task() ), forwarder_busy(false) {
01084         my_successors.set_owner(this);
01085         my_aggregator.initialize_handler(my_handler(this));
01086     }
01087 
01089     buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
01090         reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
01091         my_parent( src.my_parent ) {
01092         forwarder_busy = false;
01093         my_successors.set_owner(this);
01094         my_aggregator.initialize_handler(my_handler(this));
01095     }
01096 
01097     virtual ~buffer_node() {}
01098 
01099     //
01100     // message sender implementation
01101     //
01102 
01104 
01105     /* override */ bool register_successor( receiver<output_type> &r ) {
01106         buffer_operation op_data(reg_succ);
01107         op_data.r = &r;
01108         my_aggregator.execute(&op_data);
01109         return true;
01110     }
01111 
01113 
01115     /* override */ bool remove_successor( receiver<output_type> &r ) {
01116         r.remove_predecessor(*this);
01117         buffer_operation op_data(rem_succ);
01118         op_data.r = &r;
01119         my_aggregator.execute(&op_data);
01120         return true;
01121     }
01122 
01124 
01126     /* override */ bool try_get( T &v ) {
01127         buffer_operation op_data(req_item);
01128         op_data.elem = &v;
01129         my_aggregator.execute(&op_data);
01130         return (op_data.status==SUCCEEDED);
01131     }
01132 
01134 
01136     /* override */ bool try_reserve( T &v ) {
01137         buffer_operation op_data(res_item);
01138         op_data.elem = &v;
01139         my_aggregator.execute(&op_data);
01140         return (op_data.status==SUCCEEDED);
01141     }
01142 
01144 
01145     /* override */ bool try_release() {
01146         buffer_operation op_data(rel_res);
01147         my_aggregator.execute(&op_data);
01148         return true;
01149     }
01150 
01152 
01153     /* override */ bool try_consume() {
01154         buffer_operation op_data(con_res);
01155         my_aggregator.execute(&op_data);
01156         return true;
01157     }
01158 
01160 
01161     /* override */ bool try_put(const T &t) {
01162         buffer_operation op_data(t, put_item);
01163         my_aggregator.execute(&op_data);
01164         return true;
01165     }
01166 };
01167 
01169 template <typename T, typename A=cache_aligned_allocator<T> >
01170 class queue_node : public buffer_node<T, A> {
01171 protected:
01172     typedef typename buffer_node<T, A>::size_type size_type;
01173     typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01174 
01175     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01176 
01178     /* override */ void internal_forward(queue_operation *op) {
01179         T i_copy;
01180         bool success = false; // flagged when a successor accepts
01181         size_type counter = this->my_successors.size();
01182         if (this->my_reserved || !this->item_valid(this->my_head)) {
01183             __TBB_store_with_release(op->status, FAILED);
01184             this->forwarder_busy = false;
01185             return;
01186         }
01187         // Keep trying to send items while there is at least one accepting successor
01188         while (counter>0 && this->item_valid(this->my_head)) {
01189             this->fetch_front(i_copy);
01190             if(this->my_successors.try_put(i_copy)) {
01191                  this->invalidate_front();
01192                  ++(this->my_head);
01193                 success = true; // found an accepting successor
01194             }
01195             --counter;
01196         }
01197         if (success && !counter)
01198             __TBB_store_with_release(op->status, SUCCEEDED);
01199         else {
01200             __TBB_store_with_release(op->status, FAILED);
01201             this->forwarder_busy = false;
01202         }
01203     }
01204 
01205     /* override */ void internal_pop(queue_operation *op) {
01206         if ( this->my_reserved || !this->item_valid(this->my_head)){
01207             __TBB_store_with_release(op->status, FAILED);
01208         }
01209         else {
01210             this->pop_front(*(op->elem));
01211             __TBB_store_with_release(op->status, SUCCEEDED);
01212         }
01213     }
01214     /* override */ void internal_reserve(queue_operation *op) {
01215         if (this->my_reserved || !this->item_valid(this->my_head)) {
01216             __TBB_store_with_release(op->status, FAILED);
01217         }
01218         else {
01219             this->my_reserved = true;
01220             this->fetch_front(*(op->elem));
01221             this->invalidate_front();
01222             __TBB_store_with_release(op->status, SUCCEEDED);
01223         }
01224     }
01225     /* override */ void internal_consume(queue_operation *op) {
01226         this->consume_front();
01227         __TBB_store_with_release(op->status, SUCCEEDED);
01228     }
01229 
01230 public:
01231     typedef T input_type;
01232     typedef T output_type;
01233     typedef sender< input_type > predecessor_type;
01234     typedef receiver< output_type > successor_type;
01235 
01237     queue_node( graph &g ) : buffer_node<T, A>(g) {}
01238 
01240     queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01241 };
01242 
01244 template< typename T, typename A=cache_aligned_allocator<T> >
01245 class sequencer_node : public queue_node<T, A> {
01246     internal::function_body< T, size_t > *my_sequencer;
01247 public:
01248     typedef T input_type;
01249     typedef T output_type;
01250     typedef sender< input_type > predecessor_type;
01251     typedef receiver< output_type > successor_type;
01252 
01254     template< typename Sequencer >
01255     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01256         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01257 
01259     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01260         my_sequencer( src.my_sequencer->clone() ) {}
01261 
01263     ~sequencer_node() { delete my_sequencer; }
01264 protected:
01265     typedef typename buffer_node<T, A>::size_type size_type;
01266     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01267 
01268     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01269 
01270 private:
01271     /* override */ void internal_push(sequencer_operation *op) {
01272         size_type tag = (*my_sequencer)(*(op->elem));
01273 
01274         this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01275 
01276         if(this->size() > this->capacity())
01277             this->grow_my_array(this->size());  // tail already has 1 added to it
01278         this->item(tag) = std::make_pair( *(op->elem), true );
01279         __TBB_store_with_release(op->status, SUCCEEDED);
01280     }
01281 };
01282 
01284 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01285 class priority_queue_node : public buffer_node<T, A> {
01286 public:
01287     typedef T input_type;
01288     typedef T output_type;
01289     typedef sender< input_type > predecessor_type;
01290     typedef receiver< output_type > successor_type;
01291 
01293     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01294 
01296     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01297 
01298 protected:
01299     typedef typename buffer_node<T, A>::size_type size_type;
01300     typedef typename buffer_node<T, A>::item_type item_type;
01301     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01302 
01303     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01304 
01305     /* override */ void handle_operations(prio_operation *op_list) {
01306         prio_operation *tmp /*, *pop_list*/ ;
01307         bool try_forwarding=false;
01308         while (op_list) {
01309             tmp = op_list;
01310             op_list = op_list->next;
01311             switch (tmp->type) {
01312             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01313             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01314             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01315             case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01316             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01317             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01318             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01319             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01320             }
01321         }
01322         // process pops!  for now, no special pop processing
01323         if (mark<this->my_tail) heapify();
01324         if (try_forwarding && !this->forwarder_busy) {
01325             this->forwarder_busy = true;
01326             task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01327         }
01328     }
01329 
01331     /* override */ void internal_forward(prio_operation *op) {
01332         T i_copy;
01333         bool success = false; // flagged when a successor accepts
01334         size_type counter = this->my_successors.size();
01335 
01336         if (this->my_reserved || this->my_tail == 0) {
01337             __TBB_store_with_release(op->status, FAILED);
01338             this->forwarder_busy = false;
01339             return;
01340         }
01341         // Keep trying to send while there exists an accepting successor
01342         while (counter>0 && this->my_tail > 0) {
01343             i_copy = this->my_array[0].first;
01344             bool msg = this->my_successors.try_put(i_copy);
01345             if ( msg == true ) {
01346                  if (mark == this->my_tail) --mark;
01347                 --(this->my_tail);
01348                 this->my_array[0].first=this->my_array[this->my_tail].first;
01349                 if (this->my_tail > 1) // don't reheap for heap of size 1
01350                     reheap();
01351                 success = true; // found an accepting successor
01352             }
01353             --counter;
01354         }
01355         if (success && !counter)
01356             __TBB_store_with_release(op->status, SUCCEEDED);
01357         else {
01358             __TBB_store_with_release(op->status, FAILED);
01359             this->forwarder_busy = false;
01360         }
01361     }
01362 
01363     /* override */ void internal_push(prio_operation *op) {
01364         if ( this->my_tail >= this->my_array_size )
01365             this->grow_my_array( this->my_tail + 1 );
01366         this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01367         ++(this->my_tail);
01368         __TBB_store_with_release(op->status, SUCCEEDED);
01369     }
01370     /* override */ void internal_pop(prio_operation *op) {
01371         if ( this->my_reserved == true || this->my_tail == 0 ) {
01372             __TBB_store_with_release(op->status, FAILED);
01373         }
01374         else {
01375             if (mark<this->my_tail &&
01376                 compare(this->my_array[0].first,
01377                         this->my_array[this->my_tail-1].first)) {
01378                 // there are newly pushed elems; last one higher than top
01379                 // copy the data
01380                 *(op->elem) = this->my_array[this->my_tail-1].first;
01381                 --(this->my_tail);
01382                 __TBB_store_with_release(op->status, SUCCEEDED);
01383             }
01384             else { // extract and push the last element down heap
01385                 *(op->elem) = this->my_array[0].first; // copy the data
01386                 if (mark == this->my_tail) --mark;
01387                 --(this->my_tail);
01388                 __TBB_store_with_release(op->status, SUCCEEDED);
01389                 this->my_array[0].first=this->my_array[this->my_tail].first;
01390                 if (this->my_tail > 1) // don't reheap for heap of size 1
01391                     reheap();
01392             }
01393         }
01394     }
01395     /* override */ void internal_reserve(prio_operation *op) {
01396         if (this->my_reserved == true || this->my_tail == 0) {
01397             __TBB_store_with_release(op->status, FAILED);
01398         }
01399         else {
01400             this->my_reserved = true;
01401             *(op->elem) = reserved_item = this->my_array[0].first;
01402             if (mark == this->my_tail) --mark;
01403             --(this->my_tail);
01404             __TBB_store_with_release(op->status, SUCCEEDED);
01405             this->my_array[0].first = this->my_array[this->my_tail].first;
01406             if (this->my_tail > 1) // don't reheap for heap of size 1
01407                 reheap();
01408         }
01409     }
01410     /* override */ void internal_consume(prio_operation *op) {
01411         this->my_reserved = false;
01412         __TBB_store_with_release(op->status, SUCCEEDED);
01413     }
01414     /* override */ void internal_release(prio_operation *op) {
01415         if (this->my_tail >= this->my_array_size)
01416             this->grow_my_array( this->my_tail + 1 );
01417         this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01418         ++(this->my_tail);
01419         this->my_reserved = false;
01420         __TBB_store_with_release(op->status, SUCCEEDED);
01421         heapify();
01422     }
01423 private:
01424     Compare compare;
01425     size_type mark;
01426     input_type reserved_item;
01427 
01428     void heapify() {
01429         if (!mark) mark = 1;
01430         for (; mark<this->my_tail; ++mark) { // for each unheaped element
01431             size_type cur_pos = mark;
01432             input_type to_place = this->my_array[mark].first;
01433             do { // push to_place up the heap
01434                 size_type parent = (cur_pos-1)>>1;
01435                 if (!compare(this->my_array[parent].first, to_place))
01436                     break;
01437                 this->my_array[cur_pos].first = this->my_array[parent].first;
01438                 cur_pos = parent;
01439             } while( cur_pos );
01440             this->my_array[cur_pos].first = to_place;
01441         }
01442     }
01443 
01444     void reheap() {
01445         size_type cur_pos=0, child=1;
01446         while (child < mark) {
01447             size_type target = child;
01448             if (child+1<mark &&
01449                 compare(this->my_array[child].first,
01450                         this->my_array[child+1].first))
01451                 ++target;
01452             // target now has the higher priority child
01453             if (compare(this->my_array[target].first,
01454                         this->my_array[this->my_tail].first))
01455                 break;
01456             this->my_array[cur_pos].first = this->my_array[target].first;
01457             cur_pos = target;
01458             child = (cur_pos<<1)+1;
01459         }
01460         this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01461     }
01462 };
01463 
01465 
01468 template< typename T >
01469 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01470     using graph_node::my_graph;
01471 public:
01472     typedef T input_type;
01473     typedef T output_type;
01474     typedef sender< input_type > predecessor_type;
01475     typedef receiver< output_type > successor_type;
01476 
01477 private:
01478     task *my_root_task;
01479     size_t my_threshold;
01480     size_t my_count;
01481     internal::predecessor_cache< T > my_predecessors;
01482     spin_mutex my_mutex;
01483     internal::broadcast_cache< T > my_successors;
01484     int init_decrement_predecessors;
01485 
01486     friend class internal::forward_task< limiter_node<T> >;
01487 
01488     // Let decrementer call decrement_counter()
01489     friend class internal::decrementer< limiter_node<T> >;
01490 
01491     void decrement_counter() {
01492         input_type v;
01493 
01494         // If we can't get / put an item immediately then drop the count
01495         if ( my_predecessors.get_item( v ) == false
01496              || my_successors.try_put(v) == false ) {
01497             spin_mutex::scoped_lock lock(my_mutex);
01498             --my_count;
01499             if ( !my_predecessors.empty() )
01500                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01501                             internal::forward_task< limiter_node<T> >( *this ) );
01502         }
01503     }
01504 
01505     void forward() {
01506         {
01507             spin_mutex::scoped_lock lock(my_mutex);
01508             if ( my_count < my_threshold )
01509                 ++my_count;
01510             else
01511                 return;
01512         }
01513         decrement_counter();
01514     }
01515 
01516 public:
01518     internal::decrementer< limiter_node<T> > decrement;
01519 
01521     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01522         graph_node(g), my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01523         init_decrement_predecessors(num_decrement_predecessors),
01524         decrement(num_decrement_predecessors)
01525     {
01526         my_predecessors.set_owner(this);
01527         my_successors.set_owner(this);
01528         decrement.set_owner(this);
01529     }
01530 
01532     limiter_node( const limiter_node& src ) :
01533         graph_node(src.my_graph), receiver<T>(), sender<T>(),
01534         my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01535         init_decrement_predecessors(src.init_decrement_predecessors),
01536         decrement(src.init_decrement_predecessors)
01537     {
01538         my_predecessors.set_owner(this);
01539         my_successors.set_owner(this);
01540         decrement.set_owner(this);
01541     }
01542 
01544     /* override */ bool register_successor( receiver<output_type> &r ) {
01545         my_successors.register_successor(r);
01546         return true;
01547     }
01548 
01550 
01551     /* override */ bool remove_successor( receiver<output_type> &r ) {
01552         r.remove_predecessor(*this);
01553         my_successors.remove_successor(r);
01554         return true;
01555     }
01556 
01558     /* override */ bool try_put( const T &t ) {
01559         {
01560             spin_mutex::scoped_lock lock(my_mutex);
01561             if ( my_count >= my_threshold )
01562                 return false;
01563             else
01564                 ++my_count;
01565         }
01566 
01567         bool msg = my_successors.try_put(t);
01568 
01569         if ( msg != true ) {
01570             spin_mutex::scoped_lock lock(my_mutex);
01571             --my_count;
01572             if ( !my_predecessors.empty() )
01573                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01574                             internal::forward_task< limiter_node<T> >( *this ) );
01575         }
01576 
01577         return msg;
01578     }
01579 
01581     /* override */ bool register_predecessor( predecessor_type &src ) {
01582         spin_mutex::scoped_lock lock(my_mutex);
01583         my_predecessors.add( src );
01584         if ( my_count < my_threshold && !my_successors.empty() )
01585             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01586                            internal::forward_task< limiter_node<T> >( *this ) );
01587         return true;
01588     }
01589 
01591     /* override */ bool remove_predecessor( predecessor_type &src ) {
01592         my_predecessors.remove( src );
01593         return true;
01594     }
01595 };
01596 
01597 #include "internal/_flow_graph_join_impl.h"
01598 
01599 using internal::reserving_port;
01600 using internal::queueing_port;
01601 using internal::tag_matching_port;
01602 using internal::input_port;
01603 using internal::tag_value;
01604 using internal::NO_TAG;
01605 
01606 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01607 
01608 template<typename OutputTuple>
01609 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01610 private:
01611     static const int N = std::tuple_size<OutputTuple>::value;
01612     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01613 public:
01614     typedef OutputTuple output_type;
01615     typedef typename unfolded_type::input_ports_type input_ports_type;
01616     join_node(graph &g) : unfolded_type(g) { }
01617     join_node(const join_node &other) : unfolded_type(other) {}
01618 };
01619 
01620 template<typename OutputTuple>
01621 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01622 private:
01623     static const int N = std::tuple_size<OutputTuple>::value;
01624     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01625 public:
01626     typedef OutputTuple output_type;
01627     typedef typename unfolded_type::input_ports_type input_ports_type;
01628     join_node(graph &g) : unfolded_type(g) { }
01629     join_node(const join_node &other) : unfolded_type(other) {}
01630 };
01631 
01632 // template for tag_matching join_node
01633 template<typename OutputTuple>
01634 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01635       tag_matching_port, OutputTuple, tag_matching> {
01636 private:
01637     static const int N = std::tuple_size<OutputTuple>::value;
01638     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01639 public:
01640     typedef OutputTuple output_type;
01641     typedef typename unfolded_type::input_ports_type input_ports_type;
01642     template<typename B0, typename B1>
01643     join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01644     template<typename B0, typename B1, typename B2>
01645     join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01646     template<typename B0, typename B1, typename B2, typename B3>
01647     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01648     template<typename B0, typename B1, typename B2, typename B3, typename B4>
01649     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01650     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01651     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01652     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01653     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01654     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01655     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01656     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01657     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01658     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01659     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01660     join_node(const join_node &other) : unfolded_type(other) {}
01661 };
01662 
01663 #if TBB_PREVIEW_GRAPH_NODES
01664 // or node
01665 #include "internal/_flow_graph_or_impl.h"
01666 
01667 template<typename InputTuple>
01668 class or_node : public internal::unfolded_or_node<InputTuple> {
01669 private:
01670     static const int N = std::tuple_size<InputTuple>::value;
01671 public:
01672     typedef typename internal::or_output_type<InputTuple>::type output_type;
01673     typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01674     or_node(graph& g) : unfolded_type(g) { }
01675     // Copy constructor
01676     or_node( const or_node& other ) : unfolded_type(other) { }
01677 };
01678 #endif  // TBB_PREVIEW_GRAPH_NODES
01679 
01681 template< typename T >
01682 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01683     p.register_successor( s );
01684 }
01685 
01687 template< typename T >
01688 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01689     p.remove_successor( s );
01690 }
01691 
01693 template< typename Body, typename Node >
01694 Body copy_body( Node &n ) {
01695     return n.template copy_function_object<Body>();
01696 }
01697 
01698 } // interface6
01699 
01700     using interface6::graph;
01701     using interface6::graph_node;
01702     using interface6::continue_msg;
01703     using interface6::sender;
01704     using interface6::receiver;
01705     using interface6::continue_receiver;
01706 
01707     using interface6::source_node;
01708     using interface6::function_node;
01709     using interface6::multifunction_node;
01710     using interface6::split_node;
01711     using interface6::internal::output_port;
01712 #if TBB_PREVIEW_GRAPH_NODES
01713     using interface6::or_node;
01714 #endif
01715     using interface6::continue_node;
01716     using interface6::overwrite_node;
01717     using interface6::write_once_node;
01718     using interface6::broadcast_node;
01719     using interface6::buffer_node;
01720     using interface6::queue_node;
01721     using interface6::sequencer_node;
01722     using interface6::priority_queue_node;
01723     using interface6::limiter_node;
01724     using namespace interface6::internal::graph_policy_namespace;
01725     using interface6::join_node;
01726     using interface6::input_port;
01727     using interface6::copy_body; 
01728     using interface6::make_edge; 
01729     using interface6::remove_edge; 
01730     using interface6::internal::NO_TAG;
01731     using interface6::internal::tag_value;
01732 
01733 } // flow
01734 } // tbb
01735 
01736 #endif // __TBB_flow_graph_H

Copyright © 2005-2012 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.