Storm实时流处理框架PPT演讲李乾DHU).ppt
实时流处理框架Storm,演讲:李乾文,2013年11月5日,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,实时流计算背景,参考:1.http:/storm入门教程 第一章 前言2.http:/流处理框架Storm简介,RPC(RemoteProcedureCallProtocol)远程过程调用协议,随着互联网的更进一步发展,信息浏览、搜索、关系交互传递型,以及电子商务、互联网旅游生活产品等将生活中的流通环节在线化。对于实时性的要求进一步提升,而信息的交互和沟通正在从点对点往信息链甚至信息网的方向发展,这样必然带来数据在各个维度的交叉关联,数据爆炸已不可避免。因此流式处理和NoSQL产品应运而生,分别解决实时框架和数据大规模存储计算的问题。流式处理可以用于3种不同场景:事件流、持续计算以及分布式RPC。,数据分析系统组成,参考:1.http:/流式计算系统,数据分析系统整体组成示意图,如HDFS,流处理与批处理,参考:1.实时处理方案架构(作者-落枫).pdf,Storm 关注的是数据多次处理一次写入,而 hadoop 关注的是数据一次写入,多次查询使用。Storm系统运行起来后是持续不断的,而 hadoop往往只是在业务需要时调用数据。,Storm和Hadoop角色对比,参考:1.http:/流式计算系统,Storm组件,参考:1.http:/storm简介,Topology:storm中运行的一个实时应用程序.Nimbus:负责资源分配和任务调度.Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程.Worker:运行具体处理组件逻辑的进程.Task:worker中每一个spout/bolt的线程称为一个task.Spout:在一个topology中产生源数据流的组件.Bolt:在一个topology中接受数据然后执行处理的组件.Tuple:一次消息传递的基本单元.Stream grouping:消息的分组方法,Storm组件,参考:1.百度图片搜索,Storm组件,参考:1.http:/Storm入门教程 第二章 构建Topology,Storm特点,参考:1.百度百科2.http:/storm入门教程 第一章 前言3.http:/Storm安装部署步骤,可扩展 计算任务可在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展高可靠 保证每条消息都能被完全处理高容错性 nimbus、supervisor都是无状态的,可以用kill-9来杀死Nimbus和Supervisor进程,然后再重启它们,任务照常进行.当worker失败后,supervisor会尝试在本机重启它支持多种编程语言 除了用java实现spout和bolt,还可用其他语言支持本地模式 可在本地模拟一个Storm集群功能、进行本地测试高效 用ZeroMQ作为底层消息队列,保证消息能快速被处理,目录,Storm介绍Storm环境配置Storm程序实例Storm总结及问题,依赖软件,Storm的依赖软件有Python、Zeromq、Jzmq、Zookeeper。此外,系统应安装有Java、GCC、G+编译环境。,参考:1.http:/Twitter Storm 安装实战2.http:/centos 怎么安装 g+,安装Python,#wget http:/www.python.org/ftp/python/2.7.2/Python-2.7.2.tgz#tar zxvf Python-2.7.2.tgz#cd Python-2.7.2#./configure#make#make install#vi/etc/ld.so.conf 追加/usr/local/lib/#sudo ldconfig这样的话,Python2.7.2就安装完毕了。,参考:1.http:/Twitter Storm 安装实战2.http:/linux ld.so.conf 和 pkgconf3.http:/linux下python安装,安装Zeromq,jzmq的安装是依赖zeromq的,所以应该先装zeromq,再装jzmq。#wget http:/download.zeromq.org/zeromq-2.1.7.tar.gz#tar zxf zeromq-2.1.7.tar.gz#cd zeromq-2.1.7#./configure#make#make install#sudo ldconfig 2)安装jzmq#yum install git(CentOS)&apt-get install git(Ubuntu)#git clone git:/cd jzmq#./autogen.sh#./configure#make#make install在过程中很可能会遇到依赖库相关问题,详见参考1,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】,可到https:/,SSH免密码配置,参考:1.http:/Hadoop集群(第5期副刊)_JDK和SSH无密码配置,为使各台计算机间Zookeeper同步数据、应配置SSH免密码登录1 确认本机sshd的配置文件(root)$vi/etc/ssh/sshd_config找到以下内容,并去掉注释符#RSAAuthentication yesPubkeyAuthentication yesAuthorizedKeysFile.ssh/authorized_keys2 如果修改了配置文件需要重启sshd服务(root)$vi/sbin/service sshd restart3 ssh登陆系统 后执行测试命令$ssh localhost回车会提示你输入密码,因为此时我们还没有生成证书。4 生成证书公私钥$ssh-keygen-t dsa-P-f/.ssh/id_dsa$cat/.ssh/id_dsa.pub/.ssh/authorized_keys,SSH免密码配置,参考:1.http:/Hadoop集群(第5期副刊)_JDK和SSH无密码配置,5 拷贝本地生产的key到远程服务器端$cat/.ssh/id_rsa.pub|ssh 远程用户名远程服务器ip cat-/.ssh/authorized_keys6 测试登陆 ssh user远程ip7 如果登陆不成功,需要修改远程服务器上的authorized_keys文件权限$chmod 600/.ssh/authorized_keys,安装Zookeeper及单机配置,#wget http:/ftp.meisei-u.ac.jp/mirror/apache/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz#tar-zxf zookeeper-3.4.5.tar.gz#cp-R zookeeper-3.4.5/usr/local/#ln-s/usr/local/zookeeper-3.4.5/usr/local/zookeeper#vi./bashrc(设置ZOOKEEPER_HOME和ZOOKEEPER_HOME/bin环境变量,vim/etc/profile&source/etc/profile)追加:export ZOOKEEPER_HOME=/path/to/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin#cd/usr/local/zookeeper/conf/#cp zoo_sample.cfg zoo.cfg(用zoo_sample.cfg制作$ZOOKEEPER_HOME/conf/zoo.cfg)zookeeper的单机安装完成。启动服务 bin/zkServer.sh start用bin/zkServer.sh status 查看会显示standalone启动客户端测试bin/zkCli.sh-server 127.0.0.1:2181,参考:1.http:/Twitter Storm 安装实战2.http:/linux下zookeeper安装与测试,安装Zookeeper及单机配置,zoo_sample.cfg内容:,Zookeeper集群配置,参考:1.http:/Twitter Storm 安装实战2.http:/linux下zookeeper安装与测试3.http:/ZooKeeper系列之十:ZooKeeper的一致性保证及Leader选举,在单机配置文件zoo.cfg末尾加上server.1=192.168.61.130:2888:3888server.2=192.168.61.134:2888:3888server.3=192.168.61.135:2888:3888,格式为:server.id=host:port:port id是为每个Zookeeper节点的编号同时需要在 dataDir目录下面新建文件myid,内容为$id数值.echo$id$dataDir/myid第一个port是用于follower连接leader的端口,第二个port是用于leader选举的端口.当集群中的leader宕机后其中一台follower的模式转变成leader.所以Storm集群中的Nimbus主机不一定是leader.,关于server.id=host:port:port,server.x=hostname:nnnnn:nnnnn,etcservers making up the ZooKeeper ensemble.When the server starts up,it determines which server it is by looking for the file myid in the data directory.That file contains the server number,in ASCII,and it should match x in server.x in the left hand side of this setting.,http:/zookeeper.apache.org/doc/r3.4.3/zookeeperAdmin.html#sc_CrossMachineRequirements,官方解释,安装Storm及单机配置,#wget https:/unzip storm-0.8.1.zip#cp-R storm-0.8.1/usr/local/vim/.bashrc 追加export STORM_HOME=/usr/local/storm-0.8.1 export PATH=$PATH:$STORM_HOME/bin到此为止单机版的Storm就安装完毕了。,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】,Storm下载地址http:/storm-,Storm集群配置,配置文件:storm.yaml#These MUST be filled in for a storm configuration storm.zookeeper.servers:-192.168.61.130-192.168.61.134-192.168.61.135 nimbus.host:192.168.61.130 storm.local.dir:/data/storm/data ui.port:8080,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】,Storm下载地址http:/storm-,启动Storm,启动Storm(在此之前要启动Zookeeper)./storm nimbus/dev/null 2&1&./storm supervisor/dev/null 2&1&./storm ui/dev/null 2&1&,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】3.http:/linux nohup命令详解,./command.sh output 2&1&意思是把标准错误(2)重定向到标准输出中(1),而标准输出又导入文件output里面 所以结果是标准错误和标准输出都导入文件output里面了。,关闭Storm,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】3.http:/最近碰到的一些storm问题总结,结束nimbus进程rootlocalhost bin#kill 2781(nimbus进程号),关闭nimbus相关进程:kill ps aux|egrep(daemon.nimbus)|(storm.ui.core)|fgrep-v egrep|awk print$2 关闭supervisor上的所有storm进程:kill ps aux|fgrep storm|fgrep-v fgrep|awk print$2,Storm环境配置,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】,启动后可以通过http:/nimbus host:8080 观察集群的worker资源使用情况,Topologies的运行状态等信息.,1)提交Storm Topology:storm jar mycode.jar storm.MyTopology arg1 arg2.mycode.jar是包含Topology实现代码的jar包,storm.MyTopology的main方法是Topology的入口,arg1,arg2等为main方法参数.2)列出Storm Topology:storm list 2)停止Storm Topology:storm kill topologyname在非nimbus主机上不能list和kill,会显示拒绝连接在nimbus主机上还可运行supervisor、在supervisor主机上不能运行nimbus,Topology提交(正确),提交Topology,Topology提交(错误),提交Topology,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,项目结构,注:因篇幅限制,以下程序为主要实现代码架构、异常等具体代码省略!,项目入口(TopologyMain.java),public static void main(String args)TopologyBuilder builder=new TopologyBuilder();builder.setSpout(read,new ReadSpout();builder.setBolt(monitor,new MonitorBolt().shuffleGrouping(read);builder.setBolt(print,new PrintBolt().shuffleGrouping(monitor);builder.setBolt(mysql,new MysqlBolt().shuffleGrouping(monitor);Config config=new Config();,1、实例化TopologyBuilder类2、设置数据喷发节点(Spout类):Spout类首先执行open方法、再执行 nextTuple方法(读取数据,使用SpoutOutputCollector类发射出去)3、设置数据处理节点(Bolt类):declareOutputFields方法声明bolt的输出参数、执行execute方法处理数据后使用SpoutOutputCollector类发射出去4、还可以继续设置数据处理节点.,流分组:Shuffle分组,Fields分组,All分组,自定义分组,Direct分组,Global分组,None分组,参考:1.http:/getting start with storm 翻译 第三章 part-1-Topologies-流分组,项目入口(TopologyMain.java),if(args!=null,喷发节点(ReadSpout.java),public class ReadSpout implements IRichSpout private SpoutOutputCollector collector;private boolean complete=false;private Reader Fread=null;private BufferedReader Fbuff=null;Overridepublic void open(Map conf,TopologyContext context,SpoutOutputCollector collector)this.collector=collector;File file=new File(domain.log);Fread=new FileReader(file);Fbuff=new BufferedReader(Fread);Overridepublic void nextTuple()if(!complete)String str=null;while(str=Fbuff.readLine()!=null)this.collector.emit(new Values(str);complete=true;Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer)declarer.declare(new Fields(line);,喷发节点(ReadSpout.java),Overridepublic void close()Overridepublic void activate()Overridepublic void deactivate()Overridepublic void ack(Object msgId)Overridepublic void fail(Object msgId)Overridepublic Map getComponentConfiguration()return null;,处理节点(MonitorBolt.java),public class MonitorBolt implements IRichBolt private OutputCollector collector;Overridepublic void prepare(Map stormConf,TopologyContext context,OutputCollector collector)this.collector=collector;Overridepublic void execute(Tuple input)String str=input.getString(0);/相当于list容器String regex=.google.;boolean result=Ppile(regex).matcher(str).find();if(result)this.collector.emit(new Values(str);,处理节点(MonitorBolt.java),Overridepublic void cleanup()Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer)declarer.declare(new Fields(str);/声明bolt的输出参数Overridepublic Map getComponentConfiguration()return null;,处理节点(MysqlBolt.java),public class MysqlBolt implements IRichBolt private Connection conn=null;private Statement stmt=null;private OutputCollector collector;Override public void prepare(Map stormConf,TopologyContext context,OutputCollector collector)this.collector=collector;/.连接数据库 Override public void execute(Tuple input)String str=input.getString(0);/.插入数据库 Override public void cleanup()/.关闭数据库连接 Override public void declareOutputFields(OutputFieldsDeclarer declarer)Override public Map getComponentConfiguration()return null;,Storm程序流程,操作演示,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,Storm数据接入层,1、消息队列(MetaQ)2、通过网络Socket传输数据3、前端业务系统专有数据采集 API4、对日志文件定时监控,参考:1.实时处理方案架构(作者-落枫).pdf,Storm数据落地层,1、消息队列(MetaQ)2、Mysql等数据库3、HDFS等分布式文件系统,参考:1.实时处理方案架构(作者-落枫).pdf,业务需求,参考:1.实时处理方案架构(作者-落枫).pdf,(1)条件过滤 这是Storm最基本的处理方式,对符合条件的数据进行实时过滤,将符合条件的数据保存下来,这种实时查询的业务需求在实际应用中是很常见的。(2)中间计算 我们需要改变数据中某一个字段(例如是数值),我们需要利用一个中间值经过计算(值比较、求和、求平均等等)后改变该值,然后将数据重新输出。(3)求TopN 相信大家对TopN类的业务需求也是比较熟悉的,在规定时间窗口内,统计数据出现的 TopN,该类处理在购物及电商业务需求中,比较常见。(4)分布式RPC Storm有对RPC进行专门的设计,分布式RPC用于对Storm上大量的函数调用进行并行计算,最后将结果返回给客户端。,业务需求,参考:1.实时处理方案架构(作者-落枫).pdf,(5)推荐系统 在实时处理时从 mysql及 hadoop中获取数据库中的信息,例如在电影推荐系统中,传入数据为用户当前点播电影信息,从数据库中获取的是该用户之前的一些点播电影信息统计。(6)批处理 所谓批处理就是数据攒积到一定触发条件,就批量输出,所谓的触发条件类似时间窗口到了,统计数量够了及检测到某种数据传入等等。(7)热度统计 热度统计实现依赖于 TimeCacheMap 数据结构,该结构能够在内存中保存近期活跃的对象。我们可以使用它来实现例如论坛中的热帖排行计算等。,问题,1、使用Maven或Leiningen解决依赖包问题2、Socket端口Spout IP地址不定3、日志、数据库等监控4、Nimbus单点故障5、分布式RPC6、布置metaq集群,写metaq与storm的接口7、实现线上更新(例如动态更改过滤规则)8、部署hadoop集群,写hdfs与storm接口9、支持类Top N统计处理,参考:1.http:/Storm项目:流数据监控32.http:/storm源码之一个class解决nimbus单点问题3.实时处理方案架构(作者-落枫).pdf,问题,参考:1.http:/Storm项目:流数据监控32.http:/storm源码之一个class解决nimbus单点问题,THE END,谢谢观赏,Witter Storm,Realtime Computation System,