之前案例三中的ip地址规则是在Driver端的机器磁盘中存储着的,但是现在如果实在hdfs中存储着的又该如何实现呢
首先要分析清楚才能实现,存储在hdfs中并不像想象中的那么容易,首先代码实在Driver端写的,在Driver端写从hdfs中取出ip地址规则的代码会触发action,然后生成Task,分发到Executor端执行,因为日志文件比较大,所以存储在hdfs中的时候是会分区存储的,这样每个Executor只能取到一部分的ip地址规则了,所以这时候应该将所有Executor中的部分ip地址规则收集到Driver端,然后再由Driver端进行广播,Executor使用广播变量的引用来取到完整的ip地址规则,其他的实现则跟案例三中的一样
具体实现代码: package cn.ysjh0014 import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object IpLocation2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(“IpLoaction1”).setMaster(“local[4]”) val sc = new SparkContext(conf) //取到HDFS中的ip规则 val rulesLines: RDD[String] = sc.textFile(args(0)) //整理ip规则数据 val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => { val fields = line.split("[|]") val startNum = fields(2).toLong val endNum = fields(3).toLong val province = fields(6) (startNum, endNum, province) }) //将分散在多个Executor中的部分IP规则收集到Driver端 val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect() //将Driver端的数据广播到Executor //广播变量的引用(还在Driver端) val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver) //创建RDD,读取访问日志 val accessLines: RDD[String] = sc.textFile(args(1)) //整理数据 val proviceAndOne: RDD[(String, Int)] = accessLines.map(log => { //将log日志的每一行进行切分 val fields = log.split("[|]") val ip = fields(1) //将ip转换成十进制 val ipNum = TestIp.ip2Long(ip) //进行二分法查找,通过Driver端的引用或取到Executor中的广播变量 //(该函数中的代码是在Executor中别调用执行的,通过广播变量的引用,就可以拿到当前Executor中的广播的规则了) //Driver端广播变量的引用是怎样跑到Executor中的呢? //Task是在Driver端生成的,广播变量的引用是伴随着Task被发送到Executor中的 val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value //查找 var province = “未知” val index = TestIp.binarySearch(rulesInExecutor, ipNum) if (index != -1) { province = rulesInExecutor(index).3 } (province, 1) }) //聚合 //val sum = (x: Int, y: Int) => x + y val reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey( + _) def data2MySQL(it: Iterator[(String, Int)]): Unit = { //一个迭代器代表一个分区,分区中有多条数据 //先获得一个JDBC连接 val conn: Connection = DriverManager.getConnection(“jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8”, “root”, “root”) //将数据通过Connection写入到数据库 val pstm: PreparedStatement = conn.prepareStatement(“INSERT INTO access_log VALUES (?, ?)”) //将分区中的数据一条一条写入到MySQL中 it.foreach(tp => { pstm.setString(1, tp._1) pstm.setInt(2, tp._2) pstm.executeUpdate() }) //将分区中的数据全部写完之后,在关闭连接 if (pstm != null) { pstm.close() } if (conn != null) { conn.close() } } reduced.foreachPartition(it => data2MySQL(it)) sc.stop() } }
运行结果跟案例三中的一样
最后修改于 2018-10-14

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。