Continuing from where we left off last time, this post explains how parent-child relationships are handled inside the task scheduler, and how streaming tasks can be split automatically by the scheduler.
If you’re new to this series, make sure to read the other articles first:
- Part 1: Explains the basics behind a task scheduler, work-stealing, load-balancing, etc., and explains a very simple scheduler implementation.
- Part 2: Discusses Molecule’s task model in detail.
Today, we will have a look at how parent-child relationships are implemented in Molecule’s task scheduler. Among the implementation’s requirements were the following:
- A task should be able to have N child tasks.
- Adding relationships should not make the code more complicated.
- Waiting for a task to be completed must properly synchronize across its children as well, e.g. a task is only considered completed if all his children and the task himself have finished executing.
Keeping track of relationships
A naive implementation would simply add a list of child tasks to a Task (e.g. stored in a std::vector<>), and check for completion whenever scheduler::Wait() is called on a task. Such an implementation should immediately set off your alarm bells:
- By adding a std::vector<> we loose the ability to relocate/memcpy a Task.
- Dynamic memory allocations. We care about memory!
Instead of adding an explicit list of all its children to a Task, we turn the problem on its head and store a parent in each Task instead. Furthermore, as long as we don’t need to identify each and every child, we can simply store the number of unfinished children in our Task.
Basic idea
The basic idea behind implementing parent-child relationships is now simply the following:
- Each Task stores the number of still open tasks – a task with N child tasks has N+1 still open tasks (one for himself, N for the children). The N+1 bit is important!
- Each task done executing tells its parent (recursively) that it has finished.
- A task can only be executed if the number of open tasks equals 1. This ensures that parents/children are automatically synchronized because no parent task is ever being worked on unless all of its children have finished already.
While this sounds simple in theory, there are still some things we have to watch out for:
- As discussed, when pulling a task from the queue we must wait until all its children have finished executing. But we need to ensure that the thread that pulled the task from the queue does some work in the meantime, otherwise we either block the CPU (by busy looping), or never work on the task because it’s no longer contained in the queue.
- When waiting until a task has finished, we again should be doing work in the meantime.
Implementation
With the above in mind, we first add the needed members to our Task structure:
struct Task
{
char unusedFreelistAlias[sizeof(void*)];
AtomicInt m_openTasks;
int32_t m_generation;
TaskOffset m_parent;
Kernel m_kernel;
TaskData m_taskData;
};
As can be seen, m_openTasks is implemented as an atomic counter because we need to ensure that incrementing/decrementing the number of still open tasks is done in a thread-safe manner across multiple cores/threads. The implementation is quite simple, and will be discussed at the end of this post.
Separating all the different parts of the scheduler’s logic into small functions leads to surprisingly simple worker threads:
Thread::ReturnValue Work(const core::Thread& thread)
{
while (thread.ShouldRun())
{
Task* task = WaitUntilTaskIsAvailable(thread);
WorkOnTask(task);
}
return 0;
}
WaitUntilTaskIsAvailable() literally does what it says on the tin – it waits until a task has become available in the queue by means of a condition variable, as discussed in part 1. Because the function only returns if there’s something in the queue, we don’t need to check the returned task for being valid.
WorkOnTask() is simple as well:
void WorkOnTask(Task* task)
{
while (!CanExecuteTask(task))
{
// the task cannot be executed at this time, work on another item
HelpWithWork();
}
// execute the kernel and finish the task
(task->m_kernel)(task->m_taskData);
FinishTask(task);
}
As discussed above, a task can only be executed if m_openTasks == 1 (remember that a task having N children will always have N+1 open tasks). Because children notify their parent of completion, this automatically makes sure that a parent is only executed if all its children have finished (we will get to the implementation of FinishTask() in a minute).
Note that rather than calling WaitUntilTaskIsAvailable() whenever a task cannot be executed immediately, we call HelpWithWork(). This is crucial, because whereas the first waits on the condition variable until a task is added to the queue, the latter simply checks if there’s work currently available, and yields CPU resources if not. If we were to wait on the condition variable, we’d introduce a race condition were a worker thread would pull the task from the queue and wait until any other task becomes available – however, if nobody adds a task this would block indefinitely.
After executing the task, the implementation calls FinishTask():
void FinishTask(Task* task)
{
// IMPORTANT: make sure to store the result in a local variable and use that for comparing against zero later.
// otherwise, the code contains a data race because other child tasks could change task->m_openTasks in the meantime.
const int32_t openTasks = (--task->m_openTasks);
if (task->m_parent != Task::NO_PARENT)
{
// tell our parent that we're finished
Task* parent = GetTask(task->m_parent);
FinishTask(parent);
}
if (openTasks == 0)
{
// this task has finished completely, remove it
ReturnTask(task);
}
}
This simply decrements the number of open tasks (using an atomic variable), and informs the parent (if any) about the task’s completion. Whenever m_openTasks hits zero, we remove the task and return it to the allocator (the problem of how to make sure that we don’t access “old” Task members was discussed in part 2).
Like stated above, we should be doing work while waiting on a task to become completed:
void Wait(const TaskId& taskId)
{
// wait until the task and all its children have completed
while (!IsTaskFinished(taskId))
{
// help with working while the task isn't finished
HelpWithWork();
}
}
IsTaskFinished() was introduced in the last part of the series, so the only thing missing is HelpWithWork() which was briefly discussed above:
void HelpWithWork(void)
{
Task* task = GetAvailableTask();
if (task)
{
WorkOnTask(task);
}
else
{
// no item available now, yield CPU resources
core::Thread::Yield();
}
}
Regarding the scheduler implementation, that’s about it. It’s still simple and readable, with only a few dozen lines of actual source code as can be seen above. Parent-child relationships are handled automatically using the atomic m_openTasks counter, and synchronization is simple as well.
Automatically splitting streaming tasks
With the above in place, we can now turn our attention to automatically splitting streaming tasks into several subtasks without having to worry about correct synchronization because it’s done implicitly (as long as we setup our task data structures correctly).
Splitting is done based on simple heuristics, e.g. the scheduler knows about L1 and L2 cache sizes, and splits tasks accordingly. (In this case, heuristics is just a fancy term for choosing how many tasks we need to create based on e.g. L1 cache size – it’s not an exact science, and can be configured by the user).
Furthermore, splitting is done in linear fashion while we’re still dealing with a non-load-balanced scheduler – a divide-and-conquer approach would be more feasible when using work-stealing, and we’ll handle that as well.
For now, consider this simple linear splitting:
TaskId AddStreamingTask(const KernelData& kernelData, const InputStream& is0, const OutputStream& os0, uint32_t elementCount, Kernel kernel)
{
const int N = DetermineNumberOfTasks(is0, os0, elementCount);
// add a root task used for synchronization
Task* root = ObtainTask();
root->m_kernel = &EmptyKernel;
root->m_openTasks = N + 1;
QueueTask(root);
TaskOffset rootOffset = GetTaskOffset(root);
{
// split the task into several subtasks, according to the size of the input/output streams
const uint32_t perElementCount = elementCount / N;
for (int i=0; i<N; ++i)
{
Task* task = ObtainTask();
task->m_kernel = kernel;
task->m_taskData.m_kernelData = kernelData.m_data;
task->m_taskData.m_streamingData.m_elementCount = perElementCount;
task->m_taskData.m_streamingData.m_inputStreams[0] = static_cast<char*>(is0.m_data) + i*is0.m_elementStride*perElementCount;
task->m_taskData.m_streamingData.m_outputStreams[0] = static_cast<char*>(os0.m_data) + i*os0.m_elementStride*perElementCount;
task->m_openTasks = 1;
task->m_parent = rootOffset;
QueueTask(task);
}
// queue leftover tasks here...
}
return TaskId(rootOffset, root->m_generation);
}
Note that we keep adding tasks to the system while some worker threads are probably racing away and want to start working on both the parent and child tasks. Again, this is implicitly handled without us having to worry about race conditions. On other architectures like the PS3, we could start DMA’ing the respective stream data to the SPUs, and start working on tasks while the PPU queues more tasks to the scheduler.
And that’s about it, quite simply really.
Performance
In order to do some performance comparison, I’ve taken an easily parallelizable task as an example: character skinning. The performance figures on my i7-CPU with 4 cores and 8 logical threads (4 cores á 2 logical threads) are the following (the measurements were conducted over a series of runs and averaged):
Serial: 683.98 ms Parallel: 177.20 ms
That’s a 3.85x speed improvement which is not bad considered we haven’t even touched load-balancing yet. Other tasks have shown speed-ups of almost 5x – in some cases the logical threads help, in other cases they don’t. It all depends on memory workloads and ALUs used.
Atomic variables
Finishing today’s post, I briefly want to discuss Molecule’s implementation of atomic variables because they have been talked about a lot in this series already.
In Molecule, an AtomicInt is nothing more than a simple class offering operators, as well as a conversion/cast operator so it can be more easily used as if it was a regular integer. The interface is the following:
ME_ALIGNED_SYMBOL(class AtomicInt, 4)
{
public:
inline explicit AtomicInt(int32_t value);
inline AtomicInt& operator+=(int32_t value);
inline AtomicInt& operator++(void);
inline AtomicInt& operator--(void);
inline AtomicInt& operator=(int32_t value);
inline operator int32_t() const;
private:
volatile int32_t m_value;
};
Because atomic operations can only be carried out correctly for 32-bit aligned memory addresses, the class is declared to be aligned to 4 bytes (using __declspec(align(4)) on MSVC).
Internally, AtomicInt uses the (platform-dependent) atomic functions implemented elsewhere:
namespace atomic
{
/// Atomically increments the integer at the memory location being pointed to
inline __declspec(naked) void __fastcall Increment(volatile int32_t*)
{
__asm
{
lock inc dword ptr [ecx]
ret
}
}
/// Atomically decrements the integer at the memory location being pointed to
inline __declspec(naked) void __fastcall Decrement(volatile int32_t*)
{
__asm
{
lock dec dword ptr [ecx]
ret
}
}
/// Atomically adds a value to the integer at the memory location being pointed to
inline __declspec(naked) void __fastcall Add(volatile int32_t*, int32_t)
{
__asm
{
lock add dword ptr [ecx], edx
ret
}
}
}
There’s two things to note here:
- __declspec(naked): Declaring a naked function allows us to directly use inline assembly without having to write a function prolog/epilogue.
- __fastcall: The fastcall calling convention makes sure to pass the first two arguments to the function in registers (ecx and edx, respectively). This allows us to directly use them in the inline assembly.
And that concludes today’s post. Next time, we will take a look at how CPU caches can easily kill your multicore performance if you’re not careful.
Pls be quick with the next post, I hate waiting. And these are great
Interesting articles, thx for writing. One think I was wondering about: your system can basically handle if Task C could only be run if Task A & B are completed. So A & B have C as a parent. But what if you want to have it the other way round: Task A & B both depend upon Task C. I can’t put A & B as a parent of C?
No, but A can be the parent of B, and B can be the parent of C (or vice versa). With enough tasks in the queue, this should not introduce any significant stalls into the system.
Having only one parent (or dependency) does not limit the generality of the system in any way – one can always add empty tasks in between just for the purpose of grouping tasks together.
Pingback: Building a load-balanced task scheduler – Part 4: False sharing | Molecular Musings
First off – thank you for taking the time to make this blog. I was advised to check it out yesterday and it’s already helped me a lot!
Regarding the scheduler, how do you handle repeating tasks? Are they children of a task that acts as a task factory, recreating itself and its children when it runs? Or do you have some other method – a separate event scheduling system perhaps?
Thanks for the kind words, Mark!
What do you mean with repeating tasks? Tasks that are run each frame? Those are simple spawned each and every frame they need to run.
Can you give me an example please?
Your welcome. Yes I meant task that run each frame. So do you have a task that represents the end of a frame that other tasks descend from (e.g. update task, render task…)? If so, is it simply creating a new copy of itself and its descendants?
I simply have an empty task that has appropriate child tasks, something like the following:
core::scheduler::TaskId done = core::scheduler::AddEmptyTask();
core::scheduler::AddChild(done, update);
core::scheduler::AddChild(done, render);
core::scheduler::RunTask(done);
core::scheduler::Wait(done);
Both “update” and “render” are tasks which have childs and/or dependencies. The above waits until all children of “done” are finished, working on other tasks in the meantime, which work on specific update-tasks and render-tasks, which in turn work on tasks, and so on.
Hello Stefan!
Thanks for your posts! They are really good and enlightening!
I’m trying to implement a task-based parallel system and I’m following your posts to do so. There’s one thing I can’t wrap my mind around, though. It’s the Thread::ReturnValue Work(const core::Thread& thread) function.
This function accepts a Thread as parameter, then checks whether it has to continue running and after that I can’t follow the code.
The function WaitUntilTaskIsAvailable accepts a Thread& as parameter too. It is supposed to make the Thread wait on a condition variable, but I don’t know how you can do that.
As far as I know, when you create a Thread, you give it a function pointer. This is the function it will execute, so I’d expect this Work function to be the Thread’s function, instead of expecting a Thread as parameter.
Then, the function calls WorkOnTask(task). I guess this is supposed to be executed by the Thread passed as parameter, isn’t it?
I hope you understand what I mean.
Thank you again for sharing the knowledge! I can’t wait to see more posts, specially about work stealing!
Hi Marc,
The instances of the Thread class that are being passed around are wrappers around OS-native threads. Such a Thread creates the OS-native resources once, and then you can run different functions in it by simply calling Start(functionPointer), where the given function pointer needs to follow the signature “Thread::ReturnValue (const Thread&)”. Of course you also have methods for stopping, joining, destroying, etc. threads.
So the Work() function you see in the code is run by all available worker threads. Each of them waits until a task becomes available, and then works on that task. The thread instances are passed around so that these functions can stop doing their work once the threads have been “closed” from the outside (this is what the while-loop does, it just checks thread.ShouldRun()).
What WaitUntilTaskIsAvailable() does: it goes to sleep until something becomes available in the queue. Whenever something becomes available, the corresponding condition variable is signaled. The code in WaitUntilTaskIsAvailable() uses the condition variable to atomically release a critical section, put the thread to sleep and wait on the condition variable. If the condition variable is signaled, the OS wakes *one* thread (one that has been waiting on the condition variable), and atomically acquires the critical section again.
On Windows, I use InitializeConditionVariable, SleepConditionVariableCS and WakeConditionVariable for this.
Thank you for the quick answer! Now that makes much more sense!
Hi Stefan,
There’s still one thing I can’t clearly see. When I create a thread (CreateThread on Windows or pthread_create on Linux) it expects a function with a void* paramenter. That’s what the thread will be executing until it ends.
Maybe there are some functions to create thread resources on Windows & Linux that I don’t know, but if you are also using the same functions it would mean that the Thread needs a function with the aforementioned signature to start.
I guess this function could be an infinite loop, checking on an internal queue for functions to execute added by the Start(const Thread&) function. If not, I can’t see how you make a Thread execute arbitrary functions from outside the Thread itself (e.g. the Scheduler calling thread.Start(Work)).
I am just starting to work with threads now, so there is a lot of knowledge I’m missing.
Thank you!
It’s quite simple, actually.
In Molecule, Thread::Create() creates the OS-internal resources for running a thread. This is rather costly and therefore seperated from Thread::Start() in order to be able to re-use threads once they are created.
On Windows, Thread::Create() uses _beginthreadex() to create a thread in suspended state. The thread function that is used internally is just a static function inside the Thread class implementation. Whenever Thread::Start() is called, I just resume the previously created thread. The thread function itself is simple:
unsigned int __stdcall Thread::ThreadFunction(void* threadInstance){
Thread* thread = static_cast(threadInstance);
const ReturnValue result = thread->m_function(*thread);
thread->m_state = State::HAS_FINISHED;
_endthreadex(result);
return result;
}
As you can see, it calls the function which was given by the user as an argument to Thread::Start() – this is done via “thread->m_function(*thread)”. The surrounding stuff only sets internal states (used for asserting correct behaviour), properly releasing thread resources, etc.
As of now, the user has to make sure that the function which is given to Thread::Start() runs indefinitely (or until the thread has stopped) if he wants that behaviour. I’ve also thought about adding a Thread::StartInfinite() function which automatically does this for you until somebody calls Thread::Stop(). This way, you could use Start() for one-shot functions, and StartInfinite() for e.g. worker threads like the ones used by the task scheduler.
Oh! Of course, using _beginthreadex with CREATE_SUSPENDED allows you to store a function and call it later.
Thank you very much, Stefan! These are the bits I was missing.
Hello Stefan,
I have been following your musings for some time and I wanted to say thank you; they are very insightful and inspiring.
I have a question regarding the FinishTask function above. How do you handle a Parent -> Middle Parent -> Multiple Children relationship? I have implemented your scheduler as stated thus far and filled in the gaps, upon executing the example in Part 4 of this incredible series, I have run into an issue:
When the child tasks hit the FinishTask function, they are recognizing the parent task and then running FinishTask again. Then that parent task which was passed to FinishTask is in turn recognizing that it has a parent and passing that next parent up again to FinishTask. By the end of the test run, the root task is now equal to the negative equivalent of all the child tasks that were created/spawned below it. So, now while I am waiting on the root task to be finished, which will never complete because it is now a negative and no longer a zero.
I was wondering how you handled this type of Parent -> MiddleParent -> Children and deeper relationships of the same nature?
Thanks Timothy!
It is basically handled automatically, as stated in the post:
- Each Task stores the number of still open tasks – a task with N child tasks has N+1 still open tasks (one for himself, N for the children). The N+1 bit is important!
- Each task done executing tells its parent (recursively) that it has finished.
- A task can only be executed if the number of open tasks equals 1. This ensures that parents/children are automatically synchronized because no parent task is ever being worked on unless all of its children have finished already.
If you implemented everything correctly (which admittedly is harder than it sounds), you should never get a negative amount of open tasks.
Are you correctly updating “parentTask->m_openTasks” when adding children to a task?
Hello Stefan,
Thank you for the fast reply.
I am wondering if my understanding of N is wrong. Is N the number of direct descendants of the parent task or, is N the number of all descendants ( children, grandchildren, great grandchildren, etc )?
N is the number of direct child tasks.
Let us assume the following scenario: we’ve got a task GP (grandparent), P (parent), and 3x C (child). P is a child of GP, the 3x C are children of P.
After the parent-child relationships have been setup, P has an openTask-count of 4 – one for himself, three for his children. GP has an openTask-count of 2, and each of the children has an openTask-count of 1. This means that in the beginning, only the child tasks are allowed to run. Once everyone of them has finished, P::openTasks becomes 1, so P is now allowed to run. Once P has finished, GP::openTasks becomes 1, so finally GP can be run and the chain is completed.
That’s how it should be, but I know that it is hard to get a system free of data-races. There’s many pitfalls you have to watch out for.
Hello Stefen,
Thank you for the further explanation of the parent/child relationship.
For some reason, my FinishTask function did not like the way it was handling the grandparent/parent/child relationship with the recursion. Whenever a child task was finished, it would decrement the parent task and then also decrement the grandparent task due to the recursion. I reorganized the FinishTask to look like the following.
void FinishTask(Task* task)
{
if( 0 == –task->m_openTasks )
{
if( Task::NO_PARENT != task->m_parent )
{
// tell our parent that we’re finished
Task* parent = GetTask( task->m_parent );
FinishTask( parent );
}
ReturnTask( task );
}
}
With this my Scheduler is now working without any hiccups.
What are your thoughts on the reorganization?
Also, in my previous post when I made mention to the example in part 4, I had taken the code where the results are summed together and put that into a task as well…just experimenting with the scheduler. That’s how I ran into the issue of Grandparent, Parent, Children relationship.
Ah, I can see why your new code works, and the old probably didn’t. The FinishTask function posted on the blog unfortunately had a subtle data race which I introduced because I simplified the code too much for the blog. I’ve updated the code for FinishTask() accordingly, it now is race-free.
The old code had a data race between decrementing task->m_openTasks and comparing it with zero a few lines later. If any other thread executed a child task in the meantime, task->m_openTasks would also get decremented, leading to wrong comparison results and subsequently wrongly returned tasks, basically breaking the system.
Eventhough you don’t use a local variable for storing the result in your new implementation, it now works because you use the return value of the decrement operator directly for the comparison. As I said, lots of pitfalls to watch out for
.
Say that A is the parent of B, which is in turn the parent of C. Is it possible that WorkOnTask is issued with tasks B. Finding that it isn’t ready, it calls HelpWithWork. HelpWithWork pulls A from the queue and calls WorkOnTask with A. Again, finding that it isn’t ready, it calls HelpWithWork once more. This time HelpWithWork pulls C from the queue. The subsequent call to WorkOnTask is able to complete C. It then returns to the most recent WorkOnTask, which is handling A. A is not finished because B is not finished. B has already been pulled out of the queue, but can’t be worked on until the WorkOnTask handling A has returned.
Is this an unforseen problem? Or have I missed something that would prevent this from happening?
No, you didn’t miss anything. Quite the opposite!
How such situations are handled depends a bit on how HelpWithWork() (and GetAvailableTask()) are implemented. GetAvailableTask() could be implemented so that it only returns tasks which can be run right now.
In my implementation, I actually have an additional queue of what I call open tasks. Whenever a task gets queued which cannot be run immediately (because it has dependencies or other child tasks), it gets added to this queue. Other tasks are run right away. Additionally, Work() is modified so that after a task has been worked upon, the thread tries to queue any of the open tasks, because they might now be able to run. Furthermore, while waiting on a task to be finished, the thread also tries to queue any of the open tasks.
This automatically handles the situation you described. There must always be a thread who pulls the last remaining, open task from the queue, even if it’s maybe the one and only task all others depend upon. As soon as that thread has finished working on that task, all the others will be able to run automatically.