In this part of the series, we will discuss Molecule’s task model in detail, and have a look at the underlying C++ code and some subleties we need to watch out for, as well as some unique optimization opportunities.
Before we start fleshing out details about tasks and the underlying task model, let us identify what common game engine tasks look like:
- Simple tasks: They need a bunch of non-mutable data, and simply carry out their task.
- Streaming tasks: They need non-mutable data, as well as input and output streams, having a 1:1 mapping.
- Reduce/Expand tasks: Again, they need non-mutable data, but the input/output ratio is not 1:1, but rather N:1 or 1:N. As an example, consider a task that calculates the average of e.g. every 32 elements, and outputs a single value for them – for every 32 input elements, there’s 1 output element.
The task model
Therefore, in Molecule parlance, each Task consists of the following:
- A Kernel which acts on the data given.
- Non-mutable data, called KernelData.
- (Optional) streaming data, consisting of InputStreams and OutputStreams.
- (Optional) other data needed for e.g. recude/expand tasks, or future other, yet unidentified, tasks.
With the above in mind, one possible by-the-book implementation could look like the following:
// simple task, acts as a base class
class Task
{
public:
explicit Task(const KernelData& kernelData);
/// Executes the task, works on kernel data
virtual void Execute(void);
private:
KernelData m_kernelData;
};
// streaming task
class StreamingTask : public Task
{
public:
StreamingTask(const KernelData& kernelData, const InputStream& inputStream, const OutputStream& outputStream);
/// Executes the task, works on kernel and streaming data
virtual void Execute(void);
private:
KernelData m_kernelData;
InputStream m_input;
OutputStream m_output;
};
Each task can do whatever he wants by means of a virtual Execute method, and different tasks take different data. All is nice, except that it’s not:
- Sooner or later, tasks need to be allocated from somewhere. Having tasks of different sizes doesn’t play nice with keeping memory fragmentation down.
- The virtual Execute method doesn’t play nice with SPUs on the PS3.
We can do much better than that, and here’s the proposed implementation used in Molecule:
typedef core::traits::Function<void (const TaskData&)>::Pointer Kernel;
struct TaskData
{
void* m_kernelData;
union
{
// for streaming tasks
struct StreamingData
{
uint32_t m_elementCount;
void* m_inputStreams[4];
void* m_outputStreams[4];
} m_streamingData;
};
};
As you can see, a Kernel is nothing more than a function pointer to a function accepting a TaskData argument. TaskData itself consists of kernel data, and optional data needed for different tasks.
Note that StreamingData is put inside a union, which means that whenever I add optional data, I can simply put it into a different struct inside the union without changing the size of TaskData (as long as the size of the new structs is not greater than StreamingData).
The rationale behind this is that a task can either be a simple task, a streaming task, a reduce/expand task, but not several at the same time. The user knows exactly which members he needs to access because he was the one submitting the task to the scheduler in the first place. This allows us to makeĀ Task have a certain size, which is useful for optimization later:
struct Task
{
Kernel m_kernel;
TaskData m_taskData;
// more members here, explained later
};
Unfortunately, some programmers tend to forget about things like unions as soon as they start stuffing everything down the inheritance tree.
Scheduling tasks
Having explained the underlying task model, how does work actually get handed to the scheduler? There’s simply different overloads for handing tasks to the scheduler, with each one taking care of preparing a Task which is being worked on by the different worker threads:
namespace scheduler
{
TaskId AddTask(const KernelData& kernelData, Kernel kernel);
TaskId AddStreamingTask(const KernelData& kernelData, const InputStream& is0, const OutputStream& os0, uint32_t elementCount, Kernel kernel);
// more overloads taking 2-4 streams
};
Let us take a closer look at the AddTask function:
TaskId AddTask(const KernelData& kernelData, Kernel kernel)
{
Task* task = ObtainTask();
task->m_kernel = kernel;
task->m_taskData.m_kernelData = kernelData.m_data;
QueueTask(task);
return GetTaskId(task);
}
The function simply obtains a new Task, sets the proper members, adds it to the global queue of tasks, and returns a TaskId which is used by the caller to identify this specific task. We don’t want to give users direct access to internal Task members, hence we need some kind of identification which can be used to map a Task to a TaskId, and vice versa. Note that a std::map is not what we’re thinking of!
Allocating and identifying tasks
Before we discuss how to generate a TaskId for a Task, let us take a closer look at how a Task is actually allocated. Remember that we wanted to make sure that a Task has a certain size? The reason for this is that we then can use a free list for allocating and freeing tasks, which gives us O(1) behaviour and no fragmentation. The free list implementation in Molecule takes a memory range to be used by the free list, and stores the pointer to the next element directly in each entry – this means that each entry is something like a “union in memory”.
An entry can either be in allocated state, which means that the data in memory is interpreted as e.g. a Task. Or the entry can be in freed state, which means that the first few bytes are interpreted as a pointer pointing to the next free entry.
The implementation is as follows:
class Freelist
{
typedef Freelist Self;
public:
Freelist(void* start, void* end, size_t elementSize);
void* Obtain(void);
void Return(void* ptr);
private:
Self* m_next;
};
Freelist::Freelist(void* start, void* end, size_t elementSize)
: m_next(nullptr)
{
union
{
void* as_void;
char* as_char;
Self* as_self;
};
as_void = start;
m_next = as_self;
const size_t numElements = (static_cast<char*>(end) - as_char) / elementSize;
as_char += elementSize;
// initialize the free list - make every m_next at each element point to the next element in the list
Self* runner = m_next;
for (size_t i=1; i<numElements; ++i)
{
runner->m_next = as_self;
runner = runner->m_next;
as_char += elementSize;
}
runner->m_next = nullptr;
}
void* Freelist::Obtain(void)
{
// obtain one element from the head of the free list
Self* head = m_next;
m_next = head->m_next;
return head;
}
void Freelist::Return(void* ptr)
{
// put the returned element at the head of the free list
Self* head = static_cast<Self*>(ptr);
head->m_next = m_next;
m_next = head;
}
A free list holding 1024 tasks could then simply be constructed as follows:
const unsigned int TASK_POOL_SIZE = sizeof(Task)*1024; char taskPoolMemory[TASK_POOL_SIZE]; Freelist taskPool(taskPoolMemory, taskPoolMemory + TASK_POOL_SIZE, sizeof(Task));
The question that still remains is how do we identify a Task by means of a TaskId? Because each Task is allocated from the free list, and the free list allocates from a contiguous region of memory, we can simply turn each Task* into an offset from the start of the memory range (e.g. taskPoolMemory above), like the following functions do:
inline TaskId GetTaskId(Task* task)
{
return task - reinterpret_cast<Task*>(taskPoolMemory);
}
inline Task* GetTask(TaskId taskId)
{
return reinterpret_cast<Task*>(taskPoolMemory) + taskId;
}
Client code
With the above functions in place, we can add tasks to the scheduler like in the following example:
// setup data for streaming skinning task scheduler::KernelData transformationData(transformations, NUM_JOINTS*sizeof(matrix_4x4)); scheduler::InputStream weightedVertexData(weightedVertices, sizeof(WeightedVertex)); scheduler::OutputStream transformedVertexData(transformedVertices, sizeof(TransformedVertex)); // schedule skinning task scheduler::TaskId skinningTask = scheduler::AddStreamingTask(transformationData, weightedVertexData, transformedVertexData, NUM_VERTICES, &SkinningKernel); scheduler::Wait(skinningTask); // setup data and schedule checksum task scheduler::KernelData checksumData(&data, sizeof(ChecksumKernelData)); scheduler::TaskId checksumTask = scheduler::AddTask(checksumData, &CheckSumKernel); scheduler::Wait(checksumTask);
Note that KernelData, InputStream and OutputStream not only take the address of the data as arguments, but need additional parameters like the size of the data and the stride of the streams. This is necessary so that the scheduler knows how much data needs to be DMAed between the PPU and SPUs on the PS3.
The TaskId returned by each scheduler::Add*() function can be used to wait until the respective task has completed by calling scheduler::Wait().
Synchronizing with tasks
But how does waiting on a task actually work? We could simply add a flag to the Task class, indicating whether a task has finished or not. This works, but we need to concern ourselves with how and when Tasks are freed.
The way our scheduler works at the moment is the following:
- Each new Task gets added to a global queue via a call to QueueTask().
- Each worker thread waits on a condition variable until a task is available in the queue. The condition variable gets signaled in QueueTask(), so one of the worker threads wakes up, grabs a task from the queue, and executes it.
- After a Task has executed, we can free it.
The reasons for freeing a Task after it has finished are several:
- Freeing a Task only in a call to Wait() introduces memory leaks whenever a user doesn’t explicitly need to wait for a task until it has finished.
- Adding a Task to a separate list of tasks-to-be-recycled requires additional memory, as well as additional synchronization points.
Unfortunately, freeing a Task after is has finished introduces a subtle race condition: As soon as AddTask() is called, one of the worker threads races away, picks up the task, and works on it. If the task is finished (and hence freed) before the call to Wait(), another call to AddTask() from either the main thread or one of the worker threads might have allocated a different task in the meantime, but at the same memory location our old task was sitting at. If we then were to access the task’s flag, we would read stale data.
We can work around this problem by identifying a task via its offset from the start of memory, and a unique counter which monotonically increases on each allocation of a Task, turning our TaskId into the following:
typedef int32_t TaskOffset;
struct TaskId
{
TaskOffset m_offset;
int32_t m_generation;
TaskId(TaskOffset offset, int32_t generation)
: m_offset(offset)
, m_generation(generation)
{
}
};
inline TaskOffset GetTaskOffset(Task* task)
{
return task - reinterpret_cast<Task*>(g_taskPoolMemory);
}
inline Task* GetTask(TaskOffset taskOffset)
{
return reinterpret_cast<Task*>(g_taskPoolMemory) + taskOffset;
}
Constructing a TaskId is now simple:
return TaskId(GetTaskOffset(task), task->m_generation);
All we have to do is add another member to our Task class which stores the generation in which the allocation has been made:
inline Task* ObtainTask(void)
{
void* memory = nullptr;
{
SynchronizationPrimitive::ScopedLock lock(g_taskPoolSP);
memory = g_futureTaskPool.Obtain();
}
Task* task = core::memory::Construct<Task>(memory);
// the generation is a unique ID which allows us to distinguish between proper tasks and deleted ones
task->m_generation = ++g_generation;
return task;
}
inline void ReturnTask(Task* task)
{
task->m_generation = ++g_generation;
core::memory::Destruct(task);
{
SynchronizationPrimitive::ScopedLock lock(g_taskPoolSP);
g_futureTaskPool.Return(task);
}
}
Whenever we obtain a task, we allocate it from the free list, call the constructor in-place using placement new, and set its generation (g_generation is an atomic counter which can be increased in a thread-safe manner). Note that we also increase and set the task’s generation before freeing it. This allows us to do the following:
struct Task
{
char unusedFreelistAlias[sizeof(void*)];
int32_t m_generation;
Kernel m_kernel;
TaskData m_taskData;
// more members here, explained later
};
inline bool IsTaskFinished(const TaskId& taskId)
{
Task* task = GetTask(taskId.m_offset);
if (task->m_generation != taskId.m_generation)
{
// task is from an older generation and has been recycled again, so it's been finished already
return true;
}
else
{
if (task->m_openTasks == 0)
return true;
}
return false;
}
Note the member unusedFreelistAlias! Because we know exactly how memory allocation via the free list works, we add an unused member to our Task struct, which allows us to access m_generation after a Task has already been returned to our free list, because returning a Task to the free list only invalidates the first few bytes having size sizeof(void*).
This in turn allows us to identify whether a task is still in progress (m_generation == taskId.m_generation), or if it has been freed already with its memory being reused by a completely different task (m_generation != taskId.m_generation). This is the kind of tricks you can only pull in C/C++, allowing for that extra bit of performance.
In the next post in the series, we will have a look at how streaming tasks are divided into subtasks, and how parent-child relationships are handled, showing a bit more code of the internal workings of the scheduler.
Pingback: Building a load-balanced task scheduler – Part 3: Parent-child relationships | Molecular Musings
Pingback: Building a load-balanced task scheduler – Part 4: False sharing | Molecular Musings