,,,,
(许继电气股份有限公司,河南 许昌 461000)
消息总线服务为进程间(多台计算机之间和一台计算机内部)的消息传输提供支持,具有消息主题的创建 / 删除、消息的订阅 / 发布等功能,以接口函数的形式提供给各类应用;具有组播、广播和点到点传输形式,支持一对多、一对一的信息交换场合[1]。当发送者需要分发一组新的消息时,它向消息总线服务器发出一个请求,消息总线服务器会负责把请求递送给所有的订阅者。经过对当前各种开源软件和其它行业软件的分析对比, 配电系统决定采用 Kafka 作为消息总线的主体服务。
消息总线服务为配电主站系统的各个模块提供基于 topic 的消息订阅和发布,消息总线服务应考虑系统的集群部署、系统冗余功能等[2];消息总线服务应具备足够的吞吐量,处理大量消息传递能力, 所以较高的性能是消息总线的基本要求。
消息总线采用 Qt/C++ 对 Kafka 的客户端进行封装,屏蔽各种网络处理细节,给平台其它应用一个简单易用的接口[3]。整个消息处理组件主要包括两部分内容:消息的发布和消息的处理。为了简化应用,消息体现为函数参数,可以任意定义消息体结 构而不改变程序结构。消息的处理采用回调函数的方式,向系统注册处理某种 topic 的回调函数即可。 下面具体介绍一下消息发布和消息处理的实现机制。
消息发布机制是对kafka客户端提供的信息发送函数进行封装,以达到向服务器上传数据的目的[4]。由于事先不确定用户上送的数据类型,封装采用模板模式,支持任意数据类型。上送方式为单条上送,即一次只能上送一条数据,数据大小不做限制。并且接收服务器返回的上送数据状态,成功返回0,失败返回-1等。
消息处理采用的是向服务器注册机制,对kafka客户端提供的注册函数进行封装,以达到向服务器订阅数据的目的。封装技术与发布机制类似,采用模板形式。注册可以分组注册也可以同一组批量注册。同一用户可以注册到服务器的不同组,但不能在同一组内多次注册。
消息总线的性能指标主要包括:大数据容量、数据处理速率、消费者个数上限、消费者个数上限、稳定性等性能指标。以上几项性能指标中大数据容量、数据处理速率、消费者个数上限和稳定性是最重要的,接下来我们具体分析这两项性能指标。
大数据容量分为单机数据容量和集群数据容量,是指消息总线的数据吞吐量[5]。主要测试方法是采用控制变量的方法将测试用例中的消息数量和单个消息大小分别增大,将数据总量逐级增加,测试容量上限,记录上限值与单机最大容量是否成正比例。在上限范围内,单机或集群各项功能正常工作。
数据处理速率又包括:生产者数据处理速率、消费者数据处理速率、网络风暴中单条生产者生产速率和网络风暴中单条消费者接收速率。
生产者数据处理速率是指生产者生产数据的效率。主要测试方法是启用测试用例,以50%容量上限值发送数据,记录Producer的生产时间,计算生产者的数据处理速率;更改测试用例,逐步提升到80%容量上限值发送量,记录Producer生产时间,计算生产者数据处理速率,直至发送量提升至容量上限值,借此测试生产者数据处理速率上限。在上限范围内,集群各项功能正常工作。
消费者数据处理速率指消费者的接收效率。主要测试方法是启用测试用例,以50%容量上限值发送数据,记录Consumer的消费时间,计算消费者的数据处理速率;更改测试用例,逐步提升到80%容量上限值发送量,记录Consumer消费时间,计算消费者数据处理速率,直至发送量提升至容量上限值,借此测试消费者数据处理速率上限。在上限范围内,集群各项功能正常工作。
网络风暴中单条生产者生产速率指网络风暴情况下生产者生产一条数据的效率[6]。主要测试方法是更改测试用例,逐步提升到80%容量上限值发送量,记录Producer生产时间,计算生产者单条数据处理速率,直至发送量提升至容量上限值,借此测试生产者单条数据处理速率上限。在上限范围内,集群各项功能正常工作。
网络风暴中单条消费者接收速率指网络风暴情况下消费者接收一条数据的效率。主要测试方法是更改测试用例,逐步提升到80%容量上限值发送量,记录Consumer消费时间,计算消费者单条数据处理速率,直至发送量提升至容量上限值,借此测试消费者单条数据处理速率上限。在上限范围内,集群各项功能正常工作。
消费者个数上限又分为单机消费者个数上限和集群消费者个数上限。其中单机消费者上限是指单机消费者的最大上限,主要测试方法是启动测试用例,使Producer以50%容量上限值持续生产数据,在一台服务器上逐步增加Consumer个数同时消费,测试消费者个数上限;将测试用例调整至容量上限值,重复上述工作,测试消费者上限个数。在上限范围内,各项功能正常工作。集群消费者个数上限是指整个集群的消费者上限者上限,主要测试方法是启动测试用例,使Producer以50%容量上限值持续生产数据,在多台服务器上逐步增加Consumer个数同时消费,测试消费者总个数上限;将测试用例调整至容量上限值,重复上述工作,测试消费者上限总个数,在上限范围内,集群各项功能正常工作。
稳定性主要是指消息总线服务长时间稳定的运行和在高负荷下稳定的运行。具体长时间稳定运行是指最低保证50%容量上限值72小时稳定运行,具体测试方法是更改测试用例,50%容量上限值发送数据量,进行72小时拷机测试,拷机期间各项功能正常,程序不应产生异常或死机等现象,拷机测试期间观测记录CPU平均负荷率,拷机测试期间,观测内存占有率,不发生内存泄露,拷机测试期间,各节点网络平均负荷率。并且最低保证80%容量上限值30分钟稳定运行,具体测试方法是更改测试用例,80%容量上限值发送数据量,进行30分钟拷机测试,拷机期间各项功能正常,程序不应产生异常或死机等现象,拷机测试期间观测记录CPU平均负荷率,拷机测试期间,观测记录内存占有率,不发生内存泄露,拷机测试期间,记录各节点网络平均负荷率。高负荷稳定运行是指最低保证100%容量上限值5分钟稳定运行,具体测试方法是更改测试用例,100%容量上限值发送数据量,进行5分钟压力测试,压力测试期间各项功能正常,程序不应产生异常或死机等现象,拷机测试期间观测记录CPU平均负荷率,拷机测试期间,观测记录内存占有率,不发生内存泄露,拷机测试期间,记录各节点网络平均负荷率。
消息总线测试环境采用三台服务器搭载Kafka服务、组成集群,系统采用单网千兆网络进行连接,满足2k+1配置,至少k+1台服务器正常使用时,保证消息总线能正常工作。见图1。
如图2所示,消息总线软件测试框架主要包括大数据生成、数据发送、数据接收和时间统计4个功能模块。
图2 消息总线性能测试软件框架功能图
数据生成模块的主要功能是生成测试数据,为了便于统计数据大小,这里数据类型统一使用字符型。可以单个添加数据,也可以批量生成大量数据。数据发送模块主要功能是与消息总线服务器建立链接并发送数据到服务器。数据接收模块主要功能是消费者注册和接收数据。时间统计模块主要功能是统计并记录消息总线处理数据的时间。由于要处理海量的数据,这里采用多线程技术,来保证测试框架界面的操作流畅。
Linux系统上采用 Qt/C++实现软件框架。主窗口类设计如下:
class PUITest: public QMainWindow
{
Q_OBJECT
public:
explicit PUITest(QWidget *parent = 0);
~PUITest();
int m_DowNum,m_DowNum_reg,outcnt,totalcnt,cb_cnt;
outputDockWidget *m_InfoOutput;//操作信息输出窗口
sendThread *sendmsgthread; //数据发送线程
runthread *runmsgthread; //海量数据发送线程
bool runOverflag;
int tId;//线程ID
void removeDockTitle(QDockWidget *docwidget);//去除头标志
void timerEvent(QTimerEvent *event);//定时器自动调用函数
void InitTabWidget();
int test2(const QString str);
void Output(QString s);
void msgsendThread();
void magrunThread();
public slots:
void sendmsgover();
private slots:
void on_action_about_triggered();
void on_action_add_triggered();
void on_action_delete_triggered();
void n_action_reg_triggered();
void on_action_send_triggered();
void on_checkBox_clicked();
void on_action_unreg_triggered();
void on_action_addNumbers_triggered();
void on_action_save_triggered();
void on_action_open_triggered();
void on_action_run_triggered();
private:
Ui::PUITest *ui;
};
线程类设计如下:
class sendThread: public QThread
{
Q_OBJECT
signals:
void sendover();//发送数据结束信号
protected:
void run();//线程自动调用
};
框架实现主界面如图3所示,主界面中有两个分页面,一个是发送信息配置,主要包括数据生成、发送次数设置、发送延时设置等。一个是接收信息配置,主要是注册信息的设置等。数据可以单个添加,更可以批量生成。另外还有操作提示窗口,时间统计窗口等。
图3 消息总线测试软件框架主界面
采用前几节中提出的测试方法及设计的测试框架,对消息总线性能进行测试。测试中使用三台服务器组成消息总线集群,两台客户端安装测试程序。具体软硬件信息如表1和表2所示。
表1 软硬件配置环境
表2 软件消息
通过测试框架的测试软件自动生成海量的测试数据,并自动统计数据处理时间和读取服务器资源占用信息。以下所有记录均在HP DL560 G8 服务器上进行测试,测试10次求平均值,性能测试结果如表3~6所示。
表3 生产者生产速率
表4 消费者消费速率
表5 网络风暴中单条生产信息生产时间
表6 网络风暴中单条消费消息收时间
表3是生产者生产效率的的测试结果,从该测试结果中可以发现,发送量一定的情况下,随着单条数据字节数的增加,发送效率逐渐降低。表4是消费者消费效率的测试的测试结果,从该测试结果中可以发现,接收量一定的情况下,随着单条数据字节数的增加,接收效率逐渐降低。通过对比发现,消费效率要明显高于生产效率。表5是网络风暴中单条信息的生产效率测试结果,从该测试结果中可以发现,不同程度网络风暴情况下,生产单条数据的效率相对稳定。也就是说网络风暴对生产者的生产效率影响不大。表6是网络风暴中单条消费信息的消费效率的测试结果,从该测试结果中可以发现,不同程度网络风暴情况下,消费单条数据的效率不稳定。也就是说网络风暴对消费者的效率影响较大。
消费者上限测试,当消费者为10 000时,系统各项指标均正常运行。这以远远超出我们实际应用时的上限,新型配电主站系统实际应用时的消费者数量不超过1 000,所以这里不再具体测试。
稳定性测试时接入不少于 10 000台配电终端,每台终端定义100个“ 三遥” 数据信息接入被测系统,系统应不间断运行维持 72 h, 被测系统 cpu及内存使用情况如下:
1)当50%发送速率拷机,单条消息字节长度为1 024 byte时,内存占用率和网络负荷负荷率正常;
2)当80%发送速率拷机,单条消息字节长度为1 024 byte时,内存占用率和网络负荷负荷率正常;
3)当100%发送速率拷机,单条消息字节长度为1 024 byte时,内存占用率和网络负荷负荷率较高。
经性能测试,除了掌握以上性能规律外,还发现了两个有价值的问题,问题1:当向消息总线服务器发送海量数据,导致服务器存储空间不足时,消息总线服务直接崩溃;问题2:向消息总线服务器长时间发送1 024 byte大小的数据流时,服务器内存占用过大。以上性能数据及问题得到了设计人员的高度认可,充分体现了文中设计的测试框架的价值。
随着配电信息数据规模的不断扩大,对消息总线的性能要求越来越高,而数据吞吐量和数据处理效率是消息总线性能的重要指标。本文通过对kafka为服务主体的消息总线性能进行分析后,提出了效果较好的测试方法,设计了实用性很强的测试框架。经实际应用证明,该测试框架简单易用,在很大程度上提高了性能测试的效率。该框架有较好的通用性,其他消息总线的测试也可以使用。