生产者消费者模型
- 生产者生成数据之后,放入缓冲区
- 消费者从缓冲区取出数据
- 任何时刻,只能有一个生产者或消费者可以访问缓冲区
分析得出:
- 任何时刻只能有一个线程操作缓冲区,说明需要互斥
- 缓冲区为空时,消费者需要等待生产者生成数据;缓冲区满时,生产者需要等到消费者取出数据,说明生产者和消费者需要同步
条件变量
如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于在线程之间同步共享数据的值。条件变量提供了一种线程间的通信机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程
条件变量的类型:pthread_cond_t
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
// 创建条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
// 销毁条件变量
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
// 线程阻塞
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
// 等待多长时间,线程阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒一个等待目标条件变量的线程,至于哪个线程将被唤醒,则取决于线程的优先级和调度策略
int pthread_cond_broadcast(pthread_cond_t *cond);
// 以广播的方式唤醒所有等待目标条件变量的线程
pthread_cond_wait
用于等待目标条件变量。mutex参数是用于保护条件变量的互斥锁,以确保pthread_cond_wait
操作的原子性。在调用pthread_cond_wait
前,必须确保互斥锁mutex
已经加锁,否则将导致不可预期的后果。pthread_cond_wait
函数执行时,首先把调用线程放入条件变量的等待队列中,然后将互斥锁mutex解锁。可见,从pthread_cond_wait
开始执行到其调用线程被放入条件变量的等待队列之间的这段时间内,pthread_cond_signal
和pthread_cond_broadcast
等函数不会修改条件变量。换言之,pthread_cond_wait
函数不会错过目标条件变量的任何变化。当pthread_cond_wait
函数成功返回时,互斥锁将再次被锁上
条件变量一定要配合互斥锁来使用,目的就是弥补互斥锁功能的单一性。在条件不满足的时候阻塞进程/线程,在临界区资源满足访问要求时,激活一个或者全部进程/线程去访问资源。相较于互斥锁,不需要在临界区内使用if
代码去判断,在满足访问要求后,直接使用signal
函数激活一个进程/线程即可
使用条件变量解决生产者消费者问题
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
struct Node {
int num;
struct Node *next;
};
struct Node *head = NULL;
// 创建互斥量
pthread_mutex_t mutex;
// 创建条件变量
pthread_cond_t cond;
void* producer(void *arg) {
while (1) {
pthread_mutex_lock(&mutex); // 加锁
// 创建新的节点,添加到链表中
struct Node *newNode = (struct Node*)malloc(sizeof(struct Node));
newNode->next = head;
head = newNode;
newNode->num = rand() % 100;
printf("add Node, num: %d, tid: %ld\n", newNode->num, pthread_self());
// 只要生产一个,就通知消费者
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex); // 解锁
usleep(100);
}
return NULL;
}
void* customer(void *arg) {
while (1) {
pthread_mutex_lock(&mutex); // 加锁
struct Node *tmp = head;
if (head != NULL) {
// 有数据
head = head->next;
printf("delete node, num: %d, tid: %ld\n", tmp->num, pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex); // 解锁
}
else {
// 没有数据,等待
// 阻塞时,会对互斥锁解锁,被唤醒后,会重新加锁
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex); // 解锁
}
usleep(100);
}
return NULL;
}
int main() {
pthread_mutex_init(&mutex, NULL); // 初始化互斥量
pthread_cond_init(&cond, NULL); // 初始化条件变量
// 创建5个生产者,5个消费者
pthread_t ptids[5], ctids[5];
for (int i = 0; i < 5; i++) {
pthread_create(&ptids[i], NULL, producer, NULL);
pthread_create(&ctids[i], NULL, customer, NULL);
}
// 线程分离
for (int i = 0; i < 5; i++) {
pthread_detach(ptids[i]);
pthread_detach(ctids[i]);
}
while (1) {
sleep(10);
}
pthread_mutex_destroy(&mutex); // 销毁互斥量
pthread_cond_destroy(&cond); // 销毁条件变量
pthread_exit(NULL);
return 0;
}
add Node, num: 9, tid: 140067846080256
add Node, num: 54, tid: 140067896436480
add Node, num: 90, tid: 140067879651072
delete node, num: 90, tid: 140067820902144
delete node, num: 54, tid: 140067837687552
add Node, num: 44, tid: 140067862865664
delete node, num: 44, tid: 140067871258368
信号量
- 信号量其实是一个整型的计数器,主要用于实现进程间的互斥与同步
信号量表示资源的数量,操作信号量有2种原子操作:
- P操作:把信号量-1,如果减完后信号量<0,表示没有资源可用,进程阻塞,否则正常执行
- V操作:把信号量+1,如果加完后信号量<=0,表示有进程阻塞,则唤醒阻塞的进程
P操作用在进入临界区之前,V操作用在离开临界区之后,必须成对出现
- 信号量初始化为1,代表互斥信号量,可以保证任何时刻只有一个进程访问临界区资源
- 信号量初始化为0,代表同步信号量,保证进程A在进程B之前执行
这里使用的是POSIX的实现版本
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 初始化信号量
- sem: 信号量变量的地址
- pshared: 0表示用在线程,非0表示用在进程
- value: 信号量中的值
int sem_destroy(sem_t *sem);
// 释放资源
int sem_wait(sem_t *sem);
// P操作,调用一次对信号量值-1,如果为0,就阻塞
int sem_trywait(sem_t *sem);
// sem_wait的非阻塞版本,信号量非0时,执行减1操作;信号量为0时,返回-1并设置errno为EAGAIN
int sem_post(sem_t *sem);
// V操作,调用一次对信号量+1
信号量其实就是一个计数器,也是一个整数。每一次调用wait
操作将会使semaphore
值减一,而如果semaphore
值已经为0,则wait
操作将会阻塞。每一次调用post
操作将会使semaphore
值加一
用信号量解决生产者消费者问题
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
struct Node {
int num;
struct Node *next;
};
struct Node *head = NULL;
// 创建互斥量
pthread_mutex_t mutex;
// 创建2个信号量
sem_t psem, csem;
void* producer(void *arg) {
while (1) {
sem_wait(&psem); // P操作,生产者信号量-1,等于0就阻塞
pthread_mutex_lock(&mutex); // 加锁
// 创建新的节点,添加到链表中
struct Node *newNode = (struct Node*)malloc(sizeof(struct Node));
newNode->next = head;
head = newNode;
newNode->num = rand() % 100;
printf("add Node, num: %d, tid: %ld\n", newNode->num, pthread_self());
pthread_mutex_unlock(&mutex); // 解锁
sem_post(&csem); // V操作,消费者信号量+1,通知消费者
}
return NULL;
}
void* customer(void *arg) {
while (1) {
sem_wait(&csem); // 消费者信号量-1,等于0就阻塞
pthread_mutex_lock(&mutex); // 加锁
struct Node *tmp = head;
head = head->next;
printf("del node, num: %d, tid: %ld\n", tmp->num, pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex); // 解锁
sem_post(&psem); // 生产者信号量+1,通知生产者
}
return NULL;
}
int main() {
pthread_mutex_init(&mutex, NULL);
sem_init(&psem, 0, 8);
sem_init(&csem, 0, 0);
// 创建5个生产者,5个消费者
pthread_t ptids[5], ctids[5];
for (int i = 0; i < 5; i++) {
pthread_create(&ptids[i], NULL, producer, NULL);
pthread_create(&ctids[i], NULL, customer, NULL);
}
// 线程分离
for (int i = 0; i < 5; i++) {
pthread_detach(ptids[i]);
pthread_detach(ctids[i]);
}
while (1) {
sleep(10);
}
pthread_mutex_destroy(&mutex);
pthread_exit(NULL);
return 0;
}
add Node, num: 79, tid: 139625284757248
add Node, num: 82, tid: 139625293149952
del node, num: 82, tid: 139625418946304
del node, num: 79, tid: 139625376982784