cocoa_chen

深入浅出GCD之dispatch_queue

2018-03-05

概述

dispatch_queue可以说是GCD编程中使用频率最高的API,本篇文章主要讲一下queue的相关用法和原理,关于queue的数据结构和常用定义见上篇文章。

使用篇

当我们处理耗时操作时,比如读取数据库、请求网络数据,为了避免这些耗时操作卡住UI,可将耗时任务放到子线程中,执行完成后再通知主线程更新UI,代码示例如下:

1
2
3
4
5
6
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
//耗时操作
dispatch_async(dispatch_get_main_queue(), ^{
//更新UI
});
});

当多线程并发读写同一个资源时,为了保证资源读写的正确性,可以用Barrier Block解决该问题。
Dispatch Barrier会确保队列中先于Barrier Block提交的任务都完成后再执行它,并且执行时队列不会同步执行其它任务,等Barrier Block执行完成后再开始执行其他任务。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//创建自定义并行队列
dispatch_queue_t queue = dispatch_queue_create("com.gcdTest.queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(queue, ^{
//读操作
NSLog(@"work1");
});
dispatch_barrier_async(queue, ^{
//barrier block,可用于写操作
//确保资源更新过程中不会有其他线程读取
NSLog(@"work2");
sleep(1);
});
dispatch_async(queue, ^{
//读操作
NSLog(@"work3");
});

这里有个需要注意也是官方文档上提到的一点,如果我们调用dispatch_barrier_async时将Barrier blocks提交到一个global queue,barrier blocks执行效果与dispatch_async()一致;只有将Barrier blocks提交到使用DISPATCH_QUEUE_CONCURRENT属性创建的并行queue时它才会表现的如同预期。详细原因见后续源码分析。

原理篇

dispatch_get_global_queue

dispatch_get_global_queue用于获取一个全局队列,先看一下它的源码:

1
2
3
4
5
6
7
8
9
dispatch_queue_t dispatch_get_global_queue(long priority, unsigned long flags)
{
if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
return NULL;
}
//封装调用_dispatch_get_root_queue函数
return _dispatch_get_root_queue(priority,
flags & DISPATCH_QUEUE_OVERCOMMIT);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static inline dispatch_queue_t _dispatch_get_root_queue(long priority, bool overcommit)
{
if (overcommit) switch (priority) {
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_LOW:
case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY];
}
switch (priority) {
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_LOW:
case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY];
default:
return NULL;
}
}

队列优先级有八个,分别为低、默认、高、后台以及对应的overcommit。枚举定义如下:

1
2
3
4
5
6
7
8
9
10
enum {
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY = 0, //低优先级
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY, //低优先级+overcommit
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY, //默认优先级
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY, //默认优先级+overcommit
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY, //高优先级
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY, //高优先级+overcommit
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY, //后台
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY, //后台+overcomit
};

_dispatch_get_root_queue从_dispatch_root_queues结构体中获取对应优先级的队列。最后1bit为1的代表overcommit,带有overcommit标记的队列会在任务提交时新创建一个线程处理它。

_dispatch_root_queues取出的dispatch_queue_s队列的do_ctxt字段表示queue的线程池,定义于_dispatch_root_queue_contexts结构体中,每个线程池的最大线程数限制是255。

下面看一下global queue的do_vtable结构体,它比较重要的是do_probe的调用函数_dispatch_root_queue_probe,这个函数在后续的分析中会用到。结构体定义如下:

1
2
3
4
5
6
7
8
//global queue的vtable定义
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_root, queue,
.do_type = DISPATCH_QUEUE_ROOT_TYPE,
.do_kind = "global-queue",
.do_dispose = _dispatch_pthread_root_queue_dispose, //销毁时调用
.do_probe = _dispatch_root_queue_probe, //重要,唤醒队列时调用
.do_debug = dispatch_queue_debug, //debug回调
);

dispatch_get_main_queue

该API的使用主要是在更新UI时获取dispatch_get_main_queue()并把任务提交到主队列中。它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//宏定义,返回到是_dispatch_main_q
#define dispatch_get_main_queue() \
DISPATCH_GLOBAL_OBJECT(dispatch_queue_t, _dispatch_main_q)

//main_queue结构体定义
struct dispatch_queue_s _dispatch_main_q = {
.do_vtable = DISPATCH_VTABLE(queue),
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY], //目标队列
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.dq_label = "com.apple.main-thread", //队列名
.dq_running = 1,
.dq_width = 1, //最大并发数是1,串行队列
.dq_is_thread_bound = 1, //线程绑定
.dq_serialnum = 1, //序列号为1
};

main queue设置了并发数为1,即串行队列,并且将targetq指向com.apple.root.default-overcommit-priority队列。

dispatch_queue_create

dispatch_queue_create主要用来创建自定义的队列,流程图和源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
dispatch_queue_t dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) {
//调用dispatch_queue_create_with_target
return dispatch_queue_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT);
}
//dispatch_queue_create具体实现函数
dispatch_queue_t dispatch_queue_create_with_target(const char *label,
dispatch_queue_attr_t attr, dispatch_queue_t tq) {
dispatch_queue_t dq;
//申请内存空间
dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
//初始化,设置自定义队列的基本属性,方法实现见下面
_dispatch_queue_init(dq);
if (label) {
//设置队列名
dq->dq_label = strdup(label);
}
if (attr == DISPATCH_QUEUE_CONCURRENT) {
//并行队列设置dq_width为UINT32_MAX
dq->dq_width = UINT32_MAX;
if (!tq) {
//默认targetq,优先级为DISPATCH_QUEUE_PRIORITY_DEFAULT
tq = _dispatch_get_root_queue(0, false);
}
} else {
if (!tq) {
//默认targetq,优先级为DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
// Default target queue is overcommit!
tq = _dispatch_get_root_queue(0, true);
}
}
//设置自定义队列的目标队列,dq队列的任务会放到目标队列执行
dq->do_targetq = tq;
return _dispatch_introspection_queue_create(dq);
}
//队列初始化方法
static inline void _dispatch_queue_init(dispatch_queue_t dq)
{
dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
dq->dq_running = 0; //队列当前运行时初始为0
dq->dq_width = 1; //队列并发数默认为1,串行队列
dq->dq_serialnum = dispatch_atomic_inc_orig(&_dispatch_queue_serial_numbers,
relaxed); //序列号,在_dispatch_queue_serial_numbers基础上原子性加1
}

上面的代码介绍了自定义队列是如何创建的,初始化时会将dq_width默认设置为1,即串行队列。如果外部设置attr为DISPATCH_QUEUE_CONCURRENT,将并发数改为UINT32_MAX;
自定义队列的serialnum是在_dispatch_queue_serial_numbers基础上原子性加一,即从12开始累加。1到11被保留的序列号定义如下(后续版本有改动,自定义序列从16开始累加):

1
2
3
4
5
6
7
// skip zero        //跳过0
// 1 - main_q //主队列
// 2 - mgr_q //管理队列
// 3 - mgr_root_q //管理队列的目标队列
// 4,5,6,7,8,9,10,11 - global queues //全局队列
// we use 'xadd' on Intel, so the initial value == next assigned
unsigned long volatile _dispatch_queue_serial_numbers = 12;

同时还会设置队列的target_queue,向队列提交的任务,都会被放到它的目标队列来执行。串行队列的target_queue是一个支持overcommit的全局队列,而全局队列的底层则是一个线程池。

借用一张队列的图片:

dispatch_async

dispatch_async用来异步执行任务,它的代码比较复杂,我们可以分成三个阶段来看,第一阶段是更新队列链表,第二部分是从队列取任务,第三部分则是执行任务。每个阶段都有一张流程图表示,觉得代码多的话可以直接看每个阶段对应的流程图。

首先看一下dispatch_async的入口函数:

1
2
3
4
void dispatch_async(dispatch_queue_t dq, void (^work)(void)) {
dispatch_async_f(dq, _dispatch_Block_copy(work),
_dispatch_call_block_and_release);
}

dispatch_async封装调用了dispatch_async_f函数,先将block拷贝到堆上,避免block执行前被销毁,同时传入_dispatch_call_block_and_release来保证block执行后会执行Block_release。下面看一下dispatch_async_f的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
dispatch_continuation_t dc;
if (dq->dq_width == 1) {
//如果是串行队列,执行dispatch_barrier_async_f,和当前函数的不同点在于
//.do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT)
return dispatch_barrier_async_f(dq, ctxt, func);
}
//将任务封装到dispatch_continuation_t结构体中
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
return _dispatch_async_f_slow(dq, ctxt, func);
}
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; //将vtable设置为ASYNC标志位
dc->dc_func = func;
dc->dc_ctxt = ctxt;
if (dq->do_targetq) {
//如果有do_targetq,将任务放到目标队列执行
return _dispatch_async_f2(dq, dc);
}
//将任务压入队列(FIFO)
_dispatch_queue_push(dq, dc);
}

接下来分析一下_dispatch_queue_push,这是一个宏定义,展开后的调用栈如下:

1
2
3
_dispatch_queue_push
└──_dispatch_trace_queue_push
└──_dispatch_queue_push

看一下_dispatch_queue_push的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static inline void _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t _tail) {
struct dispatch_object_s *tail = _tail._do;
//判断链表中是否已经存在节点,有的话返回YES,否则返回NO
if (!fastpath(_dispatch_queue_push_list2(dq, tail, tail))) {
//将任务放到链表头部
_dispatch_queue_push_slow(dq, tail);
}
}
//判断链表中是否已经存在节点
static inline bool _dispatch_queue_push_list2(dispatch_queue_t dq, struct dispatch_object_s *head,
struct dispatch_object_s *tail) {
struct dispatch_object_s *prev;
tail->do_next = NULL;
//将tail原子性赋值给dq->dq_items_tail,同时返回之前的值并赋给prev
prev = dispatch_atomic_xchg2o(dq, dq_items_tail, tail, release);
if (fastpath(prev)) {
//如果prev不等于NULL,直接在链表尾部添加节点
prev->do_next = head;
}
//链表中之前有元素返回YES,否则返回NO
return (prev != NULL);
}
//将节点放到链表开头
void _dispatch_queue_push_slow(dispatch_queue_t dq,
struct dispatch_object_s *obj)
{
if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
//原子性的将head存储到链表头部
dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
//唤醒global queue队列
return _dispatch_queue_wakeup_global(dq);
}
//将obj放到链表头部并执行_dispatch_wakeup函数里的dx_probe()函数
_dispatch_queue_push_list_slow2(dq, obj);
}

由上面的代码可以看出_dispatch_queue_push分为两种情况:
1、如果队列的链表不为空,将节点添加到链表尾部,即dq->dq_item_tail=dc。然后队列会按先进先出(FIFO)来处理任务。
2、如果队列此时为空,进入到_dispatch_queue_push_slow函数。如果队列是全局队列会进入if分支,原子性的将节点添加到队列开头,并执行_dispatch_queue_wakeup_global唤醒全局队列;如果队列是主队列或自定义串行队列if分支判断不成立,执行_dispatch_queue_push_list_slow2函数,它会将节点添加到队列开头并执行_dispatch_wakeup函数唤醒队列。

dispatch_async第一阶段的工作主要是封装外部任务并添加到队列的链表中,可以用下图来表示:
WX20180403-082846

接着来看队列唤醒的逻辑,主要分成主队列和全局队列的唤醒和任务执行逻辑:
1、如果是主队列,会先调用_dispatch_wakeup唤醒队列,然后执行_dispatch_main_queue_wakeup函数来唤醒主线程的Runloop,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
dispatch_queue_t _dispatch_wakeup(dispatch_object_t dou) {
if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
return NULL;
}
//_dispatch_queue_probe判断dq_items_tail是否为空,if分支不成立
if (!dx_probe(dou._do)) {
return NULL;
}
//如果dou._do->do_suspend_cnt==0,返回YES,否则返回NO;
//同时将DISPATCH_OBJECT_SUSPEND_LOCK赋值给dou._do->do_suspend_cnt
if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
DISPATCH_OBJECT_SUSPEND_LOCK, release)) {
//因为主线程do_suspend_cnt非0,所以主线程if分支判断成功
#if DISPATCH_COCOA_COMPAT
if (dou._dq == &_dispatch_main_q) {
//主队列的任务执行和Runloop关联,唤醒主队列
return _dispatch_main_queue_wakeup();
}
#endif
return NULL;
}
//放到目标队列中,重新走_dispatch_queue_push方法
_dispatch_retain(dou._do);
dispatch_queue_t tq = dou._do->do_targetq;
_dispatch_queue_push(tq, dou._do);
return tq;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//唤醒主线程Runloop
static dispatch_queue_t _dispatch_main_queue_wakeup(void) {
dispatch_queue_t dq = &_dispatch_main_q;
if (!dq->dq_is_thread_bound) {
return NULL;
}
//只初始化一次mach_port_t
dispatch_once_f(&_dispatch_main_q_port_pred, dq,
_dispatch_runloop_queue_port_init);
_dispatch_runloop_queue_wakeup_thread(dq);
return NULL;
}
//唤醒runloop
static inline void _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq) {
mach_port_t mp = (mach_port_t)dq->do_ctxt;
if (!mp) {
return;
}
//唤醒主线程的runloop
kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
switch (kr) {
case MACH_SEND_TIMEOUT:
case MACH_SEND_TIMED_OUT:
case MACH_SEND_INVALID_DEST:
break;
default:
(void)dispatch_assume_zero(kr);
break;
}
}

当我们调用 dispatch_async(dispatch_get_main_queue(), block) 时,libDispatch 向主线程的 RunLoop 发送消息,RunLoop会被唤醒,并从消息中取得这个 block,并在回调 CFRUNLOOP_IS_SERVICING_THE_MAIN_DISPATCH_QUEUE() 里执行这个 block。用Xcode在block处打断点就会看到下图中的调用栈:

20180309162515

2、如果是全局队列,调用_dispatch_queue_wakeup_global函数,它封装调用了核心函数_dispatch_queue_wakeup_global_slow,调用栈和核心代码如下:

1
2
3
_dispatch_queue_wakeup_global_slow
└──_dispatch_queue_wakeup_global2
└──_dispatch_queue_wakeup_global_slow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static void _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) {
static dispatch_once_t pred;
dispatch_root_queue_context_t qc = dq->do_ctxt;
uint32_t i = n;
int r;

_dispatch_debug_root_queue(dq, __func__);
//初始化dispatch_root_queue_context_s
dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);

#if DISPATCH_USE_PTHREAD_POOL
//为了防止有些timer每隔一分钟调用,线程执行任务后会有65s的超时用来等待signal唤醒
//降低线程频繁创建销毁的性能消耗
if (fastpath(qc->dgq_thread_mediator)) {
while (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
if (!--i) {
return;
}
}
}
//检测线程池可用大小,如果还有,则将线程池减一
uint32_t j, t_count = qc->dgq_thread_pool_size;
do {
if (!t_count) {
//线程池已达到最大使用量
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
j = i > t_count ? t_count : i;
} while (!dispatch_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
t_count - j, &t_count, relaxed));
//创建新的线程,入口函数是_dispatch_worker_thread
do {
_dispatch_retain(dq);
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
if (!attr) {
r = pthread_detach(*pthr);
(void)dispatch_assume_zero(r);
}
} while (--j);
#endif // DISPATCH_USE_PTHREAD_POOL
}

创建新的线程后执行_dispatch_worker_thread函数,代码简化后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void * _dispatch_worker_thread(void *context) {
const int64_t timeout = (pqc ? 5ull : 65ull) * NSEC_PER_SEC;
//为了防止有些timer每隔一分钟调用,线程执行任务后会有65s的超时用来等待signal唤醒
//降低线程频繁创建销毁的性能消耗
do {
//取出一个任务并执行
_dispatch_root_queue_drain(dq);
} while (dispatch_semaphore_wait(qc->dgq_thread_mediator,
dispatch_time(0, timeout)) == 0);
//将线程池加一
(void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, relaxed);
_dispatch_queue_wakeup_global(dq);
_dispatch_release(dq);

return NULL;
}

从队列取任务的入口是_dispatch_root_queue_drain函数,简化的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void _dispatch_root_queue_drain(dispatch_queue_t dq) {
_dispatch_thread_setspecific(dispatch_queue_key, dq);

#if DISPATCH_COCOA_COMPAT
// ensure that high-level memory management techniques do not leak/crash
if (dispatch_begin_thread_4GC) {
dispatch_begin_thread_4GC();
}
//autoreleasepool的push操作
void *pool = _dispatch_autorelease_pool_push();
#endif // DISPATCH_COCOA_COMPAT

_dispatch_perfmon_start();
struct dispatch_object_s *item;
//取出队列的头部节点(FIFO)
while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
//对取出的内容进行处理,核心函数
_dispatch_continuation_pop(item);
}
_dispatch_perfmon_end();

#if DISPATCH_COCOA_COMPAT
//autoreleasepool的pop操作
_dispatch_autorelease_pool_pop(pool);
if (dispatch_end_thread_4GC) {
dispatch_end_thread_4GC();
}
#endif // DISPATCH_COCOA_COMPAT

_dispatch_thread_setspecific(dispatch_queue_key, NULL);
}

队列唤醒后的工作主要是用线程池(全局队列)或者唤醒Runloop(主队列)的方式从队列的链表中依次取出要执行的任务,流程图如下:
WX20180403-085535

队列的任务取出之后就是核心的执行逻辑了,也就是_dispatch_continuation_pop函数的逻辑,代码和流程图如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static inline void _dispatch_continuation_pop(dispatch_object_t dou) {
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_group_t dg;

_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
//判断传入的内容是不是队列,如果是的话执行_dispatch_queue_invoke函数,否的话就是block型的
//任务,直接执行block即可
//dispatch_barrier_async到自定义并行队列时,dou._do是用户创建的自定义queue,此时会执行
//_dispatch_queue_invoke,并且用信号量保证barrier的任务不会和其他任务同时执行,后续分析
if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
return dx_invoke(dou._do);
}
//判断是否带有DISPATCH_OBJ_ASYNC_BIT标志位
if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
dc1 = _dispatch_continuation_free_cacheonly(dc);
} else {
dc1 = NULL;
}
//判断是否是group
if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
dg = dc->dc_data;
} else {
dg = NULL;
}
//dispatch_continuation_t结构体,执行dc->dc_func(dc->ctxt)
//本质是调用Block_layout结构体的invoke执行block的实现代码
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
if (dg) {
//如果是群组执行dispatch_group_leave
dispatch_group_leave(dg);
_dispatch_release(dg);
}
_dispatch_introspection_queue_item_complete(dou);
if (slowpath(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
}

WX20180403-090039

总结一下:dispatch_async的流程是用链表保存所有提交的block,然后在底层线程池中,依次取出block并执行;而向主队列提交block则会向主线程的Runloop发送消息并唤醒Runloop,接着会在回调函数中取出block并执行。

dispatch_sync

了解了dispatch_async的逻辑后,再来看下dispatch_sync的实现和流程。dispatch_sync主要封装调用了dispatch_sync_f函数,看一下具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
if (fastpath(dq->dq_width == 1)) {
//串行队列执行同步方法
return dispatch_barrier_sync_f(dq, ctxt, func);
}
if (slowpath(!dq->do_targetq)) {
//global queue不要求执行顺序,直接执行具体的block
// the global concurrent queues do not need strict ordering
(void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
return _dispatch_sync_f_invoke(dq, ctxt, func);
}
//并发队列压入同步方法
_dispatch_sync_f2(dq, ctxt, func);
}

由上面的代码可以看出,后续逻辑主要分为两种情况:

1、向串行队列提交同步任务,执行dispatch_barrier_sync_f函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
void dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func) {
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
}
_dispatch_barrier_sync_f_invoke(dq, ctxt, func);
}

如果队列无任务执行,调用_dispatch_barrier_sync_f_invoke执行任务。_dispatch_barrier_sync_f_invoke代码逻辑展开后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func) {
//任务执行核心逻辑,将当前线程的dispatch_queue_key设置为dq,然后执行block,
//执行完之后再恢复到之前的old_dq
dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
_dispatch_thread_setspecific(dispatch_queue_key, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);

//如果队列中存在其他任务,用信号量的方法唤醒,然后继续执行下一个任务
if (slowpath(dq->dq_items_tail)) {
return _dispatch_barrier_sync_f2(dq);
}
if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
_dispatch_wakeup(dq);
}
}

如果队列存在其他任务或者被挂起,调用_dispatch_barrier_sync_f_slow函数,等待该队列的任务执行完之后用信号量通知队列继续执行任务。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func) {
_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
struct dispatch_continuation_s dc = {
.dc_data = dq,
.dc_func = func,
.dc_ctxt = ctxt,
.dc_other = (void*)sema,
};
struct dispatch_continuation_s dbss = {
.do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
DISPATCH_OBJ_SYNC_SLOW_BIT),
.dc_func = _dispatch_barrier_sync_f_slow_invoke,
.dc_ctxt = &dc,
#if DISPATCH_INTROSPECTION
.dc_data = (void*)_dispatch_thread_self(),
#endif
};
//使用信号量等待其他任务执行完成
_dispatch_queue_push(dq, &dbss);
_dispatch_thread_semaphore_wait(sema); // acquire
_dispatch_put_thread_semaphore(sema);
//收到signal信号,继续执行当前任务
if (slowpath(dq->do_targetq->do_targetq)) {
_dispatch_function_recurse(dq, ctxt, func);
} else {
_dispatch_function_invoke(dq, ctxt, func);
}
}

2、向并发队列提交同步任务,执行_dispatch_sync_f2函数。如果队列存在其他任务,或者队列被挂起,或者有正在执行的任务,则调用_dispatch_sync_f_slow函数,使用信号量等待,否则直接调用_dispatch_sync_f_invoke执行任务。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static inline void _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_sync_f_slow(dq, ctxt, func, false);
}
uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
// re-check suspension after barrier check <rdar://problem/15242126>
if (slowpath(running & 1) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
return _dispatch_sync_f_slow(dq, ctxt, func, running == 0);
}
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_sync_f_recurse(dq, ctxt, func);
}
_dispatch_sync_f_invoke(dq, ctxt, func);
}
//队列存在其他任务|队列被挂起|有正在执行的任务,信号等待
static void _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
bool wakeup) {
_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
struct dispatch_continuation_s dss = {
.do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
.dc_func = func,
.dc_ctxt = ctxt,
.dc_data = (void*)_dispatch_thread_self(),
.dc_other = (void*)sema,
};
_dispatch_queue_push_wakeup(dq, &dss, wakeup);
//信号等待
_dispatch_thread_semaphore_wait(sema);
_dispatch_put_thread_semaphore(sema);
//信号唤醒,执行同步任务
if (slowpath(dq->do_targetq->do_targetq)) {
_dispatch_function_recurse(dq, ctxt, func);
} else {
_dispatch_function_invoke(dq, ctxt, func);
}
if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
_dispatch_wakeup(dq);
}
}

dispatch_sync的逻辑主要是将任务放入队列,并用线程专属信号量做等待,保证每次只会有一个block在执行。流程图如下:

WX20180403-094052

dispatch_barrier_async

dispatch_barrier_async是开发中解决多线程读写同一个资源比较好的方案,接下来看一下它的实现。
该函数封装调用了dispatch_barrier_async_f,它和dispatch_async_f类似,不同点在于vtable多了DISPATCH_OBJ_BARRIER_BIT标志位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func) {
dispatch_continuation_t dc;
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
return _dispatch_barrier_async_f_slow(dq, ctxt, func);
}
//设置do_vtable的标志位,从队列中取任务时会用到
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
dc->dc_func = func;
dc->dc_ctxt = ctxt;

_dispatch_queue_push(dq, dc);
}

dispatch_barrier_async如果传入的是global queue,在唤醒队列时会执行_dispatch_queue_wakeup_global函数,故执行效果同dispatch_async一致,验证了使用篇中的备注内容;
dispatch_barrier_async传的queue为自定义队列时,_dispatch_continuation_pop参数是自定义的queue,然后在_dispatch_continuation_pop中执行自定义队列的dx_invoke函数,即dispatch_queue_invoke。它的调用栈是:

1
2
3
4
_dispatch_queue_invoke
└──_dispatch_queue_class_invoke
└──dispatch_queue_invoke2
└──_dispatch_queue_drain

重点看一下_dispatch_queue_drain函数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
_dispatch_thread_semaphore_t _dispatch_queue_drain(dispatch_object_t dou) {
dispatch_queue_t dq = dou._dq, orig_tq, old_dq;
old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
struct dispatch_object_s *dc, *next_dc;
_dispatch_thread_semaphore_t sema = 0;
orig_tq = dq->do_targetq;
_dispatch_thread_setspecific(dispatch_queue_key, dq);

while (dq->dq_items_tail) {
dc = _dispatch_queue_head(dq);
do {
if (DISPATCH_OBJECT_SUSPENDED(dq)) {
//barrier block执行时修改了do_suspend_cnt导致此时为YES
//保证barrier block执行时其他block不会同时执行
goto out;
}
if (dq->dq_running > dq->dq_width) {
goto out;
}
bool redirect = false;
if (!fastpath(dq->dq_width == 1)) {
if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
(long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
if (dq->dq_running > 1) {
goto out;
}
} else {
redirect = true;
}
}
next_dc = _dispatch_queue_next(dq, dc);
if (redirect) {
_dispatch_continuation_redirect(dq, dc);
continue;
}
//barrier block之前的block已经执行完,开始执行barrier block
if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) {
goto out;
}
_dispatch_continuation_pop(dc);
_dispatch_perfmon_workitem_inc();
} while ((dc = next_dc));
}
out:
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
return sema;
}

在while循环中依次取出任务并调用_dispatch_continuation_redirect函数,使得block并发执行。当遇到DISPATCH_OBJ_BARRIER_BIT标记时,会修改do_suspend_cnt标志以保证后续while循环时直接goto out。barrier block的任务执行完之后_dispatch_queue_class_invoke会将do_suspend_cnt重置回去,所以barrier block之后的任务会继续执行。

dispatch_barrier_async的流程见下图:
WX20180403-102645

总结篇

dispatch_async将任务添加到队列的链表中并唤醒队列,全局队列唤醒时中会从线程池里取出可用线程,如果没有则会新建线程,然后在线程中执行队列取出的任务;主队列会唤醒主线程的Runloop,然后在Runloop循环中通知GCD执行主队列提交的任务。

dispatch_sync一般都在当前线程执行,如果是主队列的任务还是会切换到主线程执行。它使用线程信号量来实现串行执行的功能。

如果我们调用dispatch_barrier_async时将Barrier blocks提交到一个global queue,barrier blocks执行效果与dispatch_async()一致;只有将Barrier blocks提交到使用DISPATCH_QUEUE_CONCURRENT属性创建的queue时它才会表现的如同预期。

队列的流程比较复杂,上述分析难免有遗漏或者理解不到位的地方,请指正学习。

Tags: GCD
使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章