Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
market.cpp
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/tbb_stddef.h"
18#include "tbb/global_control.h" // global_control::active_value
19
20#include "market.h"
21#include "tbb_main.h"
22#include "governor.h"
23#include "scheduler.h"
24#include "itt_notify.h"
25
26namespace tbb {
27namespace internal {
28
30#if __TBB_TASK_PRIORITY
31 arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
32 arena *&next = my_priority_levels[a.my_top_priority].next_arena;
33#else /* !__TBB_TASK_PRIORITY */
34 arena_list_type &arenas = my_arenas;
35 arena *&next = my_next_arena;
36#endif /* !__TBB_TASK_PRIORITY */
37 arenas.push_front( a );
38 if ( arenas.size() == 1 )
39 next = &*arenas.begin();
40}
41
43#if __TBB_TASK_PRIORITY
44 arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
45 arena *&next = my_priority_levels[a.my_top_priority].next_arena;
46#else /* !__TBB_TASK_PRIORITY */
47 arena_list_type &arenas = my_arenas;
48 arena *&next = my_next_arena;
49#endif /* !__TBB_TASK_PRIORITY */
51 __TBB_ASSERT( it != arenas.end(), NULL );
52 if ( next == &a ) {
53 if ( ++it == arenas.end() && arenas.size() > 1 )
54 it = arenas.begin();
55 next = &*it;
56 }
57 arenas.remove( a );
58}
59
60//------------------------------------------------------------------------
61// market
62//------------------------------------------------------------------------
63
64market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size )
65 : my_num_workers_hard_limit(workers_hard_limit)
66 , my_num_workers_soft_limit(workers_soft_limit)
68 , my_global_top_priority(normalized_normal_priority)
69 , my_global_bottom_priority(normalized_normal_priority)
70#endif /* __TBB_TASK_PRIORITY */
71 , my_ref_count(1)
72 , my_stack_size(stack_size)
73 , my_workers_soft_limit_to_report(workers_soft_limit)
74{
75#if __TBB_TASK_PRIORITY
76 __TBB_ASSERT( my_global_reload_epoch == 0, NULL );
77 my_priority_levels[normalized_normal_priority].workers_available = my_num_workers_soft_limit;
78#endif /* __TBB_TASK_PRIORITY */
79
80 // Once created RML server will start initializing workers that will need
81 // global market instance to get worker stack size
83 __TBB_ASSERT( my_server, "Failed to create RML server" );
84}
85
86static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
87 if( int soft_limit = market::app_parallelism_limit() )
88 workers_soft_limit = soft_limit-1;
89 else // if user set no limits (yet), use market's parameter
90 workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
91 if( workers_soft_limit >= workers_hard_limit )
92 workers_soft_limit = workers_hard_limit-1;
93 return workers_soft_limit;
94}
95
96market& market::global_market ( bool is_public, unsigned workers_requested, size_t stack_size ) {
97 global_market_mutex_type::scoped_lock lock( theMarketMutex );
98 market *m = theMarket;
99 if( m ) {
100 ++m->my_ref_count;
101 const unsigned old_public_count = is_public? m->my_public_ref_count++ : /*any non-zero value*/1;
102 lock.release();
103 if( old_public_count==0 )
105
106 // do not warn if default number of workers is requested
107 if( workers_requested != governor::default_num_threads()-1 ) {
108 __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
109 "skip_soft_limit_warning must be larger than any valid workers_requested" );
110 unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report;
111 if( soft_limit_to_report < workers_requested ) {
112 runtime_warning( "The number of workers is currently limited to %u. "
113 "The request for %u workers is ignored. Further requests for more workers "
114 "will be silently ignored until the limit changes.\n",
115 soft_limit_to_report, workers_requested );
116 // The race is possible when multiple threads report warnings.
117 // We are OK with that, as there are just multiple warnings.
119 compare_and_swap(skip_soft_limit_warning, soft_limit_to_report);
120 }
121
122 }
123 if( m->my_stack_size < stack_size )
124 runtime_warning( "Thread stack size has been already set to %u. "
125 "The request for larger stack (%u) cannot be satisfied.\n",
126 m->my_stack_size, stack_size );
127 }
128 else {
129 // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
130 if( stack_size == 0 )
132 // Expecting that 4P is suitable for most applications.
133 // Limit to 2P for large thread number.
134 // TODO: ask RML for max concurrency and possibly correct hard_limit
135 const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
136 // The requested number of threads is intentionally not considered in
137 // computation of the hard limit, in order to separate responsibilities
138 // and avoid complicated interactions between global_control and task_scheduler_init.
139 // The market guarantees that at least 256 threads might be created.
140 const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
141 const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
142 // Create the global market instance
143 size_t size = sizeof(market);
144#if __TBB_TASK_GROUP_CONTEXT
145 __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(generic_scheduler*) == sizeof(market),
146 "my_workers must be the last data field of the market class");
147 size += sizeof(generic_scheduler*) * (workers_hard_limit - 1);
148#endif /* __TBB_TASK_GROUP_CONTEXT */
150 void* storage = NFS_Allocate(1, size, NULL);
151 memset( storage, 0, size );
152 // Initialize and publish global market
153 m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
154 if( is_public )
155 m->my_public_ref_count = 1;
156 theMarket = m;
157 // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158 if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159 runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160 , m->my_server->default_concurrency(), workers_soft_limit );
161 }
162 return *m;
163}
164
166#if __TBB_COUNT_TASK_NODES
167 if ( my_task_node_count )
168 runtime_warning( "Leaked %ld task objects\n", (long)my_task_node_count );
169#endif /* __TBB_COUNT_TASK_NODES */
170 this->market::~market(); // qualified to suppress warning
171 NFS_Free( this );
173}
174
175bool market::release ( bool is_public, bool blocking_terminate ) {
176 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
177 bool do_release = false;
178 {
179 global_market_mutex_type::scoped_lock lock( theMarketMutex );
180 if ( blocking_terminate ) {
181 __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
182 while ( my_public_ref_count == 1 && my_ref_count > 1 ) {
183 lock.release();
184 // To guarantee that request_close_connection() is called by the last master, we need to wait till all
185 // references are released. Re-read my_public_ref_count to limit waiting if new masters are created.
186 // Theoretically, new private references to the market can be added during waiting making it potentially
187 // endless.
188 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
189 // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
190 // see e.g. task_group_context::cancel_group_execution()
192 __TBB_Yield();
193 lock.acquire( theMarketMutex );
194 }
195 }
196 if ( is_public ) {
197 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
200 }
201 if ( --my_ref_count == 0 ) {
203 do_release = true;
204 theMarket = NULL;
205 }
206 }
207 if( do_release ) {
208 __TBB_ASSERT( !__TBB_load_with_acquire(my_public_ref_count), "No public references remain if we remove the market." );
209 // inform RML that blocking termination is required
210 my_join_workers = blocking_terminate;
211 my_server->request_close_connection();
212 return blocking_terminate;
213 }
214 return false;
215}
216
218 int old_request = my_num_workers_requested;
220#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
221 if (my_mandatory_num_requested > 0) {
224 }
225#endif
226#if __TBB_TASK_PRIORITY
227 my_priority_levels[my_global_top_priority].workers_available = my_num_workers_requested;
228 update_allotment(my_global_top_priority);
229#else
231#endif
232 return my_num_workers_requested - old_request;
233}
234
235void market::set_active_num_workers ( unsigned soft_limit ) {
236 market *m;
237
238 {
239 global_market_mutex_type::scoped_lock lock( theMarketMutex );
240 if ( !theMarket )
241 return; // actual value will be used at market creation
242 m = theMarket;
243 if (m->my_num_workers_soft_limit == soft_limit)
244 return;
245 ++m->my_ref_count;
246 }
247 // have my_ref_count for market, use it safely
248
249 int delta = 0;
250 {
252 __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
253
254#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
255#if __TBB_TASK_PRIORITY
256#define FOR_EACH_PRIORITY_LEVEL_BEGIN { \
257 for (int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p) { \
258 priority_level_info& pl = m->my_priority_levels[p]; \
259 arena_list_type& arenas = pl.arenas;
260#else
261#define FOR_EACH_PRIORITY_LEVEL_BEGIN { { \
262 const int p = 0; \
263 tbb::internal::suppress_unused_warning(p); \
264 arena_list_type& arenas = m->my_arenas;
265#endif
266#define FOR_EACH_PRIORITY_LEVEL_END } }
267
268 if (m->my_num_workers_soft_limit == 0 && m->my_mandatory_num_requested > 0) {
269 FOR_EACH_PRIORITY_LEVEL_BEGIN
270 for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it)
271 if (it->my_global_concurrency_mode)
272 m->disable_mandatory_concurrency_impl(&*it);
273 FOR_EACH_PRIORITY_LEVEL_END
274 }
275 __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL);
276#endif
277
278 as_atomic(m->my_num_workers_soft_limit) = soft_limit;
279 // report only once after new soft limit value is set
280 m->my_workers_soft_limit_to_report = soft_limit;
281
282#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
283 if (m->my_num_workers_soft_limit == 0) {
284 FOR_EACH_PRIORITY_LEVEL_BEGIN
285 for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it) {
286 if (!it->my_task_stream.empty(p))
287 m->enable_mandatory_concurrency_impl(&*it);
288 }
289 FOR_EACH_PRIORITY_LEVEL_END
290 }
291#undef FOR_EACH_PRIORITY_LEVEL_BEGIN
292#undef FOR_EACH_PRIORITY_LEVEL_END
293#endif
294
295 delta = m->update_workers_request();
296 }
297 // adjust_job_count_estimate must be called outside of any locks
298 if( delta!=0 )
299 m->my_server->adjust_job_count_estimate( delta );
300 // release internal market reference to match ++m->my_ref_count above
301 m->release( /*is_public=*/false, /*blocking_terminate=*/false );
302}
303
304bool governor::does_client_join_workers (const tbb::internal::rml::tbb_client &client) {
305 return ((const market&)client).must_join_workers();
306}
307
308arena* market::create_arena ( int num_slots, int num_reserved_slots, size_t stack_size ) {
309 __TBB_ASSERT( num_slots > 0, NULL );
310 __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
311 // Add public market reference for master thread/task_arena (that adds an internal reference in exchange).
312 market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
313
314 arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots );
315 // Add newly created arena into the existing market's list.
318 return &a;
319}
320
323 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
324 __TBB_ASSERT( !a.my_slots[0].my_scheduler, NULL );
325 if (a.my_global_concurrency_mode)
326 disable_mandatory_concurrency_impl(&a);
327
331}
332
333void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch ) {
334 bool locked = true;
335 __TBB_ASSERT( a, NULL );
336 // we hold reference to the market, so it cannot be destroyed at any moment here
337 __TBB_ASSERT( this == theMarket, NULL );
338 __TBB_ASSERT( my_ref_count!=0, NULL );
341#if __TBB_TASK_PRIORITY
342 // scan all priority levels, not only in [my_global_bottom_priority;my_global_top_priority]
343 // range, because arena to be destroyed can have no outstanding request for workers
344 for ( int p = num_priority_levels-1; p >= 0; --p ) {
345 priority_level_info &pl = my_priority_levels[p];
346 arena_list_type &my_arenas = pl.arenas;
347#endif /* __TBB_TASK_PRIORITY */
349 for ( ; it != my_arenas.end(); ++it ) {
350 if ( a == &*it ) {
351 if ( it->my_aba_epoch == aba_epoch ) {
352 // Arena is alive
353 if ( !a->my_num_workers_requested && !a->my_references ) {
354 __TBB_ASSERT( !a->my_num_workers_allotted && (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), "Inconsistent arena state" );
355 // Arena is abandoned. Destroy it.
356 detach_arena( *a );
358 locked = false;
359 a->free_arena();
360 }
361 }
362 if (locked)
364 return;
365 }
366 }
367#if __TBB_TASK_PRIORITY
368 }
369#endif /* __TBB_TASK_PRIORITY */
371}
372
375 if ( arenas.empty() )
376 return NULL;
378 __TBB_ASSERT( it != arenas.end(), NULL );
379 do {
380 arena& a = *it;
381 if ( ++it == arenas.end() )
382 it = arenas.begin();
385 return &a;
386 }
387 } while ( it != hint );
388 return NULL;
389}
390
391int market::update_allotment ( arena_list_type& arenas, int workers_demand, int max_workers ) {
392 __TBB_ASSERT( workers_demand > 0, NULL );
393 max_workers = min(workers_demand, max_workers);
394 int assigned = 0;
395 int carry = 0;
396 for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it) {
397 arena& a = *it;
398 if (a.my_num_workers_requested <= 0) {
400 continue;
401 }
402 int allotted = 0;
403#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
404 if (my_num_workers_soft_limit == 0) {
405 __TBB_ASSERT(max_workers == 0 || max_workers == 1, NULL);
406 allotted = a.my_global_concurrency_mode && assigned < max_workers ? 1 : 0;
407 } else
408#endif
409 {
410 int tmp = a.my_num_workers_requested * max_workers + carry;
411 allotted = tmp / workers_demand;
412 carry = tmp % workers_demand;
413 // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers
414 allotted = min(allotted, (int)a.my_max_num_workers);
415 }
416 a.my_num_workers_allotted = allotted;
417 assigned += allotted;
418 }
419 __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, NULL );
420 return assigned;
421}
422
425 if ( a ) {
426 for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
427 if ( a == &*it )
428 return true;
429 }
430 return false;
431}
432
433#if __TBB_TASK_PRIORITY
434inline void market::update_global_top_priority ( intptr_t newPriority ) {
435 GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.market_prio_switches );
436 my_global_top_priority = newPriority;
437 my_priority_levels[newPriority].workers_available =
438#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
439 my_mandatory_num_requested && !my_num_workers_soft_limit ? 1 :
440#endif
442 advance_global_reload_epoch();
443}
444
445inline void market::reset_global_priority () {
446 my_global_bottom_priority = normalized_normal_priority;
447 update_global_top_priority(normalized_normal_priority);
448}
449
450arena* market::arena_in_need ( arena* prev_arena ) {
451 if( as_atomic(my_total_demand) <= 0 )
452 return NULL;
453 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
455 int p = my_global_top_priority;
456 arena *a = NULL;
457
458 // Checks if arena is alive or not
459 if ( is_arena_in_list( my_priority_levels[p].arenas, prev_arena ) ) {
460 a = arena_in_need( my_priority_levels[p].arenas, prev_arena );
461 }
462
463 while ( !a && p >= my_global_bottom_priority ) {
464 priority_level_info &pl = my_priority_levels[p--];
465 a = arena_in_need( pl.arenas, pl.next_arena );
466 if ( a ) {
467 as_atomic(pl.next_arena) = a; // a subject for innocent data race under the reader lock
468 // TODO: rework global round robin policy to local or random to avoid this write
469 }
470 // TODO: When refactoring task priority code, take into consideration the
471 // __TBB_TRACK_PRIORITY_LEVEL_SATURATION sections from earlier versions of TBB
472 }
473 return a;
474}
475
476void market::update_allotment ( intptr_t highest_affected_priority ) {
477 intptr_t i = highest_affected_priority;
478 int available = my_priority_levels[i].workers_available;
479 for ( ; i >= my_global_bottom_priority; --i ) {
480 priority_level_info &pl = my_priority_levels[i];
481 pl.workers_available = available;
482 if ( pl.workers_requested ) {
483 available -= update_allotment( pl.arenas, pl.workers_requested, available );
484 if ( available <= 0 ) { // TODO: assertion?
485 available = 0;
486 break;
487 }
488 }
489 }
490 __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
491 for ( --i; i >= my_global_bottom_priority; --i ) {
492 priority_level_info &pl = my_priority_levels[i];
493 pl.workers_available = 0;
494 arena_list_type::iterator it = pl.arenas.begin();
495 for ( ; it != pl.arenas.end(); ++it ) {
496 __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
497 it->my_num_workers_allotted = 0;
498 }
499 }
500}
501#endif /* __TBB_TASK_PRIORITY */
502
503#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
504void market::enable_mandatory_concurrency_impl ( arena *a ) {
505 __TBB_ASSERT(!a->my_global_concurrency_mode, NULL);
507
508 a->my_global_concurrency_mode = true;
509 my_mandatory_num_requested++;
510}
511
512void market::enable_mandatory_concurrency ( arena *a ) {
513 int delta = 0;
514 {
515 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
516 if (my_num_workers_soft_limit != 0 || a->my_global_concurrency_mode)
517 return;
518
519 enable_mandatory_concurrency_impl(a);
520 delta = update_workers_request();
521 }
522
523 if (delta != 0)
524 my_server->adjust_job_count_estimate(delta);
525}
526
527void market::disable_mandatory_concurrency_impl(arena* a) {
528 __TBB_ASSERT(a->my_global_concurrency_mode, NULL);
529 __TBB_ASSERT(my_mandatory_num_requested > 0, NULL);
530
531 a->my_global_concurrency_mode = false;
532 my_mandatory_num_requested--;
533}
534
535void market::mandatory_concurrency_disable ( arena *a ) {
536 int delta = 0;
537 {
538 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
539 if (!a->my_global_concurrency_mode)
540 return;
541 // There is a racy window in advertise_new_work between mandtory concurrency enabling and
542 // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
543 // Therefore, we double check that there is no enqueued tasks.
544 if (a->has_enqueued_tasks())
545 return;
546
548 disable_mandatory_concurrency_impl(a);
549
550 delta = update_workers_request();
551 }
552 if (delta != 0)
553 my_server->adjust_job_count_estimate(delta);
554}
555#endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
556
557void market::adjust_demand ( arena& a, int delta ) {
558 __TBB_ASSERT( theMarket, "market instance was destroyed prematurely?" );
559 if ( !delta )
560 return;
562 int prev_req = a.my_num_workers_requested;
563 a.my_num_workers_requested += delta;
564 if ( a.my_num_workers_requested <= 0 ) {
566 if ( prev_req <= 0 ) {
568 return;
569 }
570 delta = -prev_req;
571 }
572 else if ( prev_req < 0 ) {
573 delta = a.my_num_workers_requested;
574 }
575 my_total_demand += delta;
576 unsigned effective_soft_limit = my_num_workers_soft_limit;
577 if (my_mandatory_num_requested > 0) {
578 __TBB_ASSERT(effective_soft_limit == 0, NULL);
579 effective_soft_limit = 1;
580 }
581#if !__TBB_TASK_PRIORITY
582 update_allotment(effective_soft_limit);
583#else /* !__TBB_TASK_PRIORITY */
584 intptr_t p = a.my_top_priority;
585 priority_level_info &pl = my_priority_levels[p];
586 pl.workers_requested += delta;
587 __TBB_ASSERT( pl.workers_requested >= 0, NULL );
588 if ( a.my_num_workers_requested <= 0 ) {
589 if ( a.my_top_priority != normalized_normal_priority ) {
590 GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_resets );
591 update_arena_top_priority( a, normalized_normal_priority );
592 }
593 a.my_bottom_priority = normalized_normal_priority;
594 }
595 if ( p == my_global_top_priority ) {
596 if ( !pl.workers_requested ) {
597 while ( --p >= my_global_bottom_priority && !my_priority_levels[p].workers_requested )
598 continue;
599 if ( p < my_global_bottom_priority )
600 reset_global_priority();
601 else
602 update_global_top_priority(p);
603 }
604 my_priority_levels[my_global_top_priority].workers_available = effective_soft_limit;
605 update_allotment( my_global_top_priority );
606 }
607 else if ( p > my_global_top_priority ) {
608 __TBB_ASSERT( pl.workers_requested > 0, NULL );
609 // TODO: investigate if the following invariant is always valid
611 update_global_top_priority(p);
612 a.my_num_workers_allotted = min( (int)effective_soft_limit, a.my_num_workers_requested );
613 my_priority_levels[p - 1].workers_available = effective_soft_limit - a.my_num_workers_allotted;
614 update_allotment( p - 1 );
615 }
616 else if ( p == my_global_bottom_priority ) {
617 if ( !pl.workers_requested ) {
618 while ( ++p <= my_global_top_priority && !my_priority_levels[p].workers_requested )
619 continue;
620 if ( p > my_global_top_priority )
621 reset_global_priority();
622 else
623 my_global_bottom_priority = p;
624 }
625 else
627 }
628 else if ( p < my_global_bottom_priority ) {
629 int prev_bottom = my_global_bottom_priority;
630 my_global_bottom_priority = p;
631 update_allotment( prev_bottom );
632 }
633 else {
634 __TBB_ASSERT( my_global_bottom_priority < p && p < my_global_top_priority, NULL );
636 }
637 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested<=0, NULL );
639#endif /* !__TBB_TASK_PRIORITY */
640 if ( delta > 0 ) {
641 // can't overflow soft_limit, but remember values request by arenas in
642 // my_total_demand to not prematurely release workers to RML
643 if ( my_num_workers_requested+delta > (int)effective_soft_limit)
644 delta = effective_soft_limit - my_num_workers_requested;
645 } else {
646 // the number of workers should not be decreased below my_total_demand
648 delta = min(my_total_demand, (int)effective_soft_limit) - my_num_workers_requested;
649 }
651 __TBB_ASSERT( my_num_workers_requested <= (int)effective_soft_limit, NULL );
652
654 // Must be called outside of any locks
655 my_server->adjust_job_count_estimate( delta );
657}
658
659void market::process( job& j ) {
660 generic_scheduler& s = static_cast<generic_scheduler&>(j);
661 // s.my_arena can be dead. Don't access it until arena_in_need is called
662 arena *a = s.my_arena;
664
665 for (int i = 0; i < 2; ++i) {
666 while ( (a = arena_in_need(a)) ) {
667 a->process(s);
668 a = NULL; // to avoid double checks in arena_in_need(arena*) for the same priority level
669 }
670 // Workers leave market because there is no arena in need. It can happen earlier than
671 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
672 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
673 // the yield refines this spinning.
674 if ( !i )
675 __TBB_Yield();
676 }
677
678 GATHER_STATISTIC( ++s.my_counters.market_roundtrips );
679}
680
681void market::cleanup( job& j ) {
682 __TBB_ASSERT( theMarket != this, NULL );
683 generic_scheduler& s = static_cast<generic_scheduler&>(j);
685 __TBB_ASSERT( !mine || mine->is_worker(), NULL );
686 if( mine!=&s ) {
688 generic_scheduler::cleanup_worker( &s, mine!=NULL );
690 } else {
692 }
693}
694
696 destroy();
697}
698
700 unsigned index = ++my_first_unused_worker_idx;
701 __TBB_ASSERT( index > 0, NULL );
702 ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
703 // index serves as a hint decreasing conflicts between workers when they migrate between arenas
704 generic_scheduler* s = generic_scheduler::create_worker( *this, index, /* genuine = */ true );
705#if __TBB_TASK_GROUP_CONTEXT
706 __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
707 __TBB_ASSERT( !my_workers[index - 1], NULL );
708 my_workers[index - 1] = s;
709#endif /* __TBB_TASK_GROUP_CONTEXT */
710 return s;
711}
712
713#if __TBB_TASK_PRIORITY
714void market::update_arena_top_priority ( arena& a, intptr_t new_priority ) {
715 GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_switches );
716 __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
717 priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
718 &new_level = my_priority_levels[new_priority];
720 a.my_top_priority = new_priority;
722 as_atomic( a.my_reload_epoch ).fetch_and_increment<tbb::release>(); // TODO: synch with global reload epoch in order to optimize usage of local reload epoch
723 prev_level.workers_requested -= a.my_num_workers_requested;
724 new_level.workers_requested += a.my_num_workers_requested;
725 __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
726}
727
728bool market::lower_arena_priority ( arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
729 // TODO: replace the lock with a try_lock loop which performs a double check of the epoch
730 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
731 if ( a.my_reload_epoch != old_reload_epoch ) {
733 return false;
734 }
735 __TBB_ASSERT( a.my_top_priority > new_priority, NULL );
736 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
737
738 intptr_t p = a.my_top_priority;
739 update_arena_top_priority( a, new_priority );
740 if ( a.my_num_workers_requested > 0 ) {
741 if ( my_global_bottom_priority > new_priority ) {
742 my_global_bottom_priority = new_priority;
743 }
744 if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
745 // Global top level became empty
746 for ( --p; p>my_global_bottom_priority && !my_priority_levels[p].workers_requested; --p ) continue;
747 update_global_top_priority(p);
748 }
750 }
751
752 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
754 return true;
755}
756
757bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
758 // TODO: do not acquire this global lock while checking arena's state.
759 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
760
761 tbb::internal::assert_priority_valid(new_priority);
762 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
764 if ( a.my_top_priority == new_priority ) {
765 return false;
766 }
767 else if ( a.my_top_priority > new_priority ) {
768 if ( a.my_bottom_priority > new_priority )
769 a.my_bottom_priority = new_priority;
770 return false;
771 }
772 else if ( a.my_num_workers_requested <= 0 ) {
773 return false;
774 }
775
776 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
777
778 intptr_t p = a.my_top_priority;
779 intptr_t highest_affected_level = max(p, new_priority);
780 update_arena_top_priority( a, new_priority );
781
782 if ( my_global_top_priority < new_priority ) {
783 update_global_top_priority(new_priority);
784 }
785 else if ( my_global_top_priority == new_priority ) {
786 advance_global_reload_epoch();
787 }
788 else {
789 __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
790 __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
791 if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
792 // Global top level became empty
793 __TBB_ASSERT( my_global_bottom_priority < p, NULL );
794 for ( --p; !my_priority_levels[p].workers_requested; --p ) continue;
795 __TBB_ASSERT( p >= new_priority, NULL );
796 update_global_top_priority(p);
797 highest_affected_level = p;
798 }
799 }
800 if ( p == my_global_bottom_priority ) {
801 // Arena priority was increased from the global bottom level.
802 __TBB_ASSERT( p < new_priority, NULL );
803 __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
804 while ( my_global_bottom_priority < my_global_top_priority
805 && !my_priority_levels[my_global_bottom_priority].workers_requested )
806 ++my_global_bottom_priority;
807 __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
808 __TBB_ASSERT( my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
809 }
810 update_allotment( highest_affected_level );
811
812 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
814 return true;
815}
816#endif /* __TBB_TASK_PRIORITY */
817
818} // namespace internal
819} // namespace tbb
#define __TBB_Yield()
Definition: ibm_aix51.h:44
#define __TBB_TASK_PRIORITY
Definition: tbb_config.h:571
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:266
#define GATHER_STATISTIC(x)
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
void const char const char int ITT_FORMAT __itt_group_sync p
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
Definition: itt_notify.h:59
#define ITT_THREAD_SET_NAME(name)
Definition: itt_notify.h:113
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
The graph class.
@ release
Release.
Definition: atomic.h:59
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:709
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
Definition: tbb_misc.h:119
atomic< T > & as_atomic(T &t)
Definition: atomic.h:572
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
Definition: tbb_misc.h:110
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
Definition: market.cpp:86
static const intptr_t num_priority_levels
static size_t active_value(parameter p)
void lock()
Acquire writer lock.
void unlock()
Release lock.
The scoped locking pattern.
Definition: spin_rw_mutex.h:86
uintptr_t my_aba_epoch
ABA prevention marker.
Definition: arena.h:235
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
Definition: arena.h:195
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena.
Definition: arena.h:147
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
Definition: arena.h:185
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
Definition: arena.h:188
atomic< unsigned > my_references
Reference counter for the arena.
Definition: arena.h:153
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
Definition: arena.h:318
unsigned num_workers_active() const
The number of workers active in the arena.
Definition: arena.h:334
void free_arena()
Completes arena shutdown, destructs and deallocates it.
Definition: arena.cpp:296
arena_slot my_slots[1]
Definition: arena.h:390
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
Definition: arena.cpp:285
static const unsigned ref_worker
Definition: arena.h:328
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
Definition: arena.cpp:146
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:120
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:139
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:304
static unsigned default_num_threads()
Definition: governor.h:84
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
Definition: governor.cpp:116
static bool UsePrivateRML
Definition: governor.h:64
static rml::tbb_server * create_rml_server(rml::tbb_client &)
Definition: governor.cpp:92
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
Definition: market.h:143
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
Definition: market.cpp:96
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
Definition: market.h:74
static market * theMarket
Currently active global market.
Definition: market.h:58
void adjust_demand(arena &, int delta)
Request that arena's need in workers should be adjusted.
Definition: market.cpp:557
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
Definition: market.h:86
bool my_join_workers
Shutdown mode.
Definition: market.h:155
void cleanup(job &j) __TBB_override
Definition: market.cpp:681
unsigned my_public_ref_count
Count of master threads attached.
Definition: market.h:149
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
Definition: market.h:158
static void set_active_num_workers(unsigned w)
Set number of active workers.
Definition: market.cpp:235
void process(job &j) __TBB_override
Definition: market.cpp:659
void detach_arena(arena &)
Removes the arena from the market's list.
Definition: market.cpp:322
void destroy()
Destroys and deallocates market object created by market::create()
Definition: market.cpp:165
int update_workers_request()
Recalculates the number of workers requested from RML and updates the allotment.
Definition: market.cpp:217
bool release(bool is_public, bool blocking_terminate)
Decrements market's refcount and destroys it in the end.
Definition: market.cpp:175
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market's list.
Definition: market.cpp:333
void insert_arena_into_list(arena &a)
Definition: market.cpp:29
static unsigned app_parallelism_limit()
Reports active parallelism level according to user's settings.
Definition: tbb_main.cpp:512
int my_num_workers_requested
Number of workers currently requested from RML.
Definition: market.h:81
arenas_list_mutex_type my_arenas_list_mutex
Definition: market.h:67
void remove_arena_from_list(arena &a)
Definition: market.cpp:42
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
Definition: market.cpp:308
arena_list_type my_arenas
List of registered arenas.
Definition: market.h:135
bool is_arena_in_list(arena_list_type &arenas, arena *a)
Definition: market.cpp:424
void assert_market_valid() const
Definition: market.h:241
void acknowledge_close_connection() __TBB_override
Definition: market.cpp:695
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
Definition: market.h:161
int my_total_demand
Number of workers that were requested by all arenas.
Definition: market.h:89
unsigned my_ref_count
Reference count controlling market object lifetime.
Definition: market.h:146
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
Definition: market.cpp:64
size_t my_stack_size
Stack size of worker threads.
Definition: market.h:152
arena * arena_in_need(arena *prev_arena)
Returns next arena that needs more workers, or NULL.
Definition: market.h:221
friend class arena
Definition: market.h:47
void update_allotment(unsigned effective_soft_limit)
Recalculates the number of workers assigned to each arena in the list.
Definition: market.h:214
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas,...
Definition: market.h:63
job * create_one_job() __TBB_override
Definition: market.cpp:699
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers())
Definition: market.h:78
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
Definition: market.h:70
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
Definition: market.h:139
Work stealing task scheduler.
Definition: scheduler.h:140
bool is_worker() const
True if running on a worker thread, false otherwise.
Definition: scheduler.h:673
static generic_scheduler * create_worker(market &m, size_t index, bool geniune)
Initialize a scheduler for a worker thread.
Definition: scheduler.cpp:1273
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
Definition: scheduler.cpp:1331
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
Definition: tbb_main.cpp:122
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
Definition: tbb_main.cpp:117

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.