Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] FileCache warm up optimization #48959

Open
2 of 3 tasks
freemandealer opened this issue Mar 12, 2025 · 1 comment
Open
2 of 3 tasks

[Enhancement] FileCache warm up optimization #48959

freemandealer opened this issue Mar 12, 2025 · 1 comment
Assignees

Comments

@freemandealer
Copy link
Contributor

freemandealer commented Mar 12, 2025

Search before asking

  • I had searched in the issues and found no similar issues.

English Description:

Background Knowledge

Introduction to Warm-Up Feature

In the architecture separating storage from computing, we introduced a file cache as a local data cache to reduce the number of times query data directly accesses S3. During a cold start, users can pre-warm corresponding data locally using the following warm-up statements:

warm up cluster cluster_name1 with cluster cluster_name0

warm up cluster cluster_name1 with table customer

warm up cluster cluster_name1 with table customer partition p1

SHOW WARM UP JOB; // Get Job information
SHOW WARM UP JOB WHERE ID = 13418; // Specify JobID

cancel warm up job where id = 13418;

For more details, see: https://doris.apache.org/en-US/docs/3.0/sql-manual/sql-statements/cluster-management/storage-management/WARM-UP/

Creation of Warm-Up Tasks

stmt -> CloudWarmUpJob

CloudWarmUpJob.JobType jobType = stmt.isWarmUpWithTable() ? JobType.TABLE : JobType.CLUSTER;
CloudWarmUpJob warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstClusterName(), beToTabletIdBatches, jobType);
addCloudWarmUpJob(warmUpJob);

Herein, beToTabletIdBatches:

Metadata obtained from fe includes table, partition, tablet information, then retrieves beid from the snapshot Env.getCurrentEnv()).getCloudTabletRebalancer().getSnapshotTabletsByBeId, constructing a mapping of be -> tablet.

The data that needs to be warmed up for each be is divided into batches of 10G to create checkpoints and avoid retransmission in case of failure. The specific splitting algorithm is as follows:

// Simplified from: doris/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java    
splitBatch(Map<Long/*BE_ID*/, List<Tablet>> beToWarmUpTablets) {
    Map<Long/*BE_ID*/, List<List<TabletID>> /*Batch list, each batch is a tablet list*/> beToTabletIdBatches = new HashMap<>();
    for each BE {
        List<List<TabletID>> batches = new ArrayList<>();
        List<TabletID> batch = new ArrayList<>();
        Long curBatchSize = 0L;
        for each tablet in this BE {
            if (curBatchSize + tablet.getDataSize(true) > 10G) {
                // finish this batch and start new batch
                batches.add(batch);
                batch = new ArrayList<>();
                curBatchSize = 0L;
            }
            // otherwise, add this tablet to existing batch
            batch.add(tablet.getId());
            curBatchSize += tablet.getDataSize(true);
        }
        beToTabletIdBatches.put(this_BE_id, batches);
    }
    return beToTabletIdBatches;
}

If the destination cluster does not have sufficient capacity? Set Force (bug), it will fail or only cache part allowed by the capacity.

There can be a maximum of 10 warm-up tasks on all FEs simultaneously. Any additional tasks need to queue.

Execution Scheduling of Warm-Up

The execution process of CloudWarmUpJob:

Image

  1. fe -> all be: SET_JOB
  2. fe -> all be: GET_CURRENT_JOB_STATE_AND_LEASE
  3. be -> fe: status & pending_job_size
  4. fe polls GET_CURRENT_JOB_STATE_AND_LEASE until all be complete the previous job round before proceeding to the next step
  5. fe batch ++, edit log persistence, dispatches the next round's batch -> all be: SET_BATCH, repeat steps 2 - 4
  6. fe until all be clear all batches, fe -> all be CLEAR_JOB
  7. fe Edit Log records completion status, cluster can accept new warmup jobs

The process of accepting and processing jobs on BE (RPC entry point is CloudBackendService::warm_up_tablets, mostly implemented through CloudWarmupManager):

  • SET_JOB: Not much content, just setting the cur_job_id of CloudWarmupManager

  • SET_BATCH: Sets the current batch of the current job in CloudWarmupManage. Possible scenarios include:

    • Job ID is 0: Possibly due to failed SET_JOB, set again
    • Incorrect job ID: Ignore new batch
    • Correct job ID but previous batch not completed: Ignore new batch
    • Duplicate batch: Ignore
    • Correct job ID + completed previous batch + new batch ID situation (expected correct scenario), adds this batch task to CloudWarmupManager and has an independent thread to download all segment data from remote (S3) to local based on tablet information in the batch, completing the warm-up of this batch of tablets.
  • GET_CURRENT_JOB_STATE_AND_LEASE

  • CLEAR_JOB

Therefore, multiple BEs synchronize batch execution, meaning the FE will send the next round's batch to all BEs only after the current batch is completed across all BEs. This design and implementation are straightforward, but BEs that finish early will wait for others to complete their current batch before collectively moving to the next one.

Model of interaction between FE and multiple BEs:

Image

Maintenance of Task Progress Information

Task progress information is mainly maintained in CacheHotspotManager, specifically within the CloudWarmUpJob class, and displayed in the handleShowCloudWarmUpJob logic. The core logic of CloudWarmUpJob::getJobInfo is as follows:

info.add(Long.toString(lastBatchId + 1));  // Obtain the last completed batch
long maxBatchSize = 0;
for (List<List<Long>> list : beToTabletIdBatches.values()) {
    long size = list.size();
    if (size > maxBatchSize) {
        maxBatchSize = size;  // Obtain the largest batch count among all BEs as the total batch
    }
}
info.add(Long.toString(maxBatchSize));

The largest batch count among all BEs is used as the total batch because the batch counts may vary from each BE's perspective. Reasons for this discrepancy include:

  • Uneven distribution of tablets among BEs
  • Even if tablet numbers are balanced, data sizes may differ
  • Despite both above being consistent, differences in individual tablet sizes lead to varying batch counts due to the splitBatch algorithm described earlier.

Problem Introduction

From the above discussion, two pitfalls exist:

  1. "Maintenance of Task Progress Information" section mentioned instability in batch division numbers
  2. "Execution Scheduling" section noted that BEs finishing early must wait for others to complete their current batch

These issues are less problematic when single tablet data volumes are small, leading to relatively even divisions. However, large data volumes and uneven loads can result in inefficient scheduling as shown in the figure, where BEs experience significant waiting times, underutilizing compute, storage, and network resources, impacting warm-up efficiency.

Image

Modification Proposal

In contrast, we aim for a more compact operation mode, ensuring that even with large tablets causing uneven load distribution, there won't be idle waiting during the warm-up process (tasks may have long tails at the end due to data volume differences).

Image

Upon a BE completing its current batch, we write to the editlog to persist this checkpoint and immediately dispatch the next batch to that BE without waiting until all batches assigned to it are completed. Once all BEs complete all their batches, the FE determines the successful completion of the warm-up task.

This compact scheduling model eliminates the need for tracking lastBatch in FE, necessitating adjustments to how task progress information is maintained. This change is worthwhile as exposing internal batch concepts to users increases cognitive burden, and users cannot easily infer expected batches from data volumes due to influences like balancing and the splitBatch algorithm. Thus, displaying progress based on currently warmed data versus total data volume is more practical.

Exception Handling

If the FE restarts, it can recover the remaining warm-up tasks for each BE from persisted checkpoints in the editlog.

If a user cancels the warm-up task, the original logic should also handle it correctly: BEs complete their current batch and clean up their resources.

If a warm-up error occurs on a certain BE, it will automatically execute the CANCEL logic as described above.

References

1  fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java
2  fe/fe-core/src/main/java/org/apache/doris/analysis/CancelCloudWarmUpStmt.java
3  fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java

Contact me

don't hesitate to reach me if you need any help. We can schedule meetings on Microsoft Teams: [email protected]

Chinese Description:

背景知识

预热功能介绍

在存算分离架构中,我们引入了 file cache 作为数据本地的缓存来减少查询数据直接访问 S3 的次数。在冷启动时,用户可以通过下面的预热语句,将对应的数据预热到本地:

warm up cluster cluster_name1 with cluster cluster_name0

warm up cluster cluster_name1 with table customer

warm up cluster cluster_name1 with table customer partition p1

SHOW WARM UP JOB; // 获取 Job 信息
SHOW WARM UP JOB WHERE ID = 13418; // 指定 JobID

cancel warm up job where id = 13418;

具体参看:https://doris.apache.org/zh-CN/docs/3.0/sql-manual/sql-statements/cluster-management/storage-management/WARM-UP/

预热任务创建

stmt -> CloudWarmUpJob

CloudWarmUpJob.JobType jobType = stmt.isWarmUpWithTable() ? JobType.TABLE : JobType.CLUSTER;
CloudWarmUpJob warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstClusterName(), beToTabletIdBatches, jobType);
addCloudWarmUpJob(warmUpJob);

其中 beToTabletIdBatches:

fe 元数据 获取 table、partition、tablet 这些信息,然后从 snapshot 中Env.getCurrentEnv()).getCloudTabletRebalancer().getSnapshotTabletsByBeId 获取 beid,构造 be -> tablet 的映射

每个 be 所有的需要预热的数据切分成 10G 的 batch 去做。切分是为了做 checkpoint 避免失败重传。具体的切分算法:

	// simplified from:doris/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java    
    splitBatch(Map<Long/*BE_ID*/, List<Tablet>> beToWarmUpTablets) {
        Map<Long/*BE_ID*/, List<List<TabletID>> /*Batch list, each batch is a tablet list*/> beToTabletIdBatches = new HashMap<>();
        for each BE {
            List<List<TabletID>> batches = new ArrayList<>();
            List<TabletID> batch = new ArrayList<>();
            Long curBatchSize = 0L;
            for each tablet in this BE {
                if (curBatchSize + tablet.getDataSize(true) > 10G) {
                    // finish this batch and start new batch
                    batches.add(batch);
                    batch = new ArrayList<>();
                    curBatchSize = 0L;
                }
                // otherwise, add this tablet to existing batch
                batch.add(tablet.getId());
                curBatchSize += tablet.getDataSize(true);
            }
            beToTabletIdBatches.put(this_BE_id, batches);
        }
        return beToTabletIdBatches;
    }

如果 目标集群没有足够的容量? set Force (bug) 失败或者只会cache 容量允许的部分

所有 fe 的同时最高只有 10 个 warmup 任务,超过的可以加入但需要排队。

预热调度执行

CloudWarmUpJob 执行过程:

Image

  1. fe -> all be: SET_JOB
  2. fe -> all be: GET_CURRENT_JOB_STATE_AND_LEASE
  3. be -> fe: status & pending_job_size
  4. fe 轮询 GET_CURRENT_JOB_STATE_AND_LEASE 知道所有 be 上一轮 job 都完成了再进入下面的步骤
  5. fe batch ++, Edit log 持久化,下发下一轮 batch -> all be: SET_BATCH,重复上面的 步骤 2 - 4
  6. fe 直到所有 be 的所有 batch 清空, fe -> all be CLEAR_JOB
  7. fe Edit Log 记录完成状态,cluster 可以接受新 warmup job

BE 接受和处理 job 的过程 (RPC处理入口在CloudBackendService::warm_up_tablets ,大部分调用 CloudWarmupManager 来实现):

  • SET_JOB: 没什么太多内容,就是设置一下CloudWarmupManager 的 cur_job_id

  • SET_BATCH: 设置 CloudWarmupManage current job 的 current batch。这里可能出现的情况:

    • job id 为 0:之前 SET_JOB 可能没成功,再 set 一遍
    • job id 不对:不理会新 batch
    • job id 对但是上一个 batch 还没执行完:不理会新 batch
    • 重复 batch:不理会
    • job id 对 + 上一个 batch 做完了 + 新 batch id 情况(预期正确的情况),就会把这个 batch 任务加进 CloudWarmupManager 并有独立的线程通过 batch 中携带的 tablet 信息来把所有 segment 数据从 remote (S3) 下载到本地,完成对这批 tablet 的预热。
  • GET_CURRENT_JOB_STATE_AND_LEASE

  • CLEAR_JOB

所以这里多个 BE 是同步做 batch 的,即本轮 batch 都做完了, fe 才会往所有 be 发下一轮 batch。这样的设计和实现都很简单,但先做完的 be 会等待其余 be 做完本轮 batch,才会一起开始下一轮。

fe 和 多个 be 的互动模型:

Image

任务进度信息维护

主要任务进度信息维护在 CacheHotspotManager 中,更具体是在 CloudWarmUpJob 这个类中,并在 handleShowCloudWarmUpJob 逻辑中展示。其中 CloudWarmUpJob::getJobInfo 核心逻辑如下:

        info.add(Long.toString(lastBatchId + 1));  // 获得上一个完成的 batch
        long maxBatchSize = 0;
        for (List<List<Long>> list : beToTabletIdBatches.values()) {
            long size = list.size();
            if (size > maxBatchSize) {
                maxBatchSize = size;  // 获取所有 be 中 batch 数目最大的那个作为总 batch
            }
        }
        info.add(Long.toString(maxBatchSize));

这里获取所有 be 中 batch 数目最大的那个作为总 batch (fe 认知的 batch 数量) 是因为从每个 be 视角看 batch 数量可能不一样,所以取最大的那个。为什么会不一样? 原因有如下几点:

  • be 间 tablet 数量可能不均衡
  • tablet 数量均衡了数据量也可能不完全一样
  • 哪怕上述两者都完全一致,因为 tablet 个体大小的差异,上文中描述的 splitBatch 分离算法也会造成 batch 数量的不同,例如:
    • BE1 上三个 tablet 分别是 4G 6G 9G,那么按照每个 batch 不超过 10G 会划分为 (4G 6G) (9G) 2个 batch
    • BE2 上三个 tablet 分别是 4G 8G 6G,那么按照每个 batch 不超过 10 会划分为 (4G) (8G) (6G) 3 个 batch

问题引入

通过上面的梳理,这里有两个坑:

  1. “任务进度信息维护” 一节提到的 划分 batch 的数量 不稳定
  2. “调度执行” 一节提到的 先做完的 be 会等待其余 be 做完本轮 batch

这两个在单 tablet 数据量很小的时候 (很小指远小于 10G),不会有太多问题:划分基本是均匀的。但如果 tablet 数据量很大,负载相差很大,就容易出现下面这种调度。从图中可以看到 BE 有很多的等待,并没有很好地利用计算、存储、网络的资源,影响预热效率。

Image

修改方案

作为对比,我们希望的是更加紧凑的运行,这样即使 tablet 很大导致负载不均匀,也不会在预热过程中出现 BE 干等的情况 (任务最后可能会因为数据量的差异有长尾)。

Image

只要我们一个 BE 完成了当前 batch,我们就写入 editlog 持久化这个 checkpoint,然后不做等待直接下发该 be 下一个 batch 直到该 BE 的 batch 全部完成。等到所有 BE 的所有 batch 都完成后,fe 判定预热任务成功结束。

这样紧凑的调度模型,fe 没有了 lastBatch 这个记录,改导致之前任务进度信息维护也需要统一修改。这个代价是值得的,因为暴露内部 Batch 概念给用户会增加用户的认知负担,而且用户难以从数据量直接推算出预期 Batch,因为Batch的大小受上面提到的均衡、splitbatch算法等影响,不完全与数据量挂钩 (不能简单地使用 batch数量 x be 数量 x 10G 来计算 )。所以这里可以一鼓作气,直接用目前已预热的数据量和总数据量来进行进度展示。

异常处理

如果 FE 重启,能从 editlog 持久化的 checkpoint 恢复各个 BE 剩余预热任务的执行。

如果用户 CANCEL 预热任务,利用原来的逻辑应该也能正确处理: 各个 BE 会完成当前手头上的 batch,然后清理自身资源。

如果某个 BE 预热出错,将会自动执行 CANCEL 逻辑,逻辑同上。

参考:

1  fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java
2  fe/fe-core/src/main/java/org/apache/doris/analysis/CancelCloudWarmUpStmt.java
3  fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java

Contact me

wechat: 15811301868


Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@freemandealer freemandealer changed the title [Enhancement] FileCache 缓存预热性能优化 [Enhancement] FileCache warm up optimiztion Mar 12, 2025
@freemandealer freemandealer changed the title [Enhancement] FileCache warm up optimiztion [Enhancement] FileCache warm up optimization Mar 12, 2025
@Nitin-Kashyap
Copy link
Contributor

Please assign to me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants