一、概述
dispatch_async
是用于将任务异步提交到指定调度队列的函数。它允许调用者线程继续执行,而不必等待任务的完成。
dispatch_async
特点:
- 会开启新线程
dispatch_async
提交的任务不会阻塞调用者线程。调用者线程可以继续执行后续代码,而任务会被异步地添加到队列中。
- 无论将任务提交到并发队列还是串行队列,都会开启新线程。
- 主队列除外,提交到主队列的任务,会在主线程执行。
- 串行队列在有任务需要执行时,只会开启一个线程去执行任务。
- 任务顺序
- 串行队列
- 在串行队列上,
dispatch_async
提交的任务会按照提交的顺序依次执行,确保任务之间的顺序性。
- 并发队列
- 在并发队列上,
dispatch_async
提交的任务可以并行执行,具体执行顺序取决于系统的线程调度。
使用示例:
1 2 3 4 5 6 7 8
| dispatch_queue_t queue = dispatch_queue_create("com.lixkit.concurrentQueue", DISPATCH_QUEUE_CONCURRENT); dispatch_async(queue, ^{ });
dispatch_async(queue, ^{ });
|
二、dispatch_async
dispatch_async
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| void dispatch_async(dispatch_queue_t dq, dispatch_block_t work) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME; dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags); }
|
上述主要逻辑如下:
- 获取
continuation
对象
_dispatch_continuation_alloc
- 初始化
continuation
对象(为 continuation
中各成员赋值)
_dispatch_continuation_init
- 提交
continuation
到指定的队列
_dispatch_continuation_async
接下来,基于源码分别看下这几部分逻辑。
三、 _dispatch_continuation_alloc
_dispatch_continuation_alloc
函数用于获取 continuation
对象,continuation
对象是对任务的包装,其中存储了任务、优先级、上下文等信息。
continuation
是 dispatch_continuation_t
类型,dispatch_continuation_t
是一个指向 dispatch_continuation_s
结构体的指针,dispatch_continuation_s
的定义如下:
1 2 3
| typedef struct dispatch_continuation_s { DISPATCH_CONTINUATION_HEADER(continuation); } *dispatch_continuation_t;
|
将其涉及的宏完全展开后,结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| typedef struct dispatch_continuation_s { union { const void *__ptrauth_objc_isa_pointer do_vtable; uintptr_t dc_flags; };
union { pthread_priority_t dc_priority; int dc_cache_cnt; uintptr_t dc_pad; };
struct dispatch_continuation_s *volatile do_next;
struct voucher_s *dc_voucher;
dispatch_function_t dc_func;
void *dc_ctxt;
void *dc_data;
void *dc_other; } *dispatch_continuation_t;
|
从 dispatch_continuation_s
结构可以看出,dispatch_continuation_s
是一个链表结构,其中的 do_next
则是指向下一个 continuation
。
_dispatch_continuation_alloc
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| static inline dispatch_continuation_t _dispatch_continuation_alloc(void) { dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly(); if (unlikely(!dc)) { return _dispatch_continuation_alloc_from_heap(); } return dc; }
|
该函数主要逻辑为先从缓存中读取 continuation
对象,如果缓存不存在,再去新建一个 continuation
对象。
这里重点看下 continuation
缓存读取的逻辑,根据上述源码可知,缓存的读取是调用 _dispatch_continuation_alloc_cacheonly
函数获取的,该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| static inline dispatch_continuation_t _dispatch_continuation_alloc_cacheonly(void) { dispatch_continuation_t dc = (dispatch_continuation_t) _dispatch_thread_getspecific(dispatch_cache_key); if (likely(dc)) { _dispatch_thread_setspecific(dispatch_cache_key, dc->do_next); } return dc; }
|
前面提到 continuation
是一个链表结构,而其中 do_next
则是指向当前节点的下一个节点。
可以看到,上述主要逻辑是先从线程从当前线程存储空间中获取一个缓存的 continuation
。如果取到,则将缓存更新为 continuation
的下个节点对象 do_next
。
对于缓存不存在时新建的那个 continuation
,也会在使用完释放时存入缓存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| static inline void _dispatch_continuation_free(dispatch_continuation_t dc) { dc = _dispatch_continuation_free_cacheonly(dc); }
static inline dispatch_continuation_t _dispatch_continuation_free_cacheonly(dispatch_continuation_t dc) { _dispatch_thread_setspecific(dispatch_cache_key, dc); return NULL; }
|
_dispatch_continuation_alloc
函数主要逻辑如下:
- 从当前线程存储空间中获取一个缓存的
continuation
对象 dc
。
- 如果读取成功,则将当前线程缓存的
continuation
对象设置为 dc
的下一个对象(dc->do_next
)。
- 如果缓存没有读取到,则创建新的
continuation
。
- 这个新建的
continuation
在使用完成后,设置成线程缓存的 continuation
。
结合上述源码可以知道,每个线程都利用 continuation
链表实现了一个 continuation
缓存池,continuation
不再使用后将会放入缓存池待下次使用,这么设计有下面两个好处:
- 通过重用这些对象,可以减少频繁创建和销毁对象的开销,提高性能。
- 每个线程维护自己的缓存,可以减少多线程同时分配内存时的锁竞争。
四、_dispatch_continuation_init
_dispatch_continuation_init
函数用于初始化 continuation
对象,该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| static inline dispatch_qos_t _dispatch_continuation_init(dispatch_continuation_t dc, dispatch_queue_class_t dqu, dispatch_block_t work, dispatch_block_flags_t flags, uintptr_t dc_flags) { void *ctxt = _dispatch_Block_copy(work);
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) { dc->dc_flags = dc_flags; dc->dc_ctxt = ctxt; return _dispatch_continuation_init_slow(dc, dqu, flags); }
dispatch_function_t func = _dispatch_Block_invoke(work);
if (dc_flags & DC_FLAG_CONSUME) { func = _dispatch_call_block_and_release; }
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags); }
|
上述逻辑,会先调用 _dispatch_block_has_private_data
函数检查 block
是否包含私有数据,关于 _dispatch_block_has_private_data
函数,《GCD 底层原理 3 - dispatch_sync》 中有解释,我们常规使用传入的 block
是不包含私有数据的。所以最终会执行 _dispatch_continuation_init_f
函数。
_dispatch_continuation_init_f
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| static inline dispatch_qos_t _dispatch_continuation_init_f(dispatch_continuation_t dc, dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f, dispatch_block_flags_t flags, uintptr_t dc_flags) { pthread_priority_t pp = 0; dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED; dc->dc_func = f; dc->dc_ctxt = ctxt;
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) { pp = _dispatch_priority_propagate(); }
_dispatch_continuation_voucher_set(dc, flags); return _dispatch_continuation_priority_set(dc, dqu, pp, flags); }
|
可以看出,该函数主要是为 continuation
各成员进行赋值,例如 dc_flags
、dc_func
、dc_voucher
、dc_priority
等。
该函数中涉及到对 _dispatch_priority_propagate
、_dispatch_continuation_voucher_set
、_dispatch_continuation_priority_set
的函数调用,接下来分别分析下三个函数的实现逻辑。
1、_dispatch_priority_propagate
先看下 _dispatch_continuation_init_f
函数源码中优先级 pp
的赋值,从源码可知调用该函数时,传入的 flags
参数值是 0
,所以最终会调用 _dispatch_priority_propagate
函数,该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| static inline pthread_priority_t _dispatch_priority_propagate(void) { return _dispatch_priority_compute_propagated(0, DISPATCH_PRIORITY_PROPAGATE_CURRENT); }
static inline pthread_priority_t _dispatch_priority_compute_propagated(pthread_priority_t pp, unsigned int flags) { if (flags & DISPATCH_PRIORITY_PROPAGATE_CURRENT) { pp = _dispatch_get_priority(); } pp = _pthread_priority_strip_all_flags(pp); if (!(flags & DISPATCH_PRIORITY_PROPAGATE_FOR_SYNC_IPC) && pp > _dispatch_qos_to_pp(DISPATCH_QOS_USER_INITIATED)) { return _dispatch_qos_to_pp(DISPATCH_QOS_USER_INITIATED); } return pp; }
static inline pthread_priority_t _dispatch_get_priority(void) { pthread_priority_t pp = (uintptr_t) _dispatch_thread_getspecific(dispatch_priority_key); return pp; }
|
结合函数调用时的参数值,总结上述主要逻辑如下:
- 从当前线程存储空间中获取当前线程优先级,并赋值给
pp
。
- 移除优先级中的所有标志位,只保留纯粹的优先级值。
- 如果优先级高于
USER_INITIATED
级别,则限制为 USER_INITIATED
。
这里的优先级 pp
是 pthread_priority_t
类型,它和 Qos
什么关系呢?
pthread_priority_t
和 QoS
(Quality of Service) 有密切的关系。在 GCD 中,它们用于表示任务的优先级和服务质量。它们之间的关系如下:
pthread_priority_t
是一个更底层的表示,它包含了 QoS
信息以及其他一些标志位。
QoS
是一个更高级的抽象,它定义了几个离散的优先级级别,如 USER_INTERACTIVE
、USER_INITIATED
、UTILITY
等。
pthread_priority_t
可以通过位操作来包含 QoS
信息。通常,QoS
值被编码在 pthread_priority_t
的高位字节中。
- 可以使用一些宏或函数在
pthread_priority_t
和 QoS
之间进行转换,例如:1 2 3 4
| dispatch_qos_t qos = _dispatch_qos_from_pp(pthread_priority);
pthread_priority_t pp = _dispatch_qos_to_pp(qos);
|
pthread_priority_t
除了包含 QoS
信息外,还可以包含其他标志,如相对优先级、覆盖标志等。
- 在实际使用中,通常会使用
QoS
级别来设置任务优先级,而系统内部会将其转换为 pthread_priority_t
来进行更细粒度的调度控制。
可以通过一个简单示例,打印下默认 Qos
:

可以看到,默认 Qos
是 USER_INITIATED
。
2、_dispatch_continuation_voucher_set
_dispatch_continuation_init_f
函数中,会调用 _dispatch_continuation_voucher_set
函数为 continuation
的 dc_voucher
赋值,根据前述 dispatch_continuation_s
结构可知,dc_voucher
是 voucher_s
类型。
完全展开后的 voucher_s
结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| typedef struct voucher_s {
struct voucher_vtable_s *__ptrauth_objc_isa_pointer os_obj_isa;
int volatile os_obj_ref_cnt;
int volatile os_obj_xref_cnt;
struct voucher_hash_entry_s { uintptr_t vhe_next; uintptr_t vhe_prev_ptr; } v_list;
mach_voucher_t v_kvoucher; mach_voucher_t v_ipc_kvoucher;
voucher_t v_kvbase;
firehose_activity_id_t v_activity;
uint64_t v_activity_creator;
firehose_activity_id_t v_parent_activity;
unsigned int v_kv_has_importance:1;
#if VOUCHER_ENABLE_RECIPE_OBJECTS
size_t v_recipe_extra_offset; mach_voucher_attr_recipe_size_t v_recipe_extra_size; #endif } voucher_s;
|
voucher_s
不是本次源码分析的重点,且涉及内容较多,不再逐个字段分析。
_dispatch_continuation_voucher_set
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| static inline void _dispatch_continuation_voucher_set(dispatch_continuation_t dc, dispatch_block_flags_t flags) { voucher_t v = NULL;
dispatch_assert(!(flags & DISPATCH_BLOCK_HAS_VOUCHER));
if (!(flags & DISPATCH_BLOCK_NO_VOUCHER)) { v = _voucher_copy(); } dc->dc_voucher = v; _dispatch_voucher_debug("continuation[%p] set", dc->dc_voucher, dc); _dispatch_voucher_ktrace_dc_push(dc); }
|
其中,_voucher_copy
函数用于复制当前线程的 voucher
,其相关实现逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static inline voucher_t _voucher_copy(void) { voucher_t voucher = _voucher_get(); if (voucher) _voucher_retain(voucher); return voucher; }
static inline voucher_t _voucher_get(void) {
return _dispatch_thread_getspecific(dispatch_voucher_key); }
|
根据源码可知,上述核心逻辑为读取当前线程的 voucher
,并赋值给 dc_voucher
。
3、_dispatch_continuation_priority_set
该函数主要是为 continuation
的 dc_priority
赋值,并返回对应 QoS
,函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| static inline dispatch_qos_t _dispatch_continuation_priority_set(dispatch_continuation_t dc, dispatch_queue_class_t dqu, pthread_priority_t pp, dispatch_block_flags_t flags) { dispatch_qos_t qos = DISPATCH_QOS_UNSPECIFIED;
#if HAVE_PTHREAD_WORKQUEUE_QOS dispatch_queue_t dq = dqu._dq;
if (likely(pp)) { bool enforce = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS); bool is_floor = (dq->dq_priority & DISPATCH_PRIORITY_FLAG_FLOOR); bool dq_has_qos = (dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK);
if (enforce) { pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG; qos = _dispatch_qos_from_pp_unsafe(pp); } else if (!is_floor && dq_has_qos) { pp = 0; } else { qos = _dispatch_qos_from_pp_unsafe(pp); } }
dc->dc_priority = pp; #else (void)dc; (void)dqu; (void)pp; (void)flags; #endif
return qos; }
|
五、_dispatch_continuation_async
在前面完成 continuation
的初始化之后,将会调用 _dispatch_continuation_async
函数将初始化好的 continuation
异步提交到指定的队列。
_dispatch_continuation_async
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static inline void _dispatch_continuation_async(dispatch_queue_class_t dqu, dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags) { #if DISPATCH_INTROSPECTION if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_push(dqu, dc); } #else (void)dc_flags; #endif
return dx_push(dqu._dq, dc, qos); }
|
上述实现中,重点是对 dx_push
这个宏的调用,其他的都是日志、调试相关。
dx_push
宏定义如下:
1
| #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
|
可以知道,这里是访问队列虚表的 dq_push
函数进行调用。结合在另篇文章《GCD 底层原理 2 - dispatch_queue》 中的内容可知,串行队列、并发队列的虚表如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| const struct dispatch_lane_vtable_s _dispatch_queue_serial_vtable = { ._os_obj_xref_dispose = _dispatch_xref_dispose, ._os_obj_dispose = _dispatch_dispose, ._os_obj_vtable = { .do_kind = "queue_serial", .do_type = DISPATCH_QUEUE_SERIAL_TYPE, .do_dispose = _dispatch_lane_dispose, .do_debug = _dispatch_queue_debug, .do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate, .dq_wakeup = _dispatch_lane_wakeup, .dq_push = _dispatch_lane_push, }, };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| const struct dispatch_lane_vtable_s _dispatch_queue_concurrent_vtable = { ._os_obj_xref_dispose = _dispatch_xref_dispose, ._os_obj_dispose = _dispatch_dispose, ._os_obj_vtable = { .do_kind = "queue_concurrent", .do_type = DISPATCH_QUEUE_CONCURRENT_TYPE, .do_dispose = _dispatch_lane_dispose, .do_debug = _dispatch_queue_debug, .do_invoke = _dispatch_lane_invoke, .dq_activate = _dispatch_lane_activate, .dq_wakeup = _dispatch_lane_wakeup, .dq_push = _dispatch_lane_concurrent_push, }, }
|
所以,前面对 dx_push
的调用:
- 串行队列调用的是
_dispatch_lane_push
函数。
- 并发队列调用的是
_dispatch_lane_concurrent_push
函数。
接下来,分别看下将任务提交到串行队列、并发队列的逻辑。
1、提交到串行队列
(1)_dispatch_lane_push
dispatch_async
将任务提交到串行队列时,调用 dx_push
实际调用的是 _dispatch_lane_push
函数,该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| void _dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou, dispatch_qos_t qos) { dispatch_wakeup_flags_t flags = 0; struct dispatch_object_s *prev;
if (unlikely(_dispatch_object_is_waiter(dou))) { return _dispatch_lane_push_waiter(dq, dou._dsc, qos); }
dispatch_assert(!_dispatch_object_is_global(dq)); qos = _dispatch_queue_push_qos(dq, qos);
prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next); if (unlikely(os_mpsc_push_was_empty(prev))) { _dispatch_retain_2_unsafe(dq); flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY; } else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
_dispatch_retain_2_unsafe(dq); flags = DISPATCH_WAKEUP_CONSUME_2; }
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
if (flags) { return dx_wakeup(dq, qos, flags); } }
|
该函数的参数 dq
是 dispatch_lane_t
类型,在另篇文章《GCD 底层原理 2 - dispatch_queue》 中分析了 dispatch_lane_t
结构:
1 2 3 4 5 6 7 8 9
| struct dispatch_lane_s {
struct dispatch_object_s *dq_items_head; struct dispatch_object_s *dq_items_tail; dispatch_unfair_lock_s dq_sidelock;
};
|
其中,将源码中的:
1
| prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
|
宏完全展开后,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| prev = ({ __typeof__(__c11_atomic_load((__typeof__(*(&(dq)->dq_items_head)) _Atomic *)(&(dq)->dq_items_head), memory_order_relaxed)) _tl = (dou._do);
__c11_atomic_store((__typeof__((*(_tl)).do_next) _Atomic *)(&(_tl)->do_next), NULL, memory_order_relaxed);
_dispatch_thread_setspecific(dispatch_enqueue_key, (void *) (&(dq)->dq_items_tail));
atomic_exchange_explicit((__typeof__(*(&(dq)->dq_items_tail)) _Atomic *)(&(dq)->dq_items_tail), _tl, memory_order_release); })
|
可以看出,这部分逻辑将任务作为节点添加到队列的尾部,并将 dq_items_tail
指向这个新节点(continuation
),prev
为之前的尾部指针,可以理解为上一个节点。
将源码中的:
1
| os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
|
宏完全展开后,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ({ __typeof__(atomic_load_explicit(_os_atomic_c11_atomic(&(dq)->dq_items_head), memory_order_relaxed)) _prev = (prev); if (likely(_prev)) { (void)atomic_store_explicit(_os_atomic_c11_atomic(&(_prev)->do_next), dou._do, memory_order_relaxed); } else { (void)atomic_store_explicit(_os_atomic_c11_atomic(&(dq)->dq_items_head), dou._do, memory_order_relaxed); } _dispatch_clear_enqueuer(); })
|
这部分逻辑是将上一步返回的 prev
的 do_next
指向当前任务节点。
这里入队的操作,为什么分成两部分去做呢?
从入队宏命名上,也可以看出这是 MPSC
队列,MPSC
是 “Multiple Producer, Single Consumer” 的缩写,表示多生产者单消费者队列。这是一种并发数据结构,允许多个生产者线程向队列中添加数据,而只有一个消费者线程从队列中提取数据。
MPSC
队列的特性:
- 多生产者
- 多个生产者线程可以同时向队列中添加数据。这需要在并发环境下确保数据的一致性和正确性。
- 单消费者
- 只有一个消费者线程从队列中提取数据。这简化了消费者端的同步问题,因为不需要处理多个消费者之间的竞争。
os_mpsc_push_update_tail
和 os_mpsc_push_update_prev
是配合使用的,它们共同完成了将新节点添加到多生产者单消费者(MPSC
)队列的操作。这两个宏的作用:
os_mpsc_push_update_tail
- 这个函数主要负责更新队列的尾指针。
- 它将新节点添加到队列的末尾,并返回之前的尾节点。
- 但是,它并不会更新之前尾节点的
do_next
指针。
os_mpsc_push_update_prev
- 这个函数负责更新前一个节点(之前的尾节点)的
do_next
指针,使其指向新节点。

将一个复杂的原子操作(同时更新尾指针和前一个节点的 do_next 指针)分解为两个简单的原子操作。简单的原子操作通常比复杂的原子操作更高效,尤其是在高并发情况下。
从这里也可以看出,管理任务的队列 dispatch_lane_t
,是使用单向链表实现的。
总结 _dispatch_lane_push
函数核心逻辑:
- 将任务作为节点添加到队列的尾部,并将
dq_items_tail
指向这个新节点(continuation
),并返回 prev
(之前的尾部节点)。
- 其他关键参数配置。
prev
的 do_next
指向当前任务节点。
- 调用
dx_wakeup
唤醒队列。
其中,dx_wakeup
宏定义如下:
1
| #define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
|
所以对于串行队列,这里实际调用的是 _dispatch_lane_wakeup
函数
(2)_dispatch_lane_wakeup
_dispatch_lane_wakeup
函数实现如下:

| void _dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target) { dispatch_queue_t dq = dqu._dq; uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED; dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) { _dispatch_retain_2(dq); flags |= DISPATCH_WAKEUP_CONSUME_2; }
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
dispatch_assert(dx_metatype(dq) == _DISPATCH_SOURCE_TYPE); qos = _dispatch_queue_wakeup_qos(dq, qos); return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos, flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED); }
if (target) { if (target == DISPATCH_QUEUE_WAKEUP_MGR) { enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR; } qos = _dispatch_queue_wakeup_qos(dq, qos); os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, { new_state = _dq_state_merge_qos(old_state, qos); if (flags & DISPATCH_WAKEUP_CLEAR_ACTIVATING) { if (_dq_state_is_activating(old_state)) { new_state &= ~DISPATCH_QUEUE_ACTIVATING; } }
if (likely(!_dq_state_is_suspended(new_state) && !_dq_state_is_enqueued(old_state) && (!_dq_state_drain_locked(old_state) || enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR))) { new_state |= enqueue; } if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) { new_state |= DISPATCH_QUEUE_DIRTY; } else if (new_state == old_state) { os_atomic_rmw_loop_give_up(goto done); } }); #if HAVE_PTHREAD_WORKQUEUE_QOS } else if (qos) {
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, { if (!_dq_state_drain_locked(old_state) && !_dq_state_is_enqueued(old_state)) { os_atomic_rmw_loop_give_up(goto done); } new_state = _dq_state_merge_qos(old_state, qos); if (_dq_state_is_base_wlh(old_state) && !_dq_state_is_suspended(old_state) && !_dq_state_is_enqueued_on_manager(old_state)) {
new_state |= enqueue; }
if (new_state == old_state) { os_atomic_rmw_loop_give_up(goto done); } });
target = DISPATCH_QUEUE_WAKEUP_TARGET; #endif } else { goto done; }
if (likely((old_state ^ new_state) & enqueue)) { dispatch_queue_t tq; if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
os_atomic_thread_fence(dependency); tq = os_atomic_load_with_dependency_on2o(dq, do_targetq, (long)new_state); } else { tq = target; } dispatch_assert(_dq_state_is_enqueued(new_state)); return _dispatch_queue_push_queue(tq, dq, new_state); } #if HAVE_PTHREAD_WORKQUEUE_QOS if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) { if (_dq_state_should_override(new_state)) { return _dispatch_queue_wakeup_with_override(dq, new_state, flags); } } #endif done: if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) { return _dispatch_release_2_tailcall(dq); } }
|
将上述代码进行精简,最终会走该函数走进 _dispatch_queue_push_queue
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| void _dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target) {
uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
if (target) {
new_state |= enqueue;
}
if (likely((old_state ^ new_state) & enqueue)) {
return _dispatch_queue_push_queue(tq, dq, new_state); }
}
|
_dispatch_queue_push_queue
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| static inline void _dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_class_t dq, uint64_t dq_state, dispatch_wakeup_flags_t flags) { dispatch_assert(flags & DISPATCH_EVENT_LOOP_CONSUME_2); #if DISPATCH_USE_KEVENT_WORKLOOP if (likely(_dq_state_is_base_wlh(dq_state))) { _dispatch_trace_runtime_event(worker_request, dq._dq, 1); return _dispatch_event_loop_poke((dispatch_wlh_t)dq._dq, dq_state,DISPATCH_EVENT_LOOP_CONSUME_2); } #endif _dispatch_trace_item_push(tq, dq); return dx_push(tq, dq,_dq_state_max_qos(dq_state)); }
|
根据在另篇文章《GCD 底层原理 2 - dispatch_queue》 提到的串行队列 dq_state
为:
1
| dq_state = (DISPATCH_QUEUE_STATE_INIT_VALUE(dq_width) | DISPATCH_QUEUE_ROLE_BASE_WLH)
|
所以这里会调用 _dispatch_event_loop_poke
函数,该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| void _dispatch_event_loop_poke(dispatch_wlh_t wlh, uint64_t dq_state, uint32_t flags) { if (wlh == DISPATCH_WLH_MANAGER) { dispatch_kevent_s ke = (dispatch_kevent_s){ .ident = 1, .filter = EVFILT_USER, .fflags = NOTE_TRIGGER, .udata = (dispatch_kevent_udata_t)DISPATCH_WLH_MANAGER, }; return _dispatch_kq_deferred_update(DISPATCH_WLH_ANON, &ke); } else if (wlh && wlh != DISPATCH_WLH_ANON) { #if DISPATCH_USE_KEVENT_WORKLOOP dispatch_queue_t dq = (dispatch_queue_t)wlh; dispatch_assert(_dq_state_is_base_wlh(dq_state));
if(unlikely(_dq_state_is_enqueued_on_manager(dq_state))) { dispatch_assert(!(flags & DISPATCH_EVENT_LOOP_OVERRIDE)); dispatch_assert(flags & DISPATCH_EVENT_LOOP_CONSUME_2); _dispatch_trace_item_push(&_dispatch_mgr_q, dq); return dx_push(_dispatch_mgr_q._as_dq, dq, 0); } dispatch_deferred_items_t ddi = _dispatch_deferred_items_get(); if (ddi && ddi->ddi_wlh == wlh) { return _dispatch_kevent_workloop_poke_self(ddi, dq_state, flags); } return _dispatch_kevent_workloop_poke(wlh, dq_state, flags); #else (void)dq_state; (void)flags; #endif } DISPATCH_INTERNAL_CRASH(wlh, "Unsupported wlh configuration"); }
|
根据函数中判断条件可知,这里会执行 _dispatch_kevent_workloop_poke
函数。
_dispatch_kevent_workloop_poke
这个函数名中,有一个 workloop
,这个 workloop
是什么呢?
接下来先看下什么是 Workqueue
与 Workloop
。
(3)Workqueue 与 Workloop
GCD 通过 Workqueue
和 Workloop
两种机制实现了灵活的任务管理。
Workqueue
Workqueue
是 XNU 内核提供的基于线程池的任务调度机制,旨在高效地管理并发任务。它是 GCD 的底层实现之一,能够动态调整线程池的大小以适应当前的任务负载。
Workqueue 特点:
- 线程池管理
Workqueue
使用线程池来执行任务,线程可以被多个任务复用,从而减少线程创建和销毁的开销。
- 并发任务调度
- 适合处理大量并发任务,任务之间没有严格的顺序要求。
- 动态扩展
- 根据任务的数量和优先级,
Workqueue
可以动态增加或减少线程池中的线程。
其中,线程池的管理是通过 pthread_workqueue
相关接口与内核进行交互的,pthread_workqueue
是 pthread
的扩展,专门用于高效的任务调度和线程管理。GCD 通过调用 pthread_workqueue
接口,将任务分发到内核的 Workqueue
,内核的 Workqueue
接收到任务后,会从线程池中分配线程来执行任务。

Workqueue
在 GCD 中一个很重要的应用场景就是并发队列的异步派发,关于这一点后续内容会分析。
Workloop
Workloop
是基于 Workqueue
构建的更高级抽象。Workqueue
提供了线程池和并发任务调度的基础设施,而 Workloop
在此基础上增加了任务顺序性和事件驱动支持,专注于管理任务的顺序性和事件驱动,能够更好地支持串行任务和基于事件的任务调度。
任务被提交到串行队列后,GCD 会将任务分配到 Workloop
,Workloop
会按照任务的提交顺序依次执行任务。
简单总结一下:
Workqueue
- 基于线程池的任务调度机制,适合处理并发任务。
- 通过动态调整线程池的大小和优先级,最大化利用系统资源。
- 主要用于 GCD 的并发队列。
Workloop
- 基于事件驱动的任务调度机制,专注于任务的顺序性和上下文切换。
- 支持事件监听和优先级调度,适合处理串行任务和事件驱动任务。
- 主要用于 GCD 的串行队列。
(4)_dispatch_kevent_workloop_poke
该函数源码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| static void _dispatch_kevent_workloop_poke(dispatch_wlh_t wlh, uint64_t dq_state, uint32_t flags) { uint32_t kev_flags = KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS;
dispatch_kevent_s ke;
int action;
dispatch_assert(_dq_state_is_enqueued_on_target(dq_state)); dispatch_assert(!_dq_state_is_enqueued_on_manager(dq_state));
action = _dispatch_event_loop_get_action_for_state(dq_state);
_dispatch_kq_fill_workloop_event(&ke, action, wlh, dq_state);
if (_dispatch_kq_poll(wlh, &ke, 1, &ke, 1, NULL, NULL, kev_flags)) { _dispatch_kevent_workloop_drain_error(&ke, 0); __builtin_unreachable(); }
if (!(flags & DISPATCH_EVENT_LOOP_OVERRIDE)) { return _dispatch_release_tailcall((dispatch_queue_t)wlh); }
if (flags & DISPATCH_EVENT_LOOP_CONSUME_2) { return _dispatch_release_2_tailcall((dispatch_queue_t)wlh); } }
|
这里又看到了熟悉的 _dispatch_kq_poll
函数,在《GCD 底层原理 3 - dispatch_sync》 中也针对这个函数做过分析,有需要可以参考这篇文章里 _dispatch_kq_poll
相关内容。
该函数内部调用了 _dispatch_kq_fill_workloop_event
,从这一点也可以看出串行队列是基于 Workloop
的。
_dispatch_kq_fill_workloop_event
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| static void _dispatch_kq_fill_workloop_event(dispatch_kevent_t ke, int which, dispatch_wlh_t wlh, uint64_t dq_state) { dispatch_queue_t dq = (dispatch_queue_t)wlh; dispatch_qos_t qos = _dq_state_max_qos(dq_state); pthread_priority_t pp = 0; uint32_t fflags = 0; uint64_t mask = 0; uint16_t action = 0;
switch (which) { case DISPATCH_WORKLOOP_ASYNC: case DISPATCH_WORKLOOP_ASYNC_FROM_SYNC: case DISPATCH_WORKLOOP_ASYNC_QOS_UPDATE: dispatch_assert(_dq_state_is_base_wlh(dq_state)); dispatch_assert(_dq_state_is_enqueued_on_target(dq_state)); action = EV_ADD | EV_ENABLE; mask |= DISPATCH_QUEUE_ROLE_MASK; mask |= DISPATCH_QUEUE_ENQUEUED; mask |= DISPATCH_QUEUE_MAX_QOS_MASK; fflags |= NOTE_WL_IGNORE_ESTALE; fflags |= NOTE_WL_UPDATE_QOS; if (_dq_state_in_uncontended_sync(dq_state)) { fflags |= NOTE_WL_DISCOVER_OWNER; mask |= DISPATCH_QUEUE_UNCONTENDED_SYNC; } pp = _dispatch_kevent_workloop_priority(dq, which, qos); break;
case DISPATCH_WORKLOOP_ASYNC_LEAVE_FROM_SYNC: case DISPATCH_WORKLOOP_ASYNC_LEAVE_FROM_TRANSFER: fflags |= NOTE_WL_IGNORE_ESTALE; case DISPATCH_WORKLOOP_ASYNC_LEAVE: dispatch_assert(!_dq_state_is_enqueued_on_target(dq_state)); action = EV_ADD | EV_DELETE | EV_ENABLE; mask |= DISPATCH_QUEUE_ENQUEUED; break;
case DISPATCH_WORKLOOP_RETARGET: action = EV_ADD | EV_DELETE | EV_ENABLE; fflags |= NOTE_WL_END_OWNERSHIP; break;
default: DISPATCH_INTERNAL_CRASH(which, "Invalid transition"); }
*ke = (dispatch_kevent_s){ .ident = (uintptr_t)wlh, .filter = EVFILT_WORKLOOP, .flags = action, .fflags = fflags | NOTE_WL_THREAD_REQUEST, .qos = (__typeof__(ke->qos))pp, .udata = (uintptr_t)wlh,
.ext[EV_EXTIDX_WL_ADDR] = (uintptr_t)&dq->dq_state, .ext[EV_EXTIDX_WL_MASK] = mask, .ext[EV_EXTIDX_WL_VALUE] = dq_state, }; _dispatch_kevent_wlh_debug(_dispatch_workloop_actions[which], ke); }
|
该函数最终创建的 ke
中有个 NOTE_WL_THREAD_REQUEST
配置,该配置的作用是请求内核分配一个线程去处理当前任务。
再次进入 _dispatch_kq_poll
这个函数,其精简后的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| static int _dispatch_kq_poll(dispatch_wlh_t wlh, dispatch_kevent_t ke, int n, dispatch_kevent_t ke_out, int n_out, void *buf, size_t *avail, uint32_t flags) {
dispatch_once_f(&_dispatch_kq_poll_pred, &kq_initialized, _dispatch_kq_init);
if (unlikely(wlh == NULL)) { } else if (wlh == DISPATCH_WLH_ANON) { } else { r = kevent_id((uintptr_t)wlh, ke, n, ke_out, n_out, buf, avail, flags); }
return r; }
|
这部分主要逻辑如下:
- 初始化,会调用一次
_dispatch_kq_init
。
- 进行
kevent_id
系统调用。
其中,_dispatch_kq_init
的执行逻辑中,会调用到 _dispatch_root_queues_init_once
函数,执行路径如下:
1 2 3 4 5 6 7
| _dispatch_kq_init ⬇️ _dispatch_kevent_workqueue_init ⬇️ _dispatch_root_queues_init ⬇️ _dispatch_root_queues_init_once
|
精简后的 _dispatch_root_queues_init_once
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| static void _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED) {
struct pthread_workqueue_config cfg = { .version = PTHREAD_WORKQUEUE_CONFIG_VERSION, .flags = 0, .workq_cb = 0, .kevent_cb = 0, .workloop_cb = 0, .queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum, .queue_label_offs = dispatch_queue_offsets.dqo_label, };
if (unlikely(!_dispatch_kevent_workqueue_enabled)) { } else if (wq_supported & WORKQ_FEATURE_WORKLOOP) { cfg.workq_cb = _dispatch_worker_thread2; cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread; cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread; r = pthread_workqueue_setup(&cfg, sizeof(cfg)); } else if (wq_supported & WORKQ_FEATURE_KEVENT) { } else { DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support"); }
}
|
上述关键逻辑如下:
- 配置
workqueue
回调函数
workq_cb = _dispatch_worker_thread2
- 配置
workloop
回调函数
workloop_cb = _dispatch_workloop_worker_thread
- 调用
pthread_workqueue_setup
进行上述配置
前面已经分析过,串行队列是基于 Workloop
,最终会执行 workloop
回调函数 workloop_cb
。
当前面 kevent_id
系统调用执行完成后,内核会进行线程的的创建和分配。之后将会从内核态切换到用户态的 start_wqthread
函数,start_wqthread
函数中,会继续调用 _pthread_wqthread
函数,_pthread_wqthread
负责管理线程生命周期,并从工作队列中提取任务交由上面配置回调函数执行。
由于串行队列是基于 Workloop
的,所以 _pthread_wqthread
会将任务交给 workloop
回调函数 workloop_cb
执行,即 _dispatch_workloop_worker_thread
函数。
_dispatch_workloop_worker_thread
函数中,会先调用 _dispatch_root_queue_drain_deferred_wlh
函数,再调用 _dispatch_lane_invoke
函数:
1 2 3 4 5 6 7 8 9
| start_wqthread ⬇️ _pthread_wqthread ⬇️ _dispatch_workloop_worker_thread ⬇️ _dispatch_root_queue_drain_deferred_wlh ⬇️ _dispatch_lane_invoke
|
相关调用堆栈如下:

(5)_dispatch_root_queue_drain_deferred_wlh
该函数精简后的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static void _dispatch_root_queue_drain_deferred_wlh(dispatch_deferred_items_t ddi DISPATCH_PERF_MON_ARGS_PROTO) { dispatch_queue_t dq = ...; dispatch_invoke_context_s dic = ...; dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN | DISPATCH_INVOKE_WLH;
if (_dispatch_queue_drain_try_lock_wlh(dq, &dq_state)) { dx_invoke(dq, &dic, flags); }
}
|
其中获取 drain
锁的的函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| static inline bool _dispatch_queue_drain_try_lock_wlh(dispatch_queue_t dq, uint64_t *dq_state) { uint64_t old_state, new_state;
uint64_t lock_bits = _dispatch_lock_value_for_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, { new_state = old_state;
if (unlikely(_dq_state_is_suspended(old_state))) {
new_state &= ~DISPATCH_QUEUE_ENQUEUED; } else if (unlikely(_dq_state_drain_locked(old_state))) { if (_dq_state_in_uncontended_sync(old_state)) { new_state |= DISPATCH_QUEUE_RECEIVED_SYNC_WAIT; } else { os_atomic_rmw_loop_give_up(break); } } else { new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK; new_state |= lock_bits; } });
if (unlikely(!_dq_state_is_base_wlh(old_state) || !_dq_state_is_enqueued_on_target(old_state) || _dq_state_is_enqueued_on_manager(old_state))) { #if DISPATCH_SIZEOF_PTR == 4 old_state >>= 32; #endif DISPATCH_INTERNAL_CRASH(old_state, "Invalid wlh state"); }
if (dq_state) *dq_state = new_state;
return !_dq_state_is_suspended(old_state) && !_dq_state_drain_locked(old_state); }
|
上述获取 drain
锁的主要逻辑如下:
- 如果队列未被挂起,并且未被锁定,可以获取锁,返回值为
true
。
- 此时会同时设置一系列标志位
lock_bits
:
tid & DLOCK_OWNER_MASK
- 设置了该标志位后,
_dq_state_drain_locked
会返回 true
DISPATCH_QUEUE_WIDTH_FULL_BIT
DISPATCH_QUEUE_IN_BARRIER
- 如果队列被挂起,或者已被锁定,无法获取锁,返回值为
false
。
其中,函数的返回值是队列是否未挂起且未被锁定,上面已经提到,是否被锁定,取决于是否设置了 tid & DLOCK_OWNER_MASK
标志位。在后续调用的 _dispatch_queue_drain_try_unlock
函数(后面分析会提到)中,会清除锁定标志位,使队列重新变成未锁定状态。
而当我们显示调用了 GCD 的 dispatch_suspend
时候,会使队列变成挂起状态。dispatch_suspend
不是本次分析的重点,所以在获取 drain
锁的逻辑里,我们只需要关注队列是否被锁定即可。
所以,_dispatch_root_queue_drain_deferred_wlh
函数核心逻辑是先尝试获取锁,如果获取成功,则调用 dx_invoke
开始执行任务。如果获取锁失败,代表当前串行队列中的任务正在其他线程被执行。
根据 dx_invoke
宏的定义:
1
| #define dx_invoke(x, y, z) dx_vtable(x)->do_invoke(x, y, z)
|
结合前面提到的串行队列虚表可知,这里实际是调用 _dispatch_lane_invoke
函数。
(6)_dispatch_lane_invoke
_dispatch_lane_invoke
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| void _dispatch_lane_invoke(dispatch_lane_t dq, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags) { _dispatch_queue_class_invoke(dq, dic, flags, 0, _dispatch_lane_invoke2); }
static inline void _dispatch_queue_class_invoke(dispatch_queue_class_t dqu, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags, dispatch_invoke_flags_t const_restrict_flags, _dispatch_queue_class_invoke_handler_t invoke) {
if (likely(flags & DISPATCH_INVOKE_WLH)) { owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED | DISPATCH_QUEUE_ENQUEUED; } else { owned = _dispatch_queue_drain_try_lock(dq, flags); }
if (likely(owned)) {
tq = invoke(dq, dic, flags, &owned);
if (unlikely(tq != DISPATCH_QUEUE_WAKEUP_NONE && tq != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT)) { } else if (!_dispatch_queue_drain_try_unlock(dq, owned, tq == DISPATCH_QUEUE_WAKEUP_NONE)) { }
}
if (tq) { return _dispatch_queue_invoke_finish(dq, dic, tq, owned); } return _dispatch_release_2_tailcall(dq); }
|
上述主要逻辑如下:
- 调用
_dispatch_lane_invoke2
函数执行任务。
- 调用
_dispatch_queue_drain_try_unlock
解锁。
- 这里解锁后,会使下次调用
_dispatch_root_queue_drain_deferred_wlh
时可以成功获取到 drain
解锁。
接下来看下 _dispatch_lane_invoke2
函数的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| static inline dispatch_queue_wakeup_target_t _dispatch_lane_invoke2(dispatch_lane_t dq, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags, uint64_t *owned) { dispatch_queue_t otq = dq->do_targetq; dispatch_queue_t cq = _dispatch_queue_get_current();
if (unlikely(cq != otq)) { return otq; } if (dq->dq_width == 1) { return _dispatch_lane_serial_drain(dq, dic, flags, owned); } return _dispatch_lane_concurrent_drain(dq, dic, flags, owned); }
|
在之前的文章里已经提到,串行队列的 dq_width
为 1
,所以这里会继续调用 _dispatch_lane_serial_drain
函数。
(7)_dispatch_lane_serial_drain
_dispatch_lane_serial_drain
函数 函数实现如下:
1 2 3 4 5 6 7 8
| dispatch_queue_wakeup_target_t _dispatch_lane_serial_drain(dispatch_lane_class_t dqu, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags, uint64_t *owned) { flags &= ~(dispatch_invoke_flags_t)DISPATCH_INVOKE_REDIRECTING_DRAIN; return _dispatch_lane_drain(dqu._dl, dic, flags, owned, true); }
|
可以看到,这里最继续执行 _dispatch_lane_drain
函数:

|
DISPATCH_ALWAYS_INLINE static dispatch_queue_wakeup_target_t _dispatch_lane_drain(dispatch_lane_t dq, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain) { dispatch_queue_t orig_tq = dq->do_targetq;
dispatch_thread_frame_s dtf;
struct dispatch_object_s *dc = NULL, *next_dc;
uint64_t dq_state, owned = *owned_ptr;
if (unlikely(!dq->dq_items_tail)) return NULL;
_dispatch_thread_frame_push(&dtf, dq);
if (serial_drain || _dq_state_is_in_barrier(owned)) { owned = DISPATCH_QUEUE_IN_BARRIER; } else { owned &= DISPATCH_QUEUE_WIDTH_MASK; }
dc = _dispatch_queue_get_head(dq); goto first_iteration;
for (;;) { dispatch_assert(dic->dic_barrier_waiter == NULL);
dc = next_dc;
if (unlikely(!dc)) { if (!dq->dq_items_tail) { break; } dc = _dispatch_queue_get_head(dq); }
if (unlikely(_dispatch_needs_to_return_to_kernel())) { _dispatch_return_to_kernel(); }
if (unlikely(serial_drain != (dq->dq_width == 1))) { break; }
if (unlikely(!(flags & DISPATCH_INVOKE_DISABLED_NARROWING) && _dispatch_queue_drain_should_narrow(dic))) { break; }
if (likely(flags & DISPATCH_INVOKE_WORKLOOP_DRAIN)) { dispatch_workloop_t dwl = (dispatch_workloop_t)_dispatch_get_wlh(); if (unlikely(_dispatch_queue_max_qos(dwl) > dwl->dwl_drained_qos)) { break; } }
first_iteration: dq_state = os_atomic_load(&dq->dq_state, relaxed);
if (unlikely(_dq_state_is_suspended(dq_state))) { break; }
if (unlikely(orig_tq != dq->do_targetq)) { break; }
if (serial_drain || _dispatch_object_is_barrier(dc)) { if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) { if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) { goto out_with_no_width; } owned = DISPATCH_QUEUE_IN_BARRIER; }
if (_dispatch_object_is_sync_waiter(dc) && !(flags & DISPATCH_INVOKE_THREAD_BOUND)) { dic->dic_barrier_waiter = dc; goto out_with_barrier_waiter; }
next_dc = _dispatch_queue_pop_head(dq, dc); } else { if (owned == DISPATCH_QUEUE_IN_BARRIER) { os_atomic_xor2o(dq, dq_state, owned, release); owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL; } else if (unlikely(owned == 0)) { if (_dispatch_object_is_waiter(dc)) { _dispatch_queue_reserve_sync_width(dq); } else if (!_dispatch_queue_try_acquire_async(dq)) { goto out_with_no_width; } owned = DISPATCH_QUEUE_WIDTH_INTERVAL; }
next_dc = _dispatch_queue_pop_head(dq, dc);
if (_dispatch_object_is_waiter(dc)) { owned -= DISPATCH_QUEUE_WIDTH_INTERVAL; _dispatch_non_barrier_waiter_redirect_or_wake(dq, dc); continue; }
if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) { owned -= DISPATCH_QUEUE_WIDTH_INTERVAL; _dispatch_continuation_redirect_push(dq, dc, _dispatch_queue_max_qos(dq)); continue; } }
_dispatch_continuation_pop_inline(dc, dic, flags, dq); }
if (owned == DISPATCH_QUEUE_IN_BARRIER) { owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL; }
if (dc) { owned = _dispatch_queue_adjust_owned(dq, owned, dc); }
*owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR; *owned_ptr |= owned;
_dispatch_thread_frame_pop(&dtf);
return dc ? dq->do_targetq : NULL;
out_with_no_width: *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR; _dispatch_thread_frame_pop(&dtf); return DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
out_with_barrier_waiter: if (unlikely(flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS)) { DISPATCH_INTERNAL_CRASH(0, "Deferred continuation on source, mach channel or mgr"); }
_dispatch_thread_frame_pop(&dtf);
return dq->do_targetq; }
|
_dispatch_lane_drain
函数主要逻辑就是按照先进先出的顺序,逐个执行串行队列中的各个任务。
2、提交到并发队列
再回头看下 _dispatch_continuation_async
函数:
1 2 3 4 5 6 7 8 9
| static inline void _dispatch_continuation_async(dispatch_queue_class_t dqu, dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags) {
return dx_push(dqu._dq, dc, qos); }
|
前面已经提到,对于并发队列,dx_push
实际调用的是 _dispatch_lane_concurrent_push
函数。
(1)_dispatch_lane_concurrent_push
该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| void _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou, dispatch_qos_t qos) {
if (unlikely(_dispatch_queue_is_cooperative(dq) && !_dispatch_object_supported_on_cooperative_queue(dou))) { DISPATCH_CLIENT_CRASH(dou._do, "Cannot target the cooperative root queue - not implemented"); }
if (dq->dq_items_tail == NULL && !_dispatch_object_is_waiter(dou) && !_dispatch_object_is_barrier(dou) && _dispatch_queue_try_acquire_async(dq)) { return _dispatch_continuation_redirect_push(dq, dou, qos); }
_dispatch_lane_push(dq, dou, qos); }
|
在该函数的 if
判断条件里,dq->dq_items_tail
一定是 NULL
(具体原因后续会分析)。并且根据其他几个判断条件可知,这里会调用 _dispatch_continuation_redirect_push
函数:
1 2 3 4 5 6 7 8 9 10 11 12
| static void _dispatch_continuation_redirect_push(dispatch_lane_t dl, dispatch_object_t dou, dispatch_qos_t qos) {
dispatch_queue_t dq = dl->do_targetq; dx_push(dq, dou, qos); }
|
可以发现,这里又调用了熟悉的 dx_push
宏,但是传入的并不是我们创建的并发队列,而是并发队列的 do_targetq
。
在之前的文章《GCD 底层原理 2 - dispatch_queue》 中已经分析了,结论是并发队列和串行队列都是从根队列数组 _dispatch_root_queues
中取出的对应的根队列(root queue
)。创建并发队列时,返回的是 _dispatch_root_queues
数组 index = 9
的元素(DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS
),即:
1 2 3 4 5 6 7 8 9
| [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = { DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, .do_ctxt = _dispatch_root_queue_ctxt(DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS), .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), .dq_priority = _dispatch_priority_make_fallback(DISPATCH_QOS_DEFAULT), .dq_label = "com.apple.root.default-qos", .dq_serialnum = 13, },
|
所以这里调用 dx_push
时传入的实际上是对应的 root queue
。而对于 root queue
,其 dx_push
对应的函数是 _dispatch_root_queue_push
,所以对于后续往队列 push
任务,都是在对应的 root queue
上的,而不是在当然任务 dq
上的。这也是 _dispatch_lane_concurrent_push
里的判断条件 dq->dq_items_tail
为 NULL
的原因。
(2)_dispatch_root_queue_push
该函数精简后的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12
| void _dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou, dispatch_qos_t qos) {
if (_dispatch_root_queue_push_needs_override(rq, qos)) { return _dispatch_root_queue_push_override(rq, dou, qos); }
_dispatch_root_queue_push_inline(rq, dou, dou, 1); }
|
其中 _dispatch_root_queue_push_needs_override
、_dispatch_root_queue_push_override
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| static inline bool _dispatch_root_queue_push_needs_override(dispatch_queue_global_t rq, dispatch_qos_t qos) { dispatch_qos_t fallback = _dispatch_priority_fallback_qos(rq->dq_priority); if (fallback) { return qos && qos != fallback; }
dispatch_qos_t rqos = _dispatch_priority_qos(rq->dq_priority); return rqos && qos > rqos; }
DISPATCH_NOINLINE static void _dispatch_root_queue_push_override(dispatch_queue_global_t orig_rq, dispatch_object_t dou, dispatch_qos_t qos) { uintptr_t flags = 0;
if (orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) { flags |= DISPATCH_QUEUE_OVERCOMMIT; } else if (_dispatch_queue_is_cooperative(orig_rq)) { flags |= DISPATCH_QUEUE_COOPERATIVE; }
dispatch_queue_global_t rq = _dispatch_get_root_queue(qos, flags); dispatch_continuation_t dc = dou._dc;
if (_dispatch_object_is_redirection(dc)) { dc->dc_func = (void *)orig_rq; } else { dc = _dispatch_continuation_alloc(); dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING); 、 dc->dc_ctxt = dc; dc->dc_other = orig_rq; dc->dc_data = dou._do; dc->dc_priority = DISPATCH_NO_PRIORITY; dc->dc_voucher = DISPATCH_NO_VOUCHER; }
_dispatch_root_queue_push_inline(rq, dc, dc, 1); }
|
可以看到,最终会调用到 _dispatch_root_queue_push_inline
函数。
(3)_dispatch_root_queue_push_inline
该函数实现如下:
1 2 3 4 5 6 7 8 9
| static inline void _dispatch_root_queue_push_inline(dispatch_queue_global_t dq, dispatch_object_t _head, dispatch_object_t _tail, int n) { struct dispatch_object_s *hd = _head._do, *tl = _tail._do; if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) { return _dispatch_root_queue_poke_and_wakeup(dq, n, 0); } }
|
将其中涉及到的宏展开后实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static inline void _dispatch_root_queue_push_inline(dispatch_queue_global_t dq, dispatch_object_t _head, dispatch_object_t _tail, int n) { struct dispatch_object_s *hd = _head._do, *tl = _tail._do; __typeof__(atomic_load_explicit((__typeof__(*(&_os_mpsc_head(dq, dq_items))) _Atomic *)(&_os_mpsc_head(dq, dq_items)), memory_order_relaxed)) _token;
_token = os_mpsc_push_update_tail((dq, dq_items), tl, do_next);
os_mpsc_push_update_prev((dq, dq_items), _token, hd, do_next);
if ((_token) == NULL) { return _dispatch_root_queue_poke_and_wakeup(dq, n, 0); } }
|
这里又看到了前面总结过的 MPSC
队列操作。
总结该函数逻辑如下:
- 通过
MPSC
队列操作将任务插入队列尾部。
- 分配线程去处理队列中的任务。
- 调用
_dispatch_root_queue_poke_and_wakeup
函数
其中,_dispatch_root_queue_poke_and_wakeup
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void _dispatch_root_queue_poke_and_wakeup(dispatch_queue_global_t dq, int n, int floor) { #if !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_POOL if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE || dx_type(dq) == DISPATCH_QUEUE_COOPERATIVE_ROOT_TYPE)) #endif { int old_pending, new_pending; os_atomic_rmw_loop2o(dq, dgq_pending, old_pending, new_pending, release, { new_pending = old_pending ?: n; }); if (old_pending > 0) { _dispatch_root_queue_debug("worker thread request still pending " "for global queue: %p", dq); return; } } #endif return _dispatch_root_queue_poke_slow(dq, n, floor); }
|
该函数主要逻辑是调用 _dispatch_root_queue_poke_slow
函数,且第二个参数 n = 1
。
(4) _dispatch_root_queue_poke_slow
精简后的 _dispatch_root_queue_poke_slow
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| static void _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor) { int remaining = n;
_dispatch_root_queues_init();
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
return; } }
|
其中,_dispatch_root_queues_init
函数实现如下:
1 2 3 4 5 6
| static inline void _dispatch_root_queues_init(void) { dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once); }
|
这里又调用了 _dispatch_root_queues_init_once
函数,关于该函数在前面串行队列部分已经分析过,函数中关键逻辑如下:
- 配置
workqueue
回调函数
workq_cb = _dispatch_worker_thread2
- 配置
workloop
回调函数
workloop_cb = _dispatch_workloop_worker_thread
而且前面也已经分析过,并发队列是基于 workqueue
的,从 _dispatch_root_queue_poke_slow
函数实现也可以看到,函数逻辑中没有涉及到像串行队列那样的 workloop
的配置。所以在 _dispatch_root_queue_poke_slow
函数中,调用 _pthread_workqueue_addthreads
完成线程的分配之后,会执行 workqueue
回调函数 _dispatch_worker_thread2
。
_pthread_workqueue_addthreads
函数有两个参数:
而上面调用 _pthread_workqueue_addthreads
函数时,传入的 numthreads
参数为 1
,表示请求分配一个线程去处理任务。
按照经验来看,dispatch_async
+ 并发队列,是会分配多个线程去处理各个任务的。为什么这一步却只请求一个线程去处理任务呢?后面会分析原因。
(5)_dispatch_worker_thread2
该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| static void _dispatch_worker_thread2(pthread_priority_t pp) {
dispatch_queue_global_t dq; dispatch_invoke_flags_t invoke_flags = 0;
uintptr_t rq_flags = 0; if (cooperative) { rq_flags |= DISPATCH_QUEUE_COOPERATIVE; invoke_flags |= DISPATCH_INVOKE_COOPERATIVE_DRAIN; } else { rq_flags |= (overcommit ? DISPATCH_QUEUE_OVERCOMMIT : 0); }
dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), rq_flags);
invoke_flags |= DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN;
_dispatch_root_queue_drain(dq, dq->dq_priority, invoke_flags);
}
|
在该函数中,主要是调用了 _dispatch_root_queue_drain
函数去执行具体任务。
(6)_dispatch_root_queue_drain
_dispatch_root_queue_drain
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| static void _dispatch_root_queue_drain(dispatch_queue_global_t dq, dispatch_priority_t pri, dispatch_invoke_flags_t flags) {
_dispatch_queue_set_current(dq);
struct dispatch_object_s *item;
dispatch_invoke_context_s dic = { };
while (likely(item = _dispatch_root_queue_drain_one(dq))) {
_dispatch_continuation_pop_inline(item, &dic, flags, dq);
}
_dispatch_queue_set_current(NULL); }
|
该函数核心逻辑如下:
- 将当前线程与队列关联。
- 确保任务执行期间,GCD 能正确识别当前线程所属队列,防止线程池中的线程后续任务调度混乱。
- 使用
while
循环,按照 FIFO
顺序取出一个任务 item
。
- 取任务调用的
_dispatch_root_queue_drain_one
函数。
while
循环中,弹出并执行的上一步取出的任务 item
。
- 调用
_dispatch_continuation_pop_inline
函数。
- 所有任务处理完成后,将当前线程与队列取消关联。
这里需要重点看下 _dispatch_root_queue_drain_one
函数,这是实现多线程并发执行的关键。
(7)_dispatch_root_queue_drain_one
该函数内部实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| static inline struct dispatch_object_s * _dispatch_root_queue_drain_one(dispatch_queue_global_t dq) { struct dispatch_object_s *head, *next;
start: head = os_atomic_xchg2o(dq, dq_items_head, DISPATCH_ROOT_QUEUE_MEDIATOR, relaxed);
if (unlikely(head == NULL)) {
if (unlikely(!os_atomic_cmpxchg2o(dq, dq_items_head, DISPATCH_ROOT_QUEUE_MEDIATOR, NULL, relaxed))) { goto start; }
if (unlikely(dq->dq_items_tail)) { if (__DISPATCH_ROOT_QUEUE_CONTENDED_WAIT__(dq, _dispatch_root_queue_head_tail_quiesced)) { goto start; } } _dispatch_root_queue_debug("no work on global queue: %p", dq); return NULL; }
if (unlikely(head == DISPATCH_ROOT_QUEUE_MEDIATOR)) { if (likely(__DISPATCH_ROOT_QUEUE_CONTENDED_WAIT__(dq, _dispatch_root_queue_mediator_is_gone))) { goto start; } return NULL; }
next = head->do_next; if (unlikely(!next)) {
os_atomic_store2o(dq, dq_items_head, NULL, relaxed); if (os_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, release)) { goto out; } next = os_mpsc_get_next(head, do_next, &dq->dq_items_tail); }
os_atomic_store2o(dq, dq_items_head, next, relaxed);
_dispatch_root_queue_poke(dq, 1, 0); out: return head; }
|
该函数核心逻辑有两部分:
- 按照
FIFO
顺序取一个任务并返回,交由外面 _dispatch_continuation_pop_inline
函数取执行任务。
- 调用
_dispatch_root_queue_poke
函数。
- 该函数内部会再次调用
_dispatch_root_queue_poke_slow
函数。
_dispatch_root_queue_poke
函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| void _dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor) { if (!_dispatch_queue_class_probe(dq)) { return; } #if !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_POOL if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE || dx_type(dq) == DISPATCH_QUEUE_COOPERATIVE_ROOT_TYPE)) #endif { if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, release))) { _dispatch_root_queue_debug("worker thread request still pending " "for global queue: %p", dq); return; } } #endif
return _dispatch_root_queue_poke_slow(dq, n, floor); }
|
可以看到,这里再次调用了 _dispatch_root_queue_poke_slow
函数,上面已经提到 _dispatch_root_queue_poke_slow
会请求分配 1 个线程去执行任务。
这个“循环”的执行链路如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| _dispatch_root_queue_poke_slow ⬇️ _pthread_workqueue_addthreads // 申请分配 1 个线程去执行任务 ⬇️ _dispatch_worker_thread2 ⬇️ _dispatch_root_queue_drain // 调用 _dispatch_root_queue_drain_one 取出 1 个任务并执行 ⬇️ _dispatch_root_queue_drain_one ⬇️ _dispatch_root_queue_poke_slow // 新一轮循环,申请 1 个线程执行 1 个任务 ⬇️ ......
|
这里就是 dispatch_async
+ 并发队列的多线程并发执行的关键逻辑了:每次申请 1
个线程去执行 1
个任务,在从队列取任务的同时,同时去申请新的线程执行下个任务,这样就达到了多个线程并发执行任务的目的。而具体哪个任务先执行,则取决于的线程调度先后。
这里用到了“任务窃取优化”:
新创建的线程立即进入 _dispatch_root_queue_drain
循环,尝试从队列中窃取任务执行。提高多核 CPU 利用率,减少任务排队时间。
(8)_dispatch_continuation_pop_inline
这里再回头简单看下上一步取出的任务,是如何执行的。执行时调用的是 _dispatch_continuation_pop_inline
函数,该函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu) { dispatch_pthread_root_queue_observer_hooks_t observer_hooks = _dispatch_get_pthread_root_queue_observer_hooks(); if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq); flags &= _DISPATCH_INVOKE_PROPAGATE_MASK; if (_dispatch_object_has_vtable(dou)) { if (dx_type(dou._do) == DISPATCH_SWIFT_JOB_TYPE) { dx_invoke(dou._dsjc, NULL, _dispatch_invoke_flags_to_swift_invoke_flags(flags)); } else { dx_invoke(dou._dq, dic, flags); } } else { _dispatch_continuation_invoke_inline(dou, flags, dqu); } if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq); }
|
上面主要逻辑是判断传入的任务是否有虚函数表,如果有的话调用 dx_invoke
宏执行任务,否则调用 _dispatch_continuation_invoke_inline
宏执行任务。
由于传进来的任务,是前面包装好的 continuation
,是没有虚函数表的,所以这里会继续调用 _dispatch_continuation_invoke_inline
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu) { dispatch_continuation_t dc = dou._dc, dc1; dispatch_invoke_with_autoreleasepool(flags, { uintptr_t dc_flags = dc->dc_flags; _dispatch_continuation_voucher_adopt(dc, dc_flags);
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_pop(dqu, dou); }
if (dc_flags & DC_FLAG_CONSUME) { dc1 = _dispatch_continuation_free_cacheonly(dc); } else { dc1 = NULL; }
if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) { _dispatch_continuation_with_group_invoke(dc); } else { _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); _dispatch_trace_item_complete(dc); }
if (unlikely(dc1)) { _dispatch_continuation_free_to_cache_limit(dc1); } }); _dispatch_perfmon_workitem_inc(); }
|
上述核心逻辑如下:
- 使用
autoreleasepool
包裹调度项的执行,确保在执行过程中管理内存。
- 判断是否是
dispatch_group
任务组。
- 如果是
dispatch_group
任务组,调用 _dispatch_continuation_with_group_invoke
执行。
- 如果是普通任务,调用
_dispatch_client_callout
执行任务。
- 释放
continuation
,存到线程的 continuation
缓存池。
六、总结
可以用下图表示 dispatch_async
+ 串行队列/并发队列的逻辑:
