mosquitto源码分析

更新时间:2023-03-08 06:07:45 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

mosquitto源码分析

本文由逍遥子撰写,转发请标注原址:

http://blog.csdn.net/houjixin/article/details/21461225

一、 Mosquitto简介

mosquitto是一款实现了消息推送协议MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,例如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。

Mosquitto采用出版/订阅的模式实现MQTT协议,这种设计模式将通信终端之间的关系统一到服务程序中进行管理,可极大减轻客户端的开发和维护工作。

1.1、 mqtt协议简介

MQTT(MessageQueuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。在某些应用场合中,可通过该协议维持与客户端的长连接。关于mqtt协议更详细的介绍,请参考其官方网站:http://mqtt.org/

其个版本源码下载位置:http://mosquitto.org/files/source/

1.2、 出版/订阅模式简介

出版/订阅模式定义了如何向一个节点发布和订阅消息,这些节点被称作主题(topic)。主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscriber) 从主题订阅消息。这种模式使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。

Tcp协议中,tcp连接只提供一对一的可靠传输,例如:主机A与B进行通信,则发起tcp连接的一端只需要知道对方的ip地址和端口号即可,如下图1-1所示:

图1-1 一对一通信

每一个tcp连接都是由下面的五个元素确定:

<源ip地址,源端口号,目的ip地址,目的端口号,通信协议>

在实际程序的开发过程中 一条连接建立之后,它可能需要在一段时间内一直被通信双方所保持,以备下次数据传输使用。另外,通信的终端数目可能是多个,这就需要每个终端都要维持它所有的通信关系,如下图1-2所示

图1-2 多对多通信

此时,每个参与通信的客户端所需维持的连接数量将非常庞大,这非常不利于程序的开发和实现。出版/订阅模式即是一种解决这种问题的方法,它通过增加一个中间层的方式,让中间层来维护这种多对多的关系,这个中间层通常称之为服务器,如下图1-3所示:

图1-3 增加中间层的多对多通信

通过增加中间层服务器,每个客户端都只需要维护自己同服务器之间的连接即可,而客户端之间的关系则交由中间服务器来维护,这种设计模式将复杂的通信关系维护工作从客户端剥离出来,非常方便客户端的开发和维护。

Mosquito程序即是通过这种方式进行工作,在mosquitto程序内部,将客户端之间的关系通过一棵订阅树来维持。

1.3、 Mosquito

Mosquito源码目录结构介绍。

|---- mosquitto-1.2

|---- client

|---- examples

|----mysql_log

|----temperature_conversion

|---- installer

|---- lib

|---- cpp

|---- jsws

|----python

|---- logo

|---- man

|---- po

|----libmosquitto

|----mosquitto

|----mosquitto.conf

|----mosquitto_pub

|----mosquitto_sub

|----mosquitto-tls

|----mqtt

|---- misc

|----currentcost

|----gnome-panel

|---- security

|---- service

|---- monit

|----svscan

|----upstart

|---- src

|----db_dump

|---- test

|----broker

|---- c

|---- lib

|---- c

|----cpp

|----python

|----python3

|---- ssl

|----demoCA

|----rootCA

|----signingCA

所需关注的目录有/ mosquitto-1.2/src、/ mosquitto-1.2/lib、/ mosquitto-1.2/client三个目录,其中src和lib目录下主要放置mosquitto的实现代码以及部分底层与网络相关的操作,client目录主要为两个客户端程序的实现源码。

Mosquito的源码及其相关文档可从其官方网站获取,其官方网站为:http://mosquitto.org/

mosquitto客户端和服务器运行命令 [1] 发布者客户端运行命令示例: ./mosquitto_pub -h 192.168.6.243 -p 1883 -t \ [2] 订阅者客户端运行命令示例: ./mosquitto_sub -h 192.168.6.243 -i 111 -p 1883 -t 111 -k 60 -d -c -u hjx -P hjx [3] mosquitto服务器端运行命令示例: ./mosquitto 二、 Mosquito的数据结构

1) struct mosquito

结构体struct mosquito主要用于保存一个客户端连接的所有信息,例如用户名、密码、用户ID、向该客户端发送的消息等,其定义为:

struct mosquitto {

int sock;

char*address;

char *id;

char*username;

char*password;

uint16_tkeepalive;

time_tlast_msg_in;

time_tlast_msg_out;

struct mosquitto_client_msg *msgs;

}

上面列举了该结构体部分重要成员,其中sock表示mosquitto服务器程序与该客户端连接通信所用的socket描述符;address表示该客户端的IP地址;id是该客户端登陆mosquitto程序时所提供的ID值,该值与其他的客户端不能重复;成员username和password用于记录客户端登陆时所提供的用户名和密码;keepalive是该客户端需在此时间内向mosquitto服务器程序发送一条ping/pong消息。参数last_msg_in和last_msg_out用于记录上次收发消息的时间;参数struct mosquitto_client_msg*msgs用于暂时存储发往该context的消息。

2) struct mosquitto_db

结构体struct mosquitto_db是mosquitto对所有内部数据的统一管理结构,可以认为是其内部的一个内存数据库。它保存了所有的客户端,所有客户端的订阅关系等等,其定义形式为:

struct mosquitto_db{

dbid_tlast_db_id;

struct_mosquitto_subhier subs;

struct mosquitto**contexts;

struct_clientid_index_hash *clientid_index_hash;

intcontext_count;

structmosquitto_msg_store *msg_store;

intmsg_store_count;

structmqtt3_config *config;

intsubscription_count;

……

};

上述结构体声明中,结构体成员struct _mosquitto_subhier subs保存了订阅树的总树根,mosquitto中对所有的topic都在该订阅树中维护,客户端的订阅关系也在该订阅树中维护;结构体成员struct mosquitto **contexts可理解为一个用于存放所有客户端变量(类型为struct mosquitto)地址的数组,mosquitto程序中,所有的客户端都在此数组中保存;成员int context_count用于保存数组contexts的大小,该值也是当前mosquitto程序中维持的所有客户端的数目;成员结构体struct

_clientid_index_hash*clientid_index_hash用于保存一个hash表,该hash表可通过客户端的ID快速找到该客户端在数组contexts中的索引;结构体成员struct mqtt3_config*config用于保存mosquitto的所有配置信息;

3)struct_mosquitto_subhier

数据结构struct _mosquitto_subhier是用于保存订阅树的节点(包括叶子节点和中间节点),mosquitto中对订阅树采用孩子-兄弟链表法的方式进行存储,该存储方式主要借助与数据结构struct _mosquitto_subhier来完成,该数据结构的定义为:

struct _mosquitto_subhier {

struct_mosquitto_subhier *children;

struct_mosquitto_subhier *next;

struct_mosquitto_subleaf *subs;

char*topic;

structmosquitto_msg_store *retained;

};

成员说明:

children :该成员指针指向同结构的第一个孩子节点;

next:该成员指针指向该节点的下一个兄弟节点;

subs:该成员指向订阅列表;

topic:该节点对应的topic片段;

4) struct _mosquitto_subleaf

在mosquitto程序中,对某一topic的所有订阅者被组织成一个订阅列表,该订阅列表是一个双向链表,链表的每个节点都保存有一个订阅者,该链表的节点即是:struct _mosquitto_subleaf,定义形式为:

struct _mosquitto_subleaf {

struct_mosquitto_subleaf *prev;

struct_mosquitto_subleaf *next;

structmosquitto *context;

int qos;

};

其中成员struct mosquitto *context表示一个订阅客户端,prev和next表示其前一个节点和后一个节点。

5)structmqtt3_config

结构体struct mqtt3_config用于保存mosquitto的所有配置信息,mosquitto程序在启动时将初始化该结构体并从配置文件中读取配置信息保存于该结构体变量内。

三、 Mosquito的核心功能分析

3.1、订阅树

Mosquitto通过订阅树的方式来管理所有的topic以及客户端的订阅关系,它首先将所有的topic按照/分割并组织成一棵树结构,从根节点到树中的每个节点即组成该节点所对应的一个topic,每个topic都保存一个订阅列表,该订阅列表中保存了所有订阅当前topic的客户端信息。例如有如下订阅关系:

客户端a1,a2,a3订阅了topic:A1/B1/C1

客户端b1,b2订阅了topic:A2/B2/C2

客户端c1,c2订阅了topic:A1/B1/C3

客户端d1订阅了topic:A2/B3

则在mosquitto程序中需要先将topic按照/进行分割,然后将分割后的topic片段组织成订阅树,上述订阅树的示意图为图3-1:

图3-1 订阅树示意图

Mosquitto程序在实现中根据topic消息的性质将订阅树分为两颗子树:业务子树和系统子树;mosquitto程序中将topic分为两种类型来处理:系统topic和业务topic,前者主要用于发布和维护mosquitto内部的系统消息,后者的topic是用户订阅的业务topic,做这种区分的原因是因为这两种的类型的topic性质和实现方式上有许多差别,这种差别主要体现在以下4点:

1)生存周期不同,系统topic无论是否有用户订阅都会存在与订阅树中,而业务topic必须有客户端订阅才能存在(除非其消息字段retain设置为1);

2)创建方式不同,系统topic在消息发布时进行创建,业务topic即可以在订阅时创建也可以在消息发布时创建(此时需要该消息retain字段设置为1);

3)消息保存方式不同,凡是发布到系统topic的消息都会被保存下来,业务消息将直接挂到订阅列表的各context的消息队列中,如果没有连接订阅或未设置其retain字段,消息将不会被保存下来;

消息的retain字段是否被设置在函数mqtt3_handle_publish进行检查,在该函数中有如下代码:

retain = (header & 0x01);

该代码可获取消息头部的第一个bit位,在mqtt3.1协议中,该为用于表示消息的类型是否为retain。

订阅树在程序中的采用孩子—兄弟链表法来表示。其主要涉及的数据结构是:

struct _mosquitto_subhier

struct _mosquitto_subleaf

3.1.1、订阅树的搭建

1、创建订阅树

mosquitto程序启动时将创建订阅树,该过程将创建三个节点:订阅树总根节点、业务子树根节点和系统子树根节点,这两个子树根节点作为订阅树总根节点的两个子节点,其中订阅树总根节点和业务子树的根节点中topic成员的值为空字符串,而系统子树根节点中保存的值为“$SYS”,如图3-2所示。

图3-2 订阅树的创建

订阅树的创建主要在文件database.c中mqtt3_db_open函数里实现。订阅树中节点的数据结构为struct _mosquitto_subhier,订阅树采用“孩子—兄弟”链表法保存。

2、搭建订阅树

在订阅树中,系统子树与业务子树的搭建过程不一样,系统子树是在系统消息发布时创建,而业务子树创建过程即可以在消息发布时创建也可以在客户端订阅时才创建。

1)系统子树搭建过程

Mosquitto中,系统子树在发布系统消息时,自动检测topic片段是否存在,如果不存在则在系统topic上创建节点以搭建订阅树。例如,mosquitto程序启动时,将首先向系统topic:$SYS/broker/version发送一条版本消息“mosquittoversion 1.2”,此时订阅树的系统子树只有一个根节点,如图3.2所示,其搭建过程如下:

(1)将topic按照”/”分成topic片段,系统Topic:$SYS/broker/version将被分割为$SYS、broker、version三部分。

(2)根据第一个topic片段“$SYS”遍历订阅树的子节点找到系统子树的根节点。

(3)根据topic下一个片段“broker”查找系统子树;此时系统子树中不存在topic片段“broker”的节点,则为订阅树产生一个节点,其数据结构为:struct_mosquitto_subhier。此时订阅树由图3-2变为图3-3所示:

图3-3添加broker节点之后的订阅树

上述过程在函数mqtt3_db_messages_queue中调用函数_sub_add来完成。

(4)依此处理topic剩下的片段,在系统子树中添加topic片段“version”,该过程通过递归调用函数_sub_add来完成。添加完“version”片段之后的订阅树如图3-4所示。

其中,int fd是文件描述符,在mosquitto的中主要指socket描述符(在linux一切I/O皆文件的工作模式下,socket描述符与普通文件描述被同等对待);events成员用于告诉内核,我们对描述符fd关心什么;revents用以说明该描述符fd发生了什么事情。

另外,使用poll是需要注意,每次调用poll函数之前都要重新设置一下各描述符的状态。

3.2.2、mosquitto的消息机制

Mosquito在工作过程中用poll机制检测监听所有socket以判断其是否有数据发送或接收,mosquitto所监听的socket按照其功能分为监听socket和业务socket,监听socket主要负责监听各客户端的连接;业务socket主要负责mosquitto服务器与各客户端之间的数据收发;一旦mosquitto服务器从监听socket中接收到一个客户端连接进来,将立即为该客户端创建一个业务socket,负责后续mosquitto服务器与该客户端的所有数据收发。

Mosquitto中对poll操作主要在/ mosquitto-1.2/src loop.c文件的mosquitto_main_loop函数中,在该函数中将按照以下步骤循环处理所有的socket:

1) 创建pollfd结构体数组pollfds

2) 将要监听的socket放入pollfds

3) 使用poll函数查询pollfds数组中各描述符的状态;

4) Poll函数返回之后,遍历pollfds数组对其中就绪的socket描述符进行处理。

mosquitto_main_loop函数不仅涉及对poll函数的相关处理,它也是整个消息处理架构的实现之处,该函数对消息处理的流程如下图3-11

图3-11 mosquitto函数的消息业务处理流程

在上述消息处理过程中,poll函数返回之后,pollfds数组中socket描述的pollfd结构的revents成员就会置为相应的就绪状态,因此只要循环扫描该数组,即可根据对应socket完成相应的读写操作。

在mosquitto程序中,poll函数所监听的socket有两种类型:监听socket和业务socket,其对应的结果处理函数就分两个过程来完成。

监听socket处理过程主要在mosquitto_main_loop函数中直接完成,它主要完成的工作是处理每个监听socket,如果有客户端连接进入时,为之创建一个业务socket和一个对应的context结构体,context结构体描述了该连接的所有信息,因此在mosquitto程序中,一个context就表示一个客户端连接。

业务socket的处理主要通过loop_handle_reads_writes函数完成,在该函数中将循环检查所有的context,如果该context对应的socket有数据写入则调用函数_mosquitto_packet_write进行写操作;如果对应的socket有数据要读取,则调用函数

_mosquitto_packet_read完成socket的读取和处理,其中读取消息的处理在函数mqtt3_packet_handle中完成,在该函数中将根据不同的消息类型,调用不同的处理函数,业务socket的消息读取及其处理的函数调用关系如图3-12所示。

图3-12 消息读取和处理函数调用关系

3.2.3、mosquitto的消息收发

本文中,消息的收发是相对mosquitto服务器而言,Mosquito的消息的收发主要包括以下两个过程:

1、 消息接收过程是mosquitto服务器接收到客户端向某个主体发布一条消息。

2、 消息发送过程是指Mosquito服务器将消息发送给订阅客户端。

在mosquitto的程序实现中,上述两个过程是分开实现和操作的:

消息接收过程,Mosquitto服务器端在收到客户端向某个主体发送的消息之后,会遍历订阅树,找到该主体的订阅列表,然后将消息挂到订阅列表中每个订阅者的消息队列中。需注意的是,此时消息并实际发送给订阅客户端,只是被挂在了mosquitto服务程序中该客户端对应的context结构中所定义的消息队列中;上述工作过程涉及到的主要函数及其调用过程可参照图3-9所示。

Mosquitto的消息的组装发送过程集中在函数mqtt3_db_message_write中完成,其函数调用关系如下图3-13所示。

图3-13 消息组装和发送函数调用关系

3.3、mosquitto的ping/pong功能

1、为什么mosquitto要引人ping/pong的操作

根据tcp/ip协议的描述,tcp连接建立之后,如果双方没有通信,连接可以一直保存下去,假如中间路由器崩溃或者中间的某条线路断开,只要两端的主机没有被重启,连接就一直被保持着。尽管tcp协议规范中未做次要求,但是在很多tcp协议的实际实现中,却提供了保活定时器的功能,保活定时器一般配置的时间是2小时。在实际的服务器程序开发过程中,2个小时的连接断开的时间太长。因此,很多服务器程序都在上层自己提供保活功能,也就是服务器程序开发过程中经常提到的:心跳连接或ping/pong消息等功能。

2、mosquitto的ping/pong功能描述

在mosquitto中,提供了ping/pong功能来判断连接异常断开的情况,并通过keepalive的参数来控制检查时间,一般客户端需要在keepalive时间内向服务器发送一条消息,表明自己还存在,服务器会周期检查与客户端建立起的每一个连接,如果某个连接在keepalive*1.5的时间内没有收到过消息,则认为这个连接就失效了,于是服务器将主动断开这个超时的连接。

Mosquitto中,每个客户端所对应的context中有两个变量last_msg_out和last_msg_in,分别用于记录该context上次发送和接收消息的时间,然后在mosquitto_main_loop(位于文件loop.c)中每次循环都对每个context的所记录的消息收发时间进行检查,如果超过设定的keepalive时间的1.5倍则断开此客户端的连接,因此,如果mosquitto客户端在keepalive时间内与mosquitto服务器之间存在任何通信(无论是普通消息还是ping/pong消息,都是如此),mosquitto就认为该客户端是连接状态良好的。

3、ping/pong功能对mosquitto性能产生的潜在影响

Mosquitto以keepalive*1.5时间作为判断客户端连接是否异常断开的时间界限,这里keepalive的值对mosquitto的性能会产生较大影响,此值过大,可能无法及时判断处异常的发生;此值过小,不仅浪费网络带宽,还可能造成误判,例如客户端与服务器之间tcp连接上的某个服务器异常重启,可能会被服务器误判为tcp连接断开了。此值需根据实际情况分析后确定。

四、 Mosquito的辅助功能介绍

Mosquitto代码的辅助功能主要包括:log输出功能、配置参数管理功能和内存封装的功能,这三个功能虽不是mosquitto的核心模块,但是却在其源码实现中经常遇到,它们的实现给mosquitto的代码开发带来了很大的方便。

4.1、log输出功能

Mosquitto日志输出功能的实现代码主要在文件/ mosquitto-1.2/src/logging.c中。Mosquitto的log模块具备以下功能:日志分等级输出、日志的多平台输出、日志对各种参数的格式化输出;

mosquitto的日志模块初始化由函数mqtt3_log_init完成,该函数主要完成对日志输出位置以及日志输出等级两个参数的设置。函数mqtt3_log_close完成日志模块的关闭功能。日志模块的输出由函数_mosquitto_log_printf完成,该函数的声明形式为:

int _mosquitto_log_printf(struct mosquitto *mosq, int priority,const char *fmt, ...)

其中第一个参数struct mosquitto *mosq表示某客户端连接所对应的上下文信息,该参数可以为空;第二个参数int priority表示日志的优先级,在mosquitto的配置文件mosquitto.conf中将会配置日志的输出等级,如果传给函数的日志等级低于配置文件中的等级配置,该条日志将不被输出。另外,该函为一可变参数的函数,在使用过程中可以根据需要将参数进行格式化输出。例如下一条日志输出语句:

_mosquitto_log_printf(NULL,MOSQ_LOG_NOTICE, \

在该条日志输出语句中,将保存在client_id中的客户端ID格式化到字符串\中作为本条日志输出的内容,本条日志输出的等级为MOSQ_LOG_NOTICE。

4.2、配置参数管理

Mosquitto参数配置功能的实现代码主要在文件mosquitto-1.2/src/conf.c中,配置文件保存路径为:mosquitto-1.2/ mosquitto.conf,在mosquitto程序中主要由结构体structmqtt3_config保存从配置文件中读取的各参数值。Mosquitto的配置参数管理功能主要包括对配置文件的读取和解析、对用户输入参数的解析以及对配置文件结构体的管理三部分

Mosquitto对外提供的操作接口均以“mqtt3_config_”开头,共有以下四个:

(1) mqtt3_config_init

该函数的声明为:

void mqtt3_config_init(struct mqtt3_config *config)

它主要完成对参数config的各结构体成员的初始化工作。

(2) mqtt3_config_cleanup

该函数的声明为:

void mqtt3_config_cleanup(struct mqtt3_config *config)

它主要完成对参数config的各成员的清除、释放等操作。

(3) mqtt3_config_read

该函数的声明为:

int mqtt3_config_read(struct mqtt3_config *config, bool reload)

它主要完成从配置文件中读取各配置参数到结构体config中,具体的配置文件读取工作由函数_config_read_file完成。

(4) mqtt3_config_parse_args

该函数的声明为:

int mqtt3_config_parse_args(struct mqtt3_config *config, intargc, char *argv[])

它主要完成对用户输出参数的解析工作。

Mosquitto的配置文件内部共提供对整形、布尔类型,字符串三种类型的解析函数,这三个解析函数只共内部调用,分别是:_conf_parse_bool、_conf_parse_int、_conf_parse_string。

4.3、内存操作的封装

Mosquitto对内存操作的封装代码主要在文件mosquitto-1.2/src/memory_mosq.c中实现,共实现了对常用的内存申请与释放相关的系统调用函数,共有:_mosquitto_calloc函数是对系统函数calloc的封装,_mosquitto_malloc函数对系统函数malloc的封装,_mosquitto_realloc函数对系统函数realloc的封装,_mosquitto_free函数对系统函数free的封装,_mosquitto_strdup函数对系统函数strdup的封装。在上述封装函数中如果文件中定义了宏REAL_WITH_MEMORY_TRACKING,则这些封装函数只是对系统函数进行封装,不做任何额外操作。如果定义了REAL_WITH_MEMORY_TRACKING宏,则会在内存申请和释放时分别记录所申请或释放内存的大小。

它主要完成对参数config的各结构体成员的初始化工作。

(2) mqtt3_config_cleanup

该函数的声明为:

void mqtt3_config_cleanup(struct mqtt3_config *config)

它主要完成对参数config的各成员的清除、释放等操作。

(3) mqtt3_config_read

该函数的声明为:

int mqtt3_config_read(struct mqtt3_config *config, bool reload)

它主要完成从配置文件中读取各配置参数到结构体config中,具体的配置文件读取工作由函数_config_read_file完成。

(4) mqtt3_config_parse_args

该函数的声明为:

int mqtt3_config_parse_args(struct mqtt3_config *config, intargc, char *argv[])

它主要完成对用户输出参数的解析工作。

Mosquitto的配置文件内部共提供对整形、布尔类型,字符串三种类型的解析函数,这三个解析函数只共内部调用,分别是:_conf_parse_bool、_conf_parse_int、_conf_parse_string。

4.3、内存操作的封装

Mosquitto对内存操作的封装代码主要在文件mosquitto-1.2/src/memory_mosq.c中实现,共实现了对常用的内存申请与释放相关的系统调用函数,共有:_mosquitto_calloc函数是对系统函数calloc的封装,_mosquitto_malloc函数对系统函数malloc的封装,_mosquitto_realloc函数对系统函数realloc的封装,_mosquitto_free函数对系统函数free的封装,_mosquitto_strdup函数对系统函数strdup的封装。在上述封装函数中如果文件中定义了宏REAL_WITH_MEMORY_TRACKING,则这些封装函数只是对系统函数进行封装,不做任何额外操作。如果定义了REAL_WITH_MEMORY_TRACKING宏,则会在内存申请和释放时分别记录所申请或释放内存的大小。

本文来源:https://www.bwwdw.com/article/oki.html

Top