Job System 2.0: Lock-Free Work Stealing – Part 1: Basics

Back in 2012, I wrote about the task scheduler implementation in Molecule. Three years have passed since then, and now it’s time to give the old system a long deserved lifting.

Requirements for the new job system were the following:

  • The base implementation needs to be simpler. Jobs can be quite stupid on their own, but it should be possible to build high-level algorithms such as e.g. parallel_for on top of the implementation.
  • The job system needs to implement automatic load balancing.
  • Performance improvements should be made by gradually replacing parts of the system with lock-free alternatives, where applicable.
  • The system needs to support “dynamic parallelism”: it must be possible to alter parent-child relationships and add dependencies to a job while it is still running. This is needed to allow high-level primitives such as parallel_for to dynamically split the given workload into smaller jobs.

Today, we will look at the base implementation of the new job system, using locks/critical sections. Even when using locks, there are a few pitfalls that I would like to point out before going lock-free.

The very basics

Similar to the old task scheduler, our new job system basically works as follows:

  • We have N worker threads that continually grab a job from a queue, and execute it.
  • For N cores, we create N-1 worker threads.
  • The main thread is also considered a worker thread, and can help with executing jobs.

This time around, there is one major difference though: our job system now implements a concept known as Work stealing, which means that rather than using one global queue into which all jobs are pushed, each worker thread has its own job queue. Using a global job queue creates a lot of contention, especially when several threads are involved.
Work stealing is a simple, but effective, concept:

  • New jobs are always pushed into the queue of the calling thread.
  • Whenever a worker thread wants to work on a job, it tries to pop a job from its own queue first. If there is no job in the queue, the thread tries to steal a job from one of the other worker thread’s queues.
  • The operations Push() and Pop() are only called by the worker thread that owns the queue.
  • The operation Steal() is only called by worker threads that do not own the queue.

The last two items are important, and lead to the following observations:

  • Push() and Pop() can work on one end of the queue (the private end), while Steal() works on the other end (the public end).
  • The private end can work in LIFO fashion for better utilization of the cache, while the public end works in FIFO fashion for better work balancing.

When talking about work stealing, such a double-ended data structure is often called a work-stealing queue/deque. One very important benefit of such a work-stealing queue is the fact that it is possible to implement it in a lock-free manner.

In C++, a basic implementation could look as follows:

// main function of each worker thread
while (workerThreadActive)
{
  Job* job = GetJob();
  if (job)
  {
    Execute(job);
  }
}

Job* GetJob(void)
{
  WorkStealingQueue* queue = GetWorkerThreadQueue();

  Job* job = queue->Pop();
  if (IsEmptyJob(job))
  {
    // this is not a valid job because our own queue is empty, so try stealing from some other queue
    unsigned int randomIndex = GenerateRandomNumber(0, g_workerThreadCount+1);
    WorkStealingQueue* stealQueue = g_jobQueues[randomIndex];
    if (stealQueue == queue)
    {
      // don't try to steal from ourselves
      Yield();
      return nullptr;
    }

    Job* stolenJob = stealQueue->Steal();
    if (IsEmptyJob(stolenJob))
    {
      // we couldn't steal a job from the other queue either, so we just yield our time slice for now
      Yield();
      return nullptr;
    }

    return stolenJob;
  }

  return job;
}

void Execute(Job* job)
{
  (job->function)(job, job->data);
  Finish(job);
}

I’ve deliberately left out details for the Finish() function at this point – it will be discussed later.

What is a job?

Obeying the “keep it simple” requirement, a job needs to store at least two things: a pointer to the function being executed, and an optional parent job.
Additionally, we need a counter that lets us keep track of the number of unfinished jobs for handling parent/child-relationships. And in order to avoid False Sharing, we add padding to ensure that a Job object occupies at least one whole cache line:

struct Job
{
  JobFunction function;
  Job* parent;
  int32_t unfinishedJobs; // atomic
  char padding[];
};

Note that the unfinishedJobs member is marked as being atomic. In Molecule, it is altered by using any of the Interlocked* functions on Windows. Using C++11, you could use a std::atomic type. I also left out the size of the padding array, because it’s different for 32-bit and 64-bit and clutters the code with insignificant complexity due to several sizeof() operators being involved.

The job function associated with a job accepts two parameters: The job it belongs to, and the data associated with the job.

typedef void (*JobFunction)(Job*, const void*);

Associating data with a job

One thing I didn’t like about the old task scheduler was the fact that the user had to hold on to a job’s data until the job was finished. This is not a problem when job data can be stored on the stack, but it sometimes lead to unnecessary allocations from the heap which I wanted to get rid of in the new system.
Fortunately, there is a simple solution for storing data that belongs to a job: We can store it in-place in our Job struct!

The padding array is a perfect candidate for storing job data. The array is unused, we need it anyway, so why not put it to good use? In Molecule, data associated with a job is memcpy-ied into the padding array as long as the given data fits – which is ensured by a compile-time check. If the data is too big to be stored in-place, the user can always allocate the data from the heap, and hand just a pointer to the data to the job system.

Adding jobs

Pushing jobs into the system is always done in two steps: first, a job is created. Second, the job is added to the system. Splitting this operation into two parts allows us to implement dynamic parallelism, which was one of the requirements mentioned earlier.

In C++, jobs are created using either of the following functions:

Job* CreateJob(JobFunction function)
{
  Job* job = AllocateJob();
  job->function = function;
  job->parent = nullptr;
  job->unfinishedJobs = 1;

  return job;
}

Job* CreateJobAsChild(Job* parent, JobFunction function)
{
  atomic::Increment(&parent->unfinishedJobs);

  Job* job = AllocateJob();
  job->function = function;
  job->parent = parent;
  job->unfinishedJobs = 1;

  return job;
}

Note that I’ve left out the functions accepting additional data that is memcpy-ied into the padding array.

For now, AllocateJob() simply allocates and returns a new Job object by calling new.
As can be seen, when creating a job as a child of an already existing job, the parent’s unfinishedJobs member is atomically incremented. This needs to be done atomically because other threads could be adding different jobs as children to the same job, leading to data races.

Adding a newly created job to the system is done by a call to Run():

void Run(Job* job)
{
  WorkStealingQueue* queue = GetWorkerThreadQueue();
  queue->Push(job);
}

Waiting for a job

Of course, once we’ve added a few jobs to the system, we need to be able to check if they are finished, and do something meaningful in the meantime. This is accomplished by calling Wait():

void Wait(const Job* job)
{
  // wait until the job has completed. in the meantime, work on any other job.
  while (!HasJobCompleted(job))
  {
    Job* nextJob = GetJob();
    if (nextJob)
    {
      Execute(nextJob);
    }
  }
}

Determining whether a job has completed can be done by comparing unfinishedJobs with 0. If the counter is greater than 0, either the job itself or any of its child jobs hasn’t finished so far. If the counter is zero, all associated jobs have finished.

The system in practice

The following simple example creates a bunch of single, empty jobs which are added to the system:

void empty_job(Job*, const void*)
{
}

for (unsigned int i=0; i < N; ++i)
{
  Job* job = jobSystem::CreateJob(&empty_job);
  jobSystem::Run(job);
  jobSystem::Wait(job);
}

Of course, this is inefficient because we create, run, and wait for each job in isolation. Still, this serves as a good test for measuring the job system’s overhead for creating, adding, and running jobs.

Another example creates single jobs again, but runs them as children of one root job:

Job* root = jobSystem::CreateJob(&empty_job);
for (unsigned int i=0; i < N; ++i)
{
  Job* job = jobSystem::CreateJobAsChild(root, &empty_job);
  jobSystem::Run(job);
}
jobSystem::Run(root);
jobSystem::Wait(root);

This is much more efficient, because creating and running jobs is now done in parallel to executing jobs already in the system.

Finishing and deleting jobs

We’re almost done. We still need to properly finish jobs by telling their parent that execution has finished. And we need to delete all jobs that we allocated.
You might be tempted to write the Finish() function like this:

void Finish(Job* job)
{
  const int32_t unfinishedJobs = atomic::Decrement(&job->unfinishedJobs);
  if (unfinishedJobs == 0)
  {
    if (job->parent)
    {
      Finish(job->parent);
    }

    delete job;
  }
}

We first atomically decrement our counter of unfinishedJobs. As mentioned earlier, as soon as this counter reaches 0, this job and all its children have completed, so we need to tell our parent about this. Afterwards, we can delete the job because we no longer need it. However, there is a not-so-subtle bug in there – can you spot it?

The problem is that we are not allowed to delete the job at this point. There could still be threads waiting on this exact job, calling HasJobCompleted() to check whether this job has finished already. This would lead to the thread accessing memory that no longer belongs to this process, either causing an access violation or reading garbage values.
One solution is to defer deletion of jobs to a later point in time, but you still have to be careful about this:

void Finish(Job* job)
{
  const int32_t unfinishedJobs = atomic::Decrement(&job->unfinishedJobs);
  if (unfinishedJobs == 0)
  {
    const int32_t index = atomic::Increment(&g_jobToDeleteCount);
    g_jobsToDelete[index-1] = job;

    if (job->parent)
    {
      Finish(job->parent);
    }
  }
}

I’ve inserted code that stores the jobs to be deleted in a global array, but it is still wrong. The reason for that is that as soon as the thread finishing the job decrements the unfinishedJobs member, the thread could get pre-empted. If you’re unlucky and this particular job was the root job in the example above, it would be disastrous to go ahead and try to delete all jobs stored in the array.

Of course, there is a way to do all this in a safe manner:

void Finish(Job* job)
{
  const int32_t unfinishedJobs = atomic::Decrement(&job->unfinishedJobs);
  if (unfinishedJobs == 0)
  {
    const int32_t index = atomic::Increment(&g_jobToDeleteCount);
    g_jobsToDelete[index-1] = job;

    if (job->parent)
    {
      Finish(job->parent);
    }

    atomic::Decrement(&job->unfinishedJobs);
  }
}

Note that this code decrements the counter one more time after the job has been added to the global array and the parent has been notified. Completion of a job is now signalled by unfinishedJobs being -1, not 0. After the root job has finished executing, it is safe to delete all jobs that have been allocated for this frame.

In this particular case, it would be safe to set unfinishedJobs to -1 without using an atomic instruction, but the code would need an additional compiler barrier (and memory barrier on other platforms) in order to be correct.

Implementation details

A few notes on how to implement some of the things mentioned in this post:

  • Accessing worker thread queues can most easily be done by using a thread-local index. On Windows/MSVC, this can be accomplished by either using __declspec(thread) or TlsAlloc.
  • Yielding a thread’s time slice can be done by using either _mm_pause, Sleep(1), Sleep(0), or other variants. However, you should always make sure that worker threads do not consume 100% CPU time when there is nothing to do. An Event, Semaphore or Condition Variable can be used for that.

Performance

Using the job system described above, I conducted two tests to test the performance and overhead of the system. The first test creates 65000 single, empty jobs that are run in isolation, like shown in the example above. The second test also creates 65000 jobs, but by using several parallel_for loops that recursively split their work into smaller jobs.

Performance was measured on an Intel Core i7-2600K CPU clocked at 3.4 GHz, having 4 physical cores with Hyperthreading (= 8 logical cores).

The running times are as follows:
Single jobs: 18.5 ms
parallel_for: 5.3 ms

There are a few things worth noting:

  • Using parallel_for is much more representative of practical work loads, because creating and adding jobs can be done in parallel.
  • The job system uses new and delete for allocating jobs, which is not really efficient.
  • The implementation of the work-stealing queue uses locks.

Outlook

Next time, we will look at how to get rid of new and delete, simplifying our Finish() function in the process. After that, we will tackle the lock-free implementation of the work-stealing queue. Last but not least, we will take a look at how to implement high-level algorithms such as parallel_for using this job system.

I promise we’ll be cutting the running time down to only a fraction of what we have right now.

Disclaimer

The post assumes an x86 architecture and a strong memory model. If you are not aware of the underlying implications, you are better off using C++11 and std::atomic with sequential consistency when working on other platforms.

Advertisements

44 thoughts on “Job System 2.0: Lock-Free Work Stealing – Part 1: Basics

  1. Hey!

    thanks for your blog and the great content and insights you share here! Regarding your new job system I have two questions:

    1. When I am not totally mistaken by giving the user of your system a pointer to a job there is the possibility that the data associated with the job is modified after a job has been submitted to the system. Is this correct and do you worry about that?

    2. In your system is it possible to execute jobs only on specific threads (so for example do all OS event processing the the main thread)? If yes, how do you handle this?

    Thanks and I am looking forward to the next post.

    • Thanks Bastian!

      Regarding your first question, yes, this is a possibility with the solution I presented. The user could pretty much change the job’s state, data, parent, etc. at will, which is not something I would want. I did not want to clutter the post with such details, but I would suggest only giving JobIDs to the user. A JobID can be as simple as an integer (e.g. an index into a pre-allocated array of jobs), or something similar with a 1:1 mapping between jobs and their IDs. Going from a Job* to a JobID (and vice versa) should always be O(1).

      Answering your second question: no, this is not possible in this system. I want jobs to execute & finish as fast as possible, distributing them to all available threads automatically. If you really want to do all OS event processing on a particular thread, then I would dedicate one thread to that, and not use the job system.

  2. Very insightful article, thanks!
    There are a few things concerning the job-deletion which I have not fully understand yet:

    To delete the jobs in the g_jobsToDelete list you need to wait if there are no jobs active anymore, right? So it is not possible to run a job across multiple frames?

    On the other hand you described the Finish method like the g_jobsToDelete list could be cleared at any moment, which is why job->unifinishedJobs is set to -1 to ensure that it is ready to be deleted (please correct me if wrong).
    This would mean that as soon the g_jobToDeleteCount was incremented, the list might be cleared. At this moment g_jobsToDelete[index-1] would be nullptr, so the clearing would need to wait until a job with (job->unifinishedJobs == -1) has registered itself at g_jobsToDelete[index-1]… correct?
    I am also wondering why you put the register-to-delete before the recursive finish call, and not before it.

    Though, I do not understand how to be able to clear the list while jobs are still running (or finishing) is compatible to avoid the problem with calls to HasJobCompleted you mentioned earlier.

    • To delete the jobs in the g_jobsToDelete list you need to wait if there are no jobs active anymore, right? So it is not possible to run a job across multiple frames?

      Correct. The idea is that each frame the engine runs several hundred jobs. In between, the engine sometimes has to wait for a particular job at a certain synchronization point. Such a sync point could be e.g. waiting for all render jobs to finish to kick off rendering, or waiting until all jobs from this frame have finished. At the very end of a frame, you want to delete all jobs that have been allocated.

      This means that a frame looks roughly like the following:


      SpawnJobs();
      RunJobs();
      ...
      WaitForRootJob();
      DeleteAllJobs();

      On the other hand you described the Finish method like the g_jobsToDelete list could be cleared at any moment, which is why job->unifinishedJobs is set to -1 to ensure that it is ready to be deleted (please correct me if wrong).
      This would mean that as soon the g_jobToDeleteCount was incremented, the list might be cleared. At this moment g_jobsToDelete[index-1] would be nullptr, so the clearing would need to wait until a job with (job->unifinishedJobs == -1) has registered itself at g_jobsToDelete[index-1]… correct?

      Not quite. The g_jobsToDelete list can only be cleared at the end of a frame, but you still have to be careful how you decide that a job can be deleted. Taking the example from above, consider the following:


      // end of a frame, called from the main thread
      Wait(root);
      DeleteAllJobs();

      Remember that waiting for a job to be finished means that we execute the following loop:


      void Wait(const Job* job)
      {
      // wait until the job has completed. in the meantime, work on any other job.
      while (!HasJobCompleted(job))
      {
      Job* nextJob = GetJob();
      if (nextJob)
      {
      Execute(nextJob);
      }
      }
      }

      Also remember that the Execute() function first executes the job, and then calls Finish(). Now consider the following order of events:

      • Wait(root) is called on the main thread.
      • Wait(root) determines that the root job hasn’t finished yet, so it tries to get a job, but GetJob() returns nullptr because no other job could be found.
      • Thread 2 goes ahead and executes the last remaining job (= the root).
      • Thread 2 calls Finish(), decrements unfinishedJobs, and immediately gets pre-empted.
      • The main thread calls HasJobCompleted(), sees that the job has finished, and goes ahead with calling DeleteAllJobs().
      • While the main thread tries to delete all jobs from the array, it gets pre-empted, and Thread 2 tries to access the global g_jobsToDelete array, adding a new entry. Havok ensues.

      I am also wondering why you put the register-to-delete before the recursive finish call, and not before it.

      It doesn’t really matter, you could also do it the other way around. With the situation I described above, you need to wait until a job’s unfinishedJobs counter hits -1 anyway, before you can delete it. The crucial thing is that a job can only be deleted once it has been added to the global array, and the recursive Finish() call has executed. Both need to be done entirely before we can go ahead and deem a job to be completed.

  3. Pingback: Job System 2.0: Lock-Free Work Stealing – Part 2: A specialized allocator | Molecular Musings

  4. Hi, Stefan!
    I have one question. Parent-children relation is only for grouping jobs and doesn’t determine execution order. Did I understand correctly?

  5. Pingback: Job System 2.0: Lock-Free Work Stealing – Part 3: Going lock-free | Molecular Musings

  6. Stefan,

    Thank you for this great series of articles. I was wondering how you initially start feeding the workers. You mention that Push() can only be called by the thread the owns the job queue. Does this mean that you rely on job stealing to feed other workers off of the main thread’s queue?

    • Yes, exactly.
      Once a few very high-level tasks (such as Simulate() and Render()) have been submitted from the main thread, the system takes off automatically. Jobs then “avalanche” to the worker threads automatically. Keep in mind that as soon as e.g. the main thread submits the Simulate() job, a worker thread will almost immediately steal it from the queue, and start working on it. The main thread keeps on submitting high-level stuff in the meantime.

      • Thank you for the quick reply. I wasn’t sure it was the case because I intuitively thought that using a RNG for that purpose could sometimes fail to properly feed the workers. I now realize that the workers only yield and don’t rely on a wait/notify system so they will quickly get a random number that matches the main thread’s queue.

      • I’m pretty sure the approach of choosing a random queue to steal from can be improved, and in practice you’d want to use events or condition variables in order to not busy wait and yield the whole time. Once everything is fully threaded using the job system, there should be work to do anyway, and the RNG could be improved by trying to steal from the main queue first, or trying the queue with the largest number of items, etc.

        This has to be experimented with, and depends on other circumstances as well I would imagine.

  7. Pingback: Job System 2.0: Lock-Free Work Stealing – Part 4: parallel_for | Molecular Musings

  8. Pingback: TECNOLOGÍA » Job System 2.0: Lock-Free Work Stealing – Part 4: parallel_for

  9. Pingback: SergeyMakeev/TaskScheduler – GITROOM

  10. Hi, thanks for this awesome article! I’ve got a question, regarding this part of code:

    Job* root = jobSystem::CreateJob(&empty_job);
    for (unsigned int i=0; i < N; ++i)
    {
    Job* job = jobSystem::CreateJobAsChild(root, &empty_job);
    jobSystem::Run(job);
    }
    jobSystem::Wait(root);

    Shouldn't you add the root job first before calling Wait() on it?

  11. Pingback: Job System 2.0: Lock-Free Work Stealing – Part 5: Dependencies | Molecular Musings

  12. When you were profiling how long it took, did you batch up the 65,000 jobs first and then kick off the job queue, or were they executed while they were being inserted?

  13. Hello Stefan and thanks for the article !

    I tried to implement this job system and managed to make it work. Or so I thought. Because if I try to execute 65 000 jobs, everything works fine. But if I try to execute 2000 jobs, the program end up stuck in the wait function. After some investigations, I found that the main thread’s working queue was empty (top == bottom) but not all the jobs had been executed (only one remains most of the time). After some more investigations, I start to believe that the problem comes from the way I share the queues among the threads. Each thread has its own queue in TLS but I also have a global array of WorkStealingQueue where I put the address of each thread’s queue and I believe this is where the problem is. Do you have any advice you could give me or maybe you encountered the same problem ? Any answer will be greatly appreciated.

    PS: I must tell you that I followed part 1,2 and 3 of your series on Job Systems so I have implemented the Lock-Free Deque and I use the special allocator.

    • Hmm, what platform are you working on? Do you use fibers? Be aware of TLS-specific optimizations that might break when using TLS from within fibers.

      And could you maybe post some of your code if possible? It sounds like a race condition to me which could be triggered by not setting up the chain/queue of jobs correctly.

      • Hello Stefan, thanks for the reply!

        So, as for your questions, I’m working on Windows 7 with Visual Studio 2015, I am not using fibers, I use Win32 threads created with the function CreateThread.

        Concerning the code I can post it but I think it would be better if you could tell me which function you would like to see in particular as it represents several hundreds of lines currently. Even though I use a different naming convention, my functions are pretty much named the same way as yours.

      • Can you post the code where you run the jobs and wait for them to finish? And can you show me how you set up the array of queues globally and using TLS?

  14. OK here it is. Everything starting by “ARC_” is a function or a type of my engine. As you will see, most of the code is close to yours.

    // Main.cpp
    const U32 N = 2000;
    Timer timer;
    bool bContinue = true;

    ARC_JobSystem_CreateThreads();
    while(bContinue)
    {
    timer.Start();
    ARC_Job* pRoot = ARC_JobSystem_CreateJob(&EmptyJob);
    for(U32 i = 0; i < N; ++i)
    {
    ARC_Job* pJob = ARC_JobSystem_CreateJobAsChild(pRoot, &EmptyJob);
    ARC_JobSystem_Run(pJob);
    }

    ARC_JobSystem_Run(pRoot);
    ARC_JobSystem_Wait(pRoot);

    timer.Stop();

    double dt = timer.GetElapsedTimeInMilliseconds();
    printf("Elapsed time: %lf\n", timer.GetElapsedTimeInMilliseconds());

    if(::GetAsyncKeyState(VK_ESCAPE))
    bContinue = false;
    }

    ARC_JobSystem_DestroyThreads();

    // JobSystem.cpp

    static bool g_bIsWorkerThreadActive = true;
    static HANDLE* g_pThreads = nullptr;
    static U32 g_uNumThreads = 0U;

    static const U32 ARC_MAX_JOB = 1 << 12;

    static ARC_WorkStealingQueue** g_ppJobQueues = nullptr;

    __declspec(thread) static ARC_WorkStealingQueue* g_pWorkQueue = nullptr;
    __declspec(thread) static U32 g_uNumAllocatedJobs = 0;
    __declspec(thread) static ARC_Job g_pJobAllocator[ARC_MAX_JOB] = {nullptr};

    //——————————————————————————————
    DWORD ARC_JobSystem_ThreadFunc(void* pData)
    {
    g_pWorkQueue = new ARC_WorkStealingQueue;

    ARC_WorkerThreadData* data = static_cast(pData);
    g_ppJobQueues[data->m_uThreadNumber] = g_pWorkQueue;

    Sleep(5);

    while(g_bIsWorkerThreadActive)
    {
    ARC_Job* job = ARC_JobSystem_GetJob();
    if(job)
    {
    ARC_JobSystem_ExecuteJob(job);
    }
    }

    delete g_pWorkQueue;

    return 0;
    }

    //——————————————————————————————
    bool ARC_JobSystem_CreateThreads()
    {
    SYSTEM_INFO sysInfo;
    GetSystemInfo(&sysInfo);
    g_uNumThreads = sysInfo.dwNumberOfProcessors;

    srand((unsigned) (time(nullptr)));

    // Create Handles
    g_pThreads = (HANDLE*) malloc(sizeof(HANDLE) * g_uNumThreads);
    if(g_pThreads == nullptr)
    {
    assert(0 && “Failed to create threads.”);
    return false;
    }

    // Create Work Queues.
    g_ppJobQueues = new ARC_WorkStealingQueue*[g_uNumThreads];
    if(g_ppJobQueues == nullptr)
    {
    assert(0 && “Failed to allocate Job Queues.”);
    return false;
    }

    ARC_WorkerThreadData* pDatas = (ARC_WorkerThreadData*) alloca(sizeof(ARC_WorkerThreadData) * g_uNumThreads – 1);

    // Create Threads.
    DWORD id;
    for(U32 i = 0; i < g_uNumThreads – 1; ++i)
    {
    pDatas[i].m_uThreadNumber = i;
    g_pThreads[i] = CreateThread(nullptr, 0, (LPTHREAD_START_ROUTINE) ARC_JobSystem_ThreadFunc, &pDatas[i], CREATE_SUSPENDED, &id);

    if(!g_pThreads[i])
    {
    assert(0 && "Failed to create a thread.");
    return false;
    }
    }

    for(U32 i = 0; i Pop();
    if(ARC_JobSystem_IsEmptyJob(job))
    {
    // This is not a valid job because our own queue is empty, so try stealing from other queue.
    unsigned int uRandomIndex = ARC_GenerateRandomNumber(0, g_uNumThreads-1);
    ARC_WorkStealingQueue* pStealQueue = g_ppJobQueues[uRandomIndex];
    if(pStealQueue == pQueue)
    {
    // Don’t try to steal from ourselves.
    ARC_JobSystem_Yield();
    return nullptr;
    }

    ARC_Job* pStolenJob = pStealQueue->Steal();
    if(ARC_JobSystem_IsEmptyJob(pStolenJob))
    {
    // We couldn’t steal a job from the other queue either, so we just yield our time slice for now.
    ARC_JobSystem_Yield();
    return nullptr;
    }

    return pStolenJob;
    }

    return job;
    }

    • From glancing over it, I can’t find a real error in the code you posted. The global g_bIsWorkerThreadActive is a bit fishy and not what I would recommend, but it’s certainly not what’s causing problems.

      I fear the problem lies in the implementation of the deque (or maybe the allocator). Does the bug always occur, each frame? Or randomly?

      • The bug occurs randomly, but the more job I give to the job system, the longer it takes the bug to appear. Also, I tried modifying the GetJob function in order to force it to steal a job from the main queue before choosing a random queue and in this case, the problem occurs during the first frame.

      • Hello Stefan, many thanks for the articles !

        I have some questions regarding your job system, if you don’t mind to answer them of course.

        – How should a worker thread wait for a job in its main loop (ThreadProc in winapi world) ? The code chillonmathieu posted shows a ‘while’ loop with a global boolean and you said this is not what you would recommend. What would you use ? An event or a condition variable ? And moreover, why do you consider bad practice to use a global boolean ?
        – Would your job system benefits from using fibers ? And if yes, how would you use them ?

        The last two question may be a bit vague, I hope you’ll get what I mean.
        Once again, thanks a lot for your articles !

    • You shouldn’t have to if the allocator’s ring-buffer behaviour is implemented correctly.
      What do you use for atomic operations? What for compiler & memory barriers? Can you post your deque implementation?

  15. I post this message to tell everything got sorted out. The implementation on the site is correct, my problem was related to how I used the _InterlockedIncrement function.

  16. Your site is very nice and we getting such a best articles and ideas and etc. I read this post article thanks for sharing this post. I always try to keep touch with you, Because I liked your site thank you all

  17. Pingback: Job System and ParallelFor | Krzysztof Narkowicz

  18. What does IsEmptyJob() do? Is it just a null check? I thought the queue returned null if there is no job. Is there some way a job can be non null but empty? How would this be checked?

  19. Hello Stefan!

    Many thanks for these greats articles about your new job system.
    I have two questions about some points of your job system, if you don’t mind to answer:
    – Inside the thread procedure of each worker (ThreadProc callback in winapi world), what did you used to be able to stop the threads nicely ? (I was thinking about an event in while loop but seems overkill to me)
    – You’re using TLS in your job system, but would be good if I wanted to implement your job system today to use fibers instead? I don’t really see what fibers offers in a job system (I know you didn’t talk about that in your articles, answer only if you want to!)

    Thanks again for your articles, I love your blog 🙂

    • Thanks for the kind words, glad you like the blog!

      To answer your questions:

      Inside the thread procedure of each worker (ThreadProc callback in winapi world), what did you used to be able to stop the threads nicely ?

      A condition variable. This allows you to put the threads to sleep and wake them up once a job has been pushed into the queue.

      And moreover, why do you consider bad practice to use a global boolean ?

      Because of two things: you have to make sure the write to the boolean cannot be seen by other cores before other writes (strong vs. weak memory model). This can be done by e.g. using std::atomic and memory fences, but still leaves you with the fact that you have to busy-wait to check whether the boolean state changes, which unnecessarily drains power and battery life. Your CPU is essentially sitting there, doing no real work, but using all its power.

      Regarding fibers, they have a few advantages over threads: Switching fibers is much faster than switching threads. Whenever a fiber-based task cannot be executed yet, you can store the fiber and its state somewhere, get a different fiber to execute, and then swap back to the original fiber. This cannot really be done easily with threads. With threads, you often need unbounded stack and have to wait for child threads to finish before the stack can unwind all the way to the parent – this is inefficient in terms of parallelization.
      Of course, fibers have disadvantages as well: they need to be supported by the OS, some compiler-optimizations are not fiber-aware, thread-local storage can break.

      Check out Christian Gyrling’s talk on the topic.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s