menu

2018-11-24 Spark

  • date_range info
    sort
    Hadoop
    label
    hadoop
    Hadoop

Spark

RDD

Spark的基本抽象,是一个弹性分布式数据集,代表着不可变的,分区(partition)的集合,能够进行并行计算。

  1. 是一系列的分片,类似于Hadoop的split
  2. 在每个分片上都有一个函数去执行/迭代/计算它
  3. 是一系列的依赖,比如RDD1转换为RDD2,RDD2转换为RDD3,那么RDD2依赖于RDD1,RDD3依赖于RDD2
  4. 对一个Key-Value形式的RDD,可指定一个partitioner,告诉它如何分片,常用的有hash、range
  5. 可选择指定分区最佳计算位置

    DAG: Directed Acyclic Graph,有向无环图

    Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。

    Stage

    代表一个Job的DAG,会在发生shuffle处被切分,切分后每一个部分即为一个Stage,Stage实现分为ShuffleMapStage和ResultStage,一个Job切分的结果是0个或多个ShuffleMapStage加一个ResultStage

    Spark架构

    SparkArchitecture
    Job:包含了由Spark Action催生的多个Tasks,是一个并行计算
    Stage:一个Job分为多个彼此依赖的Stage,每个Stage又包含了一系列的Task,类似于MapReduce的Map阶段和reduce阶段

    Spark的Client和Cluster

    一般来说,如果提交任务的节点(即Master)和Worker集群在同一个网络内,此时Client Mode比较合适
    如果提交任务的节点和Worker集群相隔比较远,就会采用cluster mode来最小化Driver和Executor之间的网络延迟

    Client Mode

    Spark-Client

    • Driver进程运行在Master节点上,不在Worker节点上,所以相对于参与实际计算的Worker集群而言,Driver就相当于是一个第三方的“client”
    • 由于Driver进程不在Worker节点上,所以其是独立的,不会消耗Worker集群的资源
    • Master和Worker节点必须处于同一片局域网内,因为Driver要和Executor通信,例如Drive需要将Jar包通过Netty HTTP分发到Executor,Driver要给Executor分配任务等
    • 没有监督重启机制,Driver进程如果挂了,需要额外的程序重启

      Cluster Mode

      Spark-Cluster

    • Driver程序在worker集群中某个节点,而非Master节点,但是这个节点由Master指定
    • Driver程序占据Worker的资源
    • Master可以使用–supervise对Driver进行监控,如果Driver挂了可以自动重启
    • Master节点和Worker节点一般不在同一局域网,因此就无法将Jar包分发到各个Worker,所以必须提前把Jar包放到各个Worker节点对应的目录下面

      Spark On Yarn中的Yarn-Client和Yarn-Cluster

      在Spark On Yarn模式下,每个Spark Executor作为一个Yarn container在运行,同时支持多个任务在同一个container中运行,极大地节省了任务的启动时间

      Yarn-client模式

      Application Master仅仅从Yarn中申请资源给Executor,之后client会跟container通信进行作业的调度 Yarn-Client 作业执行流程:

  6. 客户端生成作业信息提交给ResourceManager(RM)
  7. RM在本地NodeManager启动container并将Application Master(AM)分配给该NodeManager(NM)
  8. NM接收到RM的分配,启动Application Master并初始化作业,此时这个NM就称为Driver
  9. Application向RM申请资源,分配资源同时通知其他NodeManager启动相应的Executor
  10. Executor向本地启动的Application Master注册汇报并完成相应的任务

    Yarn-cluster模式

    Driver运行在Appliaction Master上,Appliaction Master进程同时负责驱动Application和从Yarn中申请资源,该进程运行在Yarn container内,所以启动Application Master的client可以立即关闭而不必持续到Application的生命周期 Yarn-Cluster
    作业执行流程:

  11. 客户端生成作业信息提交给ResourceManager(RM)
  12. RM在某一个NodeManager(由Yarn决定)启动container并将Application Master(AM)分配给该NodeManager(NM)
  13. NM接收到RM的分配,启动Application Master并初始化作业,此时这个NM就称为Driver
  14. Application向RM申请资源,分配资源同时通知其他NodeManager启动相应的Executor
  15. Executor向NM上的Application Master注册汇报并完成相应的任务

    Spark Streaming

    一个可扩展,高吞吐、具有容错率的流式计算框架,它从数据源(soket、flume 、kafka)得到数据,并将流式数据分成很多RDD,根据时间间隔以批次(batch)为单位进行处理,能实现实时统计,累加,和一段时间内的指标的统计
    当运行Spark Streaming 框架时,Application会执行StreamingContext,并且在底层运行的是SparkContext,然后Driver在每个Executor上一直运行一个Receiver来接受数据 SparkApplication
    Receiver通过Input Stream接收数据并将数据分成块(Blocks),之后存储在Executor的内存中,Blocks会在其他的Executor上进行备份 SparkReceiver
    Executor将存储的Blocks回馈给StreamingContext,当经过一定时间后,StreamingContext将在这一段时间内的Blocks,也称为批次(batch)当作RDD来进行处理,并通过SparkContext运行Spark jobs,Spark jobs通过运行tasks在每个Executor上处理存储在内存中的Blocks SparkExecutor
    这个循环每隔一个批次执行一次 SparkLoop

    Spark提交作业

    spark的作业调度分为3个级别:DAG调度器==>TaskScheduler(任务调度器)==>SchedulerBackend(后台调度器)

    第一级作业调度:DAG调度器DAGScheduler

    当我们写好应用程序,程序里面有sc.count()或sc.collect()时,或者我们在scalashell发出的命令中有sc.count()或sc.collect()时(这俩函数都是action类的函数)于是启动了作业提交流程。流程如下
    action函数调用sc.runjob()==>
    由DAG调度器把作业转换成事件(event),DAG调度器把事件添加到EventProcessLoop(这是一个事件处理队列)==>
    线程EventThread会专门从EventProcessLoop队列中获取事件,交给DAGScheduler的事件处理方法==>
    DAGScheduler事件处理方法把作业计算出stage,传给submitStage()方法==>
    submitStage()方法生成任务集合TaskSet以后,传给任务调度器TaskScheduler==>

    作业调度的第二级:任务调度器TaskScheduler

    从DAG调度接收到任务集以后,将由TaskSchedulerImpl实现类中的submitTasks()方法把任务提交给后台调度器==>

    第三级作业调度:后台调度器

    (按照spark的三种启动模式——本地模式、伪分布式模式、完全分布式,分别有对应的后台调度器:LocalSchedulerBackendlocal、StandaloneSchedulerBackend、StandaloneSchedulerBackend)
    例如,以本地模式启动spark时,本地后台调度器LocalSchedulerBackend会把任务集发送到所有Worker进程所在的节点(spark1.6版之后是通过Netty框架发送到指定的socket,Netty封装了非阻塞技术NIO)==>
    由每个Worker节点执行任务集中的每个任务,实际上每任务分别由Worker节点中的Executo里面的launchTask()方法启动相应的执行线程==>
    执行线程会被添加到线程池(这也是个队列),这个池子仍然在Worker节点上==>
    由TaskRun()轮询队列,从队列中获取对象(每个对象都是runnable)==>
    由每个Worker节点本地的ShuffleMapTask中的runTask()方法调用write()方法==>
    最后,由write()方法执行我们自定义的业务逻辑代码(例如WorldCount)
    (函数名write意味着在对RDD的每个分区使用变换函数的期间,会把结果写到内存中。)