李峰峰博客

GCD 底层原理 4 - dispatch_async

2022-11-17

一、概述

dispatch_async 是用于将任务异步提交到指定调度队列的函数。它允许调用者线程继续执行,而不必等待任务的完成。

dispatch_async 特点:

  • 会开启新线程
    • dispatch_async 提交的任务不会阻塞调用者线程。调用者线程可以继续执行后续代码,而任务会被异步地添加到队列中。
    • 无论将任务提交到并发队列还是串行队列,都会开启新线程。
      • 主队列除外,提交到主队列的任务,会在主线程执行。
      • 串行队列在有任务需要执行时,只会开启一个线程去执行任务。
  • 任务顺序
    • 串行队列
      • 在串行队列上,dispatch_async 提交的任务会按照提交的顺序依次执行,确保任务之间的顺序性。
    • 并发队列
      • 在并发队列上,dispatch_async 提交的任务可以并行执行,具体执行顺序取决于系统的线程调度。

使用示例:

1
2
3
4
5
6
7
8
dispatch_queue_t queue = dispatch_queue_create("com.lixkit.concurrentQueue", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(queue, ^{
// 任务 1
});

dispatch_async(queue, ^{
// 任务 2
});

二、dispatch_async

dispatch_async 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
// 1、新建或者从缓存中获取 dispatch continuation 对象
dispatch_continuation_t dc = _dispatch_continuation_alloc();

// 设置 continuation 的标志,表示这个 continuation 在执行完后应该被销毁
uintptr_t dc_flags = DC_FLAG_CONSUME;

// 用于存储服务质量(QoS)信息
dispatch_qos_t qos;

// 2、初始化 continuation 对象(为 continuation 中各成员赋值)
// 设置要执行的 block、优先级等
// 返回与这个任务相关的服务质量(QoS)级别
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);

// 3、将初始化好的 continuation 异步提交到指定的队列
// 传入目标队列、continuation 对象、QoS 信息和标志
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

上述主要逻辑如下:

  • 获取 continuation 对象
    • _dispatch_continuation_alloc
  • 初始化 continuation 对象(为 continuation 中各成员赋值)
    • _dispatch_continuation_init
  • 提交 continuation 到指定的队列
    • _dispatch_continuation_async

接下来,基于源码分别看下这几部分逻辑。

三、 _dispatch_continuation_alloc

_dispatch_continuation_alloc 函数用于获取 continuation 对象,continuation 对象是对任务的包装,其中存储了任务、优先级、上下文等信息。

continuationdispatch_continuation_t 类型,dispatch_continuation_t 是一个指向 dispatch_continuation_s 结构体的指针,dispatch_continuation_s 的定义如下:

1
2
3
typedef struct dispatch_continuation_s {
DISPATCH_CONTINUATION_HEADER(continuation);
} *dispatch_continuation_t;

将其涉及的宏完全展开后,结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
typedef struct dispatch_continuation_s {
// 虚函数表指针 & 标志位
union {
const void *__ptrauth_objc_isa_pointer do_vtable; // 指向虚函数表的指针
uintptr_t dc_flags; // 用于存储各种标志位
};

// 优先级、缓存计数或填充
union {
pthread_priority_t dc_priority; // 任务的优先级
int dc_cache_cnt; // 缓存计数器
uintptr_t dc_pad; // 用于对齐的填充
};

// 指向下一个 continuation 的指针
struct dispatch_continuation_s *volatile do_next;

// 初始化当前 dispatch_continuation_s 的线程的上下文凭证(包含活动 ID 等元数据)
struct voucher_s *dc_voucher;

// 要执行的函数
dispatch_function_t dc_func;

// 函数的上下文或参数
void *dc_ctxt;

// 额外的数据,如组对象等
void *dc_data;

// 其他用途的指针
void *dc_other;
} *dispatch_continuation_t;

dispatch_continuation_s 结构可以看出,dispatch_continuation_s 是一个链表结构,其中的 do_next 则是指向下一个 continuation

_dispatch_continuation_alloc 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline dispatch_continuation_t
_dispatch_continuation_alloc(void)
{
// 尝试从缓存中读取一个 continuation 对象
dispatch_continuation_t dc =
_dispatch_continuation_alloc_cacheonly();

// 如果缓存分配失败(返回 NULL)
if (unlikely(!dc)) {
// 从堆上分配一个新的 continuation 对象
return _dispatch_continuation_alloc_from_heap();
}

return dc;
}

该函数主要逻辑为先从缓存中读取 continuation 对象,如果缓存不存在,再去新建一个 continuation 对象。

这里重点看下 continuation 缓存读取的逻辑,根据上述源码可知,缓存的读取是调用 _dispatch_continuation_alloc_cacheonly 函数获取的,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline dispatch_continuation_t
_dispatch_continuation_alloc_cacheonly(void)
{
// 从当前线程存储空间中获取一个缓存的 continuation 对象
dispatch_continuation_t dc = (dispatch_continuation_t)
_dispatch_thread_getspecific(dispatch_cache_key);

// 如果成功获取到缓存的 continuation 对象
if (likely(dc)) {
// 将线程存储空间中的 continuation 缓存更新为下一个对象
// 这样可以维护一个 continuation 对象的链表作为缓存
_dispatch_thread_setspecific(dispatch_cache_key, dc->do_next);
}

// 返回获取到的 continuation 对象(可能为 NULL 如果缓存为空)
return dc;
}

前面提到 continuation 是一个链表结构,而其中 do_next 则是指向当前节点的下一个节点。

可以看到,上述主要逻辑是先从线程从当前线程存储空间中获取一个缓存的 continuation。如果取到,则将缓存更新为 continuation 的下个节点对象 do_next

对于缓存不存在时新建的那个 continuation,也会在使用完释放时存入缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline void
_dispatch_continuation_free(dispatch_continuation_t dc)
{
dc = _dispatch_continuation_free_cacheonly(dc);
// ...
}

static inline dispatch_continuation_t
_dispatch_continuation_free_cacheonly(dispatch_continuation_t dc)
{
// ...
// 将 continuation 放入线程缓存池
_dispatch_thread_setspecific(dispatch_cache_key, dc);
return NULL;
}

_dispatch_continuation_alloc 函数主要逻辑如下:

  • 从当前线程存储空间中获取一个缓存的 continuation 对象 dc
  • 如果读取成功,则将当前线程缓存的 continuation 对象设置为 dc 的下一个对象(dc->do_next)。
  • 如果缓存没有读取到,则创建新的 continuation
    • 这个新建的 continuation 在使用完成后,设置成线程缓存的 continuation

结合上述源码可以知道,每个线程都利用 continuation 链表实现了一个 continuation 缓存池,continuation 不再使用后将会放入缓存池待下次使用,这么设计有下面两个好处:

  • 通过重用这些对象,可以减少频繁创建和销毁对象的开销,提高性能。
  • 每个线程维护自己的缓存,可以减少多线程同时分配内存时的锁竞争。

四、_dispatch_continuation_init

_dispatch_continuation_init 函数用于初始化 continuation 对象,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
// 复制传入的 block 对象,确保其在异步执行时仍然有效
void *ctxt = _dispatch_Block_copy(work);

// 设置 continuation 的标志,表明它包含一个 block 且已被分配内存
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;

// 检查 block 是否包含私有数据
if (unlikely(_dispatch_block_has_private_data(work))) {
// 如果包含私有数据,设置 continuation 的标志和上下文
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;
// 调用慢速初始化路径,它会处理所有私有数据相关的设置
// 注意:这个函数会初始化所有字段,但要求 dc_flags 和 dc_ctxt 已被设置
return _dispatch_continuation_init_slow(dc, dqu, flags);
}

// 获取 block 的执行函数
dispatch_function_t func = _dispatch_Block_invoke(work);

// 如果设置了 CONSUME 标志,使用特殊的函数来在执行后释放 block
if (dc_flags & DC_FLAG_CONSUME) {
func = _dispatch_call_block_and_release;
}

// 使用函数指针方式初始化 continuation
// 这个函数会设置 continuation 的函数、上下文、优先级等信息
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

上述逻辑,会先调用 _dispatch_block_has_private_data 函数检查 block 是否包含私有数据,关于 _dispatch_block_has_private_data 函数,《GCD 底层原理 3 - dispatch_sync》 中有解释,我们常规使用传入的 block 是不包含私有数据的。所以最终会执行 _dispatch_continuation_init_f 函数。

_dispatch_continuation_init_f 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
// 初始化优先级为 0
pthread_priority_t pp = 0;

// 设置 continuation 的标志,包括已分配标志和传入的额外标志
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;

// 设置 continuation 的函数和上下文
dc->dc_func = f;
dc->dc_ctxt = ctxt;

/**
在这个上下文中,DISPATCH_BLOCK_HAS_PRIORITY 表示优先级不应该被传播,只有当处理程序有优先级时才从处理程序中获取。
如果没有设置 DISPATCH_BLOCK_HAS_PRIORITY 标志,则传播当前优先级
*/
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
pp = _dispatch_priority_propagate();
}

/**
为 continuation 的 dc_voucher 赋值,该函数内部会读取当前线程的 voucher,并赋值给 dc_voucher
voucher 中携带了各种上下文信息,包括但不限于活动 ID、重要性等元数据。
这些信息可以在不同的执行上下文(如线程、进程)之间传递,支持复杂的分布式系统中的上下文传递、活动追踪和资源管理。
*/
_dispatch_continuation_voucher_set(dc, flags);

// 为 continuation 的 dc_priority 赋值,并返回对应 QoS
return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}

可以看出,该函数主要是为 continuation 各成员进行赋值,例如 dc_flagsdc_funcdc_voucherdc_priority 等。

该函数中涉及到对 _dispatch_priority_propagate_dispatch_continuation_voucher_set_dispatch_continuation_priority_set 的函数调用,接下来分别分析下三个函数的实现逻辑。

1、_dispatch_priority_propagate

先看下 _dispatch_continuation_init_f 函数源码中优先级 pp 的赋值,从源码可知调用该函数时,传入的 flags 参数值是 0,所以最终会调用 _dispatch_priority_propagate 函数,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 static inline pthread_priority_t
_dispatch_priority_propagate(void)
{
return _dispatch_priority_compute_propagated(0,
DISPATCH_PRIORITY_PROPAGATE_CURRENT);
}

static inline pthread_priority_t
_dispatch_priority_compute_propagated(pthread_priority_t pp,
unsigned int flags)
{
// 如果设置了 DISPATCH_PRIORITY_PROPAGATE_CURRENT 标志,则使用当前线程的优先级
if (flags & DISPATCH_PRIORITY_PROPAGATE_CURRENT) {
// 使用当前线程存储的优先级值
pp = _dispatch_get_priority();
}

// 移除优先级中的所有标志位,只保留纯粹的优先级值
pp = _pthread_priority_strip_all_flags(pp);

// 如果不是同步 IPC 调用,且优先级高于 USER_INITIATED 级别,则进行限制
if (!(flags & DISPATCH_PRIORITY_PROPAGATE_FOR_SYNC_IPC) &&
pp > _dispatch_qos_to_pp(DISPATCH_QOS_USER_INITIATED)) {
// 将 QOS 限制在用户发起级别 (USER_INITIATED)
// 这是为了解决 rdar://16681262 和 rdar://16998036 中提到的问题
return _dispatch_qos_to_pp(DISPATCH_QOS_USER_INITIATED);
}

// 返回计算得到的优先级
return pp;
}

static inline pthread_priority_t
_dispatch_get_priority(void)
{
// 从线程存储空间中获取当前线程优先级
// dispatch_priority_key 是一个线程特定数据的键,用于存储当前线程的优先级
// _dispatch_thread_getspecific 函数用于获取与该键关联的值
pthread_priority_t pp = (uintptr_t)
_dispatch_thread_getspecific(dispatch_priority_key);

return pp;
}

结合函数调用时的参数值,总结上述主要逻辑如下:

  • 从当前线程存储空间中获取当前线程优先级,并赋值给 pp
  • 移除优先级中的所有标志位,只保留纯粹的优先级值。
  • 如果优先级高于 USER_INITIATED 级别,则限制为 USER_INITIATED

这里的优先级 pppthread_priority_t 类型,它和 Qos 什么关系呢?
pthread_priority_tQoS (Quality of Service) 有密切的关系。在 GCD 中,它们用于表示任务的优先级和服务质量。它们之间的关系如下:

  • pthread_priority_t 是一个更底层的表示,它包含了 QoS 信息以及其他一些标志位。
  • QoS 是一个更高级的抽象,它定义了几个离散的优先级级别,如 USER_INTERACTIVEUSER_INITIATEDUTILITY 等。
  • pthread_priority_t 可以通过位操作来包含 QoS 信息。通常,QoS 值被编码在 pthread_priority_t 的高位字节中。
  • 可以使用一些宏或函数在 pthread_priority_tQoS 之间进行转换,例如:
    1
    2
    3
    4
    // 从 pthread_priority_t 提取 QoS
    dispatch_qos_t qos = _dispatch_qos_from_pp(pthread_priority);
    // 从 QoS 创建 pthread_priority_t
    pthread_priority_t pp = _dispatch_qos_to_pp(qos);
  • pthread_priority_t 除了包含 QoS 信息外,还可以包含其他标志,如相对优先级、覆盖标志等。
  • 在实际使用中,通常会使用 QoS 级别来设置任务优先级,而系统内部会将其转换为 pthread_priority_t 来进行更细粒度的调度控制。

可以通过一个简单示例,打印下默认 Qos

可以看到,默认 QosUSER_INITIATED

2、_dispatch_continuation_voucher_set

_dispatch_continuation_init_f 函数中,会调用 _dispatch_continuation_voucher_set 函数为 continuationdc_voucher 赋值,根据前述 dispatch_continuation_s 结构可知,dc_vouchervoucher_s 类型。

完全展开后的 voucher_s 结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
typedef struct voucher_s {
/**
* isa 指针,指向对象的类信息。
* 使用了 `__ptrauth_objc_isa_pointer` 进行指针认证(Pointer Authentication)。
* 这是一个防止指针篡改的安全机制。
*/
struct voucher_vtable_s *__ptrauth_objc_isa_pointer os_obj_isa;

/**
* 引用计数(Reference Count)。
* 用于管理对象的生命周期,表示当前有多少地方引用了该对象。
*/
int volatile os_obj_ref_cnt;

/**
* 交叉引用计数(Cross Reference Count)。
* 用于管理对象的交叉引用,通常在对象之间存在复杂依赖关系时使用。
*/
int volatile os_obj_xref_cnt;

/**
* voucher 的哈希表节点,用于将 voucher 对象加入到哈希表中。
* 包含双向链表的前后指针。
*/
struct voucher_hash_entry_s {
uintptr_t vhe_next; // 指向下一个节点
uintptr_t vhe_prev_ptr; // 指向前一个节点的指针
} v_list;

/**
* Mach voucher 端口。
*/
mach_voucher_t v_kvoucher;
mach_voucher_t v_ipc_kvoucher;

voucher_t v_kvbase;

/**
* Firehose 日志系统的活动 ID。
* 用于跟踪与该 voucher 相关的活动。
*/
firehose_activity_id_t v_activity;

/**
* 创建该活动的进程 ID。
*/
uint64_t v_activity_creator;

/**
* 父活动的 ID。
*/
firehose_activity_id_t v_parent_activity;

/**
* 布尔值,表示该 voucher 是否具有重要性标志。
*/
unsigned int v_kv_has_importance:1;

#if VOUCHER_ENABLE_RECIPE_OBJECTS
/**
* 配方数据的偏移量和大小(仅在启用 `VOUCHER_ENABLE_RECIPE_OBJECTS` 时存在)。
*/
size_t v_recipe_extra_offset;
mach_voucher_attr_recipe_size_t v_recipe_extra_size;
#endif
} voucher_s;

voucher_s 不是本次源码分析的重点,且涉及内容较多,不再逐个字段分析。

_dispatch_continuation_voucher_set 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static inline void
_dispatch_continuation_voucher_set(dispatch_continuation_t dc,
dispatch_block_flags_t flags)
{
// 用于存储 voucher 的指针,voucher_t 是一个指向 voucher_s 结构体的指针
voucher_t v = NULL;

/**
_dispatch_continuation_voucher_set 从不会被调用于带有私有数据或设置了 DISPATCH_BLOCK_HAS_VOUCHER 标志的块。
只有 _dispatch_continuation_init_slow 处理这个标志位。
这个断言确保传入的 flags 不包含 DISPATCH_BLOCK_HAS_VOUCHER 标志,因为带有该标志的块应该由 _dispatch_continuation_init_slow 函数处理。
*/
dispatch_assert(!(flags & DISPATCH_BLOCK_HAS_VOUCHER));

// 如果没有设置 DISPATCH_BLOCK_NO_VOUCHER 标志
if (!(flags & DISPATCH_BLOCK_NO_VOUCHER)) {
// 复制当前线程的 voucher
// _voucher_copy() 函数会增加 voucher 的引用计数
v = _voucher_copy();
}

// 将复制的 voucher 赋值给 continuation 的 dc_voucher 字段
dc->dc_voucher = v;

// 调试输出
_dispatch_voucher_debug("continuation[%p] set", dc->dc_voucher, dc);

// 记录 ktrace 日志
_dispatch_voucher_ktrace_dc_push(dc);
}

其中,_voucher_copy 函数用于复制当前线程的 voucher,其相关实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 复制当前线程关联的 voucher
static inline voucher_t
_voucher_copy(void)
{
// 获取当前线程关联的 voucher
voucher_t voucher = _voucher_get();

// 如果 voucher 存在,增加其引用计数
// _voucher_retain 函数会增加 voucher 的引用计数,确保 voucher 在使用期间不会被释放
if (voucher) _voucher_retain(voucher);

// 返回 voucher(可能是 NULL,如果当前线程没有关联的 voucher)
return voucher;
}

//获取当前线程关联的 voucher
static inline voucher_t
_voucher_get(void)
{
/**
从线程存储空间中,使用 dispatch_voucher_key 这个 Key 获取关联的 voucher。
这个函数实际上是在线程本地存储中查找与 dispatch_voucher_key 关联的值
*/
return _dispatch_thread_getspecific(dispatch_voucher_key);
}

根据源码可知,上述核心逻辑为读取当前线程的 voucher,并赋值给 dc_voucher

3、_dispatch_continuation_priority_set

该函数主要是为 continuationdc_priority 赋值,并返回对应 QoS,函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static inline dispatch_qos_t
_dispatch_continuation_priority_set(dispatch_continuation_t dc,
dispatch_queue_class_t dqu,
pthread_priority_t pp, dispatch_block_flags_t flags)
{
// 初始化 QoS 为未指定状态
dispatch_qos_t qos = DISPATCH_QOS_UNSPECIFIED;

#if HAVE_PTHREAD_WORKQUEUE_QOS
// 获取实际的调度队列对象
dispatch_queue_t dq = dqu._dq;

if (likely(pp)) {
// 检查是否强制执行 QoS 类别
bool enforce = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
// 检查队列是否设置了优先级下限标志
bool is_floor = (dq->dq_priority & DISPATCH_PRIORITY_FLAG_FLOOR);
// 检查队列是否有请求的优先级
bool dq_has_qos = (dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK);

if (enforce) {
// 如果强制执行 QoS 类别,则在优先级中添加强制标志
pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
// 从优先级中提取 QoS 值
qos = _dispatch_qos_from_pp_unsafe(pp);
} else if (!is_floor && dq_has_qos) {
pp = 0;
} else {
// 否则,从优先级中提取 QoS 值
qos = _dispatch_qos_from_pp_unsafe(pp);
}
}

// 将计算后的优先级设置到 continuation 的 dc_priority 字段
dc->dc_priority = pp;
#else
(void)dc; (void)dqu; (void)pp; (void)flags;
#endif

return qos;
}

五、_dispatch_continuation_async

在前面完成 continuation 的初始化之后,将会调用 _dispatch_continuation_async 函数将初始化好的 continuation 异步提交到指定的队列。

_dispatch_continuation_async 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
// 如果启用了调试内省功能(DISPATCH_INTROSPECTION 宏定义)
#if DISPATCH_INTROSPECTION
// 检查是否设置了 DC_FLAG_NO_INTROSPECTION 标志
// 如果未设置该标志,则将当前 continuation 对象推送到调试追踪系统中
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
// 调用 _dispatch_trace_item_push 函数,将 continuation 对象记录到调试追踪中
_dispatch_trace_item_push(dqu, dc);
}
#else
// 如果未启用调试内省功能,避免未使用的参数警告
(void)dc_flags;
#endif

// 将 continuation 对象推送到目标队列中
// dx_push 是一个虚函数调用,具体实现取决于队列的类型
// 参数说明:
// - dqu._dq: 目标队列
// - dc: continuation 对象
// - qos: 服务质量(QoS)级别
return dx_push(dqu._dq, dc, qos);
}

上述实现中,重点是对 dx_push 这个宏的调用,其他的都是日志、调试相关。

dx_push 宏定义如下:

1
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

可以知道,这里是访问队列虚表的 dq_push 函数进行调用。结合在另篇文章《GCD 底层原理 2 - dispatch_queue》 中的内容可知,串行队列、并发队列的虚表如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 串行队列的虚表
const struct dispatch_lane_vtable_s _dispatch_queue_serial_vtable = {
._os_obj_xref_dispose = _dispatch_xref_dispose,
._os_obj_dispose = _dispatch_dispose,
._os_obj_vtable = {
.do_kind = "queue_serial",
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,

.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_push,
},
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 并发队列的虚表
const struct dispatch_lane_vtable_s _dispatch_queue_concurrent_vtable = {
._os_obj_xref_dispose = _dispatch_xref_dispose,
._os_obj_dispose = _dispatch_dispose,
._os_obj_vtable = {
.do_kind = "queue_concurrent",
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
},
}

所以,前面对 dx_push 的调用:

  • 串行队列调用的是 _dispatch_lane_push 函数。
  • 并发队列调用的是 _dispatch_lane_concurrent_push 函数。

接下来,分别看下将任务提交到串行队列、并发队列的逻辑。

1、提交到串行队列

(1)_dispatch_lane_push

dispatch_async 将任务提交到串行队列时,调用 dx_push 实际调用的是 _dispatch_lane_push 函数,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// 初始化唤醒标志
dispatch_wakeup_flags_t flags = 0;
// 用于存储队列中的前一个对象
struct dispatch_object_s *prev;

// 检查是否为等待对象(例如 dispatch_sync 同步调用的上下文)
if (unlikely(_dispatch_object_is_waiter(dou))) {
return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
}

// 确保队列不是全局对象
dispatch_assert(!_dispatch_object_is_global(dq));
// 计算并更新服务质量(QoS)
qos = _dispatch_queue_push_qos(dq, qos);

/**
注意:如果我们要调用 dx_wakeup(),必须在推送的项目可能被出队之前保留队列,这意味着:
- 如果我们需要覆盖,则在交换尾部之前
- 如果我们使队列变为非空,则在设置头部之前
否则,如果在这些操作和调用 dx_wakeup() 之间被抢占,当 _dispatch_lane_drain 调用队列中的块时,这些块可能会释放队列的最后一个引用。
参见 <rdar://problem/6932776>
*/

/**
将新元素 dou._do 添加到队列的尾部。
原子地更新队列的尾指针,使其指向新添加的元素。
返回之前的尾部元素 prev。
- 如果 prev 为 NULL,说明队列之前是空的,这个新元素就是第一个元素。
- 如果 prev 不为 NULL,说明队列中已经有元素,新元素被添加到最后。
*/
prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
if (unlikely(os_mpsc_push_was_empty(prev))) {
// 如果之前队列为空,增加队列的引用计数
_dispatch_retain_2_unsafe(dq);
// 设置唤醒标志,表示需要消耗引用计数并标记队列为脏
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (unlikely(_dispatch_queue_need_override(dq, qos))) {

/**
* 这里存在一个竞争条件,_dispatch_queue_need_override 可能会读取到过时的 dq_state 值。
*
* 如果读取到的是同一轮 drain 操作中的过时值,由于最大 QoS 是单调递增的,
* 过时的读取只会导致不必要的覆盖尝试,这是无害的。
*
* 假设读取到上一轮 drain 操作中的过时值在实际中不会发生。
*/

// 增加队列的引用计数,防止队列被过早销毁
_dispatch_retain_2_unsafe(dq);
// 设置唤醒标志,表示需要消耗引用计数
flags = DISPATCH_WAKEUP_CONSUME_2;
}

/**
更新队列中新添加节点的前驱节点指针
如果 prev 不为 NULL(队列不为空):
- 将 prev 节点的 do_next 字段设置为新节点 dou._do。
- 这样就把新节点链接到了队列的末尾。
如果 prev 为 NULL(队列之前为空):
- 直接将队列的头指针设置为新节点 dou._do。
- 这处理了队列从空变为非空的情况。
*/
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);

if (flags) {
// 如果设置了唤醒标志,调用 dx_wakeup 函数唤醒队列
return dx_wakeup(dq, qos, flags);
}
}

该函数的参数 dqdispatch_lane_t 类型,在另篇文章《GCD 底层原理 2 - dispatch_queue》 中分析了 dispatch_lane_t 结构:

1
2
3
4
5
6
7
8
9
struct dispatch_lane_s {
// ......

struct dispatch_object_s *dq_items_head; // 队列头
struct dispatch_object_s *dq_items_tail; // 队列尾
dispatch_unfair_lock_s dq_sidelock; // 辅助锁

// dispatch_queue_s 包含的其他字段 ......
};

其中,将源码中的:

1
prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);

宏完全展开后,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
prev = ({
// 声明一个临时变量 _tl 并将 dou._do 赋值给它
__typeof__(__c11_atomic_load((__typeof__(*(&(dq)->dq_items_head)) _Atomic *)(&(dq)->dq_items_head), memory_order_relaxed)) _tl = (dou._do);

// 将 _tl 的 do_next 指针设置为 NULL
__c11_atomic_store((__typeof__((*(_tl)).do_next) _Atomic *)(&(_tl)->do_next), NULL, memory_order_relaxed);

// 将队列的尾指针地址存储到线程特定数据中
_dispatch_thread_setspecific(dispatch_enqueue_key, (void *) (&(dq)->dq_items_tail));

// 原子地交换队列的尾指针,将新节点 _tl 设置为新的尾节点
// 返回交换前的旧尾节点作为 prev
atomic_exchange_explicit((__typeof__(*(&(dq)->dq_items_tail)) _Atomic *)(&(dq)->dq_items_tail), _tl, memory_order_release);
})

可以看出,这部分逻辑将任务作为节点添加到队列的尾部,并将 dq_items_tail 指向这个新节点(continuation),prev 为之前的尾部指针,可以理解为上一个节点。

将源码中的:

1
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);

宏完全展开后,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
({
// 原子地加载队列的头部指针,使用 relaxed 内存序
__typeof__(atomic_load_explicit(_os_atomic_c11_atomic(&(dq)->dq_items_head), memory_order_relaxed)) _prev = (prev);

if (likely(_prev)) {
// 如果 _prev 不为 NULL,说明队列不为空
// 将新元素 dou._do 设置为 _prev 的下一个元素
// 使用 relaxed 内存序,因为这个操作不需要同步
(void)atomic_store_explicit(_os_atomic_c11_atomic(&(_prev)->do_next), dou._do, memory_order_relaxed);
} else {
// 如果 _prev 为 NULL,说明队列为空
// 将新元素 dou._do 设置为队列的头部
// 使用 relaxed 内存序,因为入队操作的同步已经在之前的 os_mpsc_push_update_tail 中完成
(void)atomic_store_explicit(_os_atomic_c11_atomic(&(dq)->dq_items_head), dou._do, memory_order_relaxed);
}

// 清除入队者的标识,可能是用于调试或跟踪目的
_dispatch_clear_enqueuer();
})

这部分逻辑是将上一步返回的 prevdo_next 指向当前任务节点。

这里入队的操作,为什么分成两部分去做呢?
从入队宏命名上,也可以看出这是 MPSC 队列,MPSC 是 “Multiple Producer, Single Consumer” 的缩写,表示多生产者单消费者队列。这是一种并发数据结构,允许多个生产者线程向队列中添加数据,而只有一个消费者线程从队列中提取数据。

MPSC 队列的特性:

  • 多生产者
    • 多个生产者线程可以同时向队列中添加数据。这需要在并发环境下确保数据的一致性和正确性。
  • 单消费者
    • 只有一个消费者线程从队列中提取数据。这简化了消费者端的同步问题,因为不需要处理多个消费者之间的竞争。

os_mpsc_push_update_tailos_mpsc_push_update_prev 是配合使用的,它们共同完成了将新节点添加到多生产者单消费者(MPSC)队列的操作。这两个宏的作用:

  • os_mpsc_push_update_tail
    • 这个函数主要负责更新队列的尾指针。
    • 它将新节点添加到队列的末尾,并返回之前的尾节点。
    • 但是,它并不会更新之前尾节点的 do_next 指针。
  • os_mpsc_push_update_prev
    • 这个函数负责更新前一个节点(之前的尾节点)的 do_next 指针,使其指向新节点。

将一个复杂的原子操作(同时更新尾指针和前一个节点的 do_next 指针)分解为两个简单的原子操作。简单的原子操作通常比复杂的原子操作更高效,尤其是在高并发情况下。

从这里也可以看出,管理任务的队列 dispatch_lane_t,是使用单向链表实现的。

总结 _dispatch_lane_push 函数核心逻辑:

  • 将任务作为节点添加到队列的尾部,并将 dq_items_tail 指向这个新节点(continuation),并返回 prev(之前的尾部节点)。
  • 其他关键参数配置。
  • prevdo_next 指向当前任务节点。
  • 调用 dx_wakeup 唤醒队列。

其中,dx_wakeup 宏定义如下:

1
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)

所以对于串行队列,这里实际调用的是 _dispatch_lane_wakeup 函数

(2)_dispatch_lane_wakeup

_dispatch_lane_wakeup 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
void
_dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
dispatch_queue_t dq = dqu._dq;
uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
// 确保目标不是等待事件状态
dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);

// 如果有目标且不消耗引用计数,增加引用计数并设置消耗标志
if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}

// 处理屏障完成的情况
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
/**
_dispatch_lane_class_barrier_complete() 处理常规队列和源需要评估的内容,但前者可能有同步切换需要执行,这是 _dispatch_lane_class_barrier_complete(),不处理的,只有 _dispatch_lane_barrier_complete() 处理。

_dispatch_lane_wakeup() 是为普通队列调用 _dispatch_lane_barrier_complete() 的函数,这里只针对非队列类型。
*/

// 断言确保当前对象是源类型
dispatch_assert(dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
// 获取唤醒的 QoS
qos = _dispatch_queue_wakeup_qos(dq, qos);
// 调用 _dispatch_lane_class_barrier_complete 处理屏障完成
return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
}

// 处理有目标的情况
if (target) {
// 如果目标是管理队列,设置相应的入队标志
if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
}
// 获取唤醒的 QoS
qos = _dispatch_queue_wakeup_qos(dq, qos);
// 原子地更新队列状态
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
// 合并新的 QoS 到状态中
new_state = _dq_state_merge_qos(old_state, qos);
if (flags & DISPATCH_WAKEUP_CLEAR_ACTIVATING) {
// 当事件正在被传递到源,因为其 unote 在 ACTIVATING 状态有机会被清除之前就被注册了,我们不希望唤醒失败,这可能导致优先级反转。相反,允许这些唤醒完成挂起的激活。
if (_dq_state_is_activating(old_state)) {
new_state &= ~DISPATCH_QUEUE_ACTIVATING;
}
}
/**
检查是否需要设置入队标志

_dq_state_is_suspended:
用于判断队列是否处于挂起状态。
通过检查 DISPATCH_QUEUE_SUSPEND_BITS_MASK 位是否被设置来确定。
挂起状态通常表示队列暂时不能处理任务。

_dq_state_is_enqueued:
用于判断队列是否已被入队。
检查 DISPATCH_QUEUE_ENQUEUED 或 DISPATCH_QUEUE_ENQUEUED_ON_MGR 位是否被设置。
入队状态表示队列已被调度,等待执行。

_dq_state_drain_locked:
用于判断队列是否被锁定(正在被某个线程处理)。
检查 DISPATCH_QUEUE_DRAIN_OWNER_MASK 位是否被设置。
锁定状态表示队列当前正在被某个线程独占处理,其他线程需等待。
*/
if (likely(!_dq_state_is_suspended(new_state) &&
!_dq_state_is_enqueued(old_state) &&
(!_dq_state_drain_locked(old_state) ||
enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR))) {
// 更新 new_state
new_state |= enqueue;
}
if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
// 更新 new_state
new_state |= DISPATCH_QUEUE_DIRTY;
} else if (new_state == old_state) {
// 状态没有变化,放弃循环
os_atomic_rmw_loop_give_up(goto done);
}
});
#if HAVE_PTHREAD_WORKQUEUE_QOS
} else if (qos) {
// 有人试图覆盖队列的最后一个工作项

// 原子地更新队列状态
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
// 避免在我们能应用覆盖之前项目被排空导致的虚假覆盖
if (!_dq_state_drain_locked(old_state) &&
!_dq_state_is_enqueued(old_state)) {
os_atomic_rmw_loop_give_up(goto done);
}
// 合并新的 QoS 到状态中
new_state = _dq_state_merge_qos(old_state, qos);
if (_dq_state_is_base_wlh(old_state) &&
!_dq_state_is_suspended(old_state) &&
/* <rdar://problem/63179930> */
!_dq_state_is_enqueued_on_manager(old_state)) {

/**
始终为层级结构中的所有队列上的异步入队操作设置已入队位 (rdar://62447289)

场景:
- Mach 通道 DM
- 目标是 TQ

线程 1:
- 持有 (TQ) 的锁,无争用同步
- 在 DM 上以低 QoS 触发唤醒,导致其具有:
max_qos = UT,已入队 = 1
- DM 到 TQ 的入队操作尚未发生。

线程 2:
- 一个传入的 IN IPC 正在服务线程上合并
- DM 的 QoS 为 UT,已入队 = 1,没有进一步的入队操作,
但我们需要一个额外的覆盖,并通过这段代码来处理 TQ。
- 这导致 TQ 被“暂存”,这需要设置已入队位,否则 try_lock_wlh()
会报错,并且唤醒引用计数会不正确。
*/
new_state |= enqueue;
}

if (new_state == old_state) {
// 如果状态没有变化,放弃循环
os_atomic_rmw_loop_give_up(goto done);
}
});

target = DISPATCH_QUEUE_WAKEUP_TARGET;
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
} else {
// 如果没有 QoS 和 target,直接完成
goto done;
}

/**
old_state ^ new_state 对旧状态和新状态进行异或操作,得到两者之间发生变化的位。
& enqueue 将异或结果与入队标志进行按位与操作,检查入队标志位是否发生了变化
*/
if (likely((old_state ^ new_state) & enqueue)) {
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
/**
上面的 rmw_loop 没有获取屏障,因为队列的最后一个块异步到该队列并不是一个不常见的模式,在这种情况下,获取屏障将完全无用。
因此,使用依赖顺序来读取 targetq 指针。
*/
os_atomic_thread_fence(dependency);
tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
(long)new_state);
} else {
tq = target;
}
// 断言确保新状态已入队
dispatch_assert(_dq_state_is_enqueued(new_state));
// 将队列推送到目标队列
return _dispatch_queue_push_queue(tq, dq, new_state);
}
#if HAVE_PTHREAD_WORKQUEUE_QOS
// 检查是否需要处理 QoS 覆盖
if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
if (_dq_state_should_override(new_state)) {
// 如果需要覆盖,调用相应的函数处理
return _dispatch_queue_wakeup_with_override(dq, new_state,
flags);
}
}
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
done:
// 如果设置了消耗标志,释放引用计数
if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
return _dispatch_release_2_tailcall(dq);
}
}

将上述代码进行精简,最终会走该函数走进 _dispatch_queue_push_queue 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void
_dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{

uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;

// ......


// 处理有目标的情况
if (target) {
// ......

// 注意,这里会增加 DISPATCH_QUEUE_ENQUEUED 标志
new_state |= enqueue;

// ......
}

if (likely((old_state ^ new_state) & enqueue)) {
// ......

// 将队列推送到目标队列
return _dispatch_queue_push_queue(tq, dq, new_state);
}


// ......
}

_dispatch_queue_push_queue 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline void
_dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_class_t dq,
uint64_t dq_state, dispatch_wakeup_flags_t flags)
{
dispatch_assert(flags & DISPATCH_EVENT_LOOP_CONSUME_2);
#if DISPATCH_USE_KEVENT_WORKLOOP
if (likely(_dq_state_is_base_wlh(dq_state))) {
_dispatch_trace_runtime_event(worker_request, dq._dq, 1);
return _dispatch_event_loop_poke((dispatch_wlh_t)dq._dq, dq_state,DISPATCH_EVENT_LOOP_CONSUME_2);
}
#endif // DISPATCH_USE_KEVENT_WORKLOOP
_dispatch_trace_item_push(tq, dq);
return dx_push(tq, dq,_dq_state_max_qos(dq_state));
}

根据在另篇文章《GCD 底层原理 2 - dispatch_queue》 提到的串行队列 dq_state 为:

1
dq_state = (DISPATCH_QUEUE_STATE_INIT_VALUE(dq_width) | DISPATCH_QUEUE_ROLE_BASE_WLH)

所以这里会调用 _dispatch_event_loop_poke 函数,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void
_dispatch_event_loop_poke(dispatch_wlh_t wlh, uint64_t dq_state, uint32_t flags)
{
if (wlh == DISPATCH_WLH_MANAGER) {
dispatch_kevent_s ke = (dispatch_kevent_s){
.ident = 1,
.filter = EVFILT_USER,
.fflags = NOTE_TRIGGER,
.udata = (dispatch_kevent_udata_t)DISPATCH_WLH_MANAGER,
};
return _dispatch_kq_deferred_update(DISPATCH_WLH_ANON, &ke);
} else if (wlh && wlh != DISPATCH_WLH_ANON) {
#if DISPATCH_USE_KEVENT_WORKLOOP
dispatch_queue_t dq = (dispatch_queue_t)wlh;
dispatch_assert(_dq_state_is_base_wlh(dq_state));

if(unlikely(_dq_state_is_enqueued_on_manager(dq_state))) {
dispatch_assert(!(flags & DISPATCH_EVENT_LOOP_OVERRIDE));
dispatch_assert(flags & DISPATCH_EVENT_LOOP_CONSUME_2);
_dispatch_trace_item_push(&_dispatch_mgr_q, dq);
return dx_push(_dispatch_mgr_q._as_dq, dq, 0);
}
dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
if (ddi && ddi->ddi_wlh == wlh) {
return _dispatch_kevent_workloop_poke_self(ddi, dq_state, flags);
}
return _dispatch_kevent_workloop_poke(wlh, dq_state, flags);
#else
(void)dq_state; (void)flags;
#endif // DISPATCH_USE_KEVENT_WORKLOOP
}
DISPATCH_INTERNAL_CRASH(wlh, "Unsupported wlh configuration");
}

根据函数中判断条件可知,这里会执行 _dispatch_kevent_workloop_poke 函数。

_dispatch_kevent_workloop_poke 这个函数名中,有一个 workloop,这个 workloop 是什么呢?

接下来先看下什么是 WorkqueueWorkloop

(3)Workqueue 与 Workloop

GCD 通过 WorkqueueWorkloop 两种机制实现了灵活的任务管理。

Workqueue

Workqueue 是 XNU 内核提供的基于线程池的任务调度机制,旨在高效地管理并发任务。它是 GCD 的底层实现之一,能够动态调整线程池的大小以适应当前的任务负载。

Workqueue 特点:

  • 线程池管理
    • Workqueue 使用线程池来执行任务,线程可以被多个任务复用,从而减少线程创建和销毁的开销。
  • 并发任务调度
    • 适合处理大量并发任务,任务之间没有严格的顺序要求。
  • 动态扩展
    • 根据任务的数量和优先级,Workqueue 可以动态增加或减少线程池中的线程。

其中,线程池的管理是通过 pthread_workqueue 相关接口与内核进行交互的,pthread_workqueuepthread 的扩展,专门用于高效的任务调度和线程管理。GCD 通过调用 pthread_workqueue 接口,将任务分发到内核的 Workqueue,内核的 Workqueue 接收到任务后,会从线程池中分配线程来执行任务。

Workqueue 在 GCD 中一个很重要的应用场景就是并发队列的异步派发,关于这一点后续内容会分析。

Workloop

Workloop 是基于 Workqueue 构建的更高级抽象。Workqueue 提供了线程池和并发任务调度的基础设施,而 Workloop 在此基础上增加了任务顺序性和事件驱动支持,专注于管理任务的顺序性和事件驱动,能够更好地支持串行任务和基于事件的任务调度。

任务被提交到串行队列后,GCD 会将任务分配到 WorkloopWorkloop 会按照任务的提交顺序依次执行任务。

简单总结一下:

  • Workqueue

    • 基于线程池的任务调度机制,适合处理并发任务。
    • 通过动态调整线程池的大小和优先级,最大化利用系统资源。
    • 主要用于 GCD 的并发队列。
  • Workloop

    • 基于事件驱动的任务调度机制,专注于任务的顺序性和上下文切换。
    • 支持事件监听和优先级调度,适合处理串行任务和事件驱动任务。
    • 主要用于 GCD 的串行队列。

(4)_dispatch_kevent_workloop_poke

该函数源码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static void
_dispatch_kevent_workloop_poke(dispatch_wlh_t wlh, uint64_t dq_state,
uint32_t flags)
{
uint32_t kev_flags = KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS;

// 定义一个 kevent 结构体,用于描述工作循环事件
dispatch_kevent_s ke;

int action;

// 断言
dispatch_assert(_dq_state_is_enqueued_on_target(dq_state));
dispatch_assert(!_dq_state_is_enqueued_on_manager(dq_state));

// 根据队列状态获取对应的操作类型
// 该函数会根据 `dq_state` 的值,返回一个与工作循环相关的操作标志
action = _dispatch_event_loop_get_action_for_state(dq_state);

// 填充 kevent 结构体
_dispatch_kq_fill_workloop_event(&ke, action, wlh, dq_state);

// 调用 `_dispatch_kq_poll` 函数,向内核提交 kevent 并等待处理结果
if (_dispatch_kq_poll(wlh, &ke, 1, &ke, 1, NULL, NULL, kev_flags)) {
// 如果 `_dispatch_kq_poll` 返回非零,表示发生错误
_dispatch_kevent_workloop_drain_error(&ke, 0);
__builtin_unreachable();
}

if (!(flags & DISPATCH_EVENT_LOOP_OVERRIDE)) {
return _dispatch_release_tailcall((dispatch_queue_t)wlh);
}

if (flags & DISPATCH_EVENT_LOOP_CONSUME_2) {
return _dispatch_release_2_tailcall((dispatch_queue_t)wlh);
}
}

这里又看到了熟悉的 _dispatch_kq_poll 函数,在《GCD 底层原理 3 - dispatch_sync》 中也针对这个函数做过分析,有需要可以参考这篇文章里 _dispatch_kq_poll 相关内容。

该函数内部调用了 _dispatch_kq_fill_workloop_event,从这一点也可以看出串行队列是基于 Workloop 的。

_dispatch_kq_fill_workloop_event 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static void
_dispatch_kq_fill_workloop_event(dispatch_kevent_t ke, int which,
dispatch_wlh_t wlh, uint64_t dq_state)
{
dispatch_queue_t dq = (dispatch_queue_t)wlh;
dispatch_qos_t qos = _dq_state_max_qos(dq_state);
pthread_priority_t pp = 0;
uint32_t fflags = 0;
uint64_t mask = 0;
uint16_t action = 0;

switch (which) {
case DISPATCH_WORKLOOP_ASYNC:
case DISPATCH_WORKLOOP_ASYNC_FROM_SYNC:
case DISPATCH_WORKLOOP_ASYNC_QOS_UPDATE:
dispatch_assert(_dq_state_is_base_wlh(dq_state));
dispatch_assert(_dq_state_is_enqueued_on_target(dq_state));
action = EV_ADD | EV_ENABLE;
mask |= DISPATCH_QUEUE_ROLE_MASK;
mask |= DISPATCH_QUEUE_ENQUEUED;
mask |= DISPATCH_QUEUE_MAX_QOS_MASK;
fflags |= NOTE_WL_IGNORE_ESTALE;
fflags |= NOTE_WL_UPDATE_QOS;
if (_dq_state_in_uncontended_sync(dq_state)) {
fflags |= NOTE_WL_DISCOVER_OWNER;
mask |= DISPATCH_QUEUE_UNCONTENDED_SYNC;
}
pp = _dispatch_kevent_workloop_priority(dq, which, qos);
break;

case DISPATCH_WORKLOOP_ASYNC_LEAVE_FROM_SYNC:
case DISPATCH_WORKLOOP_ASYNC_LEAVE_FROM_TRANSFER:
fflags |= NOTE_WL_IGNORE_ESTALE;
/* FALLTHROUGH */
case DISPATCH_WORKLOOP_ASYNC_LEAVE:
dispatch_assert(!_dq_state_is_enqueued_on_target(dq_state));
action = EV_ADD | EV_DELETE | EV_ENABLE;
mask |= DISPATCH_QUEUE_ENQUEUED;
break;

case DISPATCH_WORKLOOP_RETARGET:
action = EV_ADD | EV_DELETE | EV_ENABLE;
fflags |= NOTE_WL_END_OWNERSHIP;
break;

default:
DISPATCH_INTERNAL_CRASH(which, "Invalid transition");
}

*ke = (dispatch_kevent_s){
.ident = (uintptr_t)wlh,
.filter = EVFILT_WORKLOOP,
.flags = action,
.fflags = fflags | NOTE_WL_THREAD_REQUEST,
.qos = (__typeof__(ke->qos))pp,
.udata = (uintptr_t)wlh,

.ext[EV_EXTIDX_WL_ADDR] = (uintptr_t)&dq->dq_state,
.ext[EV_EXTIDX_WL_MASK] = mask,
.ext[EV_EXTIDX_WL_VALUE] = dq_state,
};
_dispatch_kevent_wlh_debug(_dispatch_workloop_actions[which], ke);
}

该函数最终创建的 ke 中有个 NOTE_WL_THREAD_REQUEST 配置,该配置的作用是请求内核分配一个线程去处理当前任务。

再次进入 _dispatch_kq_poll 这个函数,其精简后的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int
_dispatch_kq_poll(dispatch_wlh_t wlh, dispatch_kevent_t ke, int n,
dispatch_kevent_t ke_out, int n_out, void *buf, size_t *avail,
uint32_t flags)
{
// ......

// 使用 dispatch_once_f 确保 _dispatch_kq_init 仅被执行一次
dispatch_once_f(&_dispatch_kq_poll_pred, &kq_initialized, _dispatch_kq_init);

// ......


if (unlikely(wlh == NULL)) {
// ......
} else if (wlh == DISPATCH_WLH_ANON) {
// ......
} else {
// ......
r = kevent_id((uintptr_t)wlh, ke, n, ke_out, n_out, buf, avail, flags);
}

// ......

return r;
}

这部分主要逻辑如下:

  • 初始化,会调用一次 _dispatch_kq_init
  • 进行 kevent_id 系统调用。

其中,_dispatch_kq_init 的执行逻辑中,会调用到 _dispatch_root_queues_init_once 函数,执行路径如下:

1
2
3
4
5
6
7
_dispatch_kq_init
⬇️
_dispatch_kevent_workqueue_init
⬇️
_dispatch_root_queues_init
⬇️
_dispatch_root_queues_init_once

精简后的 _dispatch_root_queues_init_once 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
// ......

// 工作队列配置。
struct pthread_workqueue_config cfg = {
.version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
.flags = 0,
.workq_cb = 0, // workqueue 回调函数
.kevent_cb = 0, // kevent/kevent_id 事件回调函数
.workloop_cb = 0, // workloop 回调函数
.queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
.queue_label_offs = dispatch_queue_offsets.dqo_label,
};

if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
// ......
} else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
// 设置各回调函数。
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
// 调用 pthread_workqueue_setup 函数,设置工作循环。
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
// ......
} else {
// 如果系统不支持任何工作队列特性,触发崩溃。
DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
}

// ......

}

上述关键逻辑如下:

  • 配置 workqueue 回调函数
    • workq_cb = _dispatch_worker_thread2
  • 配置 workloop 回调函数
    • workloop_cb = _dispatch_workloop_worker_thread
  • 调用 pthread_workqueue_setup 进行上述配置

前面已经分析过,串行队列是基于 Workloop,最终会执行 workloop 回调函数 workloop_cb

当前面 kevent_id 系统调用执行完成后,内核会进行线程的的创建和分配。之后将会从内核态切换到用户态的 start_wqthread 函数,start_wqthread 函数中,会继续调用 _pthread_wqthread 函数,_pthread_wqthread 负责管理线程生命周期,并从工作队列中提取任务交由上面配置回调函数执行。

由于串行队列是基于 Workloop 的,所以 _pthread_wqthread 会将任务交给 workloop 回调函数 workloop_cb 执行,即 _dispatch_workloop_worker_thread 函数。

_dispatch_workloop_worker_thread 函数中,会先调用 _dispatch_root_queue_drain_deferred_wlh 函数,再调用 _dispatch_lane_invoke 函数:

1
2
3
4
5
6
7
8
9
start_wqthread
⬇️
_pthread_wqthread
⬇️
_dispatch_workloop_worker_thread
⬇️
_dispatch_root_queue_drain_deferred_wlh
⬇️
_dispatch_lane_invoke

相关调用堆栈如下:

(5)_dispatch_root_queue_drain_deferred_wlh

该函数精简后的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void
_dispatch_root_queue_drain_deferred_wlh(dispatch_deferred_items_t ddi
DISPATCH_PERF_MON_ARGS_PROTO)
{

// ......
dispatch_queue_t dq = ...;
dispatch_invoke_context_s dic = ...;
// 注意这里的标志位,后续关键逻辑还会用到
dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
DISPATCH_INVOKE_REDIRECTING_DRAIN | DISPATCH_INVOKE_WLH;

// ......

// 尝试获取 drain 锁
if (_dispatch_queue_drain_try_lock_wlh(dq, &dq_state)) {
// 获取 drain 锁成功,调用 _dispatch_lane_invoke
dx_invoke(dq, &dic, flags);

// ......

}

// ......
}

其中获取 drain 锁的的函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static inline bool
_dispatch_queue_drain_try_lock_wlh(dispatch_queue_t dq, uint64_t *dq_state)
{
uint64_t old_state, new_state;

// 定义锁定位,包含当前线程的锁值、队列宽度已满标志以及屏障操作标志
/**
定义锁标志位
其中,_dispatch_lock_value_for_self 会获取当前线程 tid 并:
tid & DLOCK_OWNER_MASK
*/
uint64_t lock_bits = _dispatch_lock_value_for_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;

// 使用原子读-修改-写循环尝试更新队列状态
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
// 初始化新状态为旧状态
new_state = old_state;

// 如果队列处于挂起状态
if (unlikely(_dq_state_is_suspended(old_state))) {
/**
清除队列的入队标志
清除 DISPATCH_QUEUE_ENQUEUED 标志后,调度系统会认为该队列不需要被处理,因此不会将其分配给线程或工作循环。
这通常用于暂停队列的执行,直到队列被显式恢复(resume)。
*/
new_state &= ~DISPATCH_QUEUE_ENQUEUED;
}
// 如果队列已经被锁定
else if (unlikely(_dq_state_drain_locked(old_state))) {
// 如果队列处于非争用的同步状态
if (_dq_state_in_uncontended_sync(old_state)) {
// 设置接收到同步等待的标志
new_state |= DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
} else {
// 否则放弃更新并退出循环
os_atomic_rmw_loop_give_up(break);
}
}
// 如果队列未被锁定且未挂起
else {
// 增加标志位
new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
new_state |= lock_bits;
}
});

// 检查旧状态是否有效
if (unlikely(!_dq_state_is_base_wlh(old_state) || !_dq_state_is_enqueued_on_target(old_state) || _dq_state_is_enqueued_on_manager(old_state))) {
#if DISPATCH_SIZEOF_PTR == 4
// 如果是 32 位架构,右移 32 位以获取高位状态
old_state >>= 32;
#endif
// 如果状态无效,触发内部崩溃并记录状态
DISPATCH_INTERNAL_CRASH(old_state, "Invalid wlh state");
}

// 如果传入了 dq_state 指针,则更新其值为新状态
if (dq_state) *dq_state = new_state;

/**
_dispatch_queue_drain_try_unlock 会清除的标志位:
- DLOCK_OWNER_MASK:会使下面 _dq_state_drain_locked(old_state) 返回 false
- DISPATCH_QUEUE_RECEIVED_OVERRIDE
- DISPATCH_QUEUE_RECEIVED_SYNC_WAIT
- DLOCK_FAILED_TRYLOCK_BIT
*/

// 返回队列是否未挂起且未被锁定
return !_dq_state_is_suspended(old_state) && !_dq_state_drain_locked(old_state);
}

上述获取 drain 锁的主要逻辑如下:

  • 如果队列未被挂起,并且未被锁定,可以获取锁,返回值为 true
    • 此时会同时设置一系列标志位 lock_bits
      • tid & DLOCK_OWNER_MASK
        • 设置了该标志位后,_dq_state_drain_locked 会返回 true
      • DISPATCH_QUEUE_WIDTH_FULL_BIT
      • DISPATCH_QUEUE_IN_BARRIER
  • 如果队列被挂起,或者已被锁定,无法获取锁,返回值为 false

其中,函数的返回值是队列是否未挂起且未被锁定,上面已经提到,是否被锁定,取决于是否设置了 tid & DLOCK_OWNER_MASK 标志位。在后续调用的 _dispatch_queue_drain_try_unlock 函数(后面分析会提到)中,会清除锁定标志位,使队列重新变成未锁定状态。

而当我们显示调用了 GCD 的 dispatch_suspend 时候,会使队列变成挂起状态。dispatch_suspend 不是本次分析的重点,所以在获取 drain 锁的逻辑里,我们只需要关注队列是否被锁定即可。

所以,_dispatch_root_queue_drain_deferred_wlh 函数核心逻辑是先尝试获取锁,如果获取成功,则调用 dx_invoke 开始执行任务。如果获取锁失败,代表当前串行队列中的任务正在其他线程被执行。
根据 dx_invoke 宏的定义:

1
#define dx_invoke(x, y, z) dx_vtable(x)->do_invoke(x, y, z)

结合前面提到的串行队列虚表可知,这里实际是调用 _dispatch_lane_invoke 函数。

(6)_dispatch_lane_invoke

_dispatch_lane_invoke 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
void
_dispatch_lane_invoke(dispatch_lane_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags)
{
// 注意这里的第 4 个参数 invoke 传的是 _dispatch_lane_invoke2
_dispatch_queue_class_invoke(dq, dic, flags, 0, _dispatch_lane_invoke2);
}

static inline void
_dispatch_queue_class_invoke(dispatch_queue_class_t dqu,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_invoke_flags_t const_restrict_flags,
_dispatch_queue_class_invoke_handler_t invoke)
{
// ......

/**
对于串行队列,从 _dispatch_root_queue_drain_deferred_wlh 走进来的时候传了 DISPATCH_INVOKE_WLH
所以,不会走到 _dispatch_queue_drain_try_lock
*/
if (likely(flags & DISPATCH_INVOKE_WLH)) {
// 标记队列为已拥有并已入队
owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED | DISPATCH_QUEUE_ENQUEUED;
} else {
// 尝试获取队列的 drain 锁,串行队列因为传了 DISPATCH_INVOKE_WLH 参数,所以不会走到这里
owned = _dispatch_queue_drain_try_lock(dq, flags);
}

// 如果成功获取了队列的 drain 锁
if (likely(owned)) {

// ......

// 执行任务
// invoke 传的是 _dispatch_lane_invoke2,所以这里执行的实际是 _dispatch_lane_invoke2 函数
tq = invoke(dq, dic, flags, &owned);


// ......

// 如果目标队列需要重新入队
if (unlikely(tq != DISPATCH_QUEUE_WAKEUP_NONE && tq != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT)) {

// 解锁,这里调用了 _dispatch_queue_drain_try_unlock 函数,使其他线程可以重新获取 drain 锁
} else if (!_dispatch_queue_drain_try_unlock(dq, owned, tq == DISPATCH_QUEUE_WAKEUP_NONE)) {
// 解锁失败 ......
}

// ......

}

// 如果目标队列不为空
if (tq) {
// 完成队列的调用并返回
return _dispatch_queue_invoke_finish(dq, dic, tq, owned);
}

// 否则,释放队列并返回
return _dispatch_release_2_tailcall(dq);
}

上述主要逻辑如下:

  • 调用 _dispatch_lane_invoke2 函数执行任务。
  • 调用 _dispatch_queue_drain_try_unlock 解锁。
    • 这里解锁后,会使下次调用 _dispatch_root_queue_drain_deferred_wlh 时可以成功获取到 drain 解锁。

接下来看下 _dispatch_lane_invoke2 函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline dispatch_queue_wakeup_target_t
_dispatch_lane_invoke2(dispatch_lane_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned)
{
dispatch_queue_t otq = dq->do_targetq;
dispatch_queue_t cq = _dispatch_queue_get_current();

if (unlikely(cq != otq)) {
return otq;
}
if (dq->dq_width == 1) {
return _dispatch_lane_serial_drain(dq, dic, flags, owned);
}
return _dispatch_lane_concurrent_drain(dq, dic, flags, owned);
}

在之前的文章里已经提到,串行队列的 dq_width1,所以这里会继续调用 _dispatch_lane_serial_drain 函数。

(7)_dispatch_lane_serial_drain

_dispatch_lane_serial_drain 函数 函数实现如下:

1
2
3
4
5
6
7
8
dispatch_queue_wakeup_target_t
_dispatch_lane_serial_drain(dispatch_lane_class_t dqu,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
uint64_t *owned)
{
flags &= ~(dispatch_invoke_flags_t)DISPATCH_INVOKE_REDIRECTING_DRAIN;
return _dispatch_lane_drain(dqu._dl, dic, flags, owned, true);
}

可以看到,这里最继续执行 _dispatch_lane_drain 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/**
* 队列的处理分为两种类型(串行/并发)和两种模式(重定向或非重定向)。
*
* 串行
* ~~~~~~
* 串行处理适用于串行队列(宽度 == 1)。它不支持重定向模式(因为没有意义),并将所有的任务视为屏障任务。
* 在串行模式下,记录操作非常少,大部分循环都被优化掉了。
*
* 当队列的宽度增长到大于 1 时,串行处理会停止。
* 通过串行处理可以防止任何递归的处理被重定向。
*
* 并发
* ~~~~~~~~~~
* 在非重定向模式下(即目标队列之一是串行队列),非屏障任务和屏障任务都会在处理线程的上下文中运行。
* 即使是较慢的非屏障任务也会被全部标记,以便它们可以向 `dispatch_sync()` 迈进,从而将它们全部串行化。
*
* 在重定向模式下,非屏障任务会被向下重定向。
*
* 当队列的宽度变为 1 时,并发处理会停止,从而队列处理切换到更高效的串行模式。
*/
DISPATCH_ALWAYS_INLINE
static dispatch_queue_wakeup_target_t
_dispatch_lane_drain(dispatch_lane_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
// 获取队列的目标队列(通常是父队列)
dispatch_queue_t orig_tq = dq->do_targetq;

// 初始化线程帧,用于保存当前线程的上下文信息
dispatch_thread_frame_s dtf;

// 定义当前任务和下一个任务的指针
struct dispatch_object_s *dc = NULL, *next_dc;

// 定义队列状态和拥有的宽度
uint64_t dq_state, owned = *owned_ptr;

// 如果队列为空(没有任务),直接返回 NULL
if (unlikely(!dq->dq_items_tail)) return NULL;

// 推入线程帧,保存当前队列的上下文
_dispatch_thread_frame_push(&dtf, dq);

// 如果是串行模式或当前任务是屏障任务
if (serial_drain || _dq_state_is_in_barrier(owned)) {
// 设置拥有的状态为屏障模式
owned = DISPATCH_QUEUE_IN_BARRIER;
} else {
// 否则,仅保留队列宽度相关的状态
owned &= DISPATCH_QUEUE_WIDTH_MASK;
}

// 获取队列的头部任务
dc = _dispatch_queue_get_head(dq);
goto first_iteration;

// 主循环,用于处理队列中的任务
for (;;) {
dispatch_assert(dic->dic_barrier_waiter == NULL);

// 将当前任务设置为下一个任务
dc = next_dc;

// 如果当前任务为空
if (unlikely(!dc)) {
// 如果队列为空,退出循环
if (!dq->dq_items_tail) {
break;
}
// 否则,获取队列的头部任务
dc = _dispatch_queue_get_head(dq);
}

// 如果需要返回内核,执行相关操作
if (unlikely(_dispatch_needs_to_return_to_kernel())) {
_dispatch_return_to_kernel();
}

// 如果串行模式与队列宽度不匹配,退出循环
if (unlikely(serial_drain != (dq->dq_width == 1))) {
break;
}

// 如果需要缩小队列宽度,退出循环
if (unlikely(!(flags & DISPATCH_INVOKE_DISABLED_NARROWING) &&
_dispatch_queue_drain_should_narrow(dic))) {
break;
}

// 如果是工作循环模式,检查队列的最大 QoS
if (likely(flags & DISPATCH_INVOKE_WORKLOOP_DRAIN)) {
dispatch_workloop_t dwl = (dispatch_workloop_t)_dispatch_get_wlh();
if (unlikely(_dispatch_queue_max_qos(dwl) > dwl->dwl_drained_qos)) {
break;
}
}

first_iteration:
// 加载队列的当前状态
dq_state = os_atomic_load(&dq->dq_state, relaxed);

// 如果队列被挂起,退出循环
if (unlikely(_dq_state_is_suspended(dq_state))) {
break;
}

// 如果目标队列发生变化,退出循环
if (unlikely(orig_tq != dq->do_targetq)) {
break;
}

// 如果是串行模式或当前任务是屏障任务
if (serial_drain || _dispatch_object_is_barrier(dc)) {
// 如果不是串行模式且未拥有屏障状态,尝试升级为屏障模式
if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_IN_BARRIER;
}

// 如果当前任务是同步等待者且未绑定线程,设置屏障等待者
if (_dispatch_object_is_sync_waiter(dc) &&
!(flags & DISPATCH_INVOKE_THREAD_BOUND)) {
dic->dic_barrier_waiter = dc;
goto out_with_barrier_waiter;
}

// 弹出队列头部任务
next_dc = _dispatch_queue_pop_head(dq, dc);
} else {
// 如果当前状态是屏障模式,释放屏障状态并更新宽度
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
os_atomic_xor2o(dq, dq_state, owned, release);
owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
} else if (unlikely(owned == 0)) {
// 如果未拥有宽度,尝试获取宽度
if (_dispatch_object_is_waiter(dc)) {
_dispatch_queue_reserve_sync_width(dq);
} else if (!_dispatch_queue_try_acquire_async(dq)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
}

// 弹出队列头部任务
next_dc = _dispatch_queue_pop_head(dq, dc);

// 如果当前任务是等待者,处理等待者逻辑
if (_dispatch_object_is_waiter(dc)) {
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
_dispatch_non_barrier_waiter_redirect_or_wake(dq, dc);
continue;
}

// 如果是重定向模式,处理重定向逻辑
if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
_dispatch_continuation_redirect_push(dq, dc,
_dispatch_queue_max_qos(dq));
continue;
}
}

// 执行当前任务
_dispatch_continuation_pop_inline(dc, dic, flags, dq);
}

// 如果当前状态是屏障模式,更新宽度
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
}

// 如果还有未完成的任务,调整拥有的宽度
if (dc) {
owned = _dispatch_queue_adjust_owned(dq, owned, dc);
}

// 更新拥有的状态
*owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
*owned_ptr |= owned;

// 弹出线程帧,恢复上下文
_dispatch_thread_frame_pop(&dtf);

// 如果还有任务,返回目标队列;否则返回 NULL
return dc ? dq->do_targetq : NULL;

out_with_no_width:
// 如果没有宽度,更新状态并弹出线程帧
*owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
_dispatch_thread_frame_pop(&dtf);
return DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;

out_with_barrier_waiter:
// 如果不允许同步等待者,触发崩溃
if (unlikely(flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS)) {
DISPATCH_INTERNAL_CRASH(0,
"Deferred continuation on source, mach channel or mgr");
}

// 弹出线程帧,恢复上下文
_dispatch_thread_frame_pop(&dtf);

// 返回目标队列
return dq->do_targetq;
}

_dispatch_lane_drain 函数主要逻辑就是按照先进先出的顺序,逐个执行串行队列中的各个任务。

2、提交到并发队列

再回头看下 _dispatch_continuation_async 函数:

1
2
3
4
5
6
7
8
9
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{

// ......

return dx_push(dqu._dq, dc, qos);
}

前面已经提到,对于并发队列,dx_push 实际调用的是 _dispatch_lane_concurrent_push 函数。

(1)_dispatch_lane_concurrent_push

该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{

/**
模拟器的回退路径,用于协作队列
检查队列是否为协作队列,且任务对象是否支持协作队列
如果不支持,则触发崩溃,提示不支持将任务目标设置为协作根队列
*/
if (unlikely(_dispatch_queue_is_cooperative(dq) &&
!_dispatch_object_supported_on_cooperative_queue(dou))) {
DISPATCH_CLIENT_CRASH(dou._do,
"Cannot target the cooperative root queue - not implemented");
}


// <rdar://problem/24738102&24743140> 保留非屏障宽度
// 如果仅设置了 ENQUEUED 位(而不是屏障宽度等效位),则不会失败
// 因此需要检查当前线程是否在此调用之前已将任务入队,否则可能会破坏任务的顺序
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) && // 检查任务是否为等待者
!_dispatch_object_is_barrier(dou) && // 检查任务是否为屏障任务
_dispatch_queue_try_acquire_async(dq)) { // 尝试异步获取队列
// 将任务重定向推送到队列中
return _dispatch_continuation_redirect_push(dq, dou, qos);
}

// 调用 _dispatch_lane_push
_dispatch_lane_push(dq, dou, qos);
}

在该函数的 if 判断条件里,dq->dq_items_tail 一定是 NULL(具体原因后续会分析)。并且根据其他几个判断条件可知,这里会调用 _dispatch_continuation_redirect_push 函数:

1
2
3
4
5
6
7
8
9
10
11
12
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
// .....

dispatch_queue_t dq = dl->do_targetq;
//......

// 注意,这里传入的 dq 是 do_targetq,不是当前 dp,do_targetq 是从根队列数组 _dispatch_root_queues 中取出的对应的根队列(root queue)
dx_push(dq, dou, qos);
}

可以发现,这里又调用了熟悉的 dx_push 宏,但是传入的并不是我们创建的并发队列,而是并发队列的 do_targetq

在之前的文章《GCD 底层原理 2 - dispatch_queue》 中已经分析了,结论是并发队列和串行队列都是从根队列数组 _dispatch_root_queues 中取出的对应的根队列(root queue)。创建并发队列时,返回的是 _dispatch_root_queues 数组 index = 9 的元素(DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS),即:

1
2
3
4
5
6
7
8
9
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
.do_ctxt = _dispatch_root_queue_ctxt(DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS),
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
.dq_priority = _dispatch_priority_make_fallback(DISPATCH_QOS_DEFAULT),
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 13,
},

所以这里调用 dx_push 时传入的实际上是对应的 root queue。而对于 root queue,其 dx_push 对应的函数是 _dispatch_root_queue_push,所以对于后续往队列 push 任务,都是在对应的 root queue 上的,而不是在当然任务 dq 上的。这也是 _dispatch_lane_concurrent_push 里的判断条件 dq->dq_items_tailNULL 的原因。

(2)_dispatch_root_queue_push

该函数精简后的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// ......

if (_dispatch_root_queue_push_needs_override(rq, qos)) {
return _dispatch_root_queue_push_override(rq, dou, qos);
}

_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

其中 _dispatch_root_queue_push_needs_override_dispatch_root_queue_push_override 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 判断是否需要覆盖当前队列的优先级
static inline bool
_dispatch_root_queue_push_needs_override(dispatch_queue_global_t rq,
dispatch_qos_t qos)
{
// 外部传进来的 qos 是 dq 的,而不是 dq->do_targetq 的。
// 而 rq 是 dq->do_targetq,即 root queue
dispatch_qos_t fallback = _dispatch_priority_fallback_qos(rq->dq_priority);
if (fallback) {
// 如果存在 QoS,且当 QoS 不等于回退 fallback,则需要覆盖
return qos && qos != fallback;
}

// 获取队列的当前 QoS
dispatch_qos_t rqos = _dispatch_priority_qos(rq->dq_priority);
// 如果当前 QoS 存在且小于传入的 QoS,则需要覆盖
return rqos && qos > rqos;
}

// 执行优先级覆盖操作
DISPATCH_NOINLINE
static void
_dispatch_root_queue_push_override(dispatch_queue_global_t orig_rq,
dispatch_object_t dou, dispatch_qos_t qos)
{
uintptr_t flags = 0;

// 检查队列是否设置了超额提交标志
if (orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
// 如果设置了超额提交标志,添加 DISPATCH_QUEUE_OVERCOMMIT 标志
flags |= DISPATCH_QUEUE_OVERCOMMIT;
} else if (_dispatch_queue_is_cooperative(orig_rq)) {
// 如果是协作队列,添加 DISPATCH_QUEUE_COOPERATIVE 标志
flags |= DISPATCH_QUEUE_COOPERATIVE;
}

// 根据 QoS 和标志获取目标根队列
dispatch_queue_global_t rq = _dispatch_get_root_queue(qos, flags);
// 获取调度对象的 continuation
dispatch_continuation_t dc = dou._dc;

// 如果调度对象是重定向类型
if (_dispatch_object_is_redirection(dc)) {
// 不需要双重包装,直接设置原始队列为函数上下文
dc->dc_func = (void *)orig_rq;
} else {
// 分配一个新的 continuation
dc = _dispatch_continuation_alloc();
// 设置 continuation 的虚表为 OVERRIDE_OWNING 类型
dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
、 dc->dc_ctxt = dc;
dc->dc_other = orig_rq;
dc->dc_data = dou._do;
dc->dc_priority = DISPATCH_NO_PRIORITY;
dc->dc_voucher = DISPATCH_NO_VOUCHER;
}

// 将 continuation 推送到目标根队列
_dispatch_root_queue_push_inline(rq, dc, dc, 1);
}

可以看到,最终会调用到 _dispatch_root_queue_push_inline 函数。

(3)_dispatch_root_queue_push_inline

该函数实现如下:

1
2
3
4
5
6
7
8
9
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
return _dispatch_root_queue_poke_and_wakeup(dq, n, 0);
}
}

将其中涉及到的宏展开后实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
// 提取任务链表的起始和结束节点,这两个节点表示需要处理的一组任务
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;

// 定义一个本地变量 _token 来表示队列中新的尾部元素
__typeof__(atomic_load_explicit((__typeof__(*(&_os_mpsc_head(dq, dq_items))) _Atomic *)(&_os_mpsc_head(dq, dq_items)), memory_order_relaxed)) _token;

// 将 _tail 更新为队列的新尾部,并获取之前的尾部元素(即 _token)
_token = os_mpsc_push_update_tail((dq, dq_items), tl, do_next);

// 将链表的头部(hd)与新尾部(tl)链接在一起
os_mpsc_push_update_prev((dq, dq_items), _token, hd, do_next);

/**
检查如果队列在插入这个任务链前是空的,则需要分配线程去处理队列中的任务。
如果 _token == NULL,说明这是个新队列首次插入任务,或者队列中的任务已被处理完。
如果插入任务后,发现还没有分配线程处理这个队列中的任务,就需要分配线程去处理。
*/
if ((_token) == NULL) {
return _dispatch_root_queue_poke_and_wakeup(dq, n, 0);
}
}

这里又看到了前面总结过的 MPSC 队列操作。

总结该函数逻辑如下:

  • 通过 MPSC 队列操作将任务插入队列尾部。
  • 分配线程去处理队列中的任务。
    • 调用 _dispatch_root_queue_poke_and_wakeup 函数
      • 传参 n = 1
      • 传参 floor = 0

其中,_dispatch_root_queue_poke_and_wakeup 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void
_dispatch_root_queue_poke_and_wakeup(dispatch_queue_global_t dq, int n, int floor)
{
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
dx_type(dq) == DISPATCH_QUEUE_COOPERATIVE_ROOT_TYPE))
#endif
{
int old_pending, new_pending;
os_atomic_rmw_loop2o(dq, dgq_pending, old_pending, new_pending, release, {
new_pending = old_pending ?: n;
});
if (old_pending > 0) {
_dispatch_root_queue_debug("worker thread request still pending "
"for global queue: %p", dq);
return;
}
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
return _dispatch_root_queue_poke_slow(dq, n, floor);
}

该函数主要逻辑是调用 _dispatch_root_queue_poke_slow 函数,且第二个参数 n = 1

(4) _dispatch_root_queue_poke_slow

精简后的 _dispatch_root_queue_poke_slow 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
// 参数 n = 1,所以 remaining = 1
int remaining = n;

// ......

_dispatch_root_queues_init();

// ......

if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
{
// ......

r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));

return;
}
}

其中,_dispatch_root_queues_init 函数实现如下:

1
2
3
4
5
6
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}

这里又调用了 _dispatch_root_queues_init_once 函数,关于该函数在前面串行队列部分已经分析过,函数中关键逻辑如下:

  • 配置 workqueue 回调函数
    • workq_cb = _dispatch_worker_thread2
  • 配置 workloop 回调函数
    • workloop_cb = _dispatch_workloop_worker_thread

而且前面也已经分析过,并发队列是基于 workqueue 的,从 _dispatch_root_queue_poke_slow 函数实现也可以看到,函数逻辑中没有涉及到像串行队列那样的 workloop 的配置。所以在 _dispatch_root_queue_poke_slow 函数中,调用 _pthread_workqueue_addthreads 完成线程的分配之后,会执行 workqueue 回调函数 _dispatch_worker_thread2

_pthread_workqueue_addthreads 函数有两个参数:

  • numthreads
    • 要请求的线程数量
  • priority
    • 线程优先级

而上面调用 _pthread_workqueue_addthreads 函数时,传入的 numthreads 参数为 1,表示请求分配一个线程去处理任务。

按照经验来看,dispatch_async + 并发队列,是会分配多个线程去处理各个任务的。为什么这一步却只请求一个线程去处理任务呢?后面会分析原因。

(5)_dispatch_worker_thread2

该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static void
_dispatch_worker_thread2(pthread_priority_t pp)
{

// ......

dispatch_queue_global_t dq; // 全局队列。
dispatch_invoke_flags_t invoke_flags = 0; // 调用标志初始化。

uintptr_t rq_flags = 0; // 队列标志初始化。
if (cooperative) {
// 如果是合作线程,设置合作队列标志和调用标志。
rq_flags |= DISPATCH_QUEUE_COOPERATIVE;
invoke_flags |= DISPATCH_INVOKE_COOPERATIVE_DRAIN;
} else {
// 如果是超额提交线程,设置超额提交标志。
rq_flags |= (overcommit ? DISPATCH_QUEUE_OVERCOMMIT : 0);
}

// 根据优先级和标志获取对应的全局队列。
dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), rq_flags);

// ......

// 设置调用标志
invoke_flags |= DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN;

// 从 root queue 中提取任务并执行。
_dispatch_root_queue_drain(dq, dq->dq_priority, invoke_flags);

// ......
}

在该函数中,主要是调用了 _dispatch_root_queue_drain 函数去执行具体任务。

(6)_dispatch_root_queue_drain

_dispatch_root_queue_drain 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static void
_dispatch_root_queue_drain(dispatch_queue_global_t dq,
dispatch_priority_t pri, dispatch_invoke_flags_t flags)
{

// ......

// 将当前线程与队列关联
_dispatch_queue_set_current(dq);

// ......

// 定义一个指向调度对象的指针 item,用于存储从队列中弹出的任务
struct dispatch_object_s *item;

// 初始化调度上下文 dic,用于存储调度过程中的相关信息
dispatch_invoke_context_s dic = { };

// ......

// 循环处理队列中的任务,直到队列为空
while (likely(item = _dispatch_root_queue_drain_one(dq))) {
// ......

// 弹出并执行队列中的任务
_dispatch_continuation_pop_inline(item, &dic, flags, dq);

// ......
}

// ......

// 将当前线程与队列取消关联
_dispatch_queue_set_current(NULL);
}

该函数核心逻辑如下:

  • 将当前线程与队列关联。
    • 确保任务执行期间,GCD 能正确识别当前线程所属队列,防止线程池中的线程后续任务调度混乱。
  • 使用 while 循环,按照 FIFO 顺序取出一个任务 item
    • 取任务调用的 _dispatch_root_queue_drain_one 函数。
  • while 循环中,弹出并执行的上一步取出的任务 item
    • 调用 _dispatch_continuation_pop_inline 函数。
  • 所有任务处理完成后,将当前线程与队列取消关联。

这里需要重点看下 _dispatch_root_queue_drain_one 函数,这是实现多线程并发执行的关键。

(7)_dispatch_root_queue_drain_one

该函数内部实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
static inline struct dispatch_object_s *
_dispatch_root_queue_drain_one(dispatch_queue_global_t dq)
{
struct dispatch_object_s *head, *next;

start:
// MEDIATOR 值同时充当“锁”和信号的作用。
// 将队列的头部指针交换为 DISPATCH_ROOT_QUEUE_MEDIATOR,表示当前线程正在处理队列。
head = os_atomic_xchg2o(dq, dq_items_head,
DISPATCH_ROOT_QUEUE_MEDIATOR, relaxed);

// 如果队列头部为空,检查是否与并发入队操作发生竞争。
if (unlikely(head == NULL)) {

// 第一次交换 tail 指针会告诉入队线程可以安全地写入 head 指针。
// 如果 CAS 操作失败,说明有并发入队操作使队列变为非空。
if (unlikely(!os_atomic_cmpxchg2o(dq, dq_items_head,
DISPATCH_ROOT_QUEUE_MEDIATOR, NULL, relaxed))) {
// 与并发入队线程竞争,重新尝 提取任务。
goto start;
}

// 如果 tail 指针不为空,说明有并发入队操作尚未完成。
if (unlikely(dq->dq_items_tail)) { // <rdar://problem/14416349>
// head 被设置为 MEDIATOR,表示之前 head 为空,但 tail 有值。
// 等待入队操作完成。
if (__DISPATCH_ROOT_QUEUE_CONTENDED_WAIT__(dq,
_dispatch_root_queue_head_tail_quiesced)) {
goto start;
}
}
// 如果队列仍然为空,记录调试信息并返回 NULL。
_dispatch_root_queue_debug("no work on global queue: %p", dq);
return NULL;
}

// 如果 head 是 MEDIATOR,说明与另一个线程竞争失败。
if (unlikely(head == DISPATCH_ROOT_QUEUE_MEDIATOR)) {
// 等待 MEDIATOR 被清除。
if (likely(__DISPATCH_ROOT_QUEUE_CONTENDED_WAIT__(dq,
_dispatch_root_queue_mediator_is_gone))) {
goto start;
}
return NULL;
}

// 恢复 head 指针为正常值。
// 如果 next 为空,说明当前任务可能是最后一个任务。
next = head->do_next;
if (unlikely(!next)) {

// 将队列头部指针设置为 NULL。
os_atomic_store2o(dq, dq_items_head, NULL, relaxed);
// 将 tail 指针也设置为 NULL,确保队列为空的状态一致性。
if (os_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, release)) {
// head 和 tail 都为 NULL,队列为空。
goto out;
}
// 如果 CAS 失败,说明有新的任务被入队。
next = os_mpsc_get_next(head, do_next, &dq->dq_items_tail);
}

// 更新队列头部指针为下一个任务。
os_atomic_store2o(dq, dq_items_head, next, relaxed);


// ⚠️ 注意,这里调用了 _dispatch_root_queue_poke 函数,内部会再次调用 _dispatch_root_queue_poke_slow
_dispatch_root_queue_poke(dq, 1, 0);
out:
return head;
}

该函数核心逻辑有两部分:

  • 按照 FIFO 顺序取一个任务并返回,交由外面 _dispatch_continuation_pop_inline 函数取执行任务。
  • 调用 _dispatch_root_queue_poke 函数。
    • 该函数内部会再次调用 _dispatch_root_queue_poke_slow 函数。

_dispatch_root_queue_poke 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
if (!_dispatch_queue_class_probe(dq)) {
return;
}
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
dx_type(dq) == DISPATCH_QUEUE_COOPERATIVE_ROOT_TYPE))
#endif
{
if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, release))) {
_dispatch_root_queue_debug("worker thread request still pending "
"for global queue: %p", dq);
return;
}
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE

// 调用了 _dispatch_root_queue_poke_slow 函数
return _dispatch_root_queue_poke_slow(dq, n, floor);
}

可以看到,这里再次调用了 _dispatch_root_queue_poke_slow 函数,上面已经提到 _dispatch_root_queue_poke_slow 会请求分配 1 个线程去执行任务。

这个“循环”的执行链路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
_dispatch_root_queue_poke_slow
⬇️
_pthread_workqueue_addthreads // 申请分配 1 个线程去执行任务
⬇️
_dispatch_worker_thread2
⬇️
_dispatch_root_queue_drain // 调用 _dispatch_root_queue_drain_one 取出 1 个任务并执行
⬇️
_dispatch_root_queue_drain_one
⬇️
_dispatch_root_queue_poke_slow // 新一轮循环,申请 1 个线程执行 1 个任务
⬇️
......

这里就是 dispatch_async + 并发队列的多线程并发执行的关键逻辑了:每次申请 1 个线程去执行 1 个任务,在从队列取任务的同时,同时去申请新的线程执行下个任务,这样就达到了多个线程并发执行任务的目的。而具体哪个任务先执行,则取决于的线程调度先后。

这里用到了“任务窃取优化”:
新创建的线程立即进入 _dispatch_root_queue_drain 循环,尝试从队列中窃取任务执行。提高多核 CPU 利用率,减少任务排队时间。

(8)_dispatch_continuation_pop_inline

这里再回头简单看下上一步取出的任务,是如何执行的。执行时调用的是 _dispatch_continuation_pop_inline 函数,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
dispatch_queue_class_t dqu)
{
dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
_dispatch_get_pthread_root_queue_observer_hooks();
if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
if (_dispatch_object_has_vtable(dou)) {
if (dx_type(dou._do) == DISPATCH_SWIFT_JOB_TYPE) {
dx_invoke(dou._dsjc, NULL,
_dispatch_invoke_flags_to_swift_invoke_flags(flags));
} else {
dx_invoke(dou._dq, dic, flags);
}
} else {
_dispatch_continuation_invoke_inline(dou, flags, dqu);
}
if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
}

上面主要逻辑是判断传入的任务是否有虚函数表,如果有的话调用 dx_invoke 宏执行任务,否则调用 _dispatch_continuation_invoke_inline 宏执行任务。

由于传进来的任务,是前面包装好的 continuation,是没有虚函数表的,所以这里会继续调用 _dispatch_continuation_invoke_inline 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou,
dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
{
dispatch_continuation_t dc = dou._dc, dc1;
// 使用自动释放池包装调度项的执行,确保在执行过程中管理内存
dispatch_invoke_with_autoreleasepool(flags, {
uintptr_t dc_flags = dc->dc_flags;
// 将调度项重新放回缓存中,以便快速回收和复用,这样可以减少内存分配的开销,提高性能
_dispatch_continuation_voucher_adopt(dc, dc_flags);

if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_pop(dqu, dou);
}

// 如果设置了 DC_FLAG_CONSUME 标志,则释放调度项的缓存
if (dc_flags & DC_FLAG_CONSUME) {
dc1 = _dispatch_continuation_free_cacheonly(dc);
} else {
dc1 = NULL;
}

// 如果设置了 DC_FLAG_GROUP_ASYNC 标志,表示这是一个异步任务组,调用专门的函数处理任务组的逻辑
if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
_dispatch_continuation_with_group_invoke(dc);
} else {
// 否则,调用调度项的回调函数
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
// 记录调度项的完成事件
_dispatch_trace_item_complete(dc);
}

// 如果缓存释放失败,则将调度项释放到缓存限制中
if (unlikely(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
});
// 增加性能监控的工作项计数
_dispatch_perfmon_workitem_inc();
}

上述核心逻辑如下:

  • 使用 autoreleasepool 包裹调度项的执行,确保在执行过程中管理内存。
  • 判断是否是 dispatch_group 任务组。
    • 如果是 dispatch_group 任务组,调用 _dispatch_continuation_with_group_invoke 执行。
    • 如果是普通任务,调用 _dispatch_client_callout 执行任务。
  • 释放 continuation,存到线程的 continuation 缓存池。

六、总结

可以用下图表示 dispatch_async + 串行队列/并发队列的逻辑:

Tags: 底层