Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
custom_scheduler.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef _TBB_custom_scheduler_H
18#define _TBB_custom_scheduler_H
19
20#include "scheduler.h"
21#include "observer_proxy.h"
22#include "itt_notify.h"
23
24namespace tbb {
25namespace internal {
26
27//------------------------------------------------------------------------
29//------------------------------------------------------------------------
30
32 static const bool itt_possible = true;
33 static const bool has_slow_atomic = false;
34};
35
37 static const bool itt_possible = false;
38#if __TBB_x86_32||__TBB_x86_64
39 static const bool has_slow_atomic = true;
40#else
41 static const bool has_slow_atomic = false;
42#endif /* __TBB_x86_32||__TBB_x86_64 */
43};
44
45//------------------------------------------------------------------------
46// custom_scheduler
47//------------------------------------------------------------------------
48
50
51template<typename SchedulerTraits>
54
55 custom_scheduler( market& m, bool genuine ) : generic_scheduler(m, genuine) {}
56
58
61
63
67 }
68
70
73 task_prefix& p = s.prefix();
74 __TBB_ASSERT(p.ref_count > 0, NULL);
75 if( SchedulerTraits::itt_possible )
76 ITT_NOTIFY(sync_releasing, &p.ref_count);
77 if( SchedulerTraits::has_slow_atomic && p.ref_count==1 )
78 p.ref_count=0;
79 else {
80 reference_count old_ref_count = __TBB_FetchAndDecrementWrelease(&p.ref_count);
81#if __TBB_PREVIEW_RESUMABLE_TASKS
82 if (old_ref_count == internal::abandon_flag + 2) {
83 // Remove the abandon flag.
84 p.ref_count = 1;
85 // The wait has been completed. Spawn a resume task.
86 tbb::task::resume(p.abandoned_scheduler);
87 return;
88 }
89#endif
90 if (old_ref_count > 1) {
91 // more references exist
92 // '__TBB_cl_evict(&p)' degraded performance of parallel_preorder example
93 return;
94 }
95 }
96
97 // Ordering on p.ref_count (superfluous if SchedulerTraits::has_slow_atomic)
99 __TBB_ASSERT(p.ref_count==0, "completion of task caused predecessor's reference count to underflow");
100 if( SchedulerTraits::itt_possible )
101 ITT_NOTIFY(sync_acquired, &p.ref_count);
102#if TBB_USE_ASSERT
103 p.extra_state &= ~es_ref_count_active;
104#endif /* TBB_USE_ASSERT */
105#if __TBB_TASK_ISOLATION
106 if ( isolation != no_isolation ) {
107 // The parent is allowed not to have isolation (even if a child has isolation) because it has never spawned.
108 __TBB_ASSERT(p.isolation == no_isolation || p.isolation == isolation, NULL);
109 p.isolation = isolation;
110 }
111#endif /* __TBB_TASK_ISOLATION */
112
113#if __TBB_RECYCLE_TO_ENQUEUE
114 if (p.state==task::to_enqueue) {
115 // related to __TBB_TASK_ARENA TODO: try keep priority of the task
116 // e.g. rework task_prefix to remember priority of received task and use here
118 } else
119#endif /*__TBB_RECYCLE_TO_ENQUEUE*/
120 if( bypass_slot==NULL )
121 bypass_slot = &s;
122#if __TBB_PREVIEW_CRITICAL_TASKS
123 else if( internal::is_critical( s ) ) {
124 local_spawn( bypass_slot, bypass_slot->prefix().next );
125 bypass_slot = &s;
126 }
127#endif /* __TBB_PREVIEW_CRITICAL_TASKS */
128 else
129 local_spawn( &s, s.prefix().next );
130 }
131
134 __TBB_ISOLATION_ARG(task* t, isolation_tag isolation) );
135
136public:
137 static generic_scheduler* allocate_scheduler( market& m, bool genuine ) {
138 void* p = NFS_Allocate(1, sizeof(scheduler_type), NULL);
139 std::memset(p, 0, sizeof(scheduler_type));
140 scheduler_type* s = new( p ) scheduler_type( m, genuine );
141 s->assert_task_pool_valid();
142 ITT_SYNC_CREATE(s, SyncType_Scheduler, SyncObj_TaskPoolSpinning);
143 return s;
144 }
145
147
149
150}; // class custom_scheduler<>
151
152//------------------------------------------------------------------------
153// custom_scheduler methods
154//------------------------------------------------------------------------
155template<typename SchedulerTraits>
157 task* t = NULL;
158 bool outermost_worker_level = worker_outermost_level();
159 bool outermost_dispatch_level = outermost_worker_level || master_outermost_level();
160 bool can_steal_here = can_steal();
161 bool outermost_current_worker_level = outermost_worker_level;
162#if __TBB_PREVIEW_RESUMABLE_TASKS
163 outermost_current_worker_level &= my_properties.genuine;
164#endif
165 my_inbox.set_is_idle( true );
166#if __TBB_HOARD_NONLOCAL_TASKS
167 __TBB_ASSERT(!my_nonlocal_free_list, NULL);
168#endif
169#if __TBB_TASK_PRIORITY
170 if ( outermost_dispatch_level ) {
171 if ( intptr_t skipped_priority = my_arena->my_skipped_fifo_priority ) {
172 // This thread can dequeue FIFO tasks, and some priority levels of
173 // FIFO tasks have been bypassed (to prevent deadlock caused by
174 // dynamic priority changes in nested task group hierarchy).
175 if ( my_arena->my_skipped_fifo_priority.compare_and_swap(0, skipped_priority) == skipped_priority
176 && skipped_priority > my_arena->my_top_priority )
177 {
178 my_market->update_arena_priority( *my_arena, skipped_priority );
179 }
180 }
181 }
182#endif /* !__TBB_TASK_PRIORITY */
183 // TODO: Try to find a place to reset my_limit (under market's lock)
184 // The number of slots potentially used in the arena. Updated once in a while, as my_limit changes rarely.
185 size_t n = my_arena->my_limit-1;
186 int yield_count = 0;
187 // The state "failure_count==-1" is used only when itt_possible is true,
188 // and denotes that a sync_prepare has not yet been issued.
189 for( int failure_count = -static_cast<int>(SchedulerTraits::itt_possible);; ++failure_count) {
190 __TBB_ASSERT( my_arena->my_limit > 0, NULL );
191 __TBB_ASSERT( my_arena_index <= n, NULL );
192 if( completion_ref_count == 1 ) {
193 if( SchedulerTraits::itt_possible ) {
194 if( failure_count!=-1 ) {
195 ITT_NOTIFY(sync_prepare, &completion_ref_count);
196 // Notify Intel(R) Thread Profiler that thread has stopped spinning.
197 ITT_NOTIFY(sync_acquired, this);
198 }
199 ITT_NOTIFY(sync_acquired, &completion_ref_count);
200 }
201 __TBB_ASSERT( !t, NULL );
202 // A worker thread in its outermost dispatch loop (i.e. its execution stack is empty) should
203 // exit it either when there is no more work in the current arena, or when revoked by the market.
204 __TBB_ASSERT( !outermost_worker_level, NULL );
205 __TBB_control_consistency_helper(); // on ref_count
206 break; // exit stealing loop and return;
207 }
208 // Check if the resource manager requires our arena to relinquish some threads
209 if ( outermost_current_worker_level ) {
210 if ( ( my_arena->my_num_workers_allotted < my_arena->num_workers_active() ) ) {
211 if ( SchedulerTraits::itt_possible && failure_count != -1 )
212 ITT_NOTIFY(sync_cancel, this);
213 return NULL;
214 }
215 }
216#if __TBB_PREVIEW_RESUMABLE_TASKS
217 else if ( *my_arena_slot->my_scheduler_is_recalled ) {
218 // Original scheduler was requested, return from stealing loop and recall.
219 if ( my_inbox.is_idle_state(true) )
220 my_inbox.set_is_idle(false);
221 return NULL;
222 }
223#endif
224#if __TBB_TASK_PRIORITY
225 const int p = int(my_arena->my_top_priority);
226#else /* !__TBB_TASK_PRIORITY */
227 static const int p = 0;
228#endif
229 // Check if there are tasks mailed to this thread via task-to-thread affinity mechanism.
230 __TBB_ASSERT(my_affinity_id, NULL);
231 if ( n && !my_inbox.empty() ) {
232 t = get_mailbox_task( __TBB_ISOLATION_EXPR( isolation ) );
233#if __TBB_TASK_ISOLATION
234 // There is a race with a thread adding a new task (possibly with suitable isolation)
235 // to our mailbox, so the below conditions might result in a false positive.
236 // Then set_is_idle(false) allows that task to be stolen; it's OK.
237 if ( isolation != no_isolation && !t && !my_inbox.empty()
238 && my_inbox.is_idle_state( true ) ) {
239 // We have proxy tasks in our mailbox but the isolation blocks their execution.
240 // So publish the proxy tasks in mailbox to be available for stealing from owner's task pool.
241 my_inbox.set_is_idle( false );
242 }
243#endif /* __TBB_TASK_ISOLATION */
244 }
245 if ( t ) {
246 GATHER_STATISTIC( ++my_counters.mails_received );
247 }
248 // Check if there are tasks in starvation-resistant stream.
249 // Only allowed at the outermost dispatch level without isolation.
250 else if (__TBB_ISOLATION_EXPR(isolation == no_isolation &&) outermost_dispatch_level &&
251 !my_arena->my_task_stream.empty(p) && (
252#if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
253 t = my_arena->my_task_stream.pop( p, subsequent_lane_selector(my_arena_slot->hint_for_pop) )
254#else
255 t = my_arena->my_task_stream.pop( p, my_arena_slot->hint_for_pop )
256#endif
257 ) ) {
258 ITT_NOTIFY(sync_acquired, &my_arena->my_task_stream);
259 // just proceed with the obtained task
260 }
261#if __TBB_TASK_PRIORITY
262 // Check if any earlier offloaded non-top priority tasks become returned to the top level
263 else if ( my_offloaded_tasks && (t = reload_tasks( __TBB_ISOLATION_EXPR( isolation ) )) ) {
264 __TBB_ASSERT( !is_proxy(*t), "The proxy task cannot be offloaded" );
265 // just proceed with the obtained task
266 }
267#endif /* __TBB_TASK_PRIORITY */
268 else if ( can_steal_here && n && (t = steal_task( __TBB_ISOLATION_EXPR(isolation) )) ) {
269 // just proceed with the obtained task
270 }
271#if __TBB_PREVIEW_CRITICAL_TASKS
272 else if( (t = get_critical_task( __TBB_ISOLATION_EXPR(isolation) )) ) {
273 __TBB_ASSERT( internal::is_critical(*t), "Received task must be critical one" );
274 ITT_NOTIFY(sync_acquired, &my_arena->my_critical_task_stream);
275 // just proceed with the obtained task
276 }
277#endif // __TBB_PREVIEW_CRITICAL_TASKS
278 else
279 goto fail;
280 // A task was successfully obtained somewhere
281 __TBB_ASSERT(t,NULL);
282#if __TBB_ARENA_OBSERVER
283 my_arena->my_observers.notify_entry_observers( my_last_local_observer, is_worker() );
284#endif
285#if __TBB_SCHEDULER_OBSERVER
286 the_global_observer_list.notify_entry_observers( my_last_global_observer, is_worker() );
287#endif /* __TBB_SCHEDULER_OBSERVER */
288 if ( SchedulerTraits::itt_possible && failure_count != -1 ) {
289 // FIXME - might be victim, or might be selected from a mailbox
290 // Notify Intel(R) Thread Profiler that thread has stopped spinning.
291 ITT_NOTIFY(sync_acquired, this);
292 }
293 break; // exit stealing loop and return
294fail:
295 GATHER_STATISTIC( ++my_counters.steals_failed );
296 if( SchedulerTraits::itt_possible && failure_count==-1 ) {
297 // The first attempt to steal work failed, so notify Intel(R) Thread Profiler that
298 // the thread has started spinning. Ideally, we would do this notification
299 // *before* the first failed attempt to steal, but at that point we do not
300 // know that the steal will fail.
301 ITT_NOTIFY(sync_prepare, this);
302 failure_count = 0;
303 }
304 // Pause, even if we are going to yield, because the yield might return immediately.
306 const int failure_threshold = 2*int(n+1);
307 if( failure_count>=failure_threshold ) {
308#if __TBB_YIELD2P
309 failure_count = 0;
310#else
311 failure_count = failure_threshold;
312#endif
313 __TBB_Yield();
314#if __TBB_TASK_PRIORITY
315 // Check if there are tasks abandoned by other workers
316 if ( my_arena->my_orphaned_tasks ) {
317 // Epoch must be advanced before seizing the list pointer
318 ++my_arena->my_abandonment_epoch;
319 task* orphans = (task*)__TBB_FetchAndStoreW( &my_arena->my_orphaned_tasks, 0 );
320 if ( orphans ) {
321 task** link = NULL;
322 // Get local counter out of the way (we've just brought in external tasks)
323 my_local_reload_epoch--;
324 t = reload_tasks( orphans, link, __TBB_ISOLATION_ARG( effective_reference_priority(), isolation ) );
325 if ( orphans ) {
326 *link = my_offloaded_tasks;
327 if ( !my_offloaded_tasks )
328 my_offloaded_task_list_tail_link = link;
329 my_offloaded_tasks = orphans;
330 }
331 __TBB_ASSERT( !my_offloaded_tasks == !my_offloaded_task_list_tail_link, NULL );
332 if ( t ) {
333 if( SchedulerTraits::itt_possible )
334 ITT_NOTIFY(sync_cancel, this);
335 __TBB_ASSERT( !is_proxy(*t), "The proxy task cannot be offloaded" );
336 break; // exit stealing loop and return
337 }
338 }
339 }
340#endif /* __TBB_TASK_PRIORITY */
341#if __APPLE__
342 // threshold value tuned separately for macOS due to high cost of sched_yield there
343 const int yield_threshold = 10;
344#else
345 const int yield_threshold = 100;
346#endif
347 if( yield_count++ >= yield_threshold ) {
348 // When a worker thread has nothing to do, return it to RML.
349 // For purposes of affinity support, the thread is considered idle while in RML.
350#if __TBB_TASK_PRIORITY
351 if( outermost_current_worker_level || my_arena->my_top_priority > my_arena->my_bottom_priority ) {
352 if ( my_arena->is_out_of_work() && outermost_current_worker_level ) {
353#else /* !__TBB_TASK_PRIORITY */
354 if ( outermost_current_worker_level && my_arena->is_out_of_work() ) {
355#endif /* !__TBB_TASK_PRIORITY */
356 if( SchedulerTraits::itt_possible )
357 ITT_NOTIFY(sync_cancel, this);
358 return NULL;
359 }
360#if __TBB_TASK_PRIORITY
361 }
362 if ( my_offloaded_tasks ) {
363 // Safeguard against any sloppiness in managing reload epoch
364 // counter (e.g. on the hot path because of performance reasons).
365 my_local_reload_epoch--;
366 // Break the deadlock caused by a higher priority dispatch loop
367 // stealing and offloading a lower priority task. Priority check
368 // at the stealing moment cannot completely preclude such cases
369 // because priorities can changes dynamically.
370 if ( !outermost_worker_level && *my_ref_top_priority > my_arena->my_top_priority ) {
371 GATHER_STATISTIC( ++my_counters.prio_ref_fixups );
372 my_ref_top_priority = &my_arena->my_top_priority;
373 // it's expected that only outermost workers can use global reload epoch
374 __TBB_ASSERT(my_ref_reload_epoch == &my_arena->my_reload_epoch, NULL);
375 }
376 }
377#endif /* __TBB_TASK_PRIORITY */
378 } // end of arena snapshot branch
379 // If several attempts did not find work, re-read the arena limit.
380 n = my_arena->my_limit-1;
381 } // end of yielding branch
382 } // end of nonlocal task retrieval loop
383 if ( my_inbox.is_idle_state( true ) )
384 my_inbox.set_is_idle( false );
385 return t;
386}
387
388template<typename SchedulerTraits>
391 __TBB_ISOLATION_ARG(task* t, isolation_tag isolation) )
392{
393 while ( t ) {
394 __TBB_ASSERT( my_inbox.is_idle_state(false), NULL );
395 __TBB_ASSERT(!is_proxy(*t),"unexpected proxy");
396 __TBB_ASSERT( t->prefix().owner, NULL );
397#if __TBB_TASK_ISOLATION
398 __TBB_ASSERT_EX( isolation == no_isolation || isolation == t->prefix().isolation,
399 "A task from another isolated region is going to be executed" );
400#endif /* __TBB_TASK_ISOLATION */
402#if __TBB_TASK_GROUP_CONTEXT && TBB_USE_ASSERT
403 assert_context_valid(t->prefix().context);
405#endif
406 // TODO: make the assert stronger by prohibiting allocated state.
407 __TBB_ASSERT( 1L<<t->state() & (1L<<task::allocated|1L<<task::ready|1L<<task::reexecute), NULL );
408 assert_task_pool_valid();
409#if __TBB_PREVIEW_CRITICAL_TASKS
410 // TODO: check performance and optimize if needed for added conditions on the
411 // hot-path.
412 if( !internal::is_critical(*t) && !t->is_enqueued_task() ) {
413 if( task* critical_task = get_critical_task( __TBB_ISOLATION_EXPR(isolation) ) ) {
414 __TBB_ASSERT( internal::is_critical(*critical_task),
415 "Received task must be critical one" );
416 ITT_NOTIFY(sync_acquired, &my_arena->my_critical_task_stream);
418 my_innermost_running_task = t; // required during spawn to propagate isolation
419 local_spawn(t, t->prefix().next);
420 t = critical_task;
421 } else {
422#endif /* __TBB_PREVIEW_CRITICAL_TASKS */
423#if __TBB_TASK_PRIORITY
424 intptr_t p = priority(*t);
425 if ( p != *my_ref_top_priority
426 && !t->is_enqueued_task() ) {
427 assert_priority_valid(p);
428 if ( p != my_arena->my_top_priority ) {
429 my_market->update_arena_priority( *my_arena, p );
430 }
431 if ( p < effective_reference_priority() ) {
432 if ( !my_offloaded_tasks ) {
433 my_offloaded_task_list_tail_link = &t->prefix().next_offloaded;
434 // Erase possible reference to the owner scheduler
435 // (next_offloaded is a union member)
436 *my_offloaded_task_list_tail_link = NULL;
437 }
438 offload_task( *t, p );
439 t = NULL;
440 if ( is_task_pool_published() ) {
441 t = winnow_task_pool( __TBB_ISOLATION_EXPR( isolation ) );
442 if ( t )
443 continue;
444 } else {
445 // Mark arena as full to unlock arena priority level adjustment
446 // by arena::is_out_of_work(), and ensure worker's presence.
447 my_arena->advertise_new_work<arena::wakeup>();
448 }
449 break; /* exit bypass loop */
450 }
451 }
452#endif /* __TBB_TASK_PRIORITY */
453#if __TBB_PREVIEW_CRITICAL_TASKS
454 }
455 } // if is not critical
456#endif
457 task* t_next = NULL;
458 my_innermost_running_task = t;
459 t->prefix().owner = this;
461#if __TBB_TASK_GROUP_CONTEXT
462 context_guard.set_ctx( t->prefix().context );
464#endif
465 {
466 GATHER_STATISTIC( ++my_counters.tasks_executed );
467 GATHER_STATISTIC( my_counters.avg_arena_concurrency += my_arena->num_workers_active() );
468 GATHER_STATISTIC( my_counters.avg_assigned_workers += my_arena->my_num_workers_allotted );
469#if __TBB_TASK_PRIORITY
470 GATHER_STATISTIC( my_counters.avg_arena_prio += p );
471 GATHER_STATISTIC( my_counters.avg_market_prio += my_market->my_global_top_priority );
472#endif /* __TBB_TASK_PRIORITY */
473 ITT_STACK(SchedulerTraits::itt_possible, callee_enter, t->prefix().context->itt_caller);
474 t_next = t->execute();
475 ITT_STACK(SchedulerTraits::itt_possible, callee_leave, t->prefix().context->itt_caller);
476 if (t_next) {
477 assert_task_valid(t_next);
479 "if task::execute() returns task, it must be marked as allocated" );
480 reset_extra_state(t_next);
482#if TBB_USE_ASSERT
483 affinity_id next_affinity=t_next->prefix().affinity;
484 if (next_affinity != 0 && next_affinity != my_affinity_id)
485 GATHER_STATISTIC( ++my_counters.affinity_ignored );
486#endif
487 } // if there is bypassed task
488 }
489 assert_task_pool_valid();
490 switch( t->state() ) {
491 case task::executing: {
492 task* s = t->parent();
493 __TBB_ASSERT( my_innermost_running_task==t, NULL );
494 __TBB_ASSERT( t->prefix().ref_count==0, "Task still has children after it has been executed" );
495 t->~task();
496 if( s )
497 tally_completion_of_predecessor( *s, __TBB_ISOLATION_ARG( t_next, t->prefix().isolation ) );
498 free_task<no_hint>( *t );
499 poison_pointer( my_innermost_running_task );
500 assert_task_pool_valid();
501 break;
502 }
503
504 case task::recycle: // set by recycle_as_safe_continuation()
506#if __TBB_RECYCLE_TO_ENQUEUE
508 case task::to_enqueue: // set by recycle_to_enqueue()
509#endif
510 __TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" );
512 // for safe continuation, need atomically decrement ref_count;
513 tally_completion_of_predecessor(*t, __TBB_ISOLATION_ARG( t_next, t->prefix().isolation ) );
514 assert_task_pool_valid();
515 break;
516
517 case task::reexecute: // set by recycle_to_reexecute()
518 __TBB_ASSERT( t_next, "reexecution requires that method execute() return another task" );
519 __TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" );
522 local_spawn( t, t->prefix().next );
523 assert_task_pool_valid();
524 break;
525 case task::allocated:
527 break;
528#if __TBB_PREVIEW_RESUMABLE_TASKS
529 case task::to_resume:
530 __TBB_ASSERT(my_innermost_running_task == t, NULL);
531 __TBB_ASSERT(t->prefix().ref_count == 0, "Task still has children after it has been executed");
532 t->~task();
533 free_task<no_hint>(*t);
534 __TBB_ASSERT(!my_properties.genuine && my_properties.outermost,
535 "Only a coroutine on outermost level can be left.");
536 // Leave the outermost coroutine
537 return false;
538#endif
539#if TBB_USE_ASSERT
540 case task::ready:
541 __TBB_ASSERT( false, "task is in READY state upon return from method execute()" );
542 break;
543#endif
544 default:
545 __TBB_ASSERT( false, "illegal state" );
546 break;
547 }
548 GATHER_STATISTIC( t_next ? ++my_counters.spawns_bypassed : 0 );
549 t = t_next;
550 } // end of scheduler bypass loop
551 return true;
552}
553
554// TODO: Rename args 'parent' into 'controlling_task' and 'child' into 't' or consider introducing
555// a wait object (a la task_handle) to replace the 'parent' logic.
556template<typename SchedulerTraits>
558 __TBB_ASSERT( governor::is_set(this), NULL );
559 __TBB_ASSERT( parent.ref_count() >= (child && child->parent() == &parent ? 2 : 1), "ref_count is too small" );
560 __TBB_ASSERT( my_innermost_running_task, NULL );
561#if __TBB_TASK_GROUP_CONTEXT
562 __TBB_ASSERT( parent.prefix().context, "parent task does not have context" );
563#endif /* __TBB_TASK_GROUP_CONTEXT */
564 assert_task_pool_valid();
565 // Using parent's refcount in sync_prepare (in the stealing loop below) is
566 // a workaround for TP. We need to name it here to display correctly in Ampl.
567 if( SchedulerTraits::itt_possible )
568 ITT_SYNC_CREATE(&parent.prefix().ref_count, SyncType_Scheduler, SyncObj_TaskStealingLoop);
569
570 // TODO: consider extending the "context" guard to a "dispatch loop" guard to additionally
571 // guard old_innermost_running_task and old_properties states.
572 context_guard_helper</*report_tasks=*/SchedulerTraits::itt_possible> context_guard;
573 task* old_innermost_running_task = my_innermost_running_task;
574 scheduler_properties old_properties = my_properties;
575
576 task* t = child;
577 bool cleanup = !is_worker() && &parent==my_dummy_task;
578 // Remove outermost property to indicate nested level.
579 __TBB_ASSERT(my_properties.outermost || my_innermost_running_task!=my_dummy_task, "The outermost property should be set out of a dispatch loop");
580 my_properties.outermost &= my_innermost_running_task==my_dummy_task;
581#if __TBB_PREVIEW_CRITICAL_TASKS
582 my_properties.has_taken_critical_task |= is_critical(*my_innermost_running_task);
583#endif
584#if __TBB_TASK_PRIORITY
585 __TBB_ASSERT( (uintptr_t)*my_ref_top_priority < (uintptr_t)num_priority_levels, NULL );
586 volatile intptr_t *old_ref_top_priority = my_ref_top_priority;
587 // When entering nested parallelism level market level counter
588 // must be replaced with the one local to this arena.
589 volatile uintptr_t *old_ref_reload_epoch = my_ref_reload_epoch;
590 if ( !outermost_level() ) {
591 // We are in a nested dispatch loop.
592 // Market or arena priority must not prevent child tasks from being
593 // executed so that dynamic priority changes did not cause deadlock.
594 my_ref_top_priority = &parent.prefix().context->my_priority;
595 my_ref_reload_epoch = &my_arena->my_reload_epoch;
596 if (my_ref_reload_epoch != old_ref_reload_epoch)
597 my_local_reload_epoch = *my_ref_reload_epoch - 1;
598 }
599#endif /* __TBB_TASK_PRIORITY */
600#if __TBB_TASK_ISOLATION
601 isolation_tag isolation = my_innermost_running_task->prefix().isolation;
602 if (t && isolation != no_isolation) {
604 // Propagate the isolation to the task executed without spawn.
605 t->prefix().isolation = isolation;
606 }
607#endif /* __TBB_TASK_ISOLATION */
608#if __TBB_PREVIEW_RESUMABLE_TASKS
609 // The recall flag for the original owner of this scheduler.
610 // It is used only on outermost level of currently attached arena slot.
611 tbb::atomic<bool> recall_flag;
612 recall_flag = false;
613 if (outermost_level() && my_wait_task == NULL && my_properties.genuine) {
614 __TBB_ASSERT(my_arena_slot->my_scheduler == this, NULL);
615 __TBB_ASSERT(my_arena_slot->my_scheduler_is_recalled == NULL, NULL);
616 my_arena_slot->my_scheduler_is_recalled = &recall_flag;
617 my_current_is_recalled = &recall_flag;
618 }
619 __TBB_ASSERT(my_arena_slot->my_scheduler_is_recalled != NULL, NULL);
620 task* old_wait_task = my_wait_task;
621 my_wait_task = &parent;
622#endif
623#if TBB_USE_EXCEPTIONS
624 // Infinite safeguard EH loop
625 for (;;) {
626 try {
627#endif /* TBB_USE_EXCEPTIONS */
628 // Outer loop receives tasks from global environment (via mailbox, FIFO queue(s),
629 // and by stealing from other threads' task pools).
630 // All exit points from the dispatch loop are located in its immediate scope.
631 for(;;) {
632 // Middle loop retrieves tasks from the local task pool.
633 for(;;) {
634 // Inner loop evaluates tasks coming from nesting loops and those returned
635 // by just executed tasks (bypassing spawn or enqueue calls).
636 if ( !process_bypass_loop( context_guard, __TBB_ISOLATION_ARG(t, isolation) ) ) {
637#if __TBB_PREVIEW_RESUMABLE_TASKS
638 // Restore the old properties for the coroutine reusage (leave in a valid state)
639 my_innermost_running_task = old_innermost_running_task;
640 my_properties = old_properties;
641 my_wait_task = old_wait_task;
642#endif
643 return;
644 }
645
646 // Check "normal" exit condition when parent's work is done.
647 if ( parent.prefix().ref_count == 1 ) {
648 __TBB_ASSERT( !cleanup, NULL );
649 __TBB_control_consistency_helper(); // on ref_count
650 ITT_NOTIFY( sync_acquired, &parent.prefix().ref_count );
651 goto done;
652 }
653#if __TBB_PREVIEW_RESUMABLE_TASKS
654 // The thread may be otside of its original scheduler. Check the recall request.
655 if ( &recall_flag != my_arena_slot->my_scheduler_is_recalled ) {
656 __TBB_ASSERT( my_arena_slot->my_scheduler_is_recalled != NULL, "A broken recall flag" );
657 if ( *my_arena_slot->my_scheduler_is_recalled ) {
658 if ( !resume_original_scheduler() ) {
659 // We are requested to finish the current coroutine before the resume.
660 __TBB_ASSERT( !my_properties.genuine && my_properties.outermost,
661 "Only a coroutine on outermost level can be left." );
662 // Restore the old properties for the coroutine reusage (leave in a valid state)
663 my_innermost_running_task = old_innermost_running_task;
664 my_properties = old_properties;
665 my_wait_task = old_wait_task;
666 return;
667 }
668 }
669 }
670#endif
671 // Retrieve the task from local task pool.
672 __TBB_ASSERT( is_task_pool_published() || is_quiescent_local_task_pool_reset(), NULL );
673 t = is_task_pool_published() ? get_task( __TBB_ISOLATION_EXPR( isolation ) ) : NULL;
674 assert_task_pool_valid();
675
676 if ( !t ) // No tasks in the local task pool. Go to stealing loop.
677 break;
678 }; // end of local task pool retrieval loop
679
680#if __TBB_HOARD_NONLOCAL_TASKS
681 // before stealing, previously stolen task objects are returned
682 for (; my_nonlocal_free_list; my_nonlocal_free_list = t ) {
683 t = my_nonlocal_free_list->prefix().next;
684 free_nonlocal_small_task( *my_nonlocal_free_list );
685 }
686#endif
687 if ( cleanup ) {
688 __TBB_ASSERT( !is_task_pool_published() && is_quiescent_local_task_pool_reset(), NULL );
689 __TBB_ASSERT( !worker_outermost_level(), NULL );
690 my_innermost_running_task = old_innermost_running_task;
691 my_properties = old_properties;
692#if __TBB_TASK_PRIORITY
693 my_ref_top_priority = old_ref_top_priority;
694 if(my_ref_reload_epoch != old_ref_reload_epoch)
695 my_local_reload_epoch = *old_ref_reload_epoch-1;
696 my_ref_reload_epoch = old_ref_reload_epoch;
697#endif /* __TBB_TASK_PRIORITY */
698#if __TBB_PREVIEW_RESUMABLE_TASKS
699 if (&recall_flag != my_arena_slot->my_scheduler_is_recalled) {
700 // The recall point
701 __TBB_ASSERT(!recall_flag, NULL);
702 tbb::task::suspend(recall_functor(&recall_flag));
703 if (my_inbox.is_idle_state(true))
704 my_inbox.set_is_idle(false);
705 continue;
706 }
707 __TBB_ASSERT(&recall_flag == my_arena_slot->my_scheduler_is_recalled, NULL);
708 __TBB_ASSERT(!(my_wait_task->prefix().ref_count & internal::abandon_flag), NULL);
709 my_wait_task = old_wait_task;
710#endif
711 return;
712 }
713 t = receive_or_steal_task( __TBB_ISOLATION_ARG( parent.prefix().ref_count, isolation ) );
714 if ( !t ) {
715#if __TBB_PREVIEW_RESUMABLE_TASKS
716 if ( *my_arena_slot->my_scheduler_is_recalled )
717 continue;
718 // Done if either original thread enters or we are on the nested level or attached the same arena
719 if ( &recall_flag == my_arena_slot->my_scheduler_is_recalled || old_wait_task != NULL )
720 goto done;
721 // The recall point. Continue dispatch loop because recalled thread may have tasks in it's task pool.
722 __TBB_ASSERT(!recall_flag, NULL);
723 tbb::task::suspend( recall_functor(&recall_flag) );
724 if ( my_inbox.is_idle_state(true) )
725 my_inbox.set_is_idle(false);
726#else
727 // Just exit dispatch loop
728 goto done;
729#endif
730 }
731 } // end of infinite stealing loop
732#if TBB_USE_EXCEPTIONS
733 __TBB_ASSERT( false, "Must never get here" );
734 } // end of try-block
735 TbbCatchAll( my_innermost_running_task->prefix().context );
736 t = my_innermost_running_task;
737 // Complete post-processing ...
738 if( t->state() == task::recycle
740 // TODO: the enqueue semantics gets lost below, consider reimplementing
741 || t->state() == task::to_enqueue
742#endif
743 ) {
744 // ... for recycled tasks to atomically decrement ref_count
746 if( SchedulerTraits::itt_possible )
749 if( SchedulerTraits::itt_possible )
750 ITT_NOTIFY(sync_acquired, &t->prefix().ref_count);
751 }else{
752 t = NULL;
753 }
754 }
755 } // end of infinite EH loop
756 __TBB_ASSERT( false, "Must never get here too" );
757#endif /* TBB_USE_EXCEPTIONS */
758done:
759#if __TBB_PREVIEW_RESUMABLE_TASKS
760 __TBB_ASSERT(!(parent.prefix().ref_count & internal::abandon_flag), NULL);
761 my_wait_task = old_wait_task;
762 if (my_wait_task == NULL) {
763 __TBB_ASSERT(outermost_level(), "my_wait_task could be NULL only on outermost level");
764 if (&recall_flag != my_arena_slot->my_scheduler_is_recalled) {
765 // The recall point.
766 __TBB_ASSERT(my_properties.genuine, NULL);
767 __TBB_ASSERT(!recall_flag, NULL);
768 tbb::task::suspend(recall_functor(&recall_flag));
769 if (my_inbox.is_idle_state(true))
770 my_inbox.set_is_idle(false);
771 }
772 __TBB_ASSERT(my_arena_slot->my_scheduler == this, NULL);
773 my_arena_slot->my_scheduler_is_recalled = NULL;
774 my_current_is_recalled = NULL;
775 }
776
777#endif /* __TBB_PREVIEW_RESUMABLE_TASKS */
778 my_innermost_running_task = old_innermost_running_task;
779 my_properties = old_properties;
780#if __TBB_TASK_PRIORITY
781 my_ref_top_priority = old_ref_top_priority;
782 if(my_ref_reload_epoch != old_ref_reload_epoch)
783 my_local_reload_epoch = *old_ref_reload_epoch-1;
784 my_ref_reload_epoch = old_ref_reload_epoch;
785#endif /* __TBB_TASK_PRIORITY */
787 if ( parent.prefix().ref_count != 1) {
788 // This is a worker that was revoked by the market.
789 __TBB_ASSERT( worker_outermost_level(),
790 "Worker thread exits nested dispatch loop prematurely" );
791 return;
792 }
793 parent.prefix().ref_count = 0;
794 }
795#if TBB_USE_ASSERT
796 parent.prefix().extra_state &= ~es_ref_count_active;
797#endif /* TBB_USE_ASSERT */
798#if __TBB_TASK_GROUP_CONTEXT
799 __TBB_ASSERT(parent.prefix().context && default_context(), NULL);
800 task_group_context* parent_ctx = parent.prefix().context;
801 if ( parent_ctx->my_cancellation_requested ) {
803 if ( master_outermost_level() && parent_ctx == default_context() ) {
804 // We are in the outermost task dispatch loop of a master thread, and
805 // the whole task tree has been collapsed. So we may clear cancellation data.
806 parent_ctx->my_cancellation_requested = 0;
807 // TODO: Add assertion that master's dummy task context does not have children
808 parent_ctx->my_state &= ~(uintptr_t)task_group_context::may_have_children;
809 }
810 if ( pe ) {
811 // On Windows, FPU control settings changed in the helper destructor are not visible
812 // outside a catch block. So restore the default settings manually before rethrowing
813 // the exception.
814 context_guard.restore_default();
815 TbbRethrowException( pe );
816 }
817 }
818 __TBB_ASSERT(!is_worker() || !CancellationInfoPresent(*my_dummy_task),
819 "Worker's dummy task context modified");
820 __TBB_ASSERT(!master_outermost_level() || !CancellationInfoPresent(*my_dummy_task),
821 "Unexpected exception or cancellation data in the master's dummy task");
822#endif /* __TBB_TASK_GROUP_CONTEXT */
823 assert_task_pool_valid();
824}
825
826} // namespace internal
827} // namespace tbb
828
829#endif /* _TBB_custom_scheduler_H */
#define __TBB_FetchAndDecrementWrelease(P)
Definition: tbb_machine.h:311
#define __TBB_control_consistency_helper()
Definition: gcc_generic.h:60
#define __TBB_Yield()
Definition: ibm_aix51.h:44
#define __TBB_PREVIEW_CRITICAL_TASKS
Definition: tbb_config.h:866
#define __TBB_RECYCLE_TO_ENQUEUE
Definition: tbb_config.h:556
#define __TBB_atomic
Definition: tbb_stddef.h:237
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_fallthrough
Definition: tbb_stddef.h:250
#define GATHER_STATISTIC(x)
#define __TBB_ISOLATION_EXPR(isolation)
#define __TBB_ISOLATION_ARG(arg1, isolation)
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p sync_cancel
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d int
#define ITT_SYNC_CREATE(obj, type, name)
Definition: itt_notify.h:115
#define ITT_STACK(precond, name, obj)
Definition: itt_notify.h:118
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:112
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
The graph class.
intptr_t isolation_tag
A tag for task isolation.
Definition: task.h:143
intptr_t reference_count
A reference count.
Definition: task.h:131
bool is_critical(task &t)
Definition: task.h:1014
void assert_task_valid(const task *)
unsigned short affinity_id
An id as used for specifying affinity.
Definition: task.h:139
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
void reset_extra_state(task *t)
bool ConcurrentWaitsEnabled(task &t)
const isolation_tag no_isolation
Definition: task.h:144
static const intptr_t num_priority_levels
Memory prefix to a task object.
Definition: task.h:203
tbb::task * next
"next" field for list of task
Definition: task.h:297
scheduler * owner
Obsolete. The scheduler that owns the task.
Definition: task.h:247
isolation_tag isolation
The tag used for task isolation.
Definition: task.h:220
__TBB_atomic reference_count ref_count
Reference count used for synchronization.
Definition: task.h:274
unsigned char state
A task::state_type, stored as a byte for compactness.
Definition: task.h:283
task * next_offloaded
Pointer to the next offloaded lower priority task.
Definition: task.h:252
task_group_context * context
Shared context that is used to communicate asynchronous state changes.
Definition: task.h:230
affinity_id affinity
Definition: task.h:294
Used to form groups of tasks.
Definition: task.h:358
exception_container_type * my_exception
Pointer to the container storing exception being propagated across this task group.
Definition: task.h:449
uintptr_t my_cancellation_requested
Specifies whether cancellation was requested for this task group.
Definition: task.h:440
__itt_caller itt_caller
Used to set and maintain stack stitching point for Intel Performance Tools.
Definition: task.h:418
uintptr_t my_state
Internal state (combination of state flags, currently only may_have_children).
Definition: task.h:455
Base class for user-defined tasks.
Definition: task.h:615
virtual task * execute()=0
Should be overridden by derived classes.
bool is_enqueued_task() const
True if the task was enqueued.
Definition: task.h:890
task * parent() const
task on whose behalf this task is working, or NULL if this is a root.
Definition: task.h:865
@ recycle
task to be recycled as continuation
Definition: task.h:647
@ allocated
task object is freshly allocated or recycled.
Definition: task.h:643
@ reexecute
task to be rescheduled.
Definition: task.h:639
@ ready
task is in ready pool, or is going to be put there, or was just taken off.
Definition: task.h:641
@ executing
task is running, and will be destroyed after method execute() completes.
Definition: task.h:637
state_type state() const
Current execution state.
Definition: task.h:912
virtual ~task()
Destructor.
Definition: task.h:629
internal::task_prefix & prefix(internal::version_tag *=NULL) const
Get reference to corresponding task_prefix.
Definition: task.h:1002
Exception container that preserves the exact copy of the original exception.
void enqueue_task(task &, intptr_t, FastRandom &)
enqueue a task into starvation-resistance queue
Definition: arena.cpp:597
Traits classes for scheduler.
A scheduler with a customized evaluation loop.
task * receive_or_steal_task(__TBB_ISOLATION_ARG(__TBB_atomic reference_count &completion_ref_count, isolation_tag isolation)) __TBB_override
Try getting a task from the mailbox or stealing from another scheduler.
bool process_bypass_loop(context_guard_helper< SchedulerTraits::itt_possible > &context_guard, __TBB_ISOLATION_ARG(task *t, isolation_tag isolation))
Implements the bypass loop of the dispatch loop (local_wait_for_all).
void local_wait_for_all(task &parent, task *child) __TBB_override
Scheduler loop that dispatches tasks.
static generic_scheduler * allocate_scheduler(market &m, bool genuine)
custom_scheduler(market &m, bool genuine)
void wait_for_all(task &parent, task *child) __TBB_override
Entry point from client code to the scheduler loop that dispatches tasks.
void tally_completion_of_predecessor(task &s, __TBB_ISOLATION_ARG(task *&bypass_slot, isolation_tag isolation))
Decrements ref_count of a predecessor.
custom_scheduler< SchedulerTraits > scheduler_type
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:120
static generic_scheduler * local_scheduler()
Obtain the thread-local instance of the TBB scheduler.
Definition: governor.h:129
Bit-field representing properties of a sheduler.
Definition: scheduler.h:50
arena * my_arena
The arena that I own (if master) or am servicing at the moment (if worker)
Definition: scheduler.h:85
Work stealing task scheduler.
Definition: scheduler.h:140
void local_spawn(task *first, task *&next)
Definition: scheduler.cpp:653
FastRandom my_random
Random number generator used for picking a random victim from which to steal.
Definition: scheduler.h:175

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.