Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](cloud) support BE http action: list_cache and clear #41037

Merged
merged 8 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

#include "file_cache_action.h"

#include <glog/logging.h>

#include <algorithm>
#include <memory>
#include <shared_mutex>
#include <sstream>
#include <string>
#include <string_view>
#include <vector>

#include "common/status.h"
#include "http/http_channel.h"
Expand All @@ -30,6 +35,7 @@
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "util/easy_json.h"
Expand All @@ -43,6 +49,7 @@ constexpr static std::string_view PATH = "path";
constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view HASH = "hash";
constexpr static std::string_view LIST_CACHE = "list_cache";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
Expand All @@ -66,7 +73,14 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
*json_metrics = json.ToString();
} else if (operation == CLEAR) {
const std::string& sync = req->param(SYNC.data());
auto ret = io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
const std::string& segment_path = req->param(VALUE.data());
if (segment_path.empty()) {
io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
} else {
io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path);
io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash);
cache->remove_if_cached(hash);
}
} else if (operation == RESET) {
std::string capacity = req->param(CAPACITY.data());
int64_t new_capacity = 0;
Expand Down Expand Up @@ -96,6 +110,23 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
json[HASH.data()] = ret.to_string();
*json_metrics = json.ToString();
}
} else if (operation == LIST_CACHE) {
const std::string& segment_path = req->param(VALUE.data());
if (segment_path.empty()) {
st = Status::InvalidArgument("missing parameter: {} is required", VALUE.data());
} else {
io::UInt128Wrapper cache_hash = io::BlockFileCache::hash(segment_path);
std::vector<std::string> cache_files =
io::FileCacheFactory::instance()->get_cache_file_by_path(cache_hash);
if (cache_files.empty()) {
*json_metrics = "[]";
} else {
EasyJson json;
std::for_each(cache_files.begin(), cache_files.end(),
[&json](auto& x) { json.PushBack(x); });
*json_metrics = json.ToString();
}
}
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
Expand Down
17 changes: 17 additions & 0 deletions be/src/io/cache/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "io/cache/block_file_cache_factory.h"

#include <glog/logging.h>

#include <string>
#include <vector>
#if defined(__APPLE__)
#include <sys/mount.h>
#else
Expand Down Expand Up @@ -118,6 +121,20 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
return Status::OK();
}

std::vector<std::string> FileCacheFactory::get_cache_file_by_path(const UInt128Wrapper& hash) {
io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash);
auto blocks = cache->get_blocks_by_key(hash);
std::vector<std::string> ret;
if (blocks.empty()) {
return ret;
} else {
for (auto& [_, fb] : blocks) {
ret.emplace_back(fb->get_cache_file());
}
}
return ret;
}

BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) {
// dont need lock mutex because _caches is immutable after create_file_cache
return _caches[KeyHash()(key) % _caches.size()].get();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class FileCacheFactory {

[[nodiscard]] size_t get_cache_instance_size() const { return _caches.size(); }

std::vector<std::string> get_cache_file_by_path(const UInt128Wrapper& hash);

BlockFileCache* get_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const std::string& cache_base_path);
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> get_query_context_holders(
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ std::string FileBlock::state_to_string(FileBlock::State state) {
}
}

std::string FileBlock::get_cache_file() const {
return _mgr->_storage->get_local_file(this->_key);
}

FileBlocksHolder::~FileBlocksHolder() {
for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) {
auto current_file_block_it = file_block_it;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class FileBlock {

uint64_t expiration_time() const { return _key.meta.expiration_time; }

std::string get_cache_file() const;

State state_unlock(std::lock_guard<std::mutex>&) const;

FileBlock& operator=(const FileBlock&) = delete;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class FileCacheStorage {
// force clear all current data in the cache
virtual Status clear(std::string& msg) = 0;
virtual FileCacheStorageType get_type() = 0;
// get local cached file
virtual std::string get_local_file(const FileCacheKey& key) = 0;
};

} // namespace doris::io
5 changes: 5 additions & 0 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,11 @@ Status FSFileCacheStorage::clear(std::string& msg) {
return Status::OK();
}

std::string FSFileCacheStorage::get_local_file(const FileCacheKey& key) {
return get_path_in_local_cache(get_path_in_local_cache(key.hash, key.meta.expiration_time),
key.offset, key.meta.type, false);
}

FSFileCacheStorage::~FSFileCacheStorage() {
if (_cache_background_load_thread.joinable()) {
_cache_background_load_thread.join();
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class FSFileCacheStorage : public FileCacheStorage {
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) override;
Status clear(std::string& msg) override;
std::string get_local_file(const FileCacheKey& key) override;

[[nodiscard]] static std::string get_path_in_local_cache(const std::string& dir, size_t offset,
FileCacheType type,
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/mem_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,8 @@ Status MemFileCacheStorage::clear(std::string& msg) {
return Status::OK();
}

std::string MemFileCacheStorage::get_local_file(const FileCacheKey& key) {
return "";
}

} // namespace doris::io
1 change: 1 addition & 0 deletions be/src/io/cache/mem_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MemFileCacheStorage : public FileCacheStorage {
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) override;
Status clear(std::string& msg) override;
std::string get_local_file(const FileCacheKey& key) override;

FileCacheStorageType get_type() override { return MEMORY; }

Expand Down
117 changes: 117 additions & 0 deletions regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_list_cache_file") {
sql """ use @regression_cluster_name1 """
String[][] backends = sql """ show backends """
String backendId;
def backendIdToBackendIP = [:]
def backendIdToBackendHttpPort = [:]
def backendIdToBackendBrpcPort = [:]
for (String[] backend in backends) {
if (backend[9].equals("true") && backend[19].contains("regression_cluster_name1")) {
backendIdToBackendIP.put(backend[0], backend[1])
backendIdToBackendHttpPort.put(backend[0], backend[4])
backendIdToBackendBrpcPort.put(backend[0], backend[5])
}
}
assertEquals(backendIdToBackendIP.size(), 1)

backendId = backendIdToBackendIP.keySet()[0]
def socket = backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendHttpPort.get(backendId)

sql "drop table IF EXISTS `user`"

sql """
CREATE TABLE IF NOT EXISTS `user` (
`id` int NULL,
`name` string NULL
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"file_cache_ttl_seconds" = "2884"
)
"""

sql "insert into user select number, cast(rand() as varchar(32)) from numbers(\"number\"=\"1000000\")"

def get_tablets = { String tbl_name ->
def res = sql "show tablets from ${tbl_name}"
List<Integer> tablets = new ArrayList<>()
for (final def line in res) {
tablets.add(Integer.valueOf(line[0].toString()))
}
return tablets
}

def get_rowsets = { int tablet_id ->
var ret = []
httpTest {
endpoint ""
uri socket + "/api/compaction/show?tablet_id=" + tablet_id
op "get"
check {respCode, body ->
assertEquals(respCode, 200)
var map = parseJson(body)
for (final def line in map.get("rowsets")) {
var tokens = line.toString().split(" ")
ret.add(tokens[4])
}
}
}
return ret
}

var tablets = get_tablets("user")
var rowsets = get_rowsets(tablets.get(0))
var segment_file = rowsets[rowsets.size() - 1] + "_0.dat"

httpTest {
endpoint ""
uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
op "get"
check {respCode, body ->
assertEquals(respCode, 200)
var arr = parseJson(body)
assertTrue(arr.size() > 0, "There shouldn't be no cache file at all, maybe you need to check disk capacity and modify file_cache_enter_disk_resource_limit_mode_percent in be.conf")
}
}

// clear single segment file cache
httpTest {
endpoint ""
uri socket + "/api/file_cache?op=clear&value=" + segment_file
op "get"
check {respCode, body ->
assertEquals(respCode, 200, "clear local cache fail, maybe you can find something in respond: " + parseJson(body))
}
}

httpTest {
endpoint ""
uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
op "get"
check {respCode, body ->
assertEquals(respCode, 200)
var arr = parseJson(body)
assertTrue(arr.size() == 0, "local cache files should not greater than 0, because it has already clear")
}
}
}
Loading