柴昱含 李道全
摘要:针对诈骗短信近年来出现的主叫号码多变、被叫号码具有随机性、短信内容难以识别等新特性,在综合分析数据时,需要实时的处理海量的数据,而现有数据并发量大、实时性要求高等特点,所以采用什么样的技术去处理这样的数据十分关键。针对以上问题,对实时处理技术进行调研。由于Storm在海量数据实时处理方面具有很好的特性,对Storm进行相关调研。
关键词:Storm;诈骗短信;大数据;Topology;实时流
中图分类号:TP311 文献标识码:A 文章编号:1009-3044(2014)16-3768-06
Abstract: In recent years, in the area of SMS scam, the calling numbers are highly changeable, the called numbers are of great randomicity, and the contents are unidentifiable. With these new features, an enormously great amount of data should be processed simultaneously when analyzing relevant data. Due to the great amount and the simultaneous nature, it is critical that what technique should be adopted to process these data. For the above issues, simultaneously processing technique is studied. As Storm boasts of brilliant capabilities in simultaneous mass data processing, in this paper Storm is investigated.
Key words: Storm ;SMS scam;mass dada;Topology;real-time streaming
实时数据流应用的一个普通模式是对输入数据进行滚动计数,也被称为滑动窗口分析.对于滚动计数的一个典型应用是在一个用户社区内分析热门话题-例如在Twitter-当一个话题已经在一个给定的时间窗口内排名前N位时,则其已经是一个热门话题.本篇文章将介绍如何基于分布式和可扩展的实时流数据处理平台Storm实现这个算法.相同的代码也能使用在其它领域例如安全监控。
1 Storm简介
1.1 Storm基本概念
Storm是一个分布式的实时处理系统,由主节点和从节点构成。其中,主节点只有一个,并运行名为“Nimbus”的守护进程;从节点有多个,每个工作节点都运行一个名为“Supervisor”的守护进程。“Nimbus”进程用于分配代码、布置任务及故障检测,“Supervisor”进程用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来它们就变得十分健壮。主节点和从节点通过ZooKeeper来进行交互,主节点通过ZooKeeper来发布指令,从节点从ZooKeeper读取指令并执行。ZooKeeper用于管理集群中的不同组件,ZeroMQ是内部消息系统。
Storm的基本元素包括Topology、Stream、Spout、Bolt等。
Topology[2]:一个计算任务被称为一个Topology,由多个Spout和Bolt组成。Topology任务一旦提交将会一直运行,除非主动停止任务,如图2。
Stream:即数据流,是Storm中对数据的抽象,它是时间上无界的tuple元组序列。在Topology中,Spout是Stream的发射器,从特定数据源获取数据发射Stream;一个Bolt可以接收多个Stream作为输入,然后对数据进行加工处理, Bolt还可以发射新的Stream给下级Bolt进行处理。
Spout:消息源,可从外部获取数据并将获取的数据作为消息源提交给Topology。Spout包括可靠消息源和不可靠消息源两类,可靠消息源将会对没有被成功处理的tuple进行重发,不可靠消息源不会重发。
Bolt:消息处理单元,可以执行过滤, 聚合, 查询等操作。
Topology[3]定义代码示例:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“spout”, new KafkaSpout(),2);
builder.setBolt(“smsBolt”, new SMSParerBolt (), 8).shuffleGrouping(“spout”);
builder.setBolt(“filterBolt”, new FilterCallingNumberBolt (), 2)
. shuffleGrouping (“smsBolt”). setNumTasks(4);
通过TopologyBuilder的setSpout()方法设置Spout,示例中”spout”为Spout的别名,KafkaSpout为Spout的一个实现类,后面的数字2为并发线程数。通过setBolt()方法设置Bolt,实例中分别设置了别名为”smsBolt”、线程并发度为8的SMSParerBolt和别名为”filterBolt”、线程并发度为2的FilterCallingNumberBolt,setBolt后的shuffleGrouping为流分组策略,setNumTasks为该Bolt对应Task数。关于并发度、流分组策略、Task数的概念将在之后小节介绍。endprint
1.2 Storm流分组策略
流分组策略(StreamGrouping)[4],用于设置Bolt的Task间数据的分配策略,包括以下几类:
Shuffle Grouping:随机分组,随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
Fields Grouping:按字段分组,比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts里的一个task, 而不同的userid则会被分配到不同的bolts里的task。
All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
Global Grouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
2 基于Storm的滑动窗口实现
2.1 热门话题和滑动窗口
首先,解释一下什么是热门话题,以便于我们有一个共识。
2.1.1 热门话题
一个单词、短语或话题相比其它的标签有更多地被标记的概率则被称为热门话题。成为热门话题一方面来自于用户的一致关注,另一方面因为某些特殊事件而引起人们的关注。这些话题有助于用户了解当前世界正在发生什么。
换一句话,它说明了在一个用户社区内“热点是什么”。通常,你会对一个给定时间范围内的热门话题感兴趣,例如,在过去五分钟内或一天内最流行的话题。所以,对于“热点是什么”更准确的描述应该是“今天的热点是什么”或“这周的热点是什么”。
在这篇文章中,我们假设我们有一个系统,这个系统使用TwitterAPI去获取最新的数据。更进一步假设,我们有一种机制可以从Twitter的消息中以单词的形式标识主题。例如,我们可以选择用一个简单的模式匹配算法处理主题标签#。
我们设计我们的系统,如果一个主题A比主题B更多次数被提到,我们则认为主题A比主题B更流行。这意味着我们只需要去统计Twitter中主题被引用的次数。
对于本文我们不关心这些主题是怎样从用户内容或用户活动中衍生出来的,只要知道这些衍生出来的主题用文本的方式表示就行了。然后,Storm的拓扑逻辑会通过滚动计算和排名计算分析出输入数据的实时热门话题。前期关注去对一定时间范围内用户输入数据的过滤,后期关注与对热门话题的排名。
我们期望Storm拓扑逻辑定期的产生TopN热门话题,就像下面的输出,t0-t2是不同的时间点:
Rank @ t0 ——> t1 ——> t2 —————————————————————— 1. java (33) ruby (41) scala (32) 2. php (30) scala (28) python (29) 3. scala (21) java (27) ruby (24) 4. ruby (16) python (21) java (21) 5. python (15) php (14) erlang (18)
在这个例子中我们可以看出“scala”已经成为最热门话题。
2.1.2 滑动窗口
在之前的背景介绍中,我想要说明的是滑动窗口即滚动计算。一张图片胜过千言万语:
在上述例子中,对滑动窗口内的数据求和。
一个公式可能是很好的解释:
从大小到时间:如果我们假设窗口每5分钟前进一次,则输入数据中每个块内的数字代表过去相同时间间隔内收集的数据量。在本例中,窗口大小是N*m分钟。简单的说,如果N=1和m=5,则我们的滑动窗口算法没一分钟提交过去五分钟的数据。
现在我们已经介绍过了热门话题和时间窗口,我们最后来谈谈如何通过代码实现。
2.2 滑动窗口的实现
2.2.1 实现数据结构
接下来我们介绍一下核心数据结构。正如你所看到的,一个有趣的特性是这些数据结构与Storm的内部特性是完全独立的。我们的Storm bolts将要使用它们,当然,数据结构对于Storm没有依赖。
计数所使用的类:SlotBasedCounter, SlidingWindowCounter
排名所使用的类:Rankings, Rankable, RankableObjectWithFields
另一个显著的改善是代码删除了不必的代码并且使用线程相关代码,例如同步或手动启动后台线程。并且数据结构也不与系统时间交互。消除直接调用系统时间并且手动开启后台线程是得新代码比老代码更简单和容易测试。endprint
// such code from the old RollingCountObjects bolt is not needed anymore
long delta = millisPerBucket(_numBuckets) - (System.currentTimeMillis() % millisPerBucket(_numBuckets));
Utils.sleep(delta);
SlotBasedCounter
SlotBasedCounter类提供了对于对象计数的功能。用于计数的Slots的数量是固定的。当前类提供了四个公共方法:
public void incrementCount(T obj, int slot);
public void wipeSlot(int slot):
public long getCount(T obj, int slot)
public Map
例子:
SlotBasedCounter counter = new SlotBasedCounter
Object trackMe = ...;
int currentSlot = 0;
counter.incrementCount(trackMe, currentSlot);
long counts = counter.getCount(trackMe, currentSlot);
Map
SlotBasedCounter内部使用Map
在上面的例子中SlotBasedCounter有5个slot用于计数。
SlotBasedCounter是我们可以使用的一个比较原始的类,它是滑动计数窗口的一个组成部分,接下来我们继续进行介绍。
2.2.2 SlidingWindowCounter
SlidingWindowCounter类提供了滚动计数的功能。它的计数功能基于SlotBasedCounter类。滑动窗口的大小与SlidingWindowCounter实例的slot的数量是相等的。RollingCountBolt使用它对输入的tuple进行计数。
这个类提供了2个方法:
public void incrementCount(T obj);
Map
读者可能惊奇的发现我们的滑动窗口与时间没有什么关联,因为通常滑动窗口是基于时间的。在我们的例子中,窗口不随着时间前进,除了调用getCountsThenAdvanceWindow方法。这意味着SlidingWindowCounter的行为就像一个环形缓冲区,从一个窗口前进到下一个窗口。
请注意,例子中是一个8个slot的滑动窗口,其中每个slot只显示了一个计数器。实际上有多个计数器对对象进行跟踪。
下面是一个图解,展示了SlidingWindowCounter多次迭代的效果:
2.2.3 Rankings and Rankable
Rankings类用于一定数量的排名,例如前十名。它依据对象的自然顺序从大到小进行排序。这个类由AbstractRankerBolt使用,它的bolt用于跟踪过去一段时间内对象的当前排名情况。
这个类有5个方法:
public void updateWith(Rankable r);
public void updateWith(Rankings other);
public List
public int maxSize();
public int size();
无论什么时候你更新排名的时候,它都会丢弃低于topN的数据,N是排名的最大数量。
正常情况下我们的排序是依据对象的自然次序。在我们的特殊例子中,我创建了一个Rankable接口,这个接口实现了Comparable。实际情况中,你可以传递一个Rankable对象到Rankings类中,这样排序条件将会更新。
Rankings topTen = new Rankings(10);
Rankable C = ...;
topTen.updateWith(r);
List
实现了Rankable的具体类是RankableObjectWithFields。IntermediateRankingsBolt类通过工厂方法将输入数据创建为Rankable对象。
@Override
void updateRankingsWithTuple(Tuple tuple) {
客服热线:400-656-5456 客服专线:010-56265043 电子邮箱:longyuankf@126.com
电信与信息服务业务经营许可证:京icp证060024号
Dragonsource.com Inc. All Rights Reserved
Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable);
}
仔细看一下Rankings, Rankable 和RankableObjectWithFields 。如果你自己不得不去实现这些类并且你是一个有经验的工程师,那么你一定会实现equals() 和 hashCode()方法。
2.2.4 实现Rolling Top Words 拓扑逻辑
实现Rolling Top Words 拓扑逻辑
在阅读这一部分的时候,“words”代表了我们所设想的系统中用户提到的主题。
Rolling Top Words拓扑逻辑由TestWordSpout, RollingCountBolt, IntermediateRankingsBolt 和TotalRankingsBolt组成
滑动窗口的大小和提交频率只是个例子,在我们例子中有一个5分钟的滑动窗口并且每分钟提交一次。
主要工作如下:
1)拓扑逻辑的第一层TestWordSpout模拟输入数据-用户提到的主题。
2)第二层RollingCountBolt对输入数据进行计数
3)第三层IntermediateRankingsBolt对数据进行排名
4)最后,TotalRankingsBolt汇聚数据,输出总排名。
拓扑逻辑代码如下:
builder.setSpout(spoutId, new TestWordSpout(), 2);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 3) .fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 2) .fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId,new TotalRankingsBolt(TOP_N)) .globalGrouping(intermediateRankerId);
参考文献:
[1] 张春麟.手机垃圾短信过滤平台的分析与应用[D].北京:北京邮电大学,2010.
[2] 互联网文档资源.storm-0.8.2源码分析之topology启动[DB/OL].http://blog.csdn.net/chlaws/article/details/10562035,2013.
[3] 互联网文档资源.Storm-源码分析-Component,Executor,Task之间关系[DB/OL].http://www.cnblogs.com/fxjwind/,2013.
[4] 互联网文档资源.Tutorial[DB/OL].https://github.com/nathanmarz/storm/wiki/Tutorial,2012.
[5] 互联网文档资源.Tutorial[DB/OL].http://storm.incubator.apache.org/documentation/Tutorial.html,2012.
[6] 互联网文档资源.使用Storm实现实时大数据分析[DB/OL].http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis,2012.
[7] 互联网文档资源.Storm快速理解[DB/OL].http://blog.csdn.net/colorant/article/details/8256039,2012.
[8] 互联网文档资源.徐明明.Twitter Storm: Transactional Topolgoy简介[DB/OL].http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/,2012.
[9] 互联网文档资源.Storm之trident聚合操作介绍[DB/OL].http://blog.sina.com.cn/s/blog_6ff05a2c0101k6xj.html,2013.
[10] 互联网文档资源.[翻译][Trident] Storm Trident教程[DB/OL].http://blog.csdn.net/derekjiang/article/details/9126185,2013.endprint
Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable);
}
仔细看一下Rankings, Rankable 和RankableObjectWithFields 。如果你自己不得不去实现这些类并且你是一个有经验的工程师,那么你一定会实现equals() 和 hashCode()方法。
2.2.4 实现Rolling Top Words 拓扑逻辑
实现Rolling Top Words 拓扑逻辑
在阅读这一部分的时候,“words”代表了我们所设想的系统中用户提到的主题。
Rolling Top Words拓扑逻辑由TestWordSpout, RollingCountBolt, IntermediateRankingsBolt 和TotalRankingsBolt组成
滑动窗口的大小和提交频率只是个例子,在我们例子中有一个5分钟的滑动窗口并且每分钟提交一次。
主要工作如下:
1)拓扑逻辑的第一层TestWordSpout模拟输入数据-用户提到的主题。
2)第二层RollingCountBolt对输入数据进行计数
3)第三层IntermediateRankingsBolt对数据进行排名
4)最后,TotalRankingsBolt汇聚数据,输出总排名。
拓扑逻辑代码如下:
builder.setSpout(spoutId, new TestWordSpout(), 2);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 3) .fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 2) .fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId,new TotalRankingsBolt(TOP_N)) .globalGrouping(intermediateRankerId);
参考文献:
[1] 张春麟.手机垃圾短信过滤平台的分析与应用[D].北京:北京邮电大学,2010.
[2] 互联网文档资源.storm-0.8.2源码分析之topology启动[DB/OL].http://blog.csdn.net/chlaws/article/details/10562035,2013.
[3] 互联网文档资源.Storm-源码分析-Component,Executor,Task之间关系[DB/OL].http://www.cnblogs.com/fxjwind/,2013.
[4] 互联网文档资源.Tutorial[DB/OL].https://github.com/nathanmarz/storm/wiki/Tutorial,2012.
[5] 互联网文档资源.Tutorial[DB/OL].http://storm.incubator.apache.org/documentation/Tutorial.html,2012.
[6] 互联网文档资源.使用Storm实现实时大数据分析[DB/OL].http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis,2012.
[7] 互联网文档资源.Storm快速理解[DB/OL].http://blog.csdn.net/colorant/article/details/8256039,2012.
[8] 互联网文档资源.徐明明.Twitter Storm: Transactional Topolgoy简介[DB/OL].http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/,2012.
[9] 互联网文档资源.Storm之trident聚合操作介绍[DB/OL].http://blog.sina.com.cn/s/blog_6ff05a2c0101k6xj.html,2013.
[10] 互联网文档资源.[翻译][Trident] Storm Trident教程[DB/OL].http://blog.csdn.net/derekjiang/article/details/9126185,2013.endprint
Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable);
}
仔细看一下Rankings, Rankable 和RankableObjectWithFields 。如果你自己不得不去实现这些类并且你是一个有经验的工程师,那么你一定会实现equals() 和 hashCode()方法。
2.2.4 实现Rolling Top Words 拓扑逻辑
实现Rolling Top Words 拓扑逻辑
在阅读这一部分的时候,“words”代表了我们所设想的系统中用户提到的主题。
Rolling Top Words拓扑逻辑由TestWordSpout, RollingCountBolt, IntermediateRankingsBolt 和TotalRankingsBolt组成
滑动窗口的大小和提交频率只是个例子,在我们例子中有一个5分钟的滑动窗口并且每分钟提交一次。
主要工作如下:
1)拓扑逻辑的第一层TestWordSpout模拟输入数据-用户提到的主题。
2)第二层RollingCountBolt对输入数据进行计数
3)第三层IntermediateRankingsBolt对数据进行排名
4)最后,TotalRankingsBolt汇聚数据,输出总排名。
拓扑逻辑代码如下:
builder.setSpout(spoutId, new TestWordSpout(), 2);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 3) .fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 2) .fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId,new TotalRankingsBolt(TOP_N)) .globalGrouping(intermediateRankerId);
参考文献:
[1] 张春麟.手机垃圾短信过滤平台的分析与应用[D].北京:北京邮电大学,2010.
[2] 互联网文档资源.storm-0.8.2源码分析之topology启动[DB/OL].http://blog.csdn.net/chlaws/article/details/10562035,2013.
[3] 互联网文档资源.Storm-源码分析-Component,Executor,Task之间关系[DB/OL].http://www.cnblogs.com/fxjwind/,2013.
[4] 互联网文档资源.Tutorial[DB/OL].https://github.com/nathanmarz/storm/wiki/Tutorial,2012.
[5] 互联网文档资源.Tutorial[DB/OL].http://storm.incubator.apache.org/documentation/Tutorial.html,2012.
[6] 互联网文档资源.使用Storm实现实时大数据分析[DB/OL].http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis,2012.
[7] 互联网文档资源.Storm快速理解[DB/OL].http://blog.csdn.net/colorant/article/details/8256039,2012.
[8] 互联网文档资源.徐明明.Twitter Storm: Transactional Topolgoy简介[DB/OL].http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/,2012.
[9] 互联网文档资源.Storm之trident聚合操作介绍[DB/OL].http://blog.sina.com.cn/s/blog_6ff05a2c0101k6xj.html,2013.
[10] 互联网文档资源.[翻译][Trident] Storm Trident教程[DB/OL].http://blog.csdn.net/derekjiang/article/details/9126185,2013.endprint