Building a load-balanced task scheduler – Part 2: Task model

In this part of the series, we will discuss Molecule’s task model in detail, and have a look at the underlying C++ code and some subleties we need to watch out for, as well as some unique optimization opportunities.

Before we start fleshing out details about tasks and the underlying task model, let us identify what common game engine tasks look like:

  • Simple tasks: They need a bunch of non-mutable data, and simply carry out their task.
  • Streaming tasks: They need non-mutable data, as well as input and output streams, having a 1:1 mapping.
  • Reduce/Expand tasks: Again, they need non-mutable data, but the input/output ratio is not 1:1, but rather N:1 or 1:N. As an example, consider a task that calculates the average of e.g. every 32 elements, and outputs a single value for them – for every 32 input elements, there’s 1 output element.

The task model

Therefore, in Molecule parlance, each Task consists of the following:

  • A Kernel which acts on the data given.
  • Non-mutable data, called KernelData.
  • (Optional) streaming data, consisting of InputStreams and OutputStreams.
  • (Optional) other data needed for e.g. recude/expand tasks, or future other, yet unidentified, tasks.

With the above in mind, one possible by-the-book implementation could look like the following:

// simple task, acts as a base class
class Task
{
public:
  explicit Task(const KernelData& kernelData);

  /// Executes the task, works on kernel data
  virtual void Execute(void);

private:
  KernelData m_kernelData;
};

// streaming task
class StreamingTask : public Task
{
public:
  StreamingTask(const KernelData& kernelData, const InputStream& inputStream, const OutputStream& outputStream);

  /// Executes the task, works on kernel and streaming data
  virtual void Execute(void);

private:
  KernelData m_kernelData;
  InputStream m_input;
  OutputStream m_output;
};

Each task can do whatever he wants by means of a virtual Execute method, and different tasks take different data. All is nice, except that it’s not:

  • Sooner or later, tasks need to be allocated from somewhere. Having tasks of different sizes doesn’t play nice with keeping memory fragmentation down.
  • The virtual Execute method doesn’t play nice with SPUs on the PS3.

We can do much better than that, and here’s the proposed implementation used in Molecule:

typedef core::traits::Function<void (const TaskData&)>::Pointer Kernel;

struct TaskData
{
  void* m_kernelData;

  union
  {
    // for streaming tasks
    struct StreamingData
    {
      uint32_t m_elementCount;
      void* m_inputStreams[4];
      void* m_outputStreams[4];
    } m_streamingData;
  };
};

As you can see, a Kernel is nothing more than a function pointer to a function accepting a TaskData argument. TaskData itself consists of kernel data, and optional data needed for different tasks.

Note that StreamingData is put inside a union, which means that whenever I add optional data, I can simply put it into a different struct inside the union without changing the size of TaskData (as long as the size of the new structs is not greater than StreamingData).

The rationale behind this is that a task can either be a simple task, a streaming task, a reduce/expand task, but not several at the same time. The user knows exactly which members he needs to access because he was the one submitting the task to the scheduler in the first place. This allows us to make Task have a certain size, which is useful for optimization later:

struct Task
{
  Kernel m_kernel;
  TaskData m_taskData;
  // more members here, explained later
};

Unfortunately, some programmers tend to forget about things like unions as soon as they start stuffing everything down the inheritance tree.

Scheduling tasks

Having explained the underlying task model, how does work actually get handed to the scheduler? There’s simply different overloads for handing tasks to the scheduler, with each one taking care of preparing a Task which is being worked on by the different worker threads:

namespace scheduler
{
  TaskId AddTask(const KernelData& kernelData, Kernel kernel);
  TaskId AddStreamingTask(const KernelData& kernelData, const InputStream& is0, const OutputStream& os0, uint32_t elementCount, Kernel kernel);
  // more overloads taking 2-4 streams
};

Let us take a closer look at the AddTask function:

TaskId AddTask(const KernelData& kernelData, Kernel kernel)
{
  Task* task = ObtainTask();
  task->m_kernel = kernel;
  task->m_taskData.m_kernelData = kernelData.m_data;

  QueueTask(task);

  return GetTaskId(task);
}

The function simply obtains a new Task, sets the proper members, adds it to the global queue of tasks, and returns a TaskId which is used by the caller to identify this specific task. We don’t want to give users direct access to internal Task members, hence we need some kind of identification which can be used to map a Task to a TaskId, and vice versa. Note that a std::map is not what we’re thinking of!

Allocating and identifying tasks

Before we discuss how to generate a TaskId for a Task, let us take a closer look at how a Task is actually allocated. Remember that we wanted to make sure that a Task has a certain size? The reason for this is that we then can use a free list for allocating and freeing tasks, which gives us O(1) behaviour and no fragmentation. The free list implementation in Molecule takes a memory range to be used by the free list, and stores the pointer to the next element directly in each entry – this means that each entry is something like a “union in memory”.

An entry can either be in allocated state, which means that the data in memory is interpreted as e.g. a Task. Or the entry can be in freed state, which means that the first few bytes are interpreted as a pointer pointing to the next free entry.

The implementation is as follows:

class Freelist
{
  typedef Freelist Self;

public:
  Freelist(void* start, void* end, size_t elementSize);

  void* Obtain(void);

  void Return(void* ptr);

private:
  Self* m_next;
};

Freelist::Freelist(void* start, void* end, size_t elementSize)
  : m_next(nullptr)
{
  union
  {
    void* as_void;
    char* as_char;
    Self* as_self;
  };

  as_void = start;
  m_next = as_self;

  const size_t numElements = (static_cast<char*>(end) - as_char) / elementSize;

  as_char += elementSize;

  // initialize the free list - make every m_next at each element point to the next element in the list
  Self* runner = m_next;
  for (size_t i=1; i<numElements; ++i)
  {
    runner->m_next = as_self;
    runner = runner->m_next;

    as_char += elementSize;
  }

  runner->m_next = nullptr;
}

void* Freelist::Obtain(void)
{
  // obtain one element from the head of the free list
  Self* head = m_next;
  m_next = head->m_next;
  return head;
}

void Freelist::Return(void* ptr)
{
  // put the returned element at the head of the free list
  Self* head = static_cast<Self*>(ptr);
  head->m_next = m_next;
  m_next = head;
}

A free list holding 1024 tasks could then simply be constructed as follows:

const unsigned int TASK_POOL_SIZE = sizeof(Task)*1024;
char taskPoolMemory[TASK_POOL_SIZE];
Freelist taskPool(taskPoolMemory, taskPoolMemory + TASK_POOL_SIZE, sizeof(Task));

The question that still remains is how do we identify a Task by means of a TaskId? Because each Task is allocated from the free list, and the free list allocates from a contiguous region of memory, we can simply turn each Task* into an offset from the start of the memory range (e.g. taskPoolMemory above), like the following functions do:

inline TaskId GetTaskId(Task* task)
{
  return task - reinterpret_cast<Task*>(taskPoolMemory);
}

inline Task* GetTask(TaskId taskId)
{
  return reinterpret_cast<Task*>(taskPoolMemory) + taskId;
}

Client code

With the above functions in place, we can add tasks to the scheduler like in the following example:

// setup data for streaming skinning task
scheduler::KernelData transformationData(transformations, NUM_JOINTS*sizeof(matrix_4x4));
scheduler::InputStream weightedVertexData(weightedVertices, sizeof(WeightedVertex));
scheduler::OutputStream transformedVertexData(transformedVertices, sizeof(TransformedVertex));

// schedule skinning task
scheduler::TaskId skinningTask = scheduler::AddStreamingTask(transformationData, weightedVertexData, transformedVertexData, NUM_VERTICES, &SkinningKernel);
scheduler::Wait(skinningTask);

// setup data and schedule checksum task
scheduler::KernelData checksumData(&data, sizeof(ChecksumKernelData));
scheduler::TaskId checksumTask = scheduler::AddTask(checksumData, &CheckSumKernel);
scheduler::Wait(checksumTask);

Note that KernelData, InputStream and OutputStream not only take the address of the data as arguments, but need additional parameters like the size of the data and the stride of the streams. This is necessary so that the scheduler knows how much data needs to be DMAed between the PPU and SPUs on the PS3.

The TaskId returned by each scheduler::Add*() function can be used to wait until the respective task has completed by calling scheduler::Wait().

Synchronizing with tasks

But how does waiting on a task actually work? We could simply add a flag to the Task class, indicating whether a task has finished or not. This works, but we need to concern ourselves with how and when Tasks are freed.

The way our scheduler works at the moment is the following:

  • Each new Task gets added to a global queue via a call to QueueTask().
  • Each worker thread waits on a condition variable until a task is available in the queue. The condition variable gets signaled in QueueTask(), so one of the worker threads wakes up, grabs a task from the queue, and executes it.
  • After a Task has executed, we can free it.

The reasons for freeing a Task after it has finished are several:

  • Freeing a Task only in a call to Wait() introduces memory leaks whenever a user doesn’t explicitly need to wait for a task until it has finished.
  • Adding a Task to a separate list of tasks-to-be-recycled requires additional memory, as well as additional synchronization points.

Unfortunately, freeing a Task after is has finished introduces a subtle race condition: As soon as AddTask() is called, one of the worker threads races away, picks up the task, and works on it. If the task is finished (and hence freed) before the call to Wait(), another call to AddTask() from either the main thread or one of the worker threads might have allocated a different task in the meantime, but at the same memory location our old task was sitting at. If we then were to access the task’s flag, we would read stale data.

We can work around this problem by identifying a task via its offset from the start of memory, and a unique counter which monotonically increases on each allocation of a Task, turning our TaskId into the following:

typedef int32_t TaskOffset;

struct TaskId
{
  TaskOffset m_offset;
  int32_t m_generation;

  TaskId(TaskOffset offset, int32_t generation)
    : m_offset(offset)
    , m_generation(generation)
  {
  }
};

inline TaskOffset GetTaskOffset(Task* task)
{
  return task - reinterpret_cast<Task*>(g_taskPoolMemory);
}

inline Task* GetTask(TaskOffset taskOffset)
{
  return reinterpret_cast<Task*>(g_taskPoolMemory) + taskOffset;
}

Constructing a TaskId is now simple:

return TaskId(GetTaskOffset(task), task->m_generation);

All we have to do is add another member to our Task class which stores the generation in which the allocation has been made:

inline Task* ObtainTask(void)
{
  void* memory = nullptr;
  {
    SynchronizationPrimitive::ScopedLock lock(g_taskPoolSP);
    memory = g_futureTaskPool.Obtain();
  }

  Task* task = core::memory::Construct<Task>(memory);

  // the generation is a unique ID which allows us to distinguish between proper tasks and deleted ones
  task->m_generation = ++g_generation;
  return task;
}

inline void ReturnTask(Task* task)
{
  task->m_generation = ++g_generation;
  core::memory::Destruct(task);
  {
    SynchronizationPrimitive::ScopedLock lock(g_taskPoolSP);
    g_futureTaskPool.Return(task);
  }
}

Whenever we obtain a task, we allocate it from the free list, call the constructor in-place using placement new, and set its generation (g_generation is an atomic counter which can be increased in a thread-safe manner). Note that we also increase and set the task’s generation before freeing it. This allows us to do the following:

struct Task
{
  char unusedFreelistAlias[sizeof(void*)];
  int32_t m_generation;
  Kernel m_kernel;
  TaskData m_taskData;
  // more members here, explained later
};

inline bool IsTaskFinished(const TaskId& taskId)
{
  Task* task = GetTask(taskId.m_offset);
  if (task->m_generation != taskId.m_generation)
  {
    // task is from an older generation and has been recycled again, so it's been finished already
    return true;
  }
  else
  {
    if (task->m_openTasks == 0)
      return true;
  }

  return false;
}

Note the member unusedFreelistAlias! Because we know exactly how memory allocation via the free list works, we add an unused member to our Task struct, which allows us to access m_generation after a Task has already been returned to our free list, because returning a Task to the free list only invalidates the first few bytes having size sizeof(void*).

This in turn allows us to identify whether a task is still in progress (m_generation == taskId.m_generation), or if it has been freed already with its memory being reused by a completely different task (m_generation != taskId.m_generation). This is the kind of tricks you can only pull in C/C++, allowing for that extra bit of performance.

In the next post in the series, we will have a look at how streaming tasks are divided into subtasks, and how parent-child relationships are handled, showing a bit more code of the internal workings of the scheduler.

15 thoughts on “Building a load-balanced task scheduler – Part 2: Task model

  1. Pingback: Building a load-balanced task scheduler – Part 3: Parent-child relationships | Molecular Musings

  2. Pingback: Building a load-balanced task scheduler – Part 4: False sharing | Molecular Musings

  3. Not exactly about the issue at hand, but since I’m browsing this post now…

    I’ve downloaded the evaluation SDK and after taking a look at the code, and also after many reads of your (quite interesting) posts, there’s one thing that I’ve noticed. Molecule makes heavy use of unions (at least the Core module), but in a bad way. It’s common to see code like this:

    union
    {
    char* as_char;
    void* as_void;
    };

    as_char = /* whatever */;
    /* whatever */ = as_void;

    As I’m sure you are aware of, this engenders undefined behavior. However, I did notice that Molecule is (at the time) targeted to Win32 platforms, and perhaps even to Visual Studio, so this issue is no relevant once you know the code generated by the compiler under this situation.

    Is code like this part of the engine because you are targeting a single platform/compiler?

    • Yes, Molecule uses these local anonymous unions whenever the same memory address has to be interpreted as several different types. 99% of the time, you will only find code like this in the allocator implementations.

      I’m aware that this is strictly speaking undefined behavior per the standard, but in practice I have used this on several console & PC compilers (Visual Studio, PC, Xbox360, Wii, PS3, handhelds). Even though the standard does not allow it, I would be surprised to find a compiler that didn’t allow it – it would break many, many implementations I would say. People used this before even the first C standard was around.

      Additionally, it is the only supported way to write to a type-punned pointer on certain platforms which follow the strict aliasing rule, most notably PS3. If you reinterpret_cast a pointer-type into a non-compatible pointer-type (per the strict aliasing rule) on such a platform, dereferencing such a pointer will break the rule, and create incorrect code in the process. This means that if I used reinterpret_cast in some of the allocator code, it would actually *not* work on PS3 (or other aliasing rule-aware compilers like GCC), whereas this method works.
      The only *really* portable, well-defined way to read memory as type A and write it as type B are:
      a) use memcpy
      b) copy single bytes by aliasing with a char*

      I’d rather not do both of them in low-level allocator code, hence Molecule uses the union solution, even if this fraught upon by the standard. I don’t like (ab)using the language in such ways and steer clear from all kinds of undefined, unspecified and implementation-defined behavior as much as possible, but in this case the implementation has to be practical in order to be fast.

      Here’s a link regarding the strict aliasing issue.

      A final note: in the code you posted (accessing void* and char*) it would be perfectly valid to cast the pointer types though, because char* is allowed to alias any other pointer type, so I could change that code accordingly. Thanks for pointing it out, I will change those occurrences to use regular casts, and not unions in one of the next versions.

  4. You’re right, char* can be used to access any “kind” of memory, I simply typed what came to my mind in order to make my point. As you mentioned, std::memcpy is the “legal” way to go, according to the Standard. However, you said you avoid using this function. Care to explain why? I assume your answer would imply something along the lines of “It’s slow”; have you meassured its performance? A reasonable implementation would involve straight assembly, I suppose.

    • I assume your answer would imply something along the lines of “It’s slow”; have you meassured its performance? A reasonable implementation would involve straight assembly, I suppose.

      My answer is “it’s slower than the operation needs to be”. The implementation in MSVC 2010 is very reasonable and smart, able to turn such a std::memcpy into a simple write to memory (have a look at the disassembly with optimizations cranked to the max). Other implementations, however, are not so smart. I had to deal with platforms where the implementation was first trying to be “helpful” by figuring out whether the given source and destination memory ranges would actually overlap – if so, it would call memmove, otherwise do a simple copy. On top of that, the implementation also had “fast” copy modes if pointers were aligned to proper boundaries, and chose the corresponding path.

      So instead of just writing a simple integer (4 bytes) to memory, calling memcpy (or std::memcpy for that matter) would introduce the overhead of at least one or even two branches, and loop overhead for copying the bytes. For such a simple operation, that’s quite a lot of overhead for my taste.

      It’s one of these fundamental problems we have in C++. The Standard dictates stuff like what functions an implementation must provide, what the function must do, and what the outcome shall be. Same with the STL: it is clear what an implementation must offer in terms of containers and algorithms, but nobody dictates how the implementations should perform (or be written). It’s one of the many reasons many people stay away from using the STL in the engine runtime for multi-platform projects. It doesn’t behave the same on all platforms, there are shitty implementations out there, containers and their members are not compatible. The latter matters as soon as you start using middleware which was built with a different STL implementation.

      Ranting about C++ aside, I guess my main point is: I know the implications, I know what the code does on all the platforms I currently care about (= the ones I have worked on, and plan to work on), and I know that the generated disassembly simply generates writes into memory.

  5. I have a couple of questions about tasking system:
    1. Your task freelist is global and therefore requires locking
    when you take/return tasks from the list.

    What if you had a task free list per pool worker thread?
    I am wondering if your TaskId trick would also work in
    this case. I guess the tricky bit is keeping track of
    generation since now generation would be per
    thread, but task can be finished in a different
    thread (if stolen)…

    Is this worth the hassle? Has this global freelist lock
    showed up in profiles?

    2. When you wait on a task to finish (using Wait(Task* task)),
    must this be called from the main thread only or do you
    allow this call from any worker thread?

    • First of all, sorry for the late reply, I was away for a few days!

      1. Your task freelist is global and therefore requires locking
      when you take/return tasks from the list.

      What if you had a task free list per pool worker thread?
      I am wondering if your TaskId trick would also work in
      this case. I guess the tricky bit is keeping track of
      generation since now generation would be per
      thread, but task can be finished in a different
      thread (if stolen)…

      Having thread-local task pools is a good idea, and is also what work stealing uses. The generation can still be global though, just use a monotonically increasing atomic integer. No locking needed, and each task can be uniquely identified.

      Is this worth the hassle? Has this global freelist lock
      showed up in profiles?

      If you have tons of tasks, and adding/removing creates a lot of contention, it could show up in a profile. I haven’t gotten around to implementing work-stealing yet, but I might be in the near future.

      2. When you wait on a task to finish (using Wait(Task* task)),
      must this be called from the main thread only or do you
      allow this call from any worker thread?

      This can be called from any thread.

  6. Thanks for explanation.

    > The generation can still be global though, just use a monotonically increasing
    > atomic integer.

    If you have lots of worker threads, atomic add on generation count may have
    a lot of contention. I don’t know in this would be the case here, but atomics
    are certainly not free.

    If you have task freelist per worker thread, and if there is work stealing in your
    schedule, it’s hard to know which freelist to return the task to. You could
    a) have small integer in TaskId to track which worker thread it belongs to
    b) when task is stolen, keep track of which worker it was stolen from

    b) is ugly and complicated and a) requires extra space

    Regarding dependencies:
    – dependencies between tasks are static. Is there a need for dynamic dependencies
    in a game? Namely, can you create dynamic dependency graph while executing tasks.
    If, yes, under what practical circumstances would this be needed?

    • If you have lots of worker threads, atomic add on generation count may have
      a lot of contention. I don’t know in this would be the case here, but atomics
      are certainly not free.

      If you have task freelist per worker thread, and if there is work stealing in your
      schedule, it’s hard to know which freelist to return the task to. You could
      a) have small integer in TaskId to track which worker thread it belongs to
      b) when task is stolen, keep track of which worker it was stolen from

      b) is ugly and complicated and a) requires extra space

      I agree, and would definitely vote for a). The amount of extra space required is not that high, and the number of tasks the system can handle has an upper bound anyway (at least in my implementation).

      Regarding dependencies:
      – dependencies between tasks are static. Is there a need for dynamic dependencies
      in a game? Namely, can you create dynamic dependency graph while executing tasks.
      If, yes, under what practical circumstances would this be needed?

      I hope I understood your question correctly, and I would say you pretty much need that all the time. Many of the high-level tasks like ‘update animations’, ‘update particles’, etc. will probably want to spawn new fine-grained tasks to maximize data parallelism. You need to support dependencies for these tasks in order to be able to correctly wait until all of them are completed.

      • > Many of the high-level tasks like ‘update animations’, ‘update particles’, etc.
        > will probably want to spawn new fine-grained tasks to maximize data
        > parallelism.

        Yes, this is what I meant. So, if I understand correctly, you can change
        child/parent task dependencies dynamically while they are already
        running?

        For example, say you have a task A (that has no dependencies). When the scheduler starts executing it, task A creates two new tasks B and C. Now,
        task A depends on B and C and must wait until both B and C finish.

        How would task A look like?

        void TaskA() {
        TaskId B = queue_task(taskB);
        TaskId C = queue_task(taskC);
        wait_on_task(B);
        wait_on_task(C);
        }

        Or would you also set some dependencies explicitly to tell
        A that there are now two dependent tasks?

      • Yes, this is what I meant. So, if I understand correctly, you can change
        child/parent task dependencies dynamically while they are already
        running?

        Yes.
        With my scheduler, you would have to set dependencies explicitly, but you wouldn’t have to wait for B and C. Just waiting for Task A would suffice, because B and C are now children of A.

  7. Hi Stefan,

    How might one go about building higher level constructs like ParallelFor() and ParallelReduce() using your tasking system? What’s still missing?

    Thanks.

    • Hi,

      One simple solution to do e.g. a parallel_for is to first make a template function that accepts a so-called range of elements. A range could be two iterators, or an array + count, or something similar. That is completely up to you, and can be pretty generic by using templates.

      Second, the template function itself can be implemented by making use of the underlying task system. The template function could recursively spawn two tasks – one for the first half of the range, one for the second. The function goes on splitting ranges until some pre-defined limit is met (number of items in the range, amount of data stored in the range, etc.).

      If you’re careful with setting up the tasks and dependencies, all the created tasks “unwind” themselves properly, so that you only have to wait for the root task to finish, ensuring that all the child tasks will have finished as well.

      If I remember correctly, this is the strategy implemented by Intel’s TBB, but it might have changed in the last years. Here’s some information: https://www.threadingbuildingblocks.org/docs/help/tbb_userguide/parallel_for.htm

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.