博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark版本定制第11天:Driver中的ReceiverTracker架构设计以及具体实现彻底研究
阅读量:6104 次
发布时间:2019-06-21

本文共 3612 字,大约阅读时间需要 12 分钟。

本期内容:

1 RecerverTracker架构设计

2 ReververTacker具体实现

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

  

  我们知道,数据流动过程中,是通过实例化receivedBlockHandler对象,并调用方法storeBlock来写入数据的。通过队列的方式,最终给BlockManager。

  ReververBlockHandler是在ReceiverSupervisorImpl实例化的时候创建的

private val receivedBlockHandler: ReceivedBlockHandler = {  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {    if (checkpointDirOption.isEmpty) {      throw new SparkException(        "Cannot enable receiver write-ahead log without checkpoint directory set. " +          "Please use streamingContext.checkpoint() to set the checkpoint directory. " +          "See documentation for more details.")    }    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)  } else {    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)  }}

  这里ReceiverBlockHandler存储数据的过程其实是调用的BlockManager的方法。存储完数据后,也会想driver发送报告

/** Store block and report it to driver */def pushAndReportBlock(    receivedBlock: ReceivedBlock,    metadataOption: Option[Any],    blockIdOption: Option[StreamBlockId]  ) {  val blockId = blockIdOption.getOrElse(nextBlockId)  val time = System.currentTimeMillis  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")  val numRecords = blockStoreResult.numRecords  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))  logDebug(s"Reported block $blockId")}

  这里可以看出是通过消息循环体trackerEndpoint来发送消息的,他会发送封装了blockInfo的AddBlock消息。

case AddBlock(receivedBlockInfo) =>  if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {    walBatchingThreadPool.execute(new Runnable {      override def run(): Unit = Utils.tryLogNonFatalError {        if (active) {          context.reply(addBlock(receivedBlockInfo))        } else {          throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")        }      }    })  } else {    context.reply(addBlock(receivedBlockInfo))  }

  下面具体看看ReceiverBlockTracker

private[streaming] class ReceivedBlockTracker(    conf: SparkConf,    hadoopConf: Configuration,    streamIds: Seq[Int],    clock: Clock,    recoverFromWriteAheadLog: Boolean,    checkpointDirOption: Option[String])  extends Logging

  它的具体作用就是接收blocks并为给定的batch中未分配的分配block。这里有一个关键的方法allocateBlocksToBatck,它会在JobGenetator中调用,来生成job

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {    val streamIdToBlocks = streamIds.map { streamId =>        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))    }.toMap    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)      lastAllocatedBatchTime = batchTime    } else {      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")    }

  

Try {    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch    graph.generateJobs(time) // generate jobs using allocated block

  

转载于:https://www.cnblogs.com/pzwxySpark/p/Spark11.html

你可能感兴趣的文章
ICCV2017 论文浏览记录
查看>>
科技巨头的交通争夺战
查看>>
当中兴安卓手机遇上农行音频通用K宝 -- 卡在“正在通讯”,一直加载中
查看>>
Shell基础之-正则表达式
查看>>
JavaScript异步之Generator、async、await
查看>>
讲讲吸顶效果与react-sticky
查看>>
c++面向对象的一些问题1 0
查看>>
直播视频流技术名词
查看>>
iOS13-适配夜间模式/深色外观(Dark Mode)
查看>>
网易跟贴这么火,背后的某个力量不可忽视
查看>>
企业级java springboot b2bc商城系统开源源码二次开发-hystrix参数详解(八)
查看>>
java B2B2C 多租户电子商城系统- 整合企业架构的技术点
查看>>
IOC —— AOP
查看>>
比特币现金将出新招,推动比特币现金使用
查看>>
数据库的这些性能优化,你做了吗?
查看>>
某大型网站迁移总结(完结)
查看>>
mysql的innodb中事务日志(redo log)ib_logfile
查看>>
部署SSL证书后,网页内容造成页面错误提示的处理办法
查看>>
MS SQLSERVER通用存储过程分页
查看>>
60.使用Azure AI 自定义视觉服务实现物品识别Demo
查看>>