Skip to main content

moregeek program

【c】高并发线程池设计_qq611a21400112c的博客-多极客编程

高并发线程池设计

并发基本概念

  • 所谓并发编程指的是在同一台计算机上"同时"处理多个任务。
  • 并发是在同一实体上的多个事件。

处理事件过程出现阻塞

  • 漫长的CPU密集型处理。
  • 读取文件,但文件尚未缓存,从硬盘中读取较为缓慢。
  • 不得不等待获取某个资源:
  • 硬件驱动
  • 互斥锁
  • 等待同步方式调用的数据库响应
  • 网络上的请求和响应

多线程的缺陷

  • 单个进程或线程同时只能处理一个任务,如果有很多请求需要同时处理怎么办?
  • 解决方案——运用多进程或多线程技术解决。
  • 缺陷:
  • 创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际用户请求的时间和资源要多的多。
  • 活动的线程需要消耗系统资源,如果启动太多线程,会导致系统由于过度消耗内存或"切换过度"而导致系统资源不足。
  • 线程切换时,该线程执行的相关信息会被保存在对应的上下文中,线程数越多,所用于切换的时间就越多。
  • 解决:——使用线程池技术。

线程池

  • 线程池
  • 由一个任务队列和一组处理任务队列的线程组成。一旦工作进程需要处理某个可能"阻塞"的操作,不用自己操作,将其作为一个任务放到线程池的队列,接着会被某个空闲线程提取处理。
  • 注意:
  • 线程中的线程都要从任务队列中拿任务(同一个任务只允许一个线程拿到),会修改任务队列的链表,进程往里面加入新的任务也会修改任务队列的链表,二者无法同时修改,所以任务队列为临界资源,所以这里要实现同步与互斥。

【C】高并发线程池设计_线程池


线程池的核心组件

  • ​任务​​——待处理的工作,通常由标识、上下文和处理函数组成。
  • ​任务队列​​——按顺序保存待处理的任务序列,等待线程中的线程组处理。
  • ​线程池​​——由多个已启动的一组线程组成。
  • ​条件变量​​——一种同步机制,允许线程挂起,知道共享数据上的某些条件得到满足。
  • ​互斥锁​​——保证在任意时刻,只能有一个线程访问该对象。

Nginx线程池解析

  • 注: 如下代码为本人看的某个视频中的资料,从Nginx中c抽下来的,貌似与最新的Nginx源码并不是很相同,因为经过删减,而且不是删减的最新版,但是大致意思我想应该是差不多的。:)
执行流程
  1. 创建线程池并初始化。
  • 初始化开辟空间并进行相关默认设置及属性。
  • 创建互斥锁、条件变量。
  • 初始化任务队列。
  • 创建线程池中的线程。并启动线程。这里面涉及到互斥锁与条件变量,等待任务并进行取出,详情请看代码中的注释。​​这里为核心​​。
  1. 分配任务内存
  • 任务结构体和其任务执行函数的参数内存一起分配。
  1. 指定任务的执行函数。
  2. 将任务放入线程池。
  3. 使用结束后销毁线程池。
  • 弄几个自杀任务放到任务队列中,等着线程们来取,然后依次自杀。
  • 之后销毁互斥锁、条件变量。
  • 最后free掉自己。

主要数据结构
任务结构体
  • ​thread_task_s​
struct thread_task_s {
thread_task_t *next;//下一个任务
uint_t id;//任务ID
void *ctx;//上下文,任务要带的参数
void (*handler)(void *data);//函数指针,具体执行的任务。
};
//起别名
typedef struct thread_task_s thread_task_t;

分配任务内存
  • ​thread_task_alloc​
thread_task_t *
thread_task_alloc(size_t size)
{
thread_task_t *task;

//任务内存+函数参数内存
task = calloc(1,sizeof(thread_task_t) + size);
if (task == NULL) {
return NULL;
}
//task为thread_task_t指针
//指针与常数相加
task->ctx = task + 1;//task+1,此时指向的是任务函数的参数所在内存。
return task;
}

任务队列结构体
  • ​thread_pool_queue_t​
typedef struct {
thread_task_t *first;//指向第一个元素
thread_task_t **last;//指向最后一个结点
} thread_pool_queue_t;//任务队列,单链表结构。
  • 补充:此单链表不同于我们在学数据结构时的那种定义,这里使用了二级指针,我感觉还是挺有意思的。相关的插入、取出操作在下面的相关线程池代码中有,这里我们提前拿出来先看一看。
  • 任务队列定义:如上所示,这里我们重复写一下,这样更方便顺序看。

typedef struct {
thread_task_t *first;//指向第一个元素
thread_task_t **last;//指向最后一个结点,通过last来链接。
} thread_pool_queue_t;//任务队列,单链表结构。

  • 插入操作:

thread_task_t *task;
//task为任务,thread_task_t类型,将先将next置空。
task->next = NULL;

//*last其实就是first,即first=task
*tp->queue.last = task;
//注意last=&task->next,即目前task保存的是first后一结点的地址(注意这里是二级指针,这里我指一级指针为结点,二级指针就是结点的地址,即,next指针的地址)。
tp->queue.last = &task->next;

/*我们接着模拟第二次插入
接着将上一个task的next = 本次要链接的task
接着拿到本次要链接的task的下一个结点的地址,next指针的地址。
*/

  • 取出操作:

task = tp->queue.first;//取出第一个
tp->queue.first = task->next;//首结点指针后移

if (tp->queue.first == NULL) {//任务队列空了,回到最初的状态,重新准备链接。
tp->queue.last = &tp->queue.first;
}


线程池结构体
  • ​thread_pool_s​
struct thread_pool_s {
pthread_mutex_t mtx; //互斥锁
thread_pool_queue_t queue; //任务队列
int_t waiting; //线程池中没有处理的任务还有多少
pthread_cond_t cond; //线程条件变量

char *name; //线程池的名字
uint_t threads; //线程池中线程的数量
int_t max_queue; //任务队列最多能够容纳多少个任务
};
//别名
typedef struct thread_pool_s thread_pool_t;

线程池的初始化
  • ​thread_pool_init()​
thread_pool_t* thread_pool_init()
{
int err;
pthread_t tid;
uint_t n;
pthread_attr_t attr;//线程属性设置结构体
thread_pool_t *tp=NULL;

tp = calloc(1,sizeof(thread_pool_t));

if(tp == NULL){
fprintf(stderr, "thread_pool_init: calloc failed!\n");
}

thread_pool_init_default(tp, NULL);//线程池部分属性默认设置
thread_pool_queue_init(&tp->queue);//线程池任务队列初始化
if (thread_mutex_create(&tp->mtx) != OK) {//创建互斥锁
free(tp);
return NULL;
}

if (thread_cond_create(&tp->cond) != OK) {//创建条件变量
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
return NULL;
}
//线程属性初始化
err = pthread_attr_init(&attr);
if (err) {
fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
//在线程创建时,将其属性设置为分离状态。
//主线程使用pthread_join无法等待该子线程。
//即无法再捕捉该子线程的状态
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err) {
fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
for (n = 0; n < tp->threads; n++) {
//线程的创建
err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
if (err) {
fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
}
(void) pthread_attr_destroy(&attr);

return tp;
}

线程池任务队列初始化
  • ​thread_pool_queue_init​
#define thread_pool_queue_init(q)                                         \
(q)->first = NULL; \
(q)->last = &(q)->first

线程池中线程的启动
  • ​thread_pool_cycle​
static void *
thread_pool_cycle(void *data)
{
thread_pool_t *tp = data;//所在线程池,在创建线程的时候传递过来
int err;
thread_task_t *task;
if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);
for ( ;; ) {
//上锁
if (thread_mutex_lock(&tp->mtx) != OK) {
return NULL;
}
tp->waiting--;//等待的线程--
while (tp->queue.first == NULL) {//没有任务
//等待信号,先挂起,然后开锁。——等任务队列中有任务。
//被唤醒时,先上锁,然后正式被唤醒。
if (thread_cond_wait(&tp->cond, &tp->mtx)
!= OK)
{
(void) thread_mutex_unlock(&tp->mtx);//防御型编程,开锁。
return NULL;
}
}
//从任务队列中拿任务
task = tp->queue.first;
tp->queue.first = task->next;

//如果取出一个任务后,任务队列又空了,重新设置last指向。
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}
//开锁
if (thread_mutex_unlock(&tp->mtx) != OK) {
return NULL;
}
if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
task->id, tp->name);
task->handler(task->ctx);//当前执行任务函数,task->ctx为函数参数
if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);
task->next = NULL;
free(task);
}
}

往线程池中投递任务
  • ​thread_task_post​
int_t
thread_task_post(thread_pool_t *tp, thread_task_t *task)
{
if (thread_mutex_lock(&tp->mtx) != OK) {//上锁
return ERROR;
}

//任务队列尾邻接资源,进行互斥访问。
if (tp->waiting >= tp->max_queue) {//线程池等待任务队列是否达到极限
(void) thread_mutex_unlock(&tp->mtx);

fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
tp->name, tp->waiting);
return ERROR;
}

//task->event.active = 1;

task->id = thread_pool_task_id++;//任务id++
task->next = NULL;

//发送一个信号,唤醒一个线程,之后该线程就能从任务队列中获取任务,进行执行。
if (thread_cond_signal(&tp->cond) != OK) {
(void) thread_mutex_unlock(&tp->mtx);
return ERROR;
}

//一开始的时候last,默认指向的值first的地址。
//所以此时给*tp->queue.last赋值后,first = tast
//返回last为空,还是尾插法。
*tp->queue.last = task;
tp->queue.last = &task->next;

tp->waiting++;

(void) thread_mutex_unlock(&tp->mtx);

if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
task->id, tp->name);

return OK;
}

销毁线程池
  • ​thread_pool_destroy​
void thread_pool_destroy(thread_pool_t *tp)
{
uint_t n;
thread_task_t task;
volatile uint_t lock;

memset(&task,'\0', sizeof(thread_task_t));

task.handler = thread_pool_exit_handler;//给一个自杀任务
task.ctx = (void *) &lock;//参数

for (n = 0; n < tp->threads; n++) {
lock = 1;

if (thread_task_post(tp, &task) != OK) {//投递任务
return;
}

while (lock) {//自杀任务中,会将lock置为0,终止循环。
sched_yield();//当前线程放弃CPU的优先权,让出CPU的执行权,让别的线程得到更多的执行机会。
}

}

(void) thread_cond_destroy(&tp->cond);//清理条件变量
(void) thread_mutex_destroy(&tp->mtx);//清理互斥锁
free(tp);//释放线程池
}

线程自杀任务
  • ​thread_pool_exit_handler​
static void
thread_pool_exit_handler(void *data)
{
uint_t *lock = data;

*lock = 0;

pthread_exit(0);
}

示例
#include "thread_pool.h"

struct test{

int arg1;

int arg2;

};



void task_handler1(void* data){

static int index = 0;

printf("Hello, this is 1th test.index=%d\r\n", index++);

}



void task_handler2(void* data){

static int index = 0;

printf("Hello, this is 2th test.index=%d\r\n", index++);

}



void task_handler3(void* data){

static int index = 0;

struct test *t = (struct test *) data;



printf("Hello, this is 3th test.index=%d\r\n", index++);

printf("arg1: %d, arg2: %d\n", t->arg1, t->arg2);

}

int

main(int argc, char **argv)

{

thread_pool_t* tp = NULL;//定义一个线程池指针

int i = 0;



tp = thread_pool_init(); //线程池初始化



//分配任务内存

thread_task_t * test1 = thread_task_alloc(0);

thread_task_t * test2 = thread_task_alloc(0);

thread_task_t * test3 = thread_task_alloc(sizeof(struct test));

//指定任务

test1->handler = task_handler1;

test2->handler = task_handler2;

test3->handler = task_handler3;



//通过结构体指定参数

((struct test*)test3->ctx)->arg1 = 666;

((struct test*)test3->ctx)->arg2 = 888;



//任务放入线程池

thread_task_post(tp, test1);

thread_task_post(tp, test2);

thread_task_post(tp, test3);



sleep(10);

thread_pool_destroy(tp);

}

【C】高并发线程池设计_线程池_02


补充
  • ​volatile​​关键字:
  • 有些变量是用 volatile 关键字声明的。当两个线程都要用到某一个变量且该变量的值会被改变时,应该用 volatile 声明,该关键字的作用是防止优化编译器把变量从内存装入 CPU 寄存器中。​​如果变量被装入寄存器,那么两个线程有可能一个使用内存中的变量,一个使用寄存器中的变量​​,这会造成程序的错误执行。volatile 的意思是让编译器每次操作该变量时一定要从内存中真正取出,而不是使用已经存在寄存器中的值。来源-​​菜鸟教程-C/C++ 中 volatile 关键字详解-多线程下的volatile​​。

©著作权归作者所有:来自51CTO博客作者半生瓜的blog的原创作品,请联系作者获取转载授权,否则将追究法律责任

[ c++ ] stl _ vector使用及其模拟实现_小白又菜的博客-多极客编程

1.Vector的介绍1.1 Vector的介绍​​vector官方文档介绍​​1.vector是表示可变大小数组的序列容器。2. 就像数组一样,vector也采用的连续存储空间来存储元素。也就是意味着可以采用下标对vector的元素进行访问,和数组一样高效。但是又不像数组,它的大小是可以动态改变的,而且它的大小会被容器自动处理。3. 本质讲,vector使用动态分配数组来存储它的元素。当新元素插

c++ io流_数据的旅行之路_一枚大果壳的博客-多极客编程

1. 前言 程序中的数据总是在流动着,既然是流动就会有方向。数据从程序的外部流到程序内部,称为输入;数据从程序内部流到外部称为输出。 C++提供有相应的API实现程序和外部数据之间的交互,统称这类API为 IO 流API。 流是一个形象概念,数据从一端传递到另一端时,类似于水一样在流动,只是流动的不是水,而是数据。 概括而言,流对象可连接 2 端,并在两者之间搭建起一个通道 ,让数据通过此通道

c语言指针,拿下!_ni_cue~的博客-多极客编程

@[toc] 一. 引言 要说学习C语言的过程中什么最头疼,那必然是指针!咱能放弃吗,咱不能!看完这篇,助你拿下指针~\首先来快速回顾一下指针的基础知识: 指针也是一个变量,只不过这个变量是用来存放地址的。要知道的是,地址唯一指向一块内存空间 在这个例子中,指针变量p中存放的是a的地址 示意图: 指针是有类型的,指针的类型决定指针 + - 运算时候的步长及解引用操作时的权限大小 指针

宝藏例题(欧几里得算法+素数的三种境界………)_hanwang的博客-多极客编程

1,写代码将三个数从大到小输出底层逻辑:首先,定一个不变的顺序,A>B>C-------------三个坑再想着把三个数顺序的放在三个坑中假设三者的大小关系进行排序,需要进行三次临时变量tmp不需要重新设置2,计算最大公约数---------辗转相除法/欧几里得算法%作为运算符是用来取余的,/是除法运算。例如:除法运算式65 ÷8  = 8余1那么65 / 8 = 865 %8 = 1

每日算法刷题day1-隐式转换与精度丢失_wx62e40d60030b6的博客-多极客编程

⭐每日算法系列文章旨在精选重点与易错的算法题,总结常见的算法思路与可能出现的错误,与笔者另一系列文章有所区别,并不是以知识点的形式提升算法能力,而是以实战习题的形式理解算法,使用算法。1.球的体积给定你一个球体的半径 RR,请你计算球体的体积。计算球体的公式为 。π取 3.14159。输入格式输入一个整数 R。输出格式输出格式为 ​​VOLUME = X​​,其中 XX 为球体的体积,结果保留三位

【云原生|docker】docker镜像操作_柒号华仔的博客-多极客编程

作者:柒号华仔 个人主页:欢迎访问我的主页 个人信条:星光不问赶路人,岁月不负有心人。 个人方向:主要方向为5G,同时兼顾其他网络协议,编解码协议,C/C++,linux,云原生等,感兴趣的小伙伴可以关注我,一起交流。1. 镜像简介Docker 镜像(Image)是用于创建 Docker 容器的模板。Docker 镜像相当于一个 root 文件系统,比如官方镜像 ubuntu:16.04 就包含了

apache druid 数据摄取---本地数据和kafka流式数据_博学谷狂野架构师的博客-多极客编程

Durid概述 Apache Druid是一个集时间序列数据库、数据仓库和全文检索系统特点于一体的分析性数据平台。本文将带你简单了解Druid的特性,使用场景,技术特点和架构。这将有助于你选型数据存储方案,深入了解Druid存储,深入了解时间序列存储等。 Apache Druid是一个高性能的实时分析型数据库。 上篇文章,我们了解了Druid的加载方式, 咱么主要说两种,一种是加载本地数据,

python中的if语句有哪些?_12178579的博客-多极客编程

if 条件判断语句python 语句是按固定顺序执行的,先执行前面的语句,再执行后面的语句。如果你像要程序按照你自己定制的流程执行,就需要用到流程控制的语句,最主要用到的是条件语句和循环语句。条件语句用 if 表示,它表示当满足某个条件时,执行下面的分支代码。当条件不满足时,则跳过下面的分支代码。image-20210907141048465在互联网产品中,你经常能看到条件判断的场景。比如在一个

#yyds干货盘点# 面试必刷top101:判断是不是平衡二叉树_风的博客-多极客编程

1.简述:描述输入一棵节点数为 n 二叉树,判断该二叉树是否是平衡二叉树。在这里,我们只需要考虑其平衡性,不需要考虑其是不是排序二叉树平衡二叉树(Balanced Binary Tree),具有以下性质:它是一棵空树或它的左右两个子树的高度差的绝对值不超过1,并且左右两个子树都是一棵平衡二叉树。样例解释:样例二叉树如图,为一颗平衡二叉树注:我们约定空树是平衡二叉树。数据范围:,树上节点的val值满

flask+layui+echarts实现前端动态图展示数据_wx6237f50e82bc0的博客-多极客编程

效果图:该效果主要实现一个table展示数据,并在下方生成一个折线图。实现方式:1、首先需要对表格进行一个数据加载,这里用到了layui的table.render,具体用法可以参考​​https://www.layui.com/doc/modules/table.html​​html部分:1 <table class="layui-hide" id="reportTableId" lay-f

go使用gin框架之中间件_zzxiaoma的博客-多极客编程

中间件主要是指可以拦截http请求,对到达请求之前进行一些处理,如数据处理,权限控制,认证等。 r := gin.Default(),默认自带Logger(), Recovery()全局作用了两个中间件。如果不想使用这2个中间件,可以使用r := gin.New()。 中间件写法,返回的是gin.HandlerFunc func RequestInfos() gin.HandlerFunc {

c++赋值函数|移动赋值函数|移动构造函数_wx6237f50e82bc0的博客-多极客编程

左值引用和右值引用 左值与右值:左值:在内存中占有确定位置的对象,即左值占有内存。换句话来说,就是有地址,有值。右值:不占内存(科学地讲是临时寄存器),仅有值,为临时变量。左右值的切换: 右值->左值:用*符号。int a=10;int* b=&a;// b为右值。*b=20;// b为右值,*b为左值。左值->右值:用&符号。int a = 10;&a = 4