humancode.us

GCD Concurrent Queues

August 6, 2014

GCD Logo

This is the third post in a series about Grand Central Dispatch.

If serial queues are a better replacement for mutexes (and more), then concurrent queues are a better replacement for threads (and more).

A concurrent queue allows you to enqueue blocks that will start running without waiting for the previous blocks to finish.

Run the following program several times:

#import <Foundation/Foundation.h>

void print(int number) {
    for (int count = 0; count < 10; ++count) {
        NSLog(@"%d", number);
    }
}

int main(int argc, const char * argv[]) {
    dispatch_queue_t queue = dispatch_queue_create("My concurrent queue", DISPATCH_QUEUE_CONCURRENT);

    @autoreleasepool {
        for (int index = 0; index < 5; ++index) {
            dispatch_async(queue, ^{
                print(index);
            });
        }
    }
    dispatch_main();
    return 0;
}

dispatch_async() tells GCD to enqueue a block, but not to wait until the block is done before moving on. This allows us to quickly “toss” five blocks onto the concurrent queue we just created.

When the first block is enqueued, the queue is empty, so it begins running the same way it would if the queue were serial. However, when the second block is enqueued, it too will run even though the first block hasn’t finished running. The same goes for the third block, the fourth, and the fifth—they all run at the same time.

Each enqueued block captures the value of index when the block was created, and logs it 10 times. Does the program’s output match your expectations? Why does the output change each time you run the program?

How would the program’s output be different if we swapped out the concurrent queue for a serial queue? Try it out! Change DISPATCH_QUEUE_CONCURRENT to DISPATCH_QUEUE_SERIAL and run the program again.

Queues, not Threads

You may have missed it, but the program above effortlessly created five threads of execution without touching pthread_create() or NSThread even once. Since each block in a concurrent queue must run at the same time, GCD automatically created (or commandeered) a thread to run each one of them. Once each block is done, the thread is either destroyed or returned to a thread pool. With GCD, you focus on queues, and let the library worry about threads.

Just because you don’t have to manage threads manually doesn’t mean you can ignore limits on threading. If you queue up more concurrent blocks than there are threads available for your process, your program will break, often silently.

Barriers

The natural question to ask at this point is: if concurrent queues allow all blocks to run, then why are they called “queues” at all? Aren’t they more like a heap onto which you can throw concurrent blocks?

The queue-like behavior of concurrent queues come into light when you consider barriers. Blocks that you enqueue with dispatch_barrier_sync() or dispatch_barrier_async() will cause something interesting to happen: the block will be enqueued, but not executed until all previously-enqueued blocks have finished executing. In addition, all blocks enqueued after the barrier block will not execute until the barrier block itself has finished executing. Think of barrier blocks as “choke points” that enforce serialization in an otherwise parallel set of operations.

To demonstrate barriers, take a look at the following program:

#import <Foundation/Foundation.h>

void print(int number) {
    for (int count = 0; count < 10; ++count) {
        NSLog(@"%d", number);
    }
}

int main(int argc, const char * argv[]) {
    dispatch_queue_t queue = dispatch_queue_create("My concurrent queue", DISPATCH_QUEUE_CONCURRENT);
    dispatch_suspend(queue); // Suspend the queue so blocks are enqueued, but not executed

    @autoreleasepool {
        // Enqueue five blocks
        for (int index = 0; index < 5; ++index) {
            dispatch_async(queue, ^{
                print(index);
            });
        }

        // Enqueue a barrier
        dispatch_barrier_async(queue, ^{
            NSLog(@"--- This is a barrier ---");
        });

        // Enqueue five more blocks
        for (int index = 5; index < 10; ++index) {
            dispatch_async(queue, ^{
                print(index);
            });
        }
    }
    dispatch_resume(queue); // Go!
    dispatch_main();
    return 0;
}

Run the program. Notice that prior to the barrier, only block indexes 0 through 4 are allowed to run to completion, and after the barrier, only block indexes 5 through 9 get to execute. However, within each side of the barrier, each group of five blocks are allowed to execute at the same time.

Readers-Writer Locks

In my previous blog post, I described how you can protect a set of variables from race conditions using serial queues. With that technique, only one thread can have access to the variables at a time, guaranteeing atomic behavior.

But really, we don’t have to protect the data structures against all simultaneous accesses: we only need to protect them against asynchronous changes. It’s fine (and desirable, for performance reasons) to allow multiple threads to read the data without changing it.

What we need is a readers-writer lock, where we serialize access only during writes, but allow simultaneous reads to occur.

We can easily implement a readers-writer lock using asynchronous queues and barriers. Here is an example:

#import <Foundation/Foundation.h>

dispatch_queue_t queue;

NSString *he = @"Luke";
NSString *she = @"Megan";

void printAndRepeat() {
    NSLog(@"%@ likes %@!", he, she);
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), queue, 
    // This block is dispatch_async'd to the concurrent queue after 1 second
    ^{
        printAndRepeat();
    });
}

int main(int argc, const char * argv[]) {
    @autoreleasepool {
        queue = dispatch_queue_create("Reader-writer queue", DISPATCH_QUEUE_CONCURRENT);

        // Create readers
        for (int index = 0; index < 5; ++index) {
            dispatch_async(queue, ^{
                printAndRepeat();
            });
        }

        // Change the variables after 5 seconds
        dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(5 * NSEC_PER_SEC)), dispatch_get_main_queue(), 
        // This block is enqueued onto the main queue after 5 seconds.
        ^{
            dispatch_barrier_async(queue, ^{
                he = @"Don";
                she = @"Alice";
            });
        });
    }
    dispatch_main();
    return 0;
}

You can ignore the calls to dispatch_after() for now—they simply tell GCD to enqueue a block after a period of time has elapsed.

In this example, the barrier block (the one that changes he and she to “Don” and “Alice”) is guaranteed to perform the change atomically. Because the barrier block always runs uninterrupted, you’ll never see “Luke likes Alice!” printed to your console.

Congratulations! You now know all about concurrent queues, and how they can be used to replace threads, and create efficient readers-writer locks. In the next post we will examine the Global Concurrent Queues, and queue targets. See you then!