1、什么是同步?
- 同,指协同、协助、互相配合。主旨在协同步调,按预定的先后次序运行。
- 线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据一致性,不能调用该功能。
- 同步的目的,是为了避免数据混乱,解决与时间有关的错误。实际上,不仅线程间需要同步,进程间、信号间等都需要同步机制。
- 总结:所有“多个控制流,共同操作一个共享资源”的情况,都需要同步。
2、数据混乱的原因
- 资源共享(独享资源不会)
- 调度随机(意味着数据访问会出现竞争)
- 线程间缺乏必要的同步机制。
以上三点中,前两点不能改变,欲提高效率,传递数据,资源必须共享。只要共享资源,就一定会出现竞争。只要存在竞争关系,数据就很容易出现混乱。
所以只能从第三点着手解决。使多个线程在访问共享资源的时候,出现互斥。
3、互斥量(互斥锁)
- Linux中提供一把互斥锁mutex,每个线程在对资源操作前都尝试先加锁,成功加锁才操作,操作结束解锁。
- 资源还是共享的,线程间也还是竞争的,但通过“锁”将资源访问变成互斥操作,而后的时间有关错误也不会再产生了。
- 同一时间,只能有一个线程持有该锁。
- 主要函数:
// restrict关键字:用于限制指针,告诉编译器,所有修改该指针指向的内存中内容的操作,只能通过本指针完成。 int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; int pthread_mutex_destroy(pthread_mutex_t *mutex); int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
// 1、定义一个全局变量互斥锁
pthread_mutex_t mutex;
// 线程回调函数
void *callback(void *agr){
while(1){
// 加锁、此时需要使用mutex的线程在该锁被释放前不能使用
pthread_mutex_lock(&mutex);
printf("123");
sleep(1);
printf("456\n”);
// 解锁、释放出资源
pthread_mutex_unlock(&mutex);
sleep(1);
}
return NULL;
}
int main(){
// 2、初始化互斥锁
pthread_mutex_init(&mutex, NULL);
pthread_t tid;
pthread_create(&tid, NULL, callback, NULL);
// 3、使用锁
while(1){
pthread_mutex_lock(&mutex);
printf("abc");
sleep(2);
printf("def\n");
pthread_mutex_unlock(&mutex);
sleep(1);
}
// 4、释放/销毁互斥锁
pthread_mutex_destroy(&mutex);
return 0;
}
执行结果:
// 使用互斥锁前
parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_mutex.out
abc123456
123456
123def
abc456
123def
abc456
123456
^C
parallels@ubuntu:~/Linux/pthread_syn$
// 使用互斥锁后
parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_mutex.out
abcdef
123456
abcdef
123456
abcdef
123456
abcdef
^C
parallels@ubuntu:~/Linux/pthread_syn$
PS:在访问共享资源前加锁,访问结束后立即解锁。锁的“粒度”应越小越好。
4、死锁
- 同一线程重复加锁,会造成自己等自己,死锁;
- 两个线束相互等待对方的锁,死锁
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
// 1、定义两个互斥锁
pthread_mutex_t mutex1, mutex2;
// 子线程回调函数
void *callback(void *arg){
// 对mutex1加锁
pthread_mutex_lock(&mutex1);
printf("sub pthread: I get mutex1....\n");
sleep(1);
// 对mutex2加锁
pthread_mutex_lock(&mutex2);
printf("sub pthread: I get mutex2...\n");
pthread_mutex_unlock(&mutex2);
pthread_mutex_unlock(&mutex1);
return NULL;
}
int main(){
// 2、初始化两把互斥锁
pthread_mutex_init(&mutex1, NULL);
pthread_mutex_init(&mutex2, NULL);
// 3、创建子线程
pthread_t tid;
pthread_create(&tid, NULL, callback, NULL);
// 4、主线程与子线程相互等待对方的锁
pthread_mutex_lock(&mutex2);
printf("main pthread: I get mutex2...\n");
sleep(1);
pthread_mutex_lock(&mutex1);
printf("main pthread: I get mutex1...\n");
pthread_mutex_unlock(&mutex2);
pthread_mutex_unlock(&mutex1);
// 销毁释放
pthread_mutex_destroy(&mutex1);
pthread_mutex_destroy(&mutex2);
}
执行结果:
parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_mutex_deadlock.out
main pthread: I get mutex2...
sub pthread: I get mutex1....
^C
parallels@ubuntu:~/Linux/pthread_syn$
5、读写锁
- 写加锁:不可读写,阻塞读写线程
- 读加锁:可读不可写,阻塞写线程
- 写优先级高,写独占、读共享。
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
示例代码:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
// 1、定义读写锁
pthread_rwlock_t rwlock;
// “写”相关线程主函数
void *callback_read(void *arg){
while(1){
pthread_rwlock_rdlock(&rwlock);
printf("I am a pthread -- read %d --, I get the rwlock now...\n", (int)(long)arg);
sleep(1);
pthread_rwlock_unlock(&rwlock);
usleep(300);
}
return NULL;
}
// “读”相当线程主函数
void *callback_write(void *arg){
while(1){
pthread_rwlock_wrlock(&rwlock);
printf("----------------------------------------------------\n");
printf("I am a pthread -- writ %d --, I get the rwlock now...\n", (int)(long)arg);
printf("----------------------------------------------------\n");
sleep(1);
pthread_rwlock_unlock(&rwlock);
usleep(300);
}
return NULL;
}
int main(){
// 2、初始化读写锁
pthread_rwlock_init(&rwlock, NULL);
// 3、创建10个线程,6个读4个写
pthread_t tid[10];
int i, ret;
for (i = 0; i < 6; i++){
ret = pthread_create(&tid[i], NULL, callback_read, (void*)(long)i);
if (ret != 0){
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
continue;
}
}
for (i = 6; i < 10; i++){
ret = pthread_create(&tid[i], NULL, callback_write, (void *)(long)i);
if (ret != 0){
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
continue;
}
}
// 回收子线程
for (i = 0; i < 10; i++){
ret = pthread_join(tid[i], NULL);
if (ret != 0){
fprintf(stderr, "pthread_join error:%s\n", strerror(ret));
continue;
}
}
// 4、回收释放读写锁
pthread_rwlock_destroy(&rwlock);
pthread_exit(NULL);
}
执行结果:
parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_rwlock.out
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 6 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 7 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 9 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 8 --, I get the rwlock now...
----------------------------------------------------
^C
parallels@ubuntu:~/Linux/pthread_syn$
6、条件变量
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_destroy(pthread_cond_t *cond);
- int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- 阻塞等待条件变量cond
- 释放已掌握的互斥锁,相当于pthread_mutex_unlock(&mutex)
- 唤醒时解决阻塞,重新申请互斥锁pthread_mutex_lock(&mutex)
- PS:前两步为原子操作
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
// 1、定义互斥锁、条件变量
pthread_mutex_t mutex;
pthread_cond_t cond;
// 子线程主函数
void *callback(void *arg){
pthread_mutex_lock(&mutex);
int i = 10;
while(i--){
printf("I am sub pthread runloop at %d\n", i);
// 中间设计
if (i == 7){
pthread_cond_wait(&cond, &mutex);
/* 使用pthread_cond_timedwait函数
struct timespec t;
t.tv_sec = time(NULL) + 3;
// 第三个参数const struct timespec *restrict abstime
// timespec{time_t tv_sec;time_t tv_usec}
pthread_cond_timedwait(&cond, &mutex, &t);
*/
}
//sleep(1);
}
pthread_mutex_unlock(&mutex);
return NULL;
}
int main(){
// 2、初始化互斥锁、条件变量
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
// 3、开启子线程
pthread_t tid;
pthread_create(&tid, NULL, callback, NULL);
// 4、发送条件变量信号
sleep(3);
pthread_cond_signal(&cond);
// 5、注销条件变量、互斥锁
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
// 回收子线程,退出当前线程
pthread_join(tid, NULL);
pthread_exit(NULL);
}
7、生产者消费者模型
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
// 1.1、定义并创建互斥锁、条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 1.2、定义数据模型与数据结构
// 测试用的数据模型
struct msg{
struct msg *next;
int num;
};
// 全局链表
struct msg *head;
// 打印输出“未消费”的产品
void print_msg(){
printf(" | ");
struct msg *msg = head;
while(msg){
printf("%d ", msg->num);
msg = msg->next;
}
putchar('\n');
}
// 3.1、生产者线程主函数
void *producer(void *arg){
while(1){
pthread_mutex_lock(&mutex);
// 产生一条新数据
struct msg *msg = malloc(sizeof(struct msg));
msg->num = rand() % 99;
// 在队头插入数组
msg->next = head;
head = msg;
// 打印当前结果
printf("producer msg with num = %d", msg->num);
print_msg();
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond);
sleep(rand() % 3);
}
}
// 3.2、消费者线程主函数
void *consumer(void *arg){
while (1){
pthread_mutex_lock(&mutex);
if (head == NULL){
pthread_cond_wait(&cond, &mutex);
}
// 从队头取数据
struct msg *msg = head;
head = msg->next;
// 打印当前结果
printf("consumer msg with num = %d", msg->num);
print_msg();
pthread_mutex_unlock(&mutex);
free(msg);
sleep(rand() % 3);
}
}
int main(){
// 生成一个随机数
srand(time(NULL));
// 2、创建2个线程并开启
pthread_t pid, cid;
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
// 4、回收线程并注销锁、条件变量
pthread_join(pid, NULL);
pthread_join(cid, NULL);
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
pthread_exit(NULL);
}
执行结果:
parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_cp.out
producer msg with num = 91 | 91
consumer msg with num = 91 |
producer msg with num = 67 | 67
consumer msg with num = 67 |
producer msg with num = 36 | 36
consumer msg with num = 36 |
producer msg with num = 54 | 54
consumer msg with num = 54 |
producer msg with num = 16 | 16
条件变量的优点:相对mutex而言,条件变量可以减少竞争,如果直接使用mutex,除了生产者,消费者之间也要竞争互斥量,事实上如果汇聚中没有数据,消费者之间的竞争是没有意义的,有了条件变量之后,只有生产者完成生产,消费者之间才会有竞争,提高了程序的效率。