尚硅谷大数据项目之实时项目4.docx
第1章需求分析1.1简介实时预警,是一种经常出现在实时计算中的业务类型。根据日志数据中系统报错异常, 或者用户行为异常的检测,产生对应预警日志。预警日志通过图形化界面的展示,可以提醒 监控方,需要及时核查问题,并采取应对措施。1.2需求说明需求:同一设备,5分钟内三次及以上用不同账号登录并领取优惠券,并且在登录到领 券过程中没有浏览商品。达到以上要求则产生一条预警日志。1.3 预:日志格式同一设备,每分钟只记录一次预警。mid设备iduids领取优惠券登录过的uiditemIds优惠券涉及的商品idevents发生过的行为ts发生预警的时间戳第2章整体流程设计2.1框架流程springbootKafka clusterElasticSearchSpark streamingspringbootspringbootnglnx2.2开发思路1)从kafka中消费数据,根据条件进行过滤筛选,生成预警日志;2)预警日志保存到ElasticSearch中;3)利用Kibana快速搭建可视化图形界面。第3章实时计算模块3.1筛选条件分析同一设备(分组)5分钟内(窗口)三次不同账号登录(用户)领取优惠券(行为)没有浏览商品(行为)同一设备每分钟只记录一次预警(去重)3.2数据处理流程图3.3代码开发3.3.1事件日志样例类-Eventinfocase class EventInfo(mid:String, uid:String, appid:String, area:String, os:String, ch:String, 'type':String, evid:String, pgid:String, npgid:String, itemid:String, var logDate:String, var logHour:String, var ts:Long)3.3.2预警日志样例类-CouponAlertInfocase class CouponAlertInfo(mid:String,uids:java.util.HashSetString,itemIds:java.util.HashSetString, events:java.util.ListString, ts:Long)3.3.3预警业务类一AlertAppimport com.alibaba.fastjson.JSONimport com.atguigu.gmall.constant.GmallConstantsimportcom.atguigu.gmall2019.realtime.bean.CouponAlertInfo,EventInfoimport com.atguigu.gmall2019.realtime.util.MyEsUtil, MyKafkaUtil import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStream, InputDStream import org.apache.spark.streaming.Seconds, StreamingContextimport scala.util.control.Breaks._object AlertApp def main(args: ArrayString): Unit = valsparkConf:SparkConf=newSparkConf().setMaster("local*").setAppName("event_app")val ssc = new StreamingContext(sparkConf,Seconds(5)val inputDstream: InputDStreamConsumerRecordString, String = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_EVENT,ssc)/1格式转换成样例类val eventInfoDstream: DStreamEventInfo = inputDstream.map record =>val jsonstr: String = record.value()val eventInfo:EventInfo = JSON.parseObject(jsonstr,classOfEventInfo)eventInfo/2开窗口val eventInfoWindowDstream:DStreamEventInfo =eventInfoDstream.window(Seconds(30),Seconds(5)/3同一设备分组val groupbyMidDstream: DStream(String, IterableEventInfo) = eventInfoWindowDstream.map(eventInfo=>(eventInfo.mid,eventInfo). groupByKey()/4判断预警/在一个设备之内/1三次及以上的领取优惠券(evid coupon)且uid都不相同/2没有浏览商品(evid clickItem)val checkCouponAlertDStream: DStream(Boolean, CouponAlertInfo) =groupbyMidDstream.map case (mid, eventInfoItr)=>val couponUidsSet = new util.HashSetString()val itemIdsSet = new util.HashSetString() val eventIds = new util.ArrayListString() var notClickItem: Boolean = true breakable(for (eventInfo: EventInfo <- eventInfoItr) eventIds.add(eventInfo.evid) 用户行为 if (eventInfo.evid = "coupon") couponUidsSet.add(eventInfo.uid) 用户领券的 uid itemIdsSet.add(eventInfo.itemid) 用户领券的商品 id else if (eventInfo.evid = "clickItem") notClickItem = false break() )/组合成元组(标识是否达到预警要求,预警信息对象)(couponUidsSet.size() >=3&& notClickItem,CouponAlertInfo(mid, couponUidsSet, itemIdsSet, eventIds, System.currentTimeMillis() /过滤 val filteredDstream: DStream(Boolean, CouponAlertInfo) = checkCouponAlertDStream.filter_._1/增加一个id用于保存到es的时候进行去重操作val alertInfoWithIdDstream: DStream(String, CouponAlertInfo) = filteredDstream.map case (flag, alertInfo) => val period: Long = alertInfo.ts / 1000L / 60L val id: String = alertInfo.mid + "_" + period.toString (id, alertInfo) alertInfoWithIdDstream.foreachRDDrdd=> rdd.foreachPartitionalertInfoWithIdIter=> MyEsUtil.insertBulk(GmallConstants.ES_INDEX_COUPON_ALERT ,alertIn foWithIdIter.toList) ssc.start() ssc.awaitTermination()第4章ElasticSearch的保存4.1 ES集群搭建参考ElasticSearch集群安装手册4.2 ES上建好索引其实即使不提前建立索引,ES也是可以将数据保存进去的。这种情况,ES会根据第一 条要插入的数据进行推断,但是ES的这种推断往往不够准确。比如:要区分字段要不要进行索引,字段要不要进行分词,如果分词选用哪个分词器等 等。建立索引语句(包含Mapping)PUT gmall_coupon_alert"mappings": "_doc”:"properties":"mid":"type":"keyword","uids":"type":"keyword","itemIds":"type":"keyword","events":"type":"keyword","ts":"type":"date"4.3保存ES4.3.1 pom.xml<dependency><groupId>io.searchbox</groupId><artifactId>jest</artifactId><version>5.3.3</version></dependency><dependency><groupId>net.java.dev.jna</groupId><artifactId>jna</artifactId><version>4.5.2</version></dependency><dependency><groupId>org.codehaus.janino</groupId><artifactId>commons-compiler</artifactId><version>2.7.8</version> </dependency>4.3.2保存ES的工具类import java.utilimport java.util.Objectsimport io.searchbox.client.JestClient, JestClientFactoryimport io.searchbox.client.config.HttpClientConfig import io.searchbox.core.Bulk, BulkResult, Index import collection.JavaConversions._object MyEsUtil private val ES_HOST = "http:/hadoop102”private val ES_HTTP_PORT = 9200private var factory:JestClientFactory = null/*获取客户端* return jestclient */def getClient: JestClient = if (factory = null) build() factory.getObject/*关闭客户端 */def close(client: JestClient): Unit = if (!Objects.isNull(client) tryclient.shutdownClient() catch case e: Exception => e.printStackTrace() /*建立连接 */private def build(): Unit = factory = new JestClientFactory factory.setHttpClientConfig(newHttpClientConfig.Builder(ES_HOST+":"+ES_HTTP_PORT).multiThreaded(true).maxTotalConnection(20) / 连接总数 .connTimeout(10000).readTimeout(10000).build)/批量插入数据到ESdef insertBulk(indexName:String,docList:List(String,Any): Unit =if(docList.size>0) val jest: JestClient = getClient valbulkBuilder=newBulk.Builder().defaultIndex(indexName).defaultType("_doc") for (id,doc) <- docList ) val indexBuilder = new Index.Builder(doc) if(id!=null) indexBuilder.id(id) val index: Index = indexBuilder.build() bulkBuilder.addAction(index) val bulk: Bulk = bulkBuilder.build() var items: util.ListBulkResult#BulkResultItem = null try items = jest.execute(bulkBuilder.build().getItems catch case ex: Exception => println(ex.toString) finally close(jest) println(保存”+ items.size() + ”条数据”) for (item <- items) if (item.error != null && item.error.nonEmpty) println(item.error) println(item.errorReason) def main(args: ArrayString): Unit = val jest: JestClient = getClientvalindex:Index=newIndex.Builder(Stud("zhang3”,"zhang33”).index("gmall2019_stud").' type'("_doc").id("stu123”).build() jest.execute(index) case class Stud(name:String ,nickname:String)第五章Kibana发布可视化界面5.1 建立 index pattern建立数据源表达式danced Settings"Elastic searchK KibdnaManage menrSaved ObjectsIndex ManagementFndex Lifecycle Pc linesLicense ManagementRemote ClustersMarugementDefaultnmeilonMonitoringhfechifte LearningVisualizeDashboard利用通配符(*),可以覆盖多个索引,比如order_index_2019_05*表示覆盖 orderindex 2019 05 01, orderindex 2019 05 02 等多个索引。Index Patterns Sved Objects Spaces Reporiing Advanced SEngsX iridiKl SySCfirtl IndicesCreate index patternMbanairxk* pattcnis to netrlwe-函ta Irwn Elastwiixh incftCM for things- hkt vtsu朋udEmsnaHjcoupon刖 rtmdllDlDB au:jar .dtrC站.WildStep 1 of 2; iDfine index patternEuccesx! - ::i.ir ind?-:* pd tern mairbed 2 irMkices> New step5.2 建立 visualize5.2.1新增一个可视化图5.2.2选择一个图形类型本案例选择柱形图(Vertical BhrNew VisualizationQ AlterSelect a visualization typeGskjgcGomvNhtefls Mnip H-anEontak Sar区坦回&LineMftrkdoMvnMctriii:PicHDM®</>EMapT占昌少州TiiTi刷ImVeig一面£toVErtical Bar ¥!su«l BuiRdci"Start creating /ojrn5U3i :atior &;, ielectin.f; a 序网 for that visualization.5.2.3作图5.2.3.1纵坐标MetricsY AxisAggregationCountCustom LabelSelect metrics typeY-AxisDoi S<z:e聚合方法AggregationCustomer Lable纵坐标的说明标签5.2.3.2横坐标erms helpTermsFieldiiemldsOrder Bymeti ic:操埋次数SifeOrderDescendGroup other values in separate bucket (?)Shaw missing values Cu ic om Lei bet霍无ID*Aggregation分组字段Field分组字段ORDER by排序方式Order升序降序Size列出前n名Customer Lable纵坐标的说明标签e3h Ssecond5 <0 June 29th 2019. 13:30:9.054 to June 29th 2019. 13:45:49.054>Time RangeQuick Relative Absolute RecentFromSet T NowT。Set To New20106-29 13:30:49.0542019-0&»29 13:45:49.054<June 2019><June 2019>sunMenTueWedThuFrisatsunMon rue wed rhuFrt5机01如 ”.®K01020304。50607OS03如1011121314151617IS1920212223242526272829 1293030Go运行产生效果图 i*eg.AEhrtliemlcfsOrder 盼OrdiwShe-De&tend 曾 3Ggup Mlhiti9 VdluHtfl iri lepArALC: KrkHShow mlsUnj values 缶Cus-T-DM Ljbel* Adwmred5.3 建立 DashboardDashboard是一个可以放很多个可视化图的大仪表盘,你可以把之前设计好的多个可视 化图,放置在一个仪表盘中,一起显示。5.3.1 新增一个 Dashboard5.3.2加入多个可视化图5.3.3形成一个含多个图的仪表盘5.3.4最后保存5.4分享到网页中点击最上方的share按钮< EMBEDCODEG*nftf«re the lln-k as可以把剪切板中的iframe代码嵌入到网页代码中这样就可以在一张网页中显示kibana中的仪表盘