技术内幕 | StarRocks Pipeline 执行框架(下)

本文发表于: &{ new Date(1666195200000).toLocaleDateString() }

导读:欢迎来到 StarRocks 技术内幕系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你快速上手这款明星开源数据库产品。本期 StarRocks 技术内幕将主要介绍 StarRocks Pipeline 执行框架的基本概念、原理及代码逻辑。

StarRocks Pipeline 执行框架(上)篇中,主要为大家讲解了 Pipeline 执行引擎想解决的问题及一般性原理。关于 Pipeline 执行引擎的实现, BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑,将在本篇中继续与大家分享。

#01 背景介绍

详见 StarRocks Pipeline 执行框架(上)篇

#02 基本概念

详见 StarRocks Pipeline 执行框架(上)篇
 

#03 源码解析

章节二的基本概念输入完以后,我们开始从以下几个方面解析 StarRocks 的源码:

1. BE 初始化 Pipeline 执行引擎:主要介绍 BE 启动后,如何初始化 Pipeline 执行引擎的全局资源。

2. BE 端 Query 生命周期管理:主要介绍在 BE 上,如何用 QueryContext 管理所属的全体 Fragment Instance,以及 Fragment Instance 的准备、执行和销毁逻辑。

3. 物理算子拆分 Pipeline 算子:算子拆分逻辑在 Pipeline 执行引擎中占比很重,涉及到每个算子的重构,后续添加新算子,需要遵从一定的原则拆分算子,为 Pipeline 算子的接口定义正确的语义。

4. PipelineDriver 的调度逻辑:主要涉及到 PipelineDriver 的 Ready、Blocked 和 Running 三种状态的转换。

1.BE 初始化 Pipeline 执行引擎

  • BE 初始化全局对象的方法主要有两种:
    将全局对象定义在 ExecEnv 对象中,参考 be/src/runtime/exec_env.h,be/src/runtime/exec_env.cpp文件。 
  • 定义全局性的单例(Singleton)对象,例如 be/src/exec/pipeline/query_context.cpp,如果对象本身可以独立完成初始化、不依赖参数设置、不依赖于其他对象的初始化顺序,则可以定义为单例。

Pipeline 执行引擎的全局性对象

PipelineDriver 执行器

定义为 ExecEnv::_driver_executor,类型为 pipeline::GlobalDriverExecutor,主要由执行线程池和轮询线程构成。其中执行线程的数量默认为机器的硬件核数,轮询线程数量为 1。

// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths) 

std::unique_ptr<ThreadPool> driver_executor_thread_pool;
auto max_thread_num = std::thread::hardware_concurrency();
if (config::pipeline_exec_thread_pool_thread_num > 0) {
    max_thread_num = config::pipeline_exec_thread_pool_thread_num;
}
LOG(INFO) << strings::Substitute("[PIPELINE] Exec thread pool: thread_num=$0", max_thread_num);
RETURN_IF_ERROR(ThreadPoolBuilder("pip_executor") // pipeline executor
                        .set_min_threads(0)
                        .set_max_threads(max_thread_num)
                        .set_max_queue_size(1000)
                        .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
                        .build(&driver_executor_thread_pool));
_driver_executor = new pipeline::GlobalDriverExecutor(std::move(driver_executor_thread_pool), false);
_driver_executor->initialize(max_thread_num)

pipeline::GlobalDriverExecutor 的结构如下:
 

Pipeline IO 线程池

ExecEnv._pipeline_scan_io_thread_pool,主要用于执行 ScanOperator 读数据操作的异步化 IO 任务。IO 线程池队列大小和线程数目,取决于参数:

  • config::pipeline_scan_thread_pool_queue_size
  • config::pipeline_scan_thread_pool_thread_num
// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths) 
int num_io_threads = config::pipeline_scan_thread_pool_thread_num <= 0
                             ? std::thread::hardware_concurrency()
                             : config::pipeline_scan_thread_pool_thread_num;

_pipeline_scan_io_thread_pool =
        new PriorityThreadPool("pip_scan_io", // pipeline scan io
                               num_io_threads, config::pipeline_scan_thread_pool_queue_size);

WorkGroup(即资源组 ResourceGroup)执行器

定义为 ExecEnv._wg_driver_executor,WorkGroup 用于 Pipeline 执行引擎的资源隔离,主要的设计动机是为了把不同业务场景的 Workload 划分到相应的 WorkGroup中,每个 WorkGroup 有自己的 CPU、Memory 和并行数量资源 Quota。 每个 WorkGroup 按照资源 Quota 的限制复用计算资源,从而实现隔离性。(WorkGroup 的详细源码分析请参考专门解析文档,此处提及,是为了保证本文的完整性。)  WorkGroup 执行器和 PipelineDriver 执行器功能类似,实现了基于 WorkGroup 的调度逻辑。

WorkGroup Scan 执行器

定义为ExecEnv._scan_executor,也用于 WorkGroup 功能,类似 Pipeline IO 线程池,该执行器可以根据 WorkGroup 的资源 Quota 限制,执行 ScanOperator 提交的异步化 IO 任务。

QueryContextManager

QueryContext 管理一个查询在某台执行节点上的全体 Fragment Instance,QueryContextManager 顾名思义就是对 QueryContext 进行操作,主要用于其生命周期的管理。参考源码文件:be/src/exec/pipeline/query_context.cpp。

WorkGroupManager

WorkGroupManager 用于管理 WorkGroup,详见 WorkGroup 相关文档。

2.BE 端 Query 生命周期管理

QueryContext 和 FragmentContext

计算节点 BE 为查询维护下列对象:

  • QueryContext:在 QueryContextManager 中注册,拥有 FragmentContextManager 对象管理 Fragment Instance。
  • FragmentContext:在QueryContext.fragment_mgr 中注册,每个 Fragment Instance 对应一个 FragmentContext。
  • Pipelines:FragmentContext 包含一组 Pipeline,来源于 Fragment Instance 的执行子树的拆解。
  • Drivers:FragmentContext 包含一组 PipelineDriver,PipelineDriver 通过 Pipeline 创建,来自同一个 Pipeline 的 PipelineDriver 的数量,取决于 Pipeline 并行度。
  • MorselQueues:ScanOperator 和 MorselQueue 的映射表,MorselQueue 包含一组 Morsel,Morsel 是 ScanOperator 读取数据的分片。

QueryContext 的生命周期比 FragmentContext 生命周期久,跨所有 Fragment Instance,属于 Query 层面的对象,可以由 QueryContext 管理。比如控制 Query 内存使用 MemTracker,所有 Fragment Instance 共享的 DescriptorTable。FragmentContext 只管理 Fragment Instance 范围的资源,主要包括 Pipelines、PipelineDrivers 和 MorselQueues。

只有当 FragmentContext 中的所有 PipelineDriver 都完成计算,FragmentContext 的生命周期才结束;只有当所有 FragmentContext 的生命周期结束,QueryContext 的生命周期才结束。QueryContext 生命周期结束后,就可以析构并且释放 QueryContext 占用的资源。

Fragment Instance 执行逻辑的入口
BE 收到来自 FE 的 exec_plan_fragment 后,创建 FragmentExecutor 执行该 Fragment Instance,代码如下:

// 文件:/home/grakra/workspace/sr/be/src/service/internal_service.cpp
template <typename T>
void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController* cntl_base,
                                                 const PExecPlanFragmentRequest* request,
                                                 PExecPlanFragmentResult* response, google::protobuf::Closure* done) {
    ClosureGuard closure_guard(done);
    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
    auto st = _exec_plan_fragment(cntl);
    if (!st.ok()) {
        LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
    }
    st.to_protobuf(response->mutable_status());
}

PInternalServiceImpl::exec_plan_fragment 调用 _exec_plan_fragment:

// 文件:/home/grakra/workspace/sr/be/src/service/internal_service.cpp
template <typename T>
Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
    auto ser_request = cntl->request_attachment().to_string();
    TExecPlanFragmentParams t_request;
    {
        const uint8_t* buf = (const uint8_t*)ser_request.data();
        uint32_t len = ser_request.size();
        RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, TProtocolType::BINARY, &t_request));
    }
    bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
    LOG(INFO) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
              << ", coord=" << t_request.coord << ", backend=" << t_request.backend_num
              << ", is_pipeline=" << is_pipeline << ", chunk_size=" << t_request.query_options.batch_size;
    if (is_pipeline) {
        auto fragment_executor = std::make_unique<starrocks::pipeline::FragmentExecutor>();
        auto status = fragment_executor->prepare(_exec_env, t_request);
        if (status.ok()) {
            return fragment_executor->execute(_exec_env);
        } else {
            return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
        }
    } else {
        return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
    }
}

当采用 Pipeline 执行引擎时,创建 FragmentExecutor,完成下列操作:

  • 调用 FragmentExecutor::prepare 函数, 初始化 Fragment Instance 的执行环境,创建和注册 QueryContex、FragmentContext,将Fragment Instance 拆分成 Pipelines,创建 PipelineDrivers。
  • 调用 FragmentExecutor::execute 函数,向 Pipeline 执行线程提交 PipelineDrivers 运行。

 

FragmentExecutor::prepare 函数
 

参考 be/src/exec/pipeline/fragment_executor.cpp,主要的逻辑如下:
 

1. 判断 Fragment Instance 是否为重复投递,如果是,直接返回错误状态 Status::DuplicateRpcInvocation。

2. 注册或获得已有的 QueryContext,处理 Query 的第一个 Fragment Instance 时,注册 QueryContext,后续到达的 Fragment Instance 复用已注册的 QueryContext。设置 QueryContext 需要处理的 Fragment Instance 的数量和 Query 过期时间等参数,Query 过期时间用于自动取消长期得不到执行的 Query;如果 Query 有大量的 Fragment Instance,先到达的部分 Fragment Instance 完成执行而退出,在没有活跃的 Fragment Instance 的情况下,QueryContext 依然需要保留一段时间,等到后续 Fragment Instance 全部达到或者自动过期而取消执行。

3. 创建和初始化 FragmentContext 对象,FragmentContext 需要注册到 QueryContext.fragment_mgr 中,注册的时机为 FragmentContext::prepare 函数的末尾。因为有的异步逻辑(比如 Global Runtime Filter 的投递),需要访问 FragmentContext 的成员变量,在 FragmentContext 未完成所有的初始化之前注册,会对异步逻辑暴露 FragmentContext,导致访问未初始化的成员变量而出错。

4. 调用函数 Exec::create_tree 生成 Non-pipeline 执行树,使用 PipelineBuilder 将执行树拆解成为 Pipeline。

5. 调用 convert_scan_range_to_morsel 函数将 ScanNode 需要访问的 TScanRangeParams 转换为 ScanOperator 可访问的 Morsel。

6. 将 Non-pipeline 执行引擎的 DataSink 转换为 Pipeline 引擎的 SinkOperator,调用 _decompose_data_sink_to_operator

7. 根据 DOP(degree-of-parallelism),为 Pipeline 创建 PipelineDriver,并且将 MorselQueue 和相应的 ScanOperator 关联。

8. 完成其他的必要的初始化,并且注册 FragmentContext。

FragmentExecutor::execute函数

//文件: be/src/exec/pipeline/fragment_executor.cpp
Status FragmentExecutor::execute(ExecEnv* exec_env) {
    for (const auto& driver : _fragment_ctx->drivers()) {
        RETURN_IF_ERROR(driver->prepare(_fragment_ctx->runtime_state()));
    }

    if (_fragment_ctx->enable_resource_group()) {
        for (const auto& driver : _fragment_ctx->drivers()) {
            exec_env->wg_driver_executor()->submit(driver.get());
        }
    } else {
        for (const auto& driver : _fragment_ctx->drivers()) {
            exec_env->driver_executor()->submit(driver.get());
        }
    }

    return Status::OK();
}

 

FragmentExecutor::execute 函数的主要操作如下:
 

1. 变量 FragmentContext 中的所有 PipelineDriver,执行 PipelineDriver::prepare 函数。该函数主要完成 PipelineDriver 范围的 profile 注册、调用每个算子的 prepare 函数、设置 Driver 之间前置等待条件,比如 HashJoin 左侧的 PipelineDriver 需要等待右侧 PipelineDriver 完成,消费 RuntimeFilter 的 PipelineDriver 需要等待生产 RuntimeFilter 的 PipelineDriver 完成。

2. 把 PipelineDriver 提交给 Pipeline 执行线程。PipelineDriver 提交后,FragmentExecutor 的生命周期结束,FragmentExecutor 是临时性的,禁止在 FragmentExecutor 中定义 PipelineDriver 可引用的对象。

 

3.PipelineBuilder 拆分 pipeline

BE 上的 PipelineBuilder 会把 PlanFragment 拆分成多个 Pipeline,拆分过程中,PlanFragment 中物理算子会转化为 Pipeline 算子。

物理算子

物理算子是 ExecNode 的子类,FE 投递给 BE 的 Fragment Instance 中,包含构成所属 PlanFragment 的物理算子, 物理算子如下图所示:

另外,在 Fragment Instance 中,一般用 DataSink 的子类描述该 Fragment Instance 计算结果的去向,比如 DataStreamSink 会把计算结果发给下游 Fragment Instance的ExchangeNode。在 Pipeline 执行引擎中,DataStreamSink 和 ExchangeNode 会分别转化为 ExchangeSinkOperator 和 ExchangeSourceOperator。

Pipeline 算子

Pipeline 算子的数量比物理算子的数量多,这是因为,Pipeline 算子最多只有一路输入和一路输出,多路输入的物理算子和全量物化的物理算子,会拆解成多个 Pipeline 算子。Pipeline 算子接口定义,请参考 /home/grakra/workspace/sr/be/src/exec/pipeline/operator.h,部分接口定义如下:

  • pull_chunk:从算子中拉取 chunk,一般计算时,从一对算子的前置算子拉取 chunk,然后推给后继算子。
  • push_chunk:向算子推 chunk。
  • has_output:表示状态,当前算子可输出,可以执行 pull_chunk。
  • need_input:表示状态,当前算子可输入,可以执行 push_chunk。
  • is_finished:当前算子已经结束,不能执行 pull_chunk/push_chunk。
  • prepare:prepare 和 open 表达式和调用其他内部数据结构的 prepare 函数。
  • close:close 表达式和调用其他内部数据结构的 close 函数。
  • set_finishing:关闭输入,执行 set_finishing 之后,算子的 need_input 始终返回 false,不可调用 push_chunk,但算子内部可能有缓存的计算结果,has_output 可能返回 true,可以调用 pull_chunk。
  • set_finished:关闭输入和输出,调用后,is_finished、has_output、need_input 都返回 false,pull_chunk 和 push_chunk 不可调用,当 Pipeline 中 HashJoinProbeOperator 和 LimitOperator 算子产生短路并且提前结束时,需要调用前置算子 set_finished 函数。如果两个算子之间通过专门的 Context 交换数据,则 set_finished 函数中,需要正确地重置 Context 状态,一个算子需要感知到另外一个算子的 set_finished 函数调用。比如 LocalExchangeSinkOperator 和 LocalExchangeSourceOperator。
  • set_canceled:类似 set_finished,但表示算子异常结束,如果算子需要区分正常或者异常结束,则需要重载 set_canceled 函数,目前只有 ExchangeSinkOperator 用到该函数。
  • pending_finish:表示状态,当算子实现了异步化,算子结束时,异步化任务尚未完成,算子需要等待异步化任务结束后,才能销毁所在的 PipelineDriver。提前销毁 PipelineDriver 可能会导致异步化任务延后执行引用算子中的已销毁对象。

一个算子会经过 prepare -> finishing -> finished -> [cancelled] -> closed 的转换,Pipeline 执行引擎根据算子的状态, 执行相应的接口。

Pipeline 执行引擎中,每个算子有一个 OperatorFactory 类,Pipeline 由 OperatorFactory 组成,PipelineDriver 是 Pipeline 的实例,PipelineDriver 由 Operator 构成,OperatorFactory 创建 Operator 对象,从 Pipeline 创建 PipelineDriver 时,遍历 Pipeline 中的 OperatorFactory,调用 OperatorFactory::create 方法。 

// 文件:be/src/exec/pipeline/pipeline.h
// 函数: Pipeline::create_operators
Operators create_operators(int32_t degree_of_parallelism, int32_t i) {
    Operators operators;
    for (const auto& factory : _op_factories) {
        operators.emplace_back(factory->create(degree_of_parallelism, i));
    }
    return operators;
}

 

OperatorFactory 如下:

对于某些算子,比如 ScanOperator,来自同一个 OperatorFactory 的多个算子,会共享一份表达式,共享的表达式放置在 OperatorFactory 中,通过 OperatorFactory::prepare函数调用表达式的 prepare/open 函数,通过 OperatorFactory::close 函数调用表达式的 close 函数。

如果表达式不可重入,在计算时,算子在多个线程中执行,存在线程不安全问题。因此新添加的表达式,要保证:

1. 表达式是线程安全的。

2. 线程不安全的表达式,不在 OperatorFactory 中共享,每个算子有自己私有的副本。

Pipeline 拆分

当 BE 收到 FE 发来的 Fragment Instance 时,会创建一个 FragmentExecutor 对象初始化 Fragment Instance 的执行环境。FragmentExecutor::prepare 函数使用 PipelineBuilder 将 PlanFragment 拆分成Pipeline。

// file: be/src/exec/pipeline/fragment_executor.cpp
// function: FragmentExecutor::prepare
ExecNode* plan = nullptr;
RETURN_IF_ERROR(ExecNode::create_tree(runtime_state, obj_pool, fragment.plan, *desc_tbl, &plan))
// ...
PipelineBuilderContext context(_fragment_ctx, degree_of_parallelism);
PipelineBuilder builder(context);
_fragment_ctx->set_pipelines(builder.build(*_fragment_ctx, plan))
  • 首先通过 ExecNode::create_tree 函数获得 PlanFragment 的物理算子构成执行树。
  • 初始化 PipelineBuilderContext 对象,传入 degree_of_parallelism 参数。
  • 构建 PipelineBuilder 对象,调用 PipelineBuilder::build 拆分 Pipeline。

PipelineBuilder::build 主要从执行树 root 节点开始,递归调用 decompose_to_pipeline 函数。


Pipelines PipelineBuilder::build(const FragmentContext& fragment, ExecNode* exec_node) {
    pipeline::OpFactories operators = exec_node->decompose_to_pipeline(&_context);
    _context.add_pipeline(operators);
    _context.get_pipelines().back()->set_root();
    return _context.get_pipelines();
}

 

物理算子需要重载 ExecNode 的 decompose_to_pipeline 函数。 
 

decompose_to_pipeline 函数递归地调用,完成算子的拆分。以 ProjectNode 为例,ProjectNode 调用decompose_to_pipeline 函数对 _children[0] 先完成 Pipeline 拆解,并返回 OperatorFactory 数组,然后 ProjectNode 自身转变为 ProjectOperatorFactory,追加 OperatorFactory 数组的末尾,参考下面代码:
 

pipeline::OpFactories ProjectNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
    using namespace pipeline;
    OpFactories operators = _children[0]->decompose_to_pipeline(context);
    // Create a shared RefCountedRuntimeFilterCollector
    auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(1, std::move(this->runtime_filter_collector()));

    operators.emplace_back(std::make_shared<ProjectOperatorFactory>(
            context->next_operator_id(), id(), std::move(_slot_ids), std::move(_expr_ctxs),
            std::move(_type_is_nullable), std::move(_common_sub_slot_ids), std::move(_common_sub_expr_ctxs)));
    // Initialize OperatorFactory's fields involving runtime filters.
    this->init_runtime_filter_for_operator(operators.back().get(), context, rc_rf_probe_collector);
    if (limit() != -1) {
        operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
    }
    return operators;
}

复杂算子的拆分可能会用到 LocalExchange 算子,目前 LocalExchange 算子支持 Passthrough、broadcast 和 shuffle 模式。更多复杂的算子拆分可以参考针对这些算子的详细源码解析。

DataSink 的拆分和 ExecNode 不同,可以参考函数 FragmentExecutor::_decompose_data_sink_to_operator,此处不再赘述。

 

4.PipelineDriver 的调度逻辑

PipelineDriver 的调度主要涉及下面几个函数:

  • GlobalDriverExecutor::worker_thread:Pipeline 引擎执行线程的入口函数,该函数持续从就绪 Driver 队列获取 PipelineDriver,执行 PipelineDriver::process 函数。在 PipelineDriver 阻塞或者时间片用完时,主动 yield,换其他就绪 PipelineDriver 执行。
  • PipelineDriver::process 函数:调用 Operator::pull_chunk/push_chunk 函数进行计算,判断 PipelineDriver 是否阻塞或者需要 yield。
  • PipelineDriverPoller::run_internal:阻塞 PipelineDriver 的轮询线程的函数,遍历阻塞 PipelineDriver,将已经解除阻塞的 PipelineDriver 放回就绪队列。

GlobalDriverExecutor::worker_thread

该函数的主要功能是:

  • 从就绪队列取 PipelineDriver。
  • 执行 PipelineDriver::process 函数
  • PipelineDriver 执行完一轮之后,判断 PipelineDriver 的当前状态
  • PipelineDriver 正常结束,异常结束或者计算出错,则调用 PipelineDriver::finalize_driver 函数完成 PipelineDriver 的清理;
  • PipelineDriver 仍然处于 RUNNING 状态,则设置其状态为 READY,放回就绪 Driver 队列;
  • PipelineDriver 处于阻塞状态,则调用 PipelineDriverPoller->add_blocked_driver 函数,将 PipelineDriver 加入到阻塞 Driver 队列中。
  • 就绪 Driver 队列采用多级反馈队列(mlfq)实现,小查询优先调度,同时避免大查询饥饿。
  • 请参考 https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver_executor.cpp

 

PipelineDriver::process

该函数的功能主要有:

  • 遍历 PipelineDriver 中的相邻算子对,只有当两个算子 is_finished() 返回 false,前置算子 has_output() 返回 true,后置算子 need_input() 返回 true 时,调用前置算子的 pull_chunk 获得 chunk,调用后置算子的 push_chunk,将 chunk 推给它,从而完成 chunk 的转移。请参考: https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver.cpp
  • 当 PipelineDriver 中转移的 chunk 数量超过 100 个,本轮累积执行时间超过 100ms,则主动 yield,退出当前 process,返回就绪队列,换其他 PipelineDriver 执行。
  • 当 PipelineDriver 中当前无 chunk 可以移动,则说明 PipelineDriver 处于阻塞状态,退出当前 process,放回阻塞队列。

PipelineDriverPoller::run_internal

该函数遍历阻塞 Driver 队列,唤醒解除阻塞态的 PipelineDriver,放入就绪 Driver 队列,等待 Pipeline 执行线程调度。具体参考见:https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver_poller.cpp

 

#04 总结

本文主要讲解了 Pipeline 执行引擎想解决的问题及一般性原理。针对 Pipeline 执行引擎的实现,着重说明了 BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑。

想要深入学习 StarRocks 的执行引擎,还需要研究 MPP 调度和向量化执行,后续我们会继续撰文与大家分享。