来源:公众号:【鱼鹰谈单片机】
作者:鱼鹰Osprey
ID:emOsprey
这篇长文除了由浅入深的一步步迭代出无锁队列的实现原理,也会借此说说如何在项目中注意避免写出有 BUG 的程序,与此同时也会简单聊聊如何测试一段代码,而这些能力应该是所有软件开发工作者都应该引起注意的。而在介绍的过程中也会让你明白理论和实际的差距到底在哪。
高级程序员和初级程序员之间,鱼鹰认为差距之一就在于写出来的代码 BUG 多还是少,当然解决 BUG 的能力也很重要。
既要有挖坑的能力,也要有填坑的实力!
其实在避免BUG这一块,鱼鹰不管是在前面的《》,还是《》,甚至于鱼鹰最先思考的关于信号量保护《》《》那里都有介绍过,如果你能真正理解文章所介绍的那些情况,那么写一个可靠的程序应该不难。
而关于队列,鱼鹰也在很早之前就写过一篇《》,只是那个时候总觉得那种方式效率太低(后面会说为什么效率低),应用场合太少。
很早之前鱼鹰就听说过无锁队列,但由于以前的项目不是很复杂,很多时候都可以不需要无锁队列,所以就没有更深入的学习,直到这次串口通信想试试无锁队列的效果,才最终用上了这一神奇的代码。
网上有个词形容无锁队列鱼鹰觉得很贴切:巧夺天工!
现在就从队列开始说起吧。
什么是队列,顾名思义,就类似于超市面前排起的一个队伍,当最前面的顾客买完了东西,后面的顾客整体向前移动,而处于新队头的顾客继续消费。如果没有后来的顾客,那么最终这个队伍将消失。而如果有新的顾客到来,那么他将排在队伍最后等待购买。
// 鱼鹰*公***众**号:鱼鹰谈单片机,
uint16_t queue[10000]; // 队列,0~9999,共 10000
uint16_t in;
uint16_t out;
int get(void *parameter)
{
if(in == out)
{
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",queue[out]);
// 自加 1 ,但因为数组只有 10000 大小,所以需要让 out 在 0~9999 之间进行循环
out = (out + 1) % 10000;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if((in + 1) % 10000 == out)
{ // 因为 in 和 out 值相等时,可能为空和满的,无法判断哪种情况
// 所以为了避免这种情况,总是空一个位置,这样满的时候 in 和 out 的值就不一样
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",in);
queue[in] = in;
// 自加 1 ,但因为数组只有 10000 大小,所以需要让 in 在 0~9999 之间进行循环
in = (in + 1) % 10000;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
(滑动查看全部)现在假设柜员刚开始上班,他不清楚有没有顾客等待服务,所以他首先使用 get函数获取自己的服务顾客编号,但是因为柜员来的有点早,顾客还没来银行办理业务,所以第一次机器告诉柜员没有顾客等待服务。

V 1.0
假设一次排队最多同时有99位顾客需要业务,那么数组设置为 100(为什么多一个?)。但是因为一天之中总共可能有超过 99 位顾客办理业务,那么直接使用in作为顾客编号肯定不合适,因为in的范围是0~99,那么必然需要另一个变量用于记录当天的总人数,这里假设为counter(当你修改代码时,你会发现把之前的 10000 设置为宏是明智的选择)。这里除了修改数组大小和put函数外,其他都一样:
// 测试代码 V1.0
// 鱼鹰*公***众**号:鱼鹰谈单片机,
uint16_t queue[BUFF_SIZE]; // 队列,0~99,共 100
uint16_t in;
uint16_t out;
uint16_t counter; // 记录
int get(void *parameter)
{
if(in == out)
{
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",queue[out]);
// 自加 1 ,但因为数组只有 100 大小,所以需要让 out 在 0~99 之间进行循环
out = (out + 1) % BUFF_SIZE;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if((in + 1) % BUFF_SIZE == out)
{ // 因为 in 和 out 值相等时,可能为空和满的,无法判断哪种情况
// 所以为了避免这种情况,总是空一个位置,这样满的时候 in 和 out 的值就不一样
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",counter);
queue[in] = counter;
counter = (counter + 1) % 10000; // 限制它的大小,不可超过四位数
// 自加 1 ,但因为数组只有 100 大小,所以需要让 in 在 0~99 之间进行循环
in = (in + 1) % BUFF_SIZE;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
(滑动查看全部)再次测试:
V 1.5

// 测试代码 V1.5
// 鱼鹰*公***众**号:鱼鹰谈单片机
typedef struct
{
uint16_t queue[BUFF_SIZE]; // 队列,0~99,共 100
uint16_t in;
uint16_t out;
uint16_t used;
}fifo_def; // 把队列相关的变量整合在一块,方便使用
uint16_t counter; // 记录
fifo_def fifo;
int get(void *parameter)
{
if(fifo.used == 0)
{
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",fifo.queue[fifo.out]);
// 自加 1 ,但因为数组只有 100 大小,所以需要让 out 在 0~99 之间进行循环
fifo.out = (fifo.out + 1) % BUFF_SIZE;
fifo.used--;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample); // 导出到命令行中使用
int put(void)
{
if(fifo.used >= BUFF_SIZE)
{ // 因为 in 和 out 值相等时,可能为空和满的,无法判断哪种情况
// 所以为了避免这种情况,总是空一个位置,这样满的时候 in 和 out 的值就不一样
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",counter);
fifo.queue[fifo.in] = counter;
counter = (counter + 1) % 10000; // 限制它的大小,不可超过四位数
// 自加 1 ,但因为数组只有 100 大小,所以需要让 in 在 0~99 之间进行循环
fifo.in = (fifo.in + 1) % BUFF_SIZE;
fifo.used++;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
在这个版本中,鱼鹰将队列所需的元素整合成一个结构体方便使用(当需要增加一个队列时,显得比较方便),并加入一个used变量,还缩小了队列大小,变成7,方便我们关注队列最本质的东西。测试如下:
V 2.0
// 测试代码 V2.0
// 鱼鹰*公***众**号:鱼鹰谈单片机
typedef struct
{
uint32_t in; // 注意,这里是 32 位
uint32_t out; // 注意,这里是 32 位
uint8_t *queue; // 这里不再指定大小,而是根据实际情况设置缓存大小
}fifo_def; // 把队列相关的变量整合在一块,方便使用
uint8_t counter; // 记录人数,也是发放的编号,因为队列类型变了,所以这里也改成 uint8_t
uint8_t fifo_buff[BUFF_SIZE];
fifo_def fifo = {0,0,fifo_buff}; // 为了简化初始化过程,直接定义时初始化
int get(void *parameter)
{
if(fifo.in - fifo.out == 0)
{ // 也可以写成 fifo.in == fifo.out,效率或许会高一些
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",fifo.queue[fifo.out % BUFF_SIZE]);
fifo.out++; // 直接自加
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample); // 导出到命令行中使用
int put(void)
{
if(BUFF_SIZE - fifo.in + fifo.out == 0)
{
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",counter);
fifo.queue[fifo.in % BUFF_SIZE] = counter; // 这里用 fifo.in % (BUFF_SIZE - 1) 效率会高一些
counter += 1; // 递增计数器
fifo.in++; // 直接自加
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);

if(BUFF_SIZE - (fifo.in - fifo.out) == 0)
{
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
if(fifo.in - fifo.out == 0)
{ // 也可以写成 fifo.in == fifo.out,效率或许会高一些
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
还有两个不可以思议的自加操作:fifo.in++; // 直接自加
fifo.out++; // 直接自加
现在先说第一个简单的,为什么in和out相等了,就代表为空?先看一个动图(公**众&&号可查看):BUFF_SIZE - (fifo.in - fifo.out)
内括号里面的其实就是《延时功能进化论》中说的那个神奇的算式。这个算式总是能准确in和out之间的差值(不管是in大于out还是小于),也就是队列中已使用了的空间大小,当然这是有条件的。想象一下这样一个场景,小红不再等待小蓝,而是一口气跑下去,当小红再次接近小蓝时,你会发现小红看到的是小蓝的后背,也就是说,从表象来看,小蓝跑到小红的前面了。但是群众的眼光是雪亮的,他们知道小红其实是跑在小蓝的前面的,所以按小红在小蓝前面这种情况用尺子量一下两者就能准确知道两者的距离。

BUFF_SIZE - (fifo.in - fifo.out)
fifo.in++; // 直接自加
fifo.out++; // 直接自加
BUFF_SIZE 就是这把尺子,而 in 和 out 相减可以得到两者距离,尺子总长减去两者距离,就可得到尺子剩余可测量的距离,也是小红还可再跑的距离。而一直自加其实就是利用变量自动溢出的特性,而即使溢出导致 out 大于 in 也不会影响计算结果。看一个动图结合理解溢出(公**众&&号可查看):
写完这部分,鱼鹰才发现 V2.0 版本是存在 BUG 的,原因就在于存储数据和读取数据那需要注意的代码,之前鱼鹰想当然的认为直接取余 % 即可正确读取和存储数据,实际上因为 7 这个数根本不能被 0xFFFF FFFF + 1 给整除,也就无法正确使用了,所以V2.0版本无法使用,但如果把 7 换成 8,就没有问题了,这里需要特别注意(还好提早发现了,不然就是误人子弟了)。
好了 ,终于把最难理解的部分勉强说完了!但是目前这只是前菜,还有很多。现在继续。因为 in 和 out 一直自加,所以不像 V1.0 一样出现空和满时 in 和 out 都是相等,也就不需要留下一个空间了,完美的利用了所有可用的空间!但是 in 和 out 总是自加,必然会出现 in 和 out 大于空间大小的情况,所以在取队列中的元素时必须使用 % 限制大小,才能准确获取队列中的数据(前提是队列大小为 2 的幂次方),而 get 和 put 开头的判断总能保证in大于或等于out(这个只是从结果来看是这样,但实际有可能 in 小于 out,但这个不影响最终结果),而in – out 小于或等于队列大小,防止了可能的数组越界情况。
V 2.5
// 测试代码 V2.5
// 鱼鹰*公***众**号:鱼鹰谈单片机
typedef struct
{
uint32_t in; // 注意,这里是 32 位
uint32_t out; // 注意,这里是 32 位
uint32_t size; // 设置缓存大小
uint8_t *buffer; // 这里不再指定大小,而是根据实际情况设置缓存大小
}fifo_def; // 把队列相关的变量整合在一块,方便使用
uint8_t counter; // 记录人数,也是发放的编号,因为队列类型变了,所以这里也改成 uint8_t
uint8_t fifo_buff[BUFF_SIZE];
fifo_def fifo = {0,0,BUFF_SIZE,fifo_buff}; // 为了简化初始化过程,直接定义时初始化
void init(fifo_def *pfifo,uint8_t *buff,uint32_t size)
{
if(size == 0 || (size & (size - 1))) // 2 的 幂次方
{
retern;
}
pfifo->in = 0
pfifo->out = 0;
pfifo->size = size;
pfifo->buffer = buff;
}
int get(void *parameter)
{
fifo_def *pfifo = &fifo; // 这里直接使用全局变量,因为命令行不方便传参
uint32_t len = 2; // 这里假设一次获取 2 个字节数据
uint8_t buffer[2]; // 将队列中的数据提取到这个缓存中
uint32_t l;
len = min(len, pfifo->in - pfifo->out);
/* first get the data from fifo->out until the end of the buffer */
l = min(len, pfifo->size - (pfifo->out & (pfifo->size - 1)));
memcpy(buffer, pfifo->buffer + (pfifo->out & (pfifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, pfifo->buffer, len - l);
pfifo->out += len;
if(len) // 为了判断是否写入数据,加入调试信息
{
for(int i = 0; i < len; i++)
{
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",buffer[i]);
}
}
else
{
rt_kprintf("当前没有顾客等待服务\n");
}
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample); // 导出到命令行中使用
int put(void)
{
fifo_def *pfifo = &fifo; // 这里直接使用全局变量,因为命令行不方便传参
uint32_t len = 1; // 这里假设一次写入 1 个字节数据
uint32_t l;
uint8_t buffer[1] = {counter}; // 存入编号,即将写入到队列中
len = min(len, pfifo->size - pfifo->in + pfifo->out);
/* first put the data starting from pfifo->in to buffer end */
l = min(len, pfifo->size - (pfifo->in & (pfifo->size - 1)));
memcpy(pfifo->buffer + (pfifo->in & (pfifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(pfifo->buffer, buffer + l, len - l);
pfifo->in += len;
if(len) // 为了判断是否写入数据,加入调试信息
{
rt_kprintf("您当前领取的编号为 %04u\n",buffer[0]);
}
else
{
rt_kprintf("当前没有顾客等待服务\n");
}
counter++;// 写完自加,不属于无锁队列内容
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
首先说说和 V2.0 的主要区别:1、将队列大小改为 8,即符合2的幂次方(这是in和out无限自加所要求的,也是一种限制)2、将缓存名更改为buffer,更容易理解(所以取名字也很重要)3、增加了一个变量size,这是方便在多个fifo情况可以使用同一个函数体(函数复用),通常这个值只在初始化时赋值一次,之后不会修改。然后最难理解的就是那两个拷贝函数了。现在着重说明一种特殊拷贝情况,当能理解这种特殊情况,那么其他情况也就不难理解了。首先看两个min,这是用来取两者之间最小的那一个。
len = min(len, pfifo->in - pfifo->out);
/* first get the data from fifo->out until the end of the buffer */
l = min(len, pfifo->size - (pfifo->out & (pfifo->size - 1)));
memcpy(buffer, pfifo->buffer + (pfifo->out & (pfifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, pfifo->buffer, len - l);
开始的len是用户要求拷贝的长度,但考虑到用户可能拷贝大于队列已有数据的长度,所以有必要对它进行重新设置,即最多只能拷贝目前队列中已有的大小。第二个min其实用来获取队列out开始到数组结束的大小,难理解?看下图就很清楚了:效率讨论
BUG 讨论
看着这个宏好像挺规范的,该有的括号都加上了,应该不会出现问题才是,真的如此吗?鱼鹰在很多笔记中都曾说过,如果只是对一个共享资源进行读取操作的话是不会出现问题的,但这只是针对共享资源本身而言是如此,对于使用者来说,就不是这样了!鱼鹰曾经在《》中说过,当时的鱼鹰不明白一个判断语句为什么要在判断前关中断,虽然当时的理解是因为判断和修改应该顺序进行而不应该被打断,但是其实判断和再次读取在某种情况下也不可以被打断。为什么会出现两次读取呢?当你把 min 宏展开后,你就会发现 in 或者 out 被两次读取了。len = len < in - out ? len : in - out
假设get函数想读入3个字节数据,即len等于3,此时刚好队列中也有3个数据,即in – out 也等于3,3 < 3 ? 判断失败,跳转到最后那条语句执行,跳转时,另一个任务修改了in,增加了1,那么in – out 等于4,即最终 len 的值 为 4。也就是说,本来你的程序应该只获取 3 个数据的,实际上却获取了4 个数据,如果说你的应用程序以最终的len值作为实际get的数据,那么无锁队列还是那个无锁队列,怕就怕你的应用程序会根据传入的len和返回len值做一些判断操作;还有一些可能就是应用程序就只获取3个数据,只能少,不能多,否则程序就会出现问题。总之,因为两次读取中断导致无锁队列不再是你想的那个队列了。那么该如果解决呢?首先想到的就是把min这个宏改成函数,为了提高一点效率,也可以用inline进行声明。另一种高效方法是,在进入get函数时,把in - out的结果保存在局部变量L中(这样可不必再申请一个变量,而且简化了两次读取与运算操作),用它再带入宏中使用,这样即使后面别的任务修改了in的值,也不会影响程序的运行,只不过没有非常及时提取最新数据而已。从这里可以看出来,used变量因为副本(保存值到寄存器中)原因导致需要加互斥锁,而这里却不得不增加一个副本L来保证程序的正确运行,所以,副本的好与坏只能因情况而异,一定要谨慎分析。现在再说说另一种可能的BUG。对嵌入式了解比较多的道友应该清楚,多CPU和单CPU下可能出现代码乱序执行的情况,有可能是编译器修改了执行顺序,也有可能是硬件自动修改了执行顺序,那么上述的无锁队列就可能会出现问题(前面也说过顺序很重要)。而真正的linux内核无锁队列实现其实是加入了内存屏障的(这个可以看参考代码),而因为STM32单片机比较简单,所以鱼鹰去掉了内存屏障代码。但是还有一种情况,那就是数据缓存问题。我们知道,单片机(STM32存在指令与数据预取功能)为了运行效率,都采用流水线工作,而且为了提高效率,会预取多个数据或指令,如果说在切换任务时没有把整个流水线清除,那么也可能出现问题,不过这个问题暂时没有出现,以后再看吧。测试
推荐阅读:
-THE END-
如果对你有帮助,记得转发分享哦!
微信公众号「鱼鹰谈单片机」
每周一更单片机知识
长按后前往图中包含的公众号关注
鱼鹰,一个被嵌入式耽误的畅销书作家
个人微信「EmbeddedOsprey」
长按后打开对方的名片关注