Job System 2.0: Lock-Free Work Stealing – Part 4: parallel_for

Continuing from where we left off last time, today we are going to discuss how to build high-level algorithms such as parallel_for using our job system.

Other posts in the series

Part 1: Describes the basics of the new job system, and summarizes work-stealing.
Part 2: Goes into detail about the thread-local allocation mechanism.
Part 3: Discusses the lock-free implementation of the work-stealing queue.

The basic idea

At the moment, all we can do is push jobs into the system, and let the system take care of balancing the load across all available cores. So far, the system is not aware of the fact that some jobs might be smaller, some might be larger, and all of them probably take a different amount of time to finish.

A very common scenario in game programming is to perform a certain task for a fixed number of elements, e.g. applying the same transformation to all elements of an array/vector/range. Transformations in this context could be culling 10.000 bounding boxes, updating the bone hiearchy for 1.000 characters, or animating 100.000 particles. No matter the task, there is probably some data parallelism we can (and should!) exploit.

This is where high-level algorithms such as parallel_for come into play.

Conceptually, the idea behind a parallel_for job is very simple. Given a range of elements, the job should split the range into (almost) equal parts, and distribute the individual workloads by spawning newly created jobs into the system. For each new range that is created, a user-supplied function should then be called that takes care of performing the actual work over the given range of elements.

As an example, consider how we would go about updating particles. In its simplest form, this could be nothing more than a free function accepting an array of data, along with the number of elements stored in the array:

void UpdateParticles(Particle* particles, unsigned int count);

Assuming that we have 100.000 particles to update, we could update all of them on the same core by spawning a job that simply does the following:

UpdateParticles(particles, 100000);

What do we do if we want to split the update into 4 equally-sized parts instead? Pretty simple with this kind of function:

UpdateParticles(particles, 25000);
UpdateParticles(particles + 25000, 25000);
UpdateParticles(particles + 50000, 25000);
UpdateParticles(particles + 75000, 25000);

Putting each call into a separate job will automatically distribute the workload to 4 cores, if available. Note that this is exceptionally easy due to the way we defined the UpdateParticles() function.

Of course, doing all of this by hand is tedious and error-prone, and should be the responsibility of the parallel_for job.

Implementation

In its most basic form, the parallel_for needs to accept the following parameters:

  • A range of data, consisting of a pointer to the data, and the number of elements stored in the array.
  • A function to call on newly spawned jobs/ranges.

Without further ado, the high-level implementation could look like the following:

Job* parallel_for(Particles* data, unsigned int count, void (*function)(Particles*, unsigned int))
{
  const parallel_for_job_data jobData = { data, count, function };
 
  Job* job = jobSystem::CreateJob(&jobs::parallel_for_job, jobData);
  return job;
}

struct parallel_for_job_data
{
  Particles* data;
  unsigned int count;
  void (*function)(Particles*, unsigned int);
};

The implementation simply creates a new (root) job called jobs::parallel_for_job that takes care of dividing the range into smaller ranges, recursively. The returned job can then be run and waited upon by the user:

Job* job = parallel_for(g_particles, 100000, &UpdateParticles)
jobSystem::Run(job);
jobSystem::Wait(job);

The parallel_for_job itself is also only a handful of lines:

void parallel_for_job(Job* job, const void* jobData)
{
  const parallel_for_job_data* data = static_cast<const parallel_for_job_data*>(jobData);
	
  if (data->count > 256)
  {
    // split in two
    const unsigned int leftCount = data->count / 2u;
    const parallel_for_job_data leftData(data->data, leftCount, data->function);
    Job* left = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job, leftData);
    jobSystem::Run(left);

    const unsigned int rightCount = data->count - leftCount;
    const parallel_for_job_data rightData(data->data + leftCount, rightCount, data->function);
    Job* right = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job, rightData);
    jobSystem::Run(right);
  }
  else
  {
    // execute the function on the range of data
    (data->function)(data->data, data->count);
  }
}

As can be seen, the job splits the given range into two halves as long as there are more than 256 elements left in the range. For each newly created range, a job is spawned as a child of the given job, effectively cutting the initial range into smaller pieces using a divide-and-conquer strategy. Note that this interleaves the splitting of the range with the execution of these (and other) jobs, which is a bit more efficient than having one job doing all the splitting work.

Of course, there are at least two things that need to be changed in this implementation:

  • The parallel_for algorithm currently only accepts functions that deal exclusively with particles. This was intentional to make the code more readable, but needs to be fixed.
  • Ranges are always split as long as they contain more than 256 elements. Basing the splitting strategy solely on the number of elements might not be the best option available, and we certainly want to be able to at least configure the threshold on a per-job basis.

Unsurprisingly, C++ templates offer a nice way to deal with both problems in an efficient and type-safe manner.

A more generic implementation

Accepting ranges of any type can be achieved by introducing a template parameter for both the parallel_for function, as well as the job itself. Additionally, the parallel_for_job_data also needs to be able to hold pointers of arbitrary types.

Similarly, the splitting strategy can be fed into the parallel_for algorithm by an additional template argument supplied by the user. This allows the user to choose from different strategies, based on the job to be performed.

With the addition of the two above-mentioned arguments, the code then becomes the following:

template <typename T, typename S>
Job* parallel_for(T* data, unsigned int count, void (*function)(T*, unsigned int), const S& splitter)
{
  typedef parallel_for_job_data<T, S> JobData;
  const JobData jobData(data, count, function, splitter);

  Job* job = jobSystem::CreateJob(&jobs::parallel_for_job<JobData>, jobData);

  return job;
}

template <typename T, typename S>
struct parallel_for_job_data
{
  typedef T DataType;
  typedef S SplitterType;

  parallel_for_job_data(DataType* data, unsigned int count, void (*function)(DataType*, unsigned int), const SplitterType& splitter)
    : data(data)
    , count(count)
    , function(function)
    , splitter(splitter)
  {
  }

  DataType* data;
  unsigned int count;
  void (*function)(DataType*, unsigned int);
  SplitterType splitter;
};

template <typename JobData>
void parallel_for_job(Job* job, const void* jobData)
{
  const JobData* data = static_cast<const JobData*>(jobData);
  const JobData::SplitterType& splitter = data->splitter;
	
  if (splitter.Split<JobData::DataType>(data->count))
  {
    // split in two
    const unsigned int leftCount = data->count / 2u;
    const JobData leftData(data->data, leftCount, data->function, splitter);
    Job* left = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job<JobData>, leftData);
    jobSystem::Run(left);

    const unsigned int rightCount = data->count - leftCount;
    const JobData rightData(data->data + leftCount, rightCount, data->function, splitter);
    Job* right = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job<JobData>, rightData);
    jobSystem::Run(right);
  }
  else
  {
    // execute the function on the range of data
    (data->function)(data->data, data->count);
  }
}

Now the parallel_for implementation is able to cope with ranges to arbitrary data, also accepting a splitting strategy as an additional parameter. A valid strategy implementation only needs to provide a Split() function that decides whether a given range should be split further or not, exemplified by the following two implementations:

class CountSplitter
{
public:
  explicit CountSplitter(unsigned int count)
    : m_count(count)
  {
  }

  template <typename T>
  inline bool Split(unsigned int count) const
  {
    return (count > m_count);
  }

private:
  unsigned int m_count;
};

class DataSizeSplitter
{
public:
  explicit DataSizeSplitter(unsigned int size)
    : m_size(size)
  {
  }

  template <typename T>
  inline bool Split(unsigned int count) const
  {
    return (count*sizeof(T) > m_size);
  }

private:
  unsigned int m_size;
};

CountSplitter simply splits a range based solely on the number of elements, as we’ve seen in the initial example.

On the other hand, DataSizeSplitter is a strategy that also takes the size of the working set into account. For example, on a platform with an L1 cache size of 32KB, a DataSizeSplitter(32*1024) could be used to ensure that ranges are only split as long as their working set no longer fits into the L1 cache. The important feature here is that this works across jobs of all kinds, no matter whether they’re working on particle, animation, or culling data – the strategy does it automatically.

Invocation of the parallel_for is the same as before, except for an additional splitter argument:

Job* job = parallel_for(g_particles, 100000, &UpdateParticles, DataSizeSplitter(32*1024));
jobSystem::Run(job);
jobSystem::Wait(job);

Future work

As of now, the parallel_for implementation can only call free functions. It should be easy to upgrade the system to also make it accept member functions, lambdas, std::function, etc.

Outlook

The next part in this series is probably going to be the last one, and will go into detail about how to handle dependencies between jobs.

Advertisements

20 thoughts on “Job System 2.0: Lock-Free Work Stealing – Part 4: parallel_for

  1. Pingback: 1 – Lock-Free Work Stealing, Part 4: Parallel_for - Exploding Ads

  2. Why create a binary tree of jobs rather than just a single parent with equally-sized children? As in, the parallel_for job would just add jobs to itself in a while loop, and those child jobs would execute all contained jobs with no further splitting.

    • In case of the parallel_for, I don’t care about ordering. However, in many other cases that’s not true, e.g. playing animations must be finished before collision detection can be performed. Such jobs have dependencies which must be taken care of.

  3. I have been following these blog posts with huge interest and I have been trying to follow along as much as possible to make sure that I actually understand the code, there is however one thing I have been failing at, over and over.

    How does your GetWorkerThreadQueue() work? The only implementation I could get to work is a static array of JobQueue’s and having GetWorkerThreadQueue(unsigned int id) return a specific jobqueue depending on the ID. This is extremely annoying since I seem to have to pass an id of which thread the code is being run on through a lot of functions. Is there a better solution?

  4. I have been following these posts and the same question pops in my mind: why not use some established package that does everything for you, such as Intel TBB or similar from university research groups?

    • From the top of my head, there are several reasons for a custom solution:

      • No licensing troubles, especially in the case of TBB (GPL).
      • More light-weight, tightly integrated implementation. It is easier to make use of the engine things that are already there, e.g. debugging facilities, memory management and allocators. I don’t need to pull in cruft from other libraries.
      • Easier to change the underlying implementation, e.g. in order to use Fibers as demonstrated by Naughty Dog.
      • Easier to specialize the implementation for certain (console) platforms.
  5. Are you still planning a Part 5? I for one am very eager to see how you solved dependencies, and I can’t help but hope that you will discuss how (where?) you actually ended up implementing the job system in the engine, real world examples are great. 🙂

  6. How do you actually parallelize the creation and addition of tasks? From what I can tell, when I call parallel_for, everything that follows happens on the same thread and tasks get created & added to the thread local queue. So other threads can only Steal(), not Pop(). Am I missing something?

    • No, you’re not missing something.

      parallel_for splits the range into sub-ranges (and tasks), which pushes jobs into the thread local queue. While the recursive partitioning process itself isn’t parallelized (there wouldn’t be much to gain from doing this), splitting and running those new jobs *is* parallelized.

      Other threads that currently don’t have work to do can Steal() from the queue while the splitting process might still be going on. On the other hand, if no other thread can Steal() from our queue, there’s not much parallelization to be had to begin with, because all other threads were busy.

  7. So, I have been using this kind of job system intensively and I have implemented it both in a stateless renderer and a parallel ECS-system.

    One problem I was facing is that the worker threads never _ACTUALLY_ sleep, they are constantly listening for new jobs.

    My immediate thought was to do this in my worker threads update loop:

    while (true)
    {
    Job* job = FetchJob();
    if (job)
    {
    Execute(job);
    }
    else
    {
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    }

    But I am just now starting to investigate if I could have a shared variable (Mutex? Atomic?) which governs sleepmode, a worker thread will sleep from the moment it can’t find any more jobs until this shared variable gets flipped.

    The main thread will flip the shared variable when it pushes something into the job queue.

    Any thoughts on these implementations?

    • Yes, you should definitely use some primitive for that in a production environment, rather than busy waiting or issuing a sleep.

      You could use an event, and wake up all worker threads at the same time whenever something is pushed into the queue. With only few jobs in the queue this could lead to excessive sleeping/waking of worker threads, because not all of them will be able to steal a job.
      Another option would be to use a semaphore that gets signalled each time a new job is pushed into a queue. With a semaphore, you could optionally control how many worker thread wake up and are active at the same time.

      In any case, I think you’ll probably need to make sure that once a worker thread wakes up, it is able to steal new jobs immediately, rather than choosing a queue to steal from at random.

      • Hi Stefan, I’m still just starting to get into serious multithreaded programming, so the following might not make much sense.

        I’ve successfully (at least I hope so) implemented the Job system you kindly presented here, and I’ve been experimenting with introducing a wait condition to avoid busy looping. The idea is to wake up one worker when a job is pushed to any thread’s queue (in “run()”); but the problem is, I didn’t know where to put the “wait()”, because an awoken worker might not be able to steal the job (randomly) right away.

        So I came up with a modified counting Semaphore implementation, which seems to work – I put the “wait()” in the main worker loop, BUT this one does not decrement the counter like a Semaphore would. There’s another method to do that, and it’s called right before “execute()”, kind of like this:

        void JobSignal::wait() {
        m_lock.lock();
        while(m_count <= 0) m_condition.wait(m_lock);
        m_lock.unlock();
        }

        void JobSignal::accept() {
        m_lock.lock();
        m_count–;
        m_lock.unlock();
        }

        void JobSignal::notify() {
        m_lock.lock();
        int prevCount = m_count++;
        m_lock.unlock();

        if(prevCount == 0) m_condition.notifyOne();
        }

        Where "m_lock" is a critical section. Do you think this is feasible, or am I missing some awkward scenario / obvious mistake? I don't really know how to visualize what's happening (there IS a performance hit compared to the busy waiting version), but it seems to work fine.

      • I think the simplest solution is to let worker threads wait on a semaphore, where the semaphore gets signalled each time a new job is added to the system. The semaphore should be configured to let N (N = #worker threads) threads enter at the same time. In a busy frame with lots of jobs, worker threads will be busy executing jobs and shouldn’t have to go to sleep so often.

    • I’ve been using it for a software renderer. My worker-thread main loop uses two barriers (look in boost, but they are very simple). On startup, they wait for the start-frame barrier to be triggered, then do work until an end-of-frame flag is set. Then the main-thread waits for all threads to hit the end-frame barrier, and it loops and starts over.

      The Barrier uses a std::condition_variable, which is probably implemented on Windows similar to WaitForSingleObject(). They wake up in microseconds.

      Also, if you are making measurements, watch out for Core-Parking on mobile cpus… quite a head-scratcher for a while until I learned this term.

  8. Pingback: Job System 2.0: Lock-Free Work Stealing – Part 5: Dependencies | Molecular Musings

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s