Yarn资源调度器简介

资源调度器是Yarn中最核心的组件之一,他是ResourceManager中的一个可插拔的服务组件,负责整个集群的管理和分配。目前Yarn中的作业类型:

  • 批处理作业,这种作业比较耗时,对时间的完成没有严格要求,如数据挖掘和机器学习等;
  • 交互式作业,这种作业一半希望能够及时的返回结果,例如:hive的sql查询;
  • 生产性作业:这种作业要求有一定量的资源保证,如统计值计算,垃圾数据分析等;

为了满足多用户多队列的资源分配问题以及Yarn自带的FIFO(先进先出资源调度器)单队列的问题,又引入了Yahoo的Capacity Scheduler和FaceBook的Fair Scheduler

Yarn的资源调度器的基本架构

资源调度器作为Yarn中的可插拔的资源调度器,它定义了一套接口规范以便用户可按照规范实现自己的调度器,本文主要从资源调度器的可插拔性和时间处理器两方面来说

1.ResourceScheduler之插拔式组件

  protected ResourceScheduler createScheduler() {
    String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
        YarnConfiguration.DEFAULT_RM_SCHEDULER);
    LOG.info("Using Scheduler: " + schedulerClassName);
    try {
      Class<?> schedulerClazz = Class.forName(schedulerClassName);
      if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
        return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
            this.conf);
      } else {
        throw new YarnRuntimeException("Class: " + schedulerClassName
            + " not instance of " + ResourceScheduler.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate Scheduler: "
          + schedulerClassName, e);
    }
  }

资源调度器的实现必须实现ResourceScheduler接口,代码如下:

@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable {
 
  /**
   * Set RMContext for <code>ResourceScheduler</code>.
   * This method should be called immediately after instantiating
   * a scheduler once.
   * @param rmContext created by ResourceManager
   */
  void setRMContext(RMContext rmContext);
 
  /**
   * Re-initialize the <code>ResourceScheduler</code>.
   * @param conf configuration
   * @throws IOException
   */
  void reinitialize(Configuration conf, RMContext rmContext) throws IOException;
}

而ResourceScheduler实现了YarnScheduler接口,代码如下:

public interface YarnScheduler extends EventHandler<SchedulerEvent> {
 
  /**
   * Get queue information
   * @param queueName queue name
   * @param includeChildQueues include child queues?
   * @param recursive get children queues?
   * @return queue information
   * @throws IOException
   */
  //todo 获取一个队列信息
  @Public
  @Stable
  public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
      boolean recursive) throws IOException;
 
  /**
   * Get acls for queues for current user.
   * @return acls for queues for current user
   */
  //todo 返回当前用户的队列Acl权限
  @Public
  @Stable
  public List<QueueUserACLInfo> getQueueUserAclInfo();
  
  .......

References