用dw做音乐网站模板,市场营销策略名词解释,广东省东莞市有几个区,互联网创业项目零成本目录1、基于锁的并发数据结构1、并发计数器2、懒惰计数器3、并发链表4、并发队列5、并发散列表总结2、条件变量使用#xff08;POSIX#xff09;生产者/消费者 #xff08;有界缓冲区问题#xff09;覆盖条件扩展3、信号量使用二值信号量#xff08;锁#xff09;0值信号… 目录1、基于锁的并发数据结构1、并发计数器2、懒惰计数器3、并发链表4、并发队列5、并发散列表总结2、条件变量使用POSIX生产者/消费者 有界缓冲区问题覆盖条件扩展3、信号量使用二值信号量锁0值信号量(条件变量)生产者消费者问题读写锁哲学家就餐问题使用锁条件变量实现信号量扩展4、并发问题总结1、预防循环等待2、使用非抢占锁3、完全避免互斥使用CAS4、调度避免死锁5、检测死锁并恢复5、事件并发时间并发与线程并发的区别状态管理问题缺点 1、基于锁的并发数据结构
一个标准的方法来创建一个并发数据结构添加一把大锁。
1、并发计数器
typedef struct counter_t {int value;pthread_mutex_t lock;
} counter_t;void init(counter_t *c) {c-value 0;Pthread_mutex_init(c-lock, NULL);
}void increment(counter_t *c) {Pthread_mutex_lock(c-lock);c-value;Pthread_mutex_unlock(c-lock);
}void decrement(counter_t *c) {Pthread_mutex_lock(c-lock);c-value--;Pthread_mutex_unlock(c-lock);
}int get(counter_t *c) {Pthread_mutex_lock(c-lock);int rc c-value;Pthread_mutex_unlock(c-lock);return rc;
}它只是加了一把锁在调用函数操作该数据结构时获取锁从调用返回时释放锁。
同步的计数器扩展性不好。单线程完成100万次更新只需要很短的时间大约0.03s而两个线程并发执行每个更新100万次性能下降很多超过5s。线程更多时性能更差。
理想情况下你会看到多处理上运行的多线程就像单线程一样快。达到这种状态称为完美扩展perfect scaling。
2、懒惰计数器
懒惰计数器的基本思想是这样的。如果一个核心上的线程想增加计数器那就增加它的局部计数器访问这个局部计数器是通过对应的局部锁同步的。因为每个CPU有自己的局部计数器不同CPU上的线程不会竞争所以计数器的更新操作可扩展性好。
懒惰计数器通过多个局部计数器和一个全局计数器来实现一个逻辑计数器其中每个CPU核心有一个局部计数器。具体来说在4个CPU的机器上有4个局部计数器和1个全局计数器。除了这些计数器还有锁每个局部计数器有一个锁全局计数器有一个。
但是为了保持全局计数器更新以防某个线程要读取该值局部值会定期转移给全局计数器方法是获取全局锁让全局计数器加上局部计数器的值然后将局部计数器置零。
这种局部转全局的频度取决于一个阈值这里称为S表示sloppiness。S越小懒惰计数器则越趋近于非扩展的计数器。S越大扩展性越强但是全局计数器与实际计数的偏差越大。
在4个CPU上的4个线程分别增加计数器100万次。如果S小性能很差但是全局计数器精确度高。如果S大性能很好但是全局计数器会有延时。懒惰计数器就是在准确性和性能之间折中。 typedef struct counter_t {int global;pthread_mutex_t glock;int local[NUMCPUS];pthread_mutex_t llocal[NUMCPUS];int threshold;
} counter_t;void init(counter_t *c, int threshold) {c-threshold threshold;c-global 0;for (int i 0; i NUMCPUS; i) {c-local[i] 0;Pthread_mutex_init(c-llocal[i], NULL);}
}void update(counter_t *c, int threadID, int amt) {Pthread_mutex_lock(c-llocal[threadID]);c-local[threadID] amt;if (c-local[threadID] c-threshold) {// transfer to globPthread_mutex_lock(c-glock);c-global c-local[threadID];Pthread_mutex_unlock(c-glock);c-local[threadID] 0;}Pthread_mutex_unlock(c-llocal[threadID]);
}int get(counter_t *c) {Pthread_mutex_lock(c-glock);int rc c-global;Pthread_mutex_unlock(c-glock);return rc;
}3、并发链表
typedef struct node_t {int key;struct node_t* next;
} node_t;typedef struct list_t {node_t* head;pthread_mutex_t lock;
} list_t;void init(list_t* list) {list-head NULL;Pthread_mutex_init(list-lock, NULL);
}void insert(list_t *list, int key)
{node_t* p malloc(sizeof(node_t)); if (p NULL) {perror(malloc);return -1;} p-key key;// 假定malloc()是线程安全的每个线程都可以调用它不需要担心竞争条件和其他并发缺陷。// 只有在更新共享列表时需要持有锁Pthread_mutex_lock(list-lock);p-next list-head; // 头插法list-head p;Pthread_mutex_unlock(list-lock);return 0;
}void find(list_t *list, int key)
{int ret -1;Pthread_mutex_lock(list-lock);node_t* cur list-head;while (cur) {if (cur-key key) {ret 0;break;}cur cur-next;}Pthread_mutex_unlock(list-lock);return ret;
}扩展上面的代码demo是一个链表一个锁这样性能不大行。介绍一个过手锁hand-over-hand locking也叫作锁耦合lock coupling
原理也很简单。每个节点都有一个锁替代之前整个链表一个锁。遍历链表的时候首先抢占下一个节点的锁然后释放当前节点的锁。
增加了链表操作的并发程度。但是实际上在遍历的时候每个节点获取锁、释放锁的开销巨大很难比单锁的方法快。即使有大量的线程和很大的链表这种并发的方案也不一定会比单锁的方案快。也许某种杂合的方案一定数量的节点用一个锁值得去研究。
4、并发队列
typedef struct node_t {int value;struct node_t* next;
} node_t;typedef struct queue_t {node_t* head;node_t* tail;pthread_mutex_t headLock;pthread_mutex_t tailLock;
} queue_t;void init(queue_t* q) {node_t* tmp malloc(sizeof(node_t));tmp-next NULL;q-head q-tail tmp;Pthread_mutex_init(q-headLock, NULL);Pthread_mutex_init(q-tailLock, NULL);
}void enqueue(queue_t *q, int value)
{node_t* tmp malloc(sizeof(node_t));assert(tmp ! NULL);tmp-value value;tmp-next NULL;// 队尾入队Pthread_mutex_lock(q-tailLock);q-tail-next tmp;q-tail tmp;Pthread_mutex_unlock(q-tailLock);
}int dequeue(queue_t *q, int* value)
{Pthread_mutex_lock(q-headLock);node_t* tmp q-head;node_t* newHead q-next;if (newHead NULL) {Pthread_mutex_unlock(q-headLock);return -1; // queue was empty}*value newHead-value;q-head newHead;Pthread_mutex_unlock(q-headLock);free(tmp);return 0;
}有两个锁一个负责队列头另一个负责队列尾。这两个锁使得入队列操作和出队列操作可以并发执行因为入队列只访问tail锁而出队列只访问head锁。
使用了一个技巧添加了一个假节点在队列初始化的代码里分配的。该假节点分开了头和尾操作。
5、并发散列表
基于3的并发链表
#define BUCKETS (101)typedef struct hash_t {list_t lists[BUCKETS];
} hash_t;void Hash_Init(hash_t* H) {for (int i 0; i BUCKETS; i) {init(H-lists[i]);}
}void Hash_Insert(hash_t* H, int key) {int bucket key % BUCKETS;return insert(H-lists[bucket], key);
}int Hash_find(hash_t* H, int key) {int bucket key % BUCKETS;return find(H-lists[bucket], key);
}每个散列桶每个桶都是一个链表都有一个锁而不是整个散列表只有一个锁从而支持许多并发操作。
总结
实现并发数据结构时先从最简单的方案开始也就是加一把大锁来同步。这样做你很可能构建了正确的锁。如果发现性能问题那么就改进方法只要优化到满足需要即可。
关于并发队列的更多内容查看
https://www.cnblogs.com/lijingcheng/p/4454848.html
2、条件变量
多线程程序中一个线程等待某些条件是很常见的。简单的方案是自旋直到条件满足这是极其低效的某些情况下甚至是错误的。那么线程应该如何等待一个条件
线程可以使用条件变量condition variable来等待一个条件变成真。条件变量是一个显式队列当某些执行状态即条件condition不满足时线程可以把自己加入队列等待waiting该条件。另外某个线程当它改变了上述状态时就可以唤醒一个或者多个等待线程通过在该条件上发信号让它们继续执行。
使用POSIX
// 声明
pthread_cond_t c;
// op
wait()和signal()。
线程要睡眠的时候调用wait()。
当线程想唤醒等待在某个条件变量上的睡眠线程时调用signal()。
pthread_cond_wait(pthread_cond_t* c, pthread_mutex_t* m);
pthread_cond_signal(pthread_cond_t* c);wait()调用有一个参数它是互斥量。
它假定在wait()调用时这个互斥量是已上锁状态。
wait()的职责是释放锁并让调用线程休眠原子地。
当线程被唤醒时在另外某个线程发信号给它后它必须重新获取锁再返回调用者。
这样复杂的步骤也是为了避免在线程陷入休眠时产生一些竞态条件。
父线程等待子线程使用条件变量
Pthread_cond_wait(pthread_cond_t* c, pthread_mutex_t* m);
Pthread_cond_signal(pthread_cond_t* c);int done 0;
pthread_mutex_t m PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c PTHREAD_COND_INITIALIZER;void thread_exit() {Pthread_mutex_lock(m);done 1;Pthread_cond_signal(c);Pthread_mutex_unlock(m);
}void thread_join() {Pthread_mutex_lock(m);while (done 0) {Pthread_cond_wait(c, m);}// 你可能看到父线程使用了一个while循环而不是if语句来判断是否需要等待。// 虽然从逻辑上来说没有必要使用循环语句但这样做总是好的Pthread_mutex_unlock(m);
}void* child(void* arg) {printf(child\n);thread_exit();return NULL;
}int main(int argc, char* argv[]) {printf(parent:begin\n);pthread_t p;Pthread_create(p, NULL, child, NULL);thread_join();printf(parent:end\n);return 0;
}尽管并不是所有情况下都严格需要但有效且简单的做法还是在使用条件变量调用signal和wait时要持有锁hold the lock when calling signal or wait
生产者/消费者 有界缓冲区问题
假设有一个或多个生产者线程和一个或多个消费者线程。生产者把生成的数据项放入缓冲区消费者从缓冲区取走数据项以某种方式消费。
int buffer[MAXN];
int fill_ptr 0;
int use_ptr 0;
int count 0;void put(int value) {buffer[fill_ptr] value;fill_ptr (fill_ptr 1) % MAXN;count;
}int get() {int tmp buffer[use_ptr];use_ptr (use_ptr 1) % MAXN;count--;return tmp;
}cond_t empty, fill;
mutex_t mutex;void* producer(void* arg) {for (int i 0; i loops; i) {Pthread_mutex_lock(mutex);while (count MAXN) { // 如果队列满了那就不生产了进行睡眠等待// 等待到队列空了Pthread_cond_wait(empty, mutex);}// 当队列还可以进行放入值的时候put(i);// 唤醒消费者Pthread_cond_signal(fill);Pthread_mutex_unlock(mutex);}
}void* consumer(void* arg) {for (int i 0; i loops; i) {Pthread_mutex_lock(mutex);while (count 0) { // 如果队列空了那就不消费了进行睡眠等待// 等待到队列不为空Pthread_cond_wait(fill, mutex);}// 当队列还可以进行放入值的时候int tmp get();// 唤醒生产者者Pthread_cond_signal(empty);Pthread_mutex_unlock(mutex);printf(%d\n, tmp);}
}要记住一条关于条件变量的简单规则总是使用while循环always use while loop。虽然有时候不需要重新检查条件但这样做总是安全的做了就开心了。
消费者不应该唤醒消费者而应该只唤醒生产者反之亦然。使用两个条件变量而不是一个以便正确地发出信号在系统状态改变时哪类线程应该唤醒。
覆盖条件
一个简单的多线程内存分配库
// free heap nums
int byteLeft MAX_HEAP_SIZE;cond_t c;
mutex_t m;void* allocate(int size) {Pthread_mutex_lock(m);while (byteLeft size) { // 如果剩余的内存小于申请的内存线程进入休眠Pthread_cond_wait(c, m);}// 等待线程有足够内存了void* ptr ...; // get mem from heapbytesLeft - size;Pthread_mutex_unlock(m);return ptr;
}void free(void* ptr, int size) {Pthread_mutex_lock(m);bytesLeft size;// 释放出内存了唤醒休眠的allocate线程应该唤醒哪个等待线程可能有多个线程Pthread_cond_signal(c); Pthread_mutex_unlock(m);
}
应该唤醒哪个等待线程可能有多个线程可能唤醒了一个线程后检查条件发现不满足唤醒从而导致符合条件的没有被唤醒。
解决方法
用pthread_cond_broadcast()代替上述代码中的pthread_cond_signal()唤醒所有的等待线程。这样做确保了所有应该唤醒的线程都被唤醒。当然不利的一面是可能会影响性能因为不必要地唤醒了其他许多等待的线程它们本来还不应该被唤醒。这些线程被唤醒后重新检查条件马上再次睡眠。
这种条件变量叫作覆盖条件covering condition因为它能覆盖所有需要唤醒线程的场景保守策略
扩展
关于c中的条件变量使用之前有写过相关笔记 https://hanhandi.blog.csdn.net/article/details/120914038
3、信号量
使用
#include semaphore.h
sem_t s;
sem_init(s, 0, 1);
// param20:信号量在同进程的多线程中共享
// param3信号量的值
int sem_wait(sem_t* s) {// s--;s0 - 立即返回s0 线程挂起直到之后的post操作
}int sem_post(sem_t* s) {// s;直接增加信号量的值如果有等待的线程唤醒其中一个。
}
// 当信号量 0 时s的val就是等待线程的个数。二值信号量锁
信号量的第一种用法作为锁来使用
直接把临界区用一对sem_wait() / sem_post()包括起来。
sem_t m;
sem_init(m, 0, 1); // 初始化值为1sem_wait(m);
// 临界区代码
sem_post(m);0值信号量(条件变量)
信号量的第一种用法作为条件变量来使用
父线程wait等待子线程
子线程done后调用post进行signal
sem_t* s;
void* child(void* arg) {printf(child\n);sem_post(s); // signal : child donereturn NULL;
}int main() {sem_init(s, 0, 0); // 初始化值为0printf(parent:begin\n);pthread_t p;Pthread_create(p, NULL, child, NULL);sem_wait(s); // wait : wait for child doneprintf(parent:end\n);return 0;
}生产者消费者问题
综合使用条件变量和
nt buffer[MAXN];
int fill_ptr 0;
int use_ptr 0;
int count 0;void put(int value) {buffer[fill_ptr] value;fill_ptr (fill_ptr 1) % MAXN;count;
}int get() {int tmp buffer[use_ptr];use_ptr (use_ptr 1) % MAXN;count--;return tmp;
}sem_t empty;
sem_t full;
sem_t mutex;void* producer(void* arg) {for (int i 0; i loops; i) {sem_wait(empty); // 等到空的时候进行生产sem_wait(mutex); // 为了保证生产者互斥插入需要加锁put(i);sem_post(mutex); // 解锁sem_post(full); // 生产好了提醒消费者进行消费}
}void* consumer(void* arg) {for (int i 0; i loops; i) {sem_wait(full); // 等到生产好了进行消费sem_wait(mutex); // 为了保证消费者互斥插入需要加锁put(i);sem_post(mutex); // 解锁sem_post(empty); // 消费好了,提醒生产者进行生产}
}
注意这里和https://www.wolai.com/dNjJbFaR3fYqd6CXP27HAe中对于条件变量和锁的使用顺序不同
举例
假设有两个线程一个生产者和一个消费者。消费者首先运行获得锁然后对full信号量执行sem_wait() 。因为还没有数据所以消费者阻塞让出CPU。但是重要的是此时消费者仍然持有锁。然后生产者运行。假如生产者能够运行它就能生产数据并唤醒消费者线程。遗憾的是它首先对二值互斥信号量调用sem_wait()。锁已经被持有因此生产者也被卡住。
在这里出现了死锁因为sem_wait在让出CPU时并没有释放锁而在https://www.wolai.com/dNjJbFaR3fYqd6CXP27HAe中我们使用的Pthread_cond_wait(empty, mutex);会对锁进行释放从而避免死锁。这两个API的区别注意一下。
所以我们把获取和释放互斥量的操作调整为紧挨着临界区把full、empty的唤醒和等待操作调整到锁外面。结果得到了简单而有效的有界缓冲区。
读写锁
访问同一个数据结构可能需要不同类型的锁。读写锁就是一种。
下面是不公平的读写锁写锁只能在最后读者释放之后才能被写者获取。
rwlock_acquire_writelock() 获得写锁
rwlock_release_writelock() 释放写锁typedef struct _rwlock_t {sem_t lock; // 二值信号量作为basic locksem_t writelock; // 用于一个写者 or 多个读者int readers; // 在临界区读取的读者数
} rwlock_t;void rwlock_init(rwlock_t* rw) {rw-readers 0;sem_init(rw-lock, 0, 1);sem_init(rw-writelock, 0, 1);
} void rwlock_acquire_readlock(rwlock_t* rw) {sem_wait(rw-lock);rw-readers;if (rw-readers 1) {sem_wait(rw-writelock); // 第一个读者需要持有写锁}sem_post(rw-lock);
}void rwlock_release_readlock(rwlock_t* rw) {sem_wait(rw-lock);rw-readers--;if (rw-readers 0) {sem_post(rw-writelock); // 最后一个读者释放写锁}sem_post(rw-lock);
}void rwlock_acquire_writelock(rwlock_t* rw) {sem_wait(rw-writelock);
}void rwlock_release_writelock(rwlock_t* rw) {sem_post(rw-writelock);
}
一旦一个读者获得了读锁其他的读者也可以获取这个读锁。
但是想要获取写锁的线程就必须等到所有的读者都结束。
最后一个退出的写者在“writelock”信号量上调用sem_post()从而让等待的写者能够获取该锁。
读者很容易饿死写者我们应该思考有写者等待时如何能够避免更多的读者进入并持有锁。
这里TODO一下以后看到有好的代码写过来。
哲学家就餐问题
假定有5位“哲学家”围着一个圆桌。每两位哲学家之间有一把餐叉一共5把。哲学家有时要思考一会不需要餐叉有时又要就餐。而一位哲学家只有同时拿到了左手边和右手边的两把餐叉才能吃到东西。
每个哲学家线程的自我循环
while (1) {think();getforks();eat();putforks();
}关键的挑战就是如何实现getforks()和putforks()函数保证没有死锁没有哲学家饿死并且并发度更高。
辅助函数
int left(int p) { // 使用左边的叉子return p;
}
int right(int p) { // 使用右边的叉子return (p 1) % 5;
}每个叉子作为一个信号量
sem_t forks[5];
void getforks() {sem_wait(forks[left(p)]);sem_wait(forks[right(p)]);
}依次获取每把餐叉的锁——先是左手边的然后是右手边的。结束就餐时释放掉锁。假设每个哲学家都拿到了左手边的餐叉他们每个都会阻塞住并且一直等待另一个餐叉。
解决上述问题最简单的方法就是修改某个或者某些哲学家的取餐叉顺序。
假定哲学家4编写最大的一个取餐叉的顺序不同。
void getforks() {if (p 4) {sem_wait(forks[right(p)]);sem_wait(forks[left(p)]);} else {sem_wait(forks[left(p)]);sem_wait(forks[right(p)]);}
}因为最后一个哲学家会尝试先拿右手边的餐叉然后拿左手边所以不会出现每个哲学家都拿着一个餐叉卡住等待另一个的情况等待循环被打破了。
使用锁条件变量实现信号量
信号量可以作为锁和条件变量来使用。而我们用底层的同步原语锁和条件变量也可以来实现自己的信号量名字叫作Zemaphore。
typedef struct _Zem_t {int value;pthread_cond_t cond;pthread_mutex_t lock;
} Zem_t;// only one thread can call it
void Zem_init(Zem_t* s, int value) {s-value value;
}void Zem_wait(Zem_t* s) {Pthread_mutex_lock(s-lock);while (s-value 0) { // 当小于0 阻塞 并在睡眠前将锁释放这样下面的unlock就不会出错Pthread_cond_wait(s-cond, s-lock);}s-value--;Pthread_mutex_unlock(s-lock);
}void Zem_post(Zem_t* s) {Pthread_mutex_lock(s-lock);s-value;Pthread_cond_signal(s-cond);Pthread_mutex_unlock(s-lock);
}
扩展
关于信号量的之前也有部分笔记
https://hanhandi.blog.csdn.net/article/details/123576424
4、并发问题总结
1、非死锁问题
违反原子性加锁解决错误顺序通过强制顺序方法利用条件变量
这里针对第二点做一下阐述
错误代码
thread1:
void init() {...mThread createThread(mMain, ...);...
}thread2:
void mMain(...) {...mState mThread-State;...
} 如果线程1并没有首先执行线程2就可能因为引用空指针奔溃.
修改后的代码 pthread_mutex_t mtLock PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t mtCond PTHREAD_COND_INITIALIZER;
int mtInit 0;thread1:
void init() {...mThread createThread(mMain, ...);// signal threadpthread_mutex_lock(mtLock);mtInit 1;pthread_cond_signal(mtCond);pthread_mutex_unlock(mtLock);...
}thread2:
void mMain(...) {...// wait for thread to be initializedpthread_mutex_lock(mtLock);while (mtInit 0) {pthread_cond_wait(mtLock);}pthread_mutex_unlock(mtLock);mState mThread-State;...
} 我们增加了一个锁mtLock、一个条件变量mtCond以及状态的变量mtInit
请注意我们可以用mThread本身作为状态变量但为了简洁我们没有这样做。当线程之间的顺序很重要时条件变量或信号量能够解决问题。
2、死锁问题 当线程1占有锁L1上下文切换到线程2。线程2锁住L2试图锁住L1。这时才产生了死锁两个线程互相等待。
死锁产生的条件
● 互斥线程对于需要的资源进行互斥的访问例如一个线程抢到锁。
● 持有并等待线程持有了资源例如已将持有的锁同时又在等待其他资源例如需要获得的锁。
● 非抢占线程获得的资源例如锁不能被抢占。
● 循环等待线程之间存在一个环路环路上每个线程都额外持有一个资源而这个资源又是下一个线程要申请的。
解决方案
1、预防循环等待
通过预防获取锁的时候提供全序。若有两个锁每次都先申请L1再申请L2可以避免死锁。
也可以采用偏序来安排锁的获取人为定义某些不同的加锁顺序。
也可以根据锁的地址作为获取锁的顺序。按照地址从高到低或者从低到高的顺序加锁。
void do_something(mutex_t* m1, mutex_t* m2) {if (m1 m2) {pthread_mutex_lock(m1); pthread_mutex_lock(m2); } else {pthread_mutex_lock(m2); pthread_mutex_lock(m1); }
}
// assume m1 ! m22、使用非抢占锁
unlock之前默认锁被占有释放后会引起多个抢锁操作可以使用trylock来尝试获得锁或者返回-1表示锁已经被占用。这样就不会出现死锁等待了。
thread1
...
label:
lock(L1);
if (trylock(L2) -1) { // 如果创建锁失败 unlock(L1);goto label;
}
...thread2
...
label:
lock(L2);
if (trylock(L1) -1) { // 如果创建锁失败 unlock(L2);goto label;
}
...
这样也会带来活锁问题两个线程可能一直重复这段代码但是这个不是死锁。
解决方法可以在循环结束的时候先随机等待一个时间然后再重复整个动作这样可以降低线程之间的重复互相干扰。
3、完全避免互斥使用CAS
通过硬件指令构造出不需要锁的数据结构。
下面是一个常用的指令CAScompare and swap比较并交换是硬件提供的原子指令。
int CompareAndSwap(int* address, int expected, int new) {if (*address expected) { // 如果旧值和理想值一样*address new; // 更新旧值return 1; // success} else {return 0; // failure}
}如果我们想原子地增加一个数可以这样写
void AtomicIncrement(int* value, int amount) {do {int old *value;} while (CompareAndSwap(value, old, old amount) 0); // 如果更新失败继续尝试
}
// 使用锁
void LockIncrement(mutex_t* lock, int* value, int amount) {Pthread_mutex_lock(lock);int new *value amount;*value new;Pthread_mutex_unlock(lock);
}
这样避免了加锁、解锁而是使用CAS指令反复尝试将值更新到最新的值。这样也就避免了死锁。
关于CAS的详细理解见文章
https://www.wolai.com/9aopYQvrCHJ8i8MWJhjMCa
在之前的笔记中有谈到并发链表当时并发头插法的核心代码如下
void insert(list_t *list, int key)
{node_t* p malloc(sizeof(node_t)); if (p NULL) {perror(malloc);return -1;} p-key key;// 假定malloc()是线程安全的每个线程都可以调用它不需要担心竞争条件和其他并发缺陷。// 只有在更新共享列表时需要持有锁Pthread_mutex_lock(list-lock);p-next list-head; // 头插法list-head p;Pthread_mutex_unlock(list-lock);return 0;
}
这里我们试着使用CAS
void insert(list_t *list, int key)
{node_t* p malloc(sizeof(node_t)); if (p NULL) {perror(malloc);return -1;} p-key key;do {p-next list-head; // old address} while (CompareAndSwap(list-head, p-next, p) 0); // address old newreturn 0;
}4、调度避免死锁
2核4线程有两把锁L1 L2不同线程对于锁的需求程度。yes表示要用到no表示用不到 只要T1和T2不同时运行就不会产生死锁。
5、检测死锁并恢复
很多数据库系统使用了死锁检测和恢复技术。死锁检测器会定期运行通过构建资源图来检查循环。这里可以参考一下之前的笔记的最后一部分关于死锁检测的部分本质上就是有向图判断环。
5、事件并发
在之前的两个文章
https://hanhandi.blog.csdn.net/article/details/120998962
https://hanhandi.blog.csdn.net/article/details/121052560
我们使用了基于事件的服务器他们都是基于事件循环的简单结构
while (1) {events getEvents();for (e in events) {processEvent(e);}
}这也属于并发的一种基本方法就是基于事件的并发。我们等待某事即“事件”发生当它发生时检查事件类型然后做少量的相应工作可能是I/O请求或者调度其他事件准备后续处理。
主循环等待事件发生然后一次处理这些发生的事件。
具体处理事件交由事件处理程序(event handler)处理。
当event handler处理一个事件时它是CPU中执行的唯一进程。
这样就可以进行显式调度了。
不过这也带来了下面的思考
基于事件的服务器如何决定哪个事件发生尤其是对于网络和磁盘的IO。
也就是说服务器如何确定是否有消息到达这里主要牵扯到select 和epoll两个机制。
基于事件的服务器不能被另一个线程中断因为它确实是单线程的。因此线程化程序中常见的并发性错误并没有出现在基本的基于事件的方法中。这就要求我们的服务器不能阻塞调用。
时间并发与线程并发的区别
使用基于线程的服务器时在发出I/O请求的线程挂起等待I/O完成时其他线程可以运行从而使服务器能够取得进展。
使用基于事件的方法时没有其他线程可以运行只是主事件循环。这意味着如果一个事件处理程序发出一个阻塞的调用整个服务器就会这样做阻塞直到调用完成。当事件循环阻塞时系统处于闲置状态。
对于基于事件并发、基于线程并发我有些疑惑记录于此
https://www.wolai.com/3irCkAHj3DzWFdLs6j8kCo
关于线程并发与事件并发还可以参考一下
http://www.imxmx.com/Item/1/117655.html
在事件并发服务器中为了克服阻塞调用限制我们使用非阻塞IO
非阻塞IO我们又称异步IO最起码需要有下面几个接口支持
1、有接口使得应用程序能够发出IO请求并在IO完成前立即将控制权返回给调用者。
2、有接口使得应用程序能够确定IO是否完成
它基于AIO控制块实现。
struct aiocb {int aio_fildes; // 文件描述符off_t aio_offset; //文件内偏移量volatile void *aio_buf; // 复制读取结果的目标内存位置size_t aio_nbytes; // 请求的长度
};要向文件发出异步的read应用程序应该先填充aiocb之后发出异步调用读取文件
int aio_read(struct aiocb *aiocbp) {...};该函数尝试发起IO成功的话立即返回事件服务器也可以继续其工作。
不成功也会立即返回事件服务器得做好相关处理。
那么我们改如何获知IO何时完成呢一般有两种思路1、周期轮询2、系统中断告知
对于第一种需要用到api
int aio_error(const struct aiocb *aiocbp);若已完成接口返回0
否则返回EINPROGRESS。
对于第二种使用UNIX信号异步IO完成时通知消除了重复询问系统的需要。
信号和嵌入式中的中断差不多将信号传递给应用程序。这样做会让应用程序停止当前的任何工作开始运行信号处理程序signal handler。完成后该进程就恢复其先前的行为。
#include stdio.h
#include signal.hvoid handle(int arg) {printf(xxxx\n);
}int main(int argc, char *argv[]) {signal(SIGHUP, handle);while (1) ; // do nothing except catch some signal
}在没有异步I/O的系统中纯基于事件的方法无法实现。
状态管理问题
当事件处理程序发出异步I/O时它必须打包一些程序状态以便下一个事件处理程序在I/O最终完成时使用。这个额外的工作在基于线程的程序中是不需要的因为程序需要的状态在线程栈中。
在线程服务器中当read()最终返回时代码立即知道要写入哪个套接字因为该信息位于线程堆栈中在变量sd中
int rc read(fd, buffer, size);
rc write(sd, buffer, size);在基于事件的系统中为了执行相同的任务我们需要自己存储并管理。
我们首先使用上面描述的AIO调用异步地发出读取。
然后将套接字描述符sd记录在由文件描述符fd作为索引的散列表中当磁盘IO完成时事件处理程序将用fd来查找sd将sd返回给对应的调用者。
缺点
1、当系统从单个CPU转向多个CPU时为了利用多个CPU事件服务器必须并行运行多个事件处理程序。就会出现常见的同步问题例如临界区并且必须采用通常的解决方案例如锁定
2、如果事件处理程序发生页错误它将被阻塞并且因此服务器在页错误完成之前不会有进展。尽管服务器的结构可以避免显式阻塞但由于页错误导致的这种隐式阻塞很难避免因此在频繁发生时可能会导致较大的性能问题。
3、随着时间的推移基于事件的代码可能很难管理。如果函数从非阻塞变为阻塞调用该例程的事件处理程序也必须更改以适应其新性质。