当前位置: 首页 > news >正文

网站你应该明白我的意思吗免费建站系统个人

网站你应该明白我的意思吗,免费建站系统个人,网络推广培训资料,xampp wordpress linux1. 引言通常我们的说的同步其实有两个层面的意思#xff1a;一个是线程间的同步#xff0c;主要是为了按照编程者指定的特定顺序执行#xff1b;另外一个是数据的同步#xff0c;主要是为了保存数据。为了高效解决同步问题#xff0c;前人抽象出同步原语供开发者使用。不仅… 1. 引言通常我们的说的同步其实有两个层面的意思一个是线程间的同步主要是为了按照编程者指定的特定顺序执行另外一个是数据的同步主要是为了保存数据。为了高效解决同步问题前人抽象出同步原语供开发者使用。不仅多核单核也需要同步原语核心的问题要保证共享资源访问的正确性。如果共享资源划分不合理同步原语的开销会制约多核性能。常见的同步原语有互斥锁、条件变量、信号量、读写锁、RCU等。本文主要聚焦于互斥锁对应linux的spinlock我们试图沿着时间的脉络去梳理spinlock的不断改进的进程如果涉及CPU体系结构我们主要关注ARM体系结构的实现。如果后续有时间我们会继续分析其他同步原语的演进和优化历程。2. 演化进程linux内核spinlock是互斥机制的最底层基础设施它的性能直接关系到内核的性能主要分为这么几个阶段linux-2.6.25之前我们称之为原始spinlock。对锁的实现是使用原子操作去无序竞争全局的锁资源。这个阶段对锁的争用处于无序竞争的状态。如果CPU核心数不多资源相对充裕好比我们去银行柜台办理业务一共就1-2个人无非你在前还是我在前的问题公平性的问题并不突出性能也没什么大的影响但是一旦cpu核心数和锁的竞争者相对较多时就会出现有些人因为某些优势如CPU算力强锁正好落在当前CPU的cacheline中等总是能抢到锁其他人总是抢不到的情况出现。linux-2.6.25在x86下实现了ticket spinlock替换原始spinlock。随着CPU核数的增多以及对共享资源的争用越发激烈公平性问题就显现出来了。保证公平一个很自然的思路就是大家都来排队。如果对锁的争用比较激烈再加上如果此时核比较多此时一旦释放锁其实只有1个CPU能抢到锁但是因为大家观察都是全局的一个锁那其他CPU的cacheline会因此失效会有相当程度的性能损耗。还是就以去银行柜台办理业务为例它的实现相当于去银行取号、排队、等叫号这么一个过程问题在于叫号相当是一个广播机制所有人都要侦听还是有点浪费时间精力。linux-3.5ARM体系结构用ticket spinlock替换了原始spinlock。过了几个版本ARM才替换原因作者没有去考证不得而知。linux-3.15实现了mcs_spinlock但未替换ticket spinlock。它把对全局锁转换成per-cpu的锁减少争用的损耗然后各个锁之间通过链表串起来排队解决了公平性和性能损耗的问题。然后它却没有替代ticket spinlock成为内核默认实现因为spinlock太底层了已经嵌入了内核的各种关键数据结构中它的数据结构要比spinlock大这是内核锁不能接受的但是最终它还是合入了内核只是没有人去用它。但是它的存在为后一步的优化仍然起到了非常重要的作用。它的实现思路是把ticket spinlock的广播机制转变为击鼓传花也就是实际上可以我并需要侦听广播主要在我前面排队的人在使用完锁以后告诉我可以了。linux-4.2实现了queued spinlock简称qspinlock替换了ticket spinlock。它首先肯定要解决mcs_spinlock占用大小实际上它结合了ticket spinlock和mcs_spinlock的优点大小保持一致如果只有1个或2个CPU试图获取锁只用这个新的spinlock当有3个以上的CPU试图获取锁才需要mcs_spinlock。它的数据结构有表示当前锁的持有情况、是否有还有一个竞争者已经需要快速找到对应CPU的per-cpu结构的mcs_spinlock节点这3个大的域被塞在ticket spinlock同样大小数据结构中。这种遵守原先架构约束的情况而做出的改进非常值得我们学习。linux-4.19ARM体系结构将queued spinlock替换成默认实现。原因是什么作者同样没去考证。3. 原始spinlock实现3.1 关键数据结构和公共代码typedef struct {volatile unsigned int lock; } raw_spinlock_t;typedef struct {raw_spinlock_t raw_lock; } spinlock_t;#define spin_lock(lock) _spin_lock(lock)void __lockfunc _spin_lock(spinlock_t *lock) {preempt_disable();_raw_spin_lock(lock); } # define _raw_spin_lock(lock) __raw_spin_lock((lock)-raw_lock)3.2 ARM体系结构的加锁实现//arm32那时候还没arm64的实现这个时期的内核大体对应ARMV6 static inline void __raw_spin_lock(raw_spinlock_t *lock) {unsigned long tmp;__asm__ __volatile__( 1: ldrex %0, [%1]\n //1.teq %0, #0\n //2. strexeq %0, %2, [%1]\n //3.teqeq %0, #0\n //4. #ifdef CONFIG_CPU_32v6Kwfene\n //5. #endifbne 1b //6.: r (tmp): r (lock-lock), r (1) : cc);smp_mb(); //7. }通过数据结构可以看出此时的lock还是一个unsigned int类型的数据加锁的时候首先会关闭抢占然后会转到各个体系结构的实现我们关注ARM的实现__raw_spin_lock的分析如下读取lock的状态值给tmp并将lock-lock标记为独占。判断lock的状态是否为0。如果是0说明可以继续往下走跳到第3步如果不为0说明自旋锁处于上锁状态不能访问跳到第5步如果不支持WFE则直接跳到第6步自旋最后回到第1步自旋。teq执行会影响标志寄存器中Z标志位后面带eq或者ne后缀的执行都受该标志位影响。执行strex执行只有从上一次ldrex执行到本次strex这个被标记为独占的地址lock-lock没有改变才会执行成功lock的状态改写为1。通过strex执和ldrex实现原子性操作。继续判断lock的状态是否为0为0说明获得锁不为0说明没有获得锁跳到第5步如果支持WFE的话。执行WFE指令如果支持的话CPU进低功耗状态省点功耗。如果收到SEV指令如果有第五步的话继续判断lock的状态是否为0不为0跳到第1步继续循环如果lock为0继续跳到第7步执行barrier多核情况下为DMB指令保证访存顺序按我们的编程顺序执行即后面的load/store绝不允许越过smp_mb()屏障乱序到前面执行。3.3 ARM体系结构的解锁实现static inline void __raw_spin_unlock(raw_spinlock_t *lock) {smp_mb();__asm__ __volatile__(str %1, [%0]\n #ifdef CONFIG_CPU_32v6Kmcr p15, 0, %1, c7, c10, 4\n /* DSB */sev #endif:: r (lock-lock), r (0): cc); }解锁的操作相对简单str将lock-lock赋值然后使用DSB保序使用sev通知持锁cpu得到锁。4. ticket spinlock4.1 关键数据结构和公共代码typedef struct {union {u32 slock;struct __raw_tickets { //只考虑小端u16 owner;u16 next;} tickets;}; } arch_spinlock_t;typedef struct raw_spinlock {arch_spinlock_t raw_lock; } raw_spinlock_t;typedef struct spinlock {union {struct raw_spinlock rlock;}; } spinlock_t;static inline void spin_lock(spinlock_t *lock) {raw_spin_lock(lock-rlock); } #define raw_spin_lock(lock) _raw_spin_lock(lock) void __lockfunc _raw_spin_lock(raw_spinlock_t *lock) {__raw_spin_lock(lock); } static inline void __raw_spin_lock(raw_spinlock_t *lock) {preempt_disable();do_raw_spin_lock(); } static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock) {arch_spin_lock(lock-raw_lock); }4.2 AArch32的加锁实现static inline void arch_spin_lock(arch_spinlock_t *lock) {unsigned long tmp;u32 newval;arch_spinlock_t lockval;__asm__ __volatile__( 1: ldrex %0, [%3]\n //1. add %1, %0, %4\n //2.strex %2, %1, [%3]\n //3.teq %2, #0\n //4.bne 1b //5.: r (lockval), r (newval), r (tmp): r (lock-slock), I (1 TICKET_SHIFT): cc);while (lockval.tickets.next ! lockval.tickets.owner) { //6.wfe(); //7.lockval.tickets.owner ACCESS_ONCE(lock-tickets.owner);//8.}smp_mb(); //9. }把lock-slock值保存到lock_valnewval lockval (1 TICKET_SHIFT) lockval (1 16)等价于newval lockval.tickets.next相当于从银行取号机取号。strex tmp, newval, [lock-slock]将新的值newval 存在lock中strex将是否成功结果存入在tmp中检查是否写入成功lockval.tickets.next成功则跳到第6步否则返回第1步重试同上文类似也是实现原子操作lockval.tickets.next和owner成员是否相等相等跳到第9步成功获得锁没有则跳到第7步。成功的话相当于银行柜台已经叫我的号了我就可以去办理业务了没有的话我还要继续等。执行WFE指令CPU进低功耗状态省点功耗。如果收到SEV指令从低功耗状态恢复重新获得新的owner值因为一般是别人释放了锁才会发送SEV指令这时owner的值已经发生了变化需要重新从内存中获取ACCESS_ONCE本身的实现就是增加了volatile这个关键字它确保编译器每次访问的变量都是从内存中获取防止编译器优化。执行barrier同上文描述不再赘述。4.3 AArch32的解锁实现static inline void arch_spin_unlock(arch_spinlock_t *lock) {unsigned long tmp;u32 slock;smp_mb();__asm__ __volatile__(mov %1, #1\n //1. 1: ldrex %0, [%2]\n //2. uadd16 %0, %0, %1\n //3.strex %1, %0, [%2]\n //4.teq %1, #0\n //5.bne 1b //6.: r (slock), r (tmp): r (lock-slock): cc);dsb_sev(); //7. }将tmp赋值为1将lock-slock的值赋值给slock。将slock的低16bit也就是owner成员的值加1。将新的值新的ower使用strex写入中lock-slock将是否成功结果存入在tmp中检查是否写入成功成功跳到第7步实现了原子操作不成功跳到第6步tmp不等于0不成功继续返回label 1重试即跳回第2步4.4 AArch64的加锁实现static inline void arch_spin_lock(arch_spinlock_t *lock) {unsigned int tmp;arch_spinlock_t lockval, newval;asm volatile(prfm pstl1strm, %3\n //1. 1: ldaxr %w0, %3\n //2.add %w1, %w0, %w5\n //3.stxr %w2, %w1, %3\n //4.cbnz %w2, 1b\n //5./* Did we get the lock? */eor %w1, %w0, %w0, ror #16\n //6.cbz %w1, 3f\n //7.sevl\n //8. 2: wfe\n //9.ldaxrh %w2, %4\n //10.eor %w1, %w2, %w0, lsr #16\n //11.cbnz %w1, 2b\n //12. 3: //13.: r (lockval), r (newval), r (tmp), Q (*lock): Q (lock-owner), I (1 TICKET_SHIFT): memory); }核心逻辑与AArch32类似汇编实现会有不一样这里不再展开。从lockmemory预取数据到L1cache中加速执行。使用ldaxr指令Load-acquire exclusive register带Exclusive和Acquire-Release 两种语义将lock的值赋值给lockval。newval lockval (1 TICKET_SHIFT) lockval (1 16)等价于newval lockval.tickets.next相当于从银行取号机取号。使用stxr指令将newval赋值给lock并将exclusive是否设置成功结果存放到tmp中。如果tmp ! 0说明exclusive失败需要重新跳到开始处第2步重试因为这时候其他CPU核心可能有是执行流插入抢在我们前面执行。否则继续。用eor 异或运算实现lockval.tickets.next和owner成员是否相等的判断如果相等跳到label3对应13步获得锁进入临界区。否则往下执行自旋。使用SEVL指令发送本地事件Send Event Locally唤醒本CPU核心防止有丢失unlock的情况出现。执行WFE指令CPU进低功耗状态省点功耗。获取当前的Owner值存放在tmp中判断lockval.tickets.next和owner成员的值是否相等如果不相等就回跳到label2对应第9步。相等继续往下。结束退出。4.5 AArch64的解锁实现static inline void arch_spin_unlock(arch_spinlock_t *lock) {asm volatile(stlrh %w1, %0\n: Q (lock-owner): r (lock-owner 1): memory); }解锁的操作相对简单stlrh除了将lock-owner相当于银行柜台叫下一个排队者的号码将会兼有SEV和DSB的功能。5. mcs spinlock5.1 关键数据结构和变量struct mcs_spinlock {    struct mcs_spinlock *next; //1.    int nt locked; //2.};当一个CPU试图获取一个spinlock时它就会将自己的MCS lock加到这个spinlock的等待队列的队尾然后next指向这个新的MCS lock。locked的值为1表示已经获得spinlock为0则表示还没有持有该锁。5.2 加锁的实现static inline void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node) {struct mcs_spinlock *prev;node-locked 0; //1.node-next NULL;prev xchg(lock, node); //2if (likely(prev NULL)) { //3return;}ACCESS_ONCE(prev-next) node; //4arch_mcs_spin_lock_contended(node-locked); //5. }先看下两个参数第1个参数lock是指向指针的指针二级指针是因为它指向的是末尾节点里的next域而next本身是一个指向struct mcs_spinlock的指针。第2个参数node试图加锁的CPU对应的MCS lock节点。接下来看代码逻辑初始化node节点找队列末尾的那个mcs lock。xchg完成两件事一是给一个指针赋值二是获取了这个指针在赋值前的值相当于下面两句prev *lock; //队尾的lock *lock node; //将lock指向新的node如果队列为空CPU可以立即获得锁直接返回否则继续往下。不需要基于locked的值进行spin所以此时locked的值不需要关心。等价于prev-next node把自己这个node加入等待队列的末尾。调用arch_mcs_spin_lock_contended等待当前锁的持有者将锁释放给我。#define arch_mcs_spin_lock_contended(l) \ do { \while (!(smp_load_acquire(l))) \ //1. arch_mutex_cpu_relax(); \ //2. } while (0)上文中的node-locked0说明没有获得锁需要继续往下执行说明已经获得锁直接退出ARM64中arch_mutex_cpu_relax调用cpu_relax函数的有一个内存屏障指令防止编译器优化。从4.1开始还存一个yield指令。该指令为了提高性能占用cpu的线程可以使用该给其他线程。5.3 解锁的实现static inline void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node) {struct mcs_spinlock *next ACCESS_ONCE(node-next); //1.if (likely(!next)) { //2.if (likely(cmpxchg(lock, node, NULL) node)) //3return;while (!(next ACCESS_ONCE(node-next))) //4.arch_mutex_cpu_relax(); //5.}arch_mcs_spin_unlock_contended(next-locked); //6. }找到等待队列中的下一个节点当前没有其他CPU试图获得锁直接释放锁。如果*lock node将*lock NULL然后直接返回反之说明当前队列中有等待获取锁的CPU。继续往下。cmpxchg作用翻译大致如下所示cmpxchg(lock, node, NULL) {ret *lock;if (*lock node)*lock NULL;return ret; }距离函数开头获得next指针的值已经过去一段时间了在这个时间间隔里可能又有CPU把自己添加到队列里来了。需要重新获得next指针的值。于是待新的node添加成功后才可以通过arch_mcs_spin_unlock_contended()将spinlock传给下一个CPU如果next为空调用arch_mutex_cpu_relax作用同上文。arch_mcs_spin_unlock_contended(l)实际上是调用smp_store_release((l), 1)将next-locked设置为1。将spinlock传给下一个CPU6. queued spinlock6.1 概述它首先肯定要解决mcs_spinlock的占用空间问题否则设计再好也无法合入主线。它是这样的大部分情况用的锁大小控制在跟以前ticket spinlock一样的水平设计了两个域分别是locked和pending分别表示锁当前是否被持有已经在持有时是否又来了一个申请者竞争。争锁好比抢皇位皇位永远只有1个对应locked域除此之外还有1个太子位对应pending域防止皇帝出现意外能随时候补上不至于出现群龙无首的状态他们可以住在紫禁城内使用qspinlock。这两个位置被占后其他人还想来竞争皇位只有等皇帝和太子都移交各自的位子以后才可以在等待的时候你需要在紫禁城外待在自己的府邸里使用mcs_spinlock减小紫禁城的拥挤减少系统损耗。即当锁的申请者小于或等于2时只需要1个qspinlock就可以了其所占内存的大小和ticket spinlock一样。当锁的竞争者大于2个时候才需要(N-2)个mcs_spinlock。qspinlock还是全局的为降低锁的竞争使用退化到per-cpu的mcs_spinlock锁所有的mcs_spinlock锁串行构成一个等待队列这样cacheline invalide带来的损耗减轻了很多。这是它的基本设计思想。在大多数情况下per-cpu的mcs lock只有1个除非发生了嵌套task上下文被中断抢占因为中断上下文只有3种类softirq、hardirq和nmi所有每个CPU核心至多有4个mcs_spinlock锁竞争。而且所有mcs_spinlock会串联到一个等待队列里的。上图展示的是qspinlock的locked和pending位都被占需要进入mcs_spinlock等待队列而CPU(2)是第1个进入等待队列的qspinlock的tail.cpu则被赋值成2CPU(2)的mcs_spinlock数组的第3个成员空闲则qspinlock的tail.index被赋值成3。入队的是CPU(3)、CPU(1)和CPU(0)通过mcs_spinlock的next域将大家连成队列。假如等到竞争qspinlock的2个锁的持有者都释放了则CPU(2)的第3个空闲成员则获得锁完成它的临界区访问后通过qspinlock的tail.cpu类似于页表基址和tail.index类似于页表内的偏移快速找到下个mcs_spinlock的node节点。下面要进入细节分析了本文只考虑小端模式、NR_CPUS 16K的情况不考虑虚拟化这块去掉qstats统计力图聚焦在该锁实现的核心逻辑上。6.2 关键数据结构和变量struct qspinlock数据简化如下typedef struct qspinlock {union {atomic_t val;struct {u8 locked;u8 pending;};struct {u16 locked_pending;u16 tail;};}; } arch_spinlock_t;struct qspinlock包含了三个主要部分locked0- 7bit表示是否持有锁只有1和0两个值1表示某个CPU已经持有锁0则表示没有持有锁。pending(8bit)作为竞争锁的第一候补第1个等待自旋锁的CPU只需要简单地设置它为1则表示它成为第一候补后面再有CPU来竞争锁则需要创建mcs lock节点了为0则表示该候补位置是空闲的。tail(16-31bit): 通过这个域可以找到自旋锁队列的最后一个节点。又细分为tail cpu(18-31bit)来记录和快速索引需要访问的mcs_spinlock位于哪个CPU上作用类似于页表基址。tail index(16-17bit)用来标识当前处在哪种context中。Linux中一共有4种context分别是task, softirq, hardirq和nmi1个CPU在1种context下至多试图获取一个spinlock1个CPU至多同时试图获取4个spinlock。当然也表示嵌套的层数。对应per-cpu的mcs_spinlock的数组对应下文的struct mcs_spinlock mcs_nodes[4]的下标作用类似于类似于页表内的偏移。struct mcs_spinlockmcs_spinlock的数据结构如下struct mcs_spinlock {struct mcs_spinlock *next;int locked; /* 1 if lock acquired */int count; /* nesting count, see qspinlock.c */ };struct qnode {struct mcs_spinlock mcs; };static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[4]);使用per-cpu的struct mcs_spinlock mcs_nodes[4]可以用来减少对cacheline的竞争。数组数量为4前文已经解释过了。struct mcs_spinlock具体含义如下locked用来通知这个CPU你可以持锁了通过该域完成击鼓传花当然这个动作是上一个申请者释放的时候通知的。count嵌套的计数。只有第0个节点这个域才有用用来索引空闲节点的。next指向下一个锁的申请者构成串行的等待队列的链表。6.3 加锁实现核心逻辑概述我们把一个qspinlock对应的**( tail, pending, locked)称为一个三元组(x,y,z)**以此描述锁状态及其迁移有2中特殊状态初始状态无申请者竞争(0, 0, 0)过渡状态类似于皇帝正在传位给太子处于交接期(0, 1, 0)按加锁原有代码只有慢速路径和非慢速路径我们为了行文方便将源代码的慢速路径又分为中速路径和慢速路径这样就有以下三组状态不同路径出现情况核心功能所在的函数和使用锁的种类快速路径锁没有持有者locked位皇位空缺queued_spin_lock使用qspinlock中速路径锁已经被持有但pending位太子位空缺queued_spin_lock_slowpath使用qspinlock慢速路径锁已经被持有locked位和pending位都没空缺queued_spin_lock_slowpath使用mcs_spinlock核心逻辑就变成如果没有人持有锁那我们进入快速路径直接拿到锁走人走之前把locked位皇位标记成已抢占状态。期间只需要使用qspinlock。如果有人持有锁抢到了皇位成为皇帝但pending位太子位空缺那我们先抢这个位置进入的是中速路径等这个人皇帝释放锁传位了我们就可以拿到锁了。期间只需要用到qspinlock。如果这两个位置我们都抢不到则进入慢速路径需要使用per-cpu的mcs_spinlock。总体状态流程图如下梳理一下对应伪码描述static __always_inline void queued_spin_lock(struct qspinlock *lock) {if (初始状态){ //没人持锁//进入快速路径return;}queued_spin_lock_slowpath(lock, val); //进入中速或者慢速路径 }void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) //lock是原始锁val是之前拿到的lock-val的值 {//代码片段1过渡状态判断看是否处于过渡状态尝试等这个状态完成//代码片段2慢速路径判断看是否有其他申请者竞争则直接进入慢速路径的核心片段//代码片段3中速路径queue: //代码判断4慢速路径//1. 调用__this_cpu_inc(mcs_nodes[0].count)将当前cpu竞争锁的数量1//2. mcs node的寻址和初始化//3. 调用queued_spin_trylock(),看看准备期间spinlock是不是已经被释放了//4. 处理等待队列已经有人的情况重点是arch_mcs_spin_lock_contended();//5. 处理我们加入队列就是队首的情况 locked:// 已经等到锁了//6.处理我们是队列最后一个节点的情况//7.处理我们前面还有节点的情况//8.调用arch_mcs_spin_unlock_contended()通知下一个节点 release://9.__this_cpu_dec(mcs_nodes[0].count); }下面我们开始分析queued_spin_lock和queued_spin_lock_slowpath函数的实现细节。实现细节分析static __always_inline void queued_spin_lock(struct qspinlock *lock) {u32 val;val atomic_cmpxchg_acquire(lock-val, 0, _Q_LOCKED_VAL); //1if (likely(val 0)) //2return;queued_spin_lock_slowpath(lock, val); //3 }通过atomic_cmpxchg_acquire与之前的cmpxchg类似用原子操作实现将val赋值为lock-val。lock-val值如果为0没人拿到锁将lock-val的值设为1(即*_Q_LOCKED_VAL*)三元组状态由**( tail, pending, locked)** (0, 0, 0)迁移为(0, 0, 1)。lock-val值如果不为0保持lock-val的值不变。如果当前没有人获得锁直接拿到锁返回。三元组的状态迁移已经在上一步完成了。否则需要继续往下走中速/慢速路径。进入中速/慢速路径调用queued_spin_lock_slowpath函数快速路径lock-val值0没人拿到锁三元组状态由**( tail, pending, locked)** (0, 0, 0)迁移为(0, 0, 1)。快速返回不用等待。下面进入queued_spin_lock_slowpath函数的分析我们先分析过渡状态判断和中速路径两个代码片段void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) {/* 过渡状态判断 */if (val _Q_PENDING_VAL) { //0. int cnt _Q_PENDING_LOOPS;val atomic_cond_read_relaxed(lock-val,(VAL ! _Q_PENDING_VAL) || !cnt--);}/*其他部分代码*/}过渡状态判断如果三元组( tail,pending,locked)状态如果是(0, 1, 0)则尝试等待状态变为 (0, 0, 1)但是只会循环等待1次。简单翻译一下atomic_cond_read_relaxed语句的意思为如果(lock-val ! _Q_PENDING_VAL) || !cnt--)则跳出循环继续往下。中速路径进入中速路径的前提是当前没有其他竞争者在等待队列中排队以及pending位空缺之后我们先将pending位占住。如果已经这段期间已经有人释放了那直接获取锁并将pending位重置为空闲反正则要自旋等持锁者释放锁再做其他动作。void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) {/* ... 过渡状态判断及处理代码省略 *//* ... 判断是否有pending和tail是否有竞争者有竞争者直接进入慢速路径排队代码省略 *//* 中速路径开始 */val atomic_fetch_or_acquire(_Q_PENDING_VAL, lock-val); //1. if (!(val ~_Q_LOCKED_MASK)) { //2.if (val _Q_LOCKED_MASK) { //3.atomic_cond_read_acquire(lock-val, !(VAL _Q_LOCKED_MASK)); //4.}clear_pending_set_locked(lock); //5.return; //6.}if (!(val _Q_PENDING_MASK)) //7.clear_pending(lock);/* 中速路径结束 *//*其他部分代码*/}这部分代码的核心逻辑是在竞争qspinloc的pending即竞争太子位。进入中速路径的前提是三元组( tail,pending,locked)(0, 0, *)离“是否是过渡状态”和“除锁的持有者者之外是否竞争者”的判断已经过了一段时间了。需要用atomic_fetch_or_acquire函数通过原子操作重新看一下锁的状态执行了两个动作执行完三元组状态会从(0, 0, *)改为(0, 1, *)。val lock-val成为“原始的lock-val”lock-val的peding域被置位。如果原始的lock-val的pending和tail域都为0则表明没有pending位没有其他竞争者即太子位空闲可以去竞争否则则表明有其他竞争者不满足中速路径的条件需要进入慢速路径。lock-val的和tail域就不用关心了。所以此时的三元状态可以使(*, 1, *)如果locked域为1表明位置被占着已经有人在持锁了我们需要跳到第4步等锁被释放否则没有人持锁皇位是空着的我们跳到第5步直接去上位就好了。spin等待直到lock-val的locked域变回0也就是等皇位空出来。三元组状态会从(*, 1, 1) 转变为( *, 1, 0)拿到锁了可以登基上位了但还要做些清理工作调用clear_pending_set_locked将lock-val的pending域清零以及将locked置1。即三元组状态由(*, 1, 0) 转变为( *, 0, 1)。结束提前返回。如果没有进入中速路径第1步开始时获得的锁的状态是(n, 0, *) 需要把在第一步设置的现在锁的pending域恢复成0。慢速路径上面流程图有些地方不太准确但没有关系可以先帮我们建立起总体流程印象细节后续我们会展开。在拿到mcs lock的空闲节点之后我们先用queued_spin_trylock函数检查一下在我们准备的这段时间里是不是已经有人释放了该锁了如果有人释放了就提前返回了。获得锁之前要分两种情况分别处理如果队列中只有我们那只用自旋等lock的pending位空出来即可。如果队列中还有其他竞争者在排在我们前面则需要自旋等前序节点释放该锁。获得锁之后也要分两种情况处理如果队列中只有我们将锁的locked位置位表明锁已经被我们持有了不需要再做其他处理。如果队列中还有其他竞争者在排在我们后面则需要将锁传递这位竞争者当然需要要等它正式加入。慢速路径的简化代码如下所示void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) {struct mcs_spinlock *prev, *next, *node;u32 old, tail;int idx;/* ... 过渡状态判断及处理省略 *//* 先判断是否有pending和tail是否有竞争者有竞争者直接进入慢速路径排队 */if (val ~_Q_LOCKED_MASK)goto queue;/* 中速路径代码开始 */val atomic_fetch_or_acquire(_Q_PENDING_VAL, lock-val);if (!(val ~_Q_LOCKED_MASK)) {// ...中速路径处理省略return;}/* 中速路径代码结束 *//* 之前代码的收尾清理工作 */if (!(val _Q_PENDING_MASK))clear_pending(lock); //将pending状态恢复。//慢速路径代码开始位置 queue://进入mcs lock队列node this_cpu_ptr(mcs_nodes[0]); //1.idx node-count;tail encode_tail(smp_processor_id(), idx);node idx; //2.barrier();node-locked 0;node-next NULL;if (queued_spin_trylock(lock)) //3.goto release;smp_wmb();old xchg_tail(lock, tail); //4.next NULL;if (old _Q_TAIL_MASK) { //5.prev decode_tail(old); //6.WRITE_ONCE(prev-next, node); //7.arch_mcs_spin_lock_contended(node-locked); //8.next READ_ONCE(node-next); //9.if (next)prefetchw(next);}val atomic_cond_read_acquire(lock-val, !(VAL _Q_LOCKED_PENDING_MASK)); //10.locked: if (((val _Q_TAIL_MASK) tail) //11.atomic_try_cmpxchg_relaxed(lock-val, val, _Q_LOCKED_VAL))goto release; /* No contention */set_locked(lock); //12.if (!next) //13.next smp_cond_load_relaxed(node-next, (VAL));arch_mcs_spin_unlock_contended(next-locked); //14.release:__this_cpu_dec(mcs_nodes[0].count); //15. }之前是进入queued_spin_lock_slowpath函数有“是否过渡状态的判断”及处理“是否要进入慢速路径的判断”及处理如果进了中速路径的处理。下面正式进入慢速路径的处理。因为系统可能有多个cpu并发即使在同一个cpu上也可能切换了4种context上下文的中一种说不定大家竞争的锁已经都被释放了也说不准所以此时三元组的状态是未知的。获得local CPU上的第0个节点mcs_spinlock类型的node节点并将节点的count值1count值记录了local CPU上空闲节点的起始下标该值也只在第0个有意义。encode_tail函数节点编号编码成tail(2bit的tail index和14bit的tail cpu)同时还完成区分到底是tail域没有指向任何节点还是指向了第0个CPU的第0个节点。检查当前CPU上自旋锁嵌套的层数是否超过4层。根据idx取到取到空闲节点相当于node mcs_nodes[idx]。这里有barrier保序防止编译器优化。然后对空闲节点进行初始化。调用queued_spin_trylock函数检查一下在我们准备的这段时间里是不是已经有人释放了该锁了。如果成功获得锁则直接跳到lable release处否则继续往下。queued_spin_trylock函数的作用具体来讲是这样连续两次使用原子操作检查锁的状态如果val为0对应三元组(0, 0, 0)并且再次读取val的locked域也为0则表示可以获得锁。上文说过此时锁的状态是未知的。在调用xchg_tail函数之前有smp_wmb内存屏障保证tail值获得的正确性。该函数将lock-tail设置为新生成的tail值并将旧的值存在old中。如果旧的tail为0说明队列里只有我们这个新生成的节点直接跳到第10步否则继续往下执行通过旧的tail拿到等待队列的尾结点prev。将当前节点插入等待队列作为新的尾节点。自旋等待本节点的locked域变为1。在local CPU上自旋等待自己的locked成员被置位。arch_mcs_spin_lock_contended在arm64上最终会调用__cmpwait_case_##name宏展开后同arm64的tick spinlock的arch_spin_lock的实现类似。核心功能都是在自旋时执行WFE节省部分能耗。后面3行代码是看下在我们后面是否还有其他人在排队如果有的话使用prfm指令提前将相关值预取到local CPU的cache中加速后续的执行。使用原子操作重新获取lock-val的值并且循环等到直到pending和locked都为0。三元组( tail, pending, locked)的值为(*, 0, 0)也就是说需要等到皇位和太子位都是空闲的状态才是我们真正获得了锁的条件。如果tail还是我们设置的说明我们同时是等待队列的最后一个节点或者说是唯一节点后面没人在排队了。通过atomic_try_cmpxchg_relaxed原子的将locked域设置为1至此才真正完成了锁的合法拥有直接跳到最后1步。三元组的状态迁移是(n, 0, 0) -- (0, 0, 1)。否则如果tail发生了改变说明后面还有节点或者pending位被占则继续往下处理。先将locked设置为1。三元组的状态迁移是(*, *, 0) -- (*, 0, 1)。等待的下一个节点还有正式加入进来则需要等next变成非空非空才真正了完成加入。调用arch_mcs_spin_unlock_contended将下一个节点的locked值设置成1 完成了锁的传递。也就是完成了击鼓传花。释放本节点。6.4 解锁实现static __always_inline void queued_spin_unlock(struct qspinlock *lock) {smp_store_release(lock-locked, 0); }将qspinlock的locked清零即可。【参考资料】1. 【原创】linux spinlock/rwlock/seqlock原理剖析基于ARM642.  宋宝华几个人一起抢spinlock到底谁先抢到3. Linux内核同步机制之四spin lock蜗窝科技4. Linux中的spinlock机制[一] - CAS和ticket spinlock兰新宇知乎推荐阅读专辑|Linux文章汇总专辑|程序人生专辑|C语言我的知识小密圈关注公众号后台回复「1024」获取学习资料网盘链接。欢迎点赞关注转发在看您的每一次鼓励我都将铭记于心~
http://www.huolong8.cn/news/244455/

相关文章:

  • 江西建设监督网新网站建设治安总队网站目的
  • 甘肃省建设厅网站质监局linux 中 wordpress
  • 做网站用什么环境外贸网站模板建设
  • 网站做301重定向我做的网站手机上不了
  • linux搭建个人网站建一个公司网站要多少钱
  • 精品外贸网站 dedecms农场理财网站建设
  • 旅游电子商务网站有哪些网站域名解析教程
  • 做网站需要网页嵌套吗网站建设作业百度云资源
  • 网站建设什么打王思聪游戏秒玩网站
  • 国内论坛网站有哪些网站建设与管理课程设计论文
  • 手机网站开发介绍wordpress 运费设置
  • 杭州建站程序世界重大新闻
  • 网站建设规划书河北互联网平台有哪些
  • 西安建设手机网站免费做网站的方法
  • 低价企业网站搭建高阳网站建设
  • 去哪个网站找题目给孩子做三五互联网站
  • 注册安全工程师建设工程网站苏州做网站便宜的公司
  • 网站开发的人李海涛上海网站排名优化费用
  • 怎么做原创电影视频网站诸暨制作网站的公司有哪些
  • 建筑网图片网站优化建设哈尔滨
  • 石碣东莞网站建设公司网站设计网络公司
  • 烟台建设企业网站网站修改 iis6应用程序池
  • 先做网站先备案怎么获取wordpress的权限
  • 网站备案和服务器备案为什么要做网站推广
  • 重庆网站建设定制专业设计vi公司
  • 苏州做网站的哪个公司比较好建设部网站 注册违规
  • 哪个网站可以学做咸菜设计公司室内设计
  • 图片外链自己怎么优化网站排名
  • 华为建站丹东建设安全监督网站
  • 做电影网站违法企业软文