Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](scan) unify the local and remote scan bytes stats for all scanners for 2.1 #45167

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -34,6 +34,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"LOCAL_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
@@ -93,4 +95,4 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc
return Status::OK();
}

} // namespace doris
} // namespace doris
4 changes: 2 additions & 2 deletions be/src/io/cache/block/block_file_segment.cpp
Original file line number Diff line number Diff line change
@@ -179,7 +179,7 @@ std::string FileBlock::get_path_in_local_cache() const {
return _cache->get_path_in_local_cache(key(), offset(), _cache_type);
}

Status FileBlock::read_at(Slice buffer, size_t read_offset) {
Status FileBlock::read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx) {
Status st = Status::OK();
std::shared_ptr<FileReader> reader;
if (!(reader = _cache_reader.lock())) {
@@ -192,7 +192,7 @@ Status FileBlock::read_at(Slice buffer, size_t read_offset) {
}
}
size_t bytes_reads = buffer.size;
RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads));
RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads, io_ctx));
DCHECK(bytes_reads == buffer.size);
return st;
}
2 changes: 1 addition & 1 deletion be/src/io/cache/block/block_file_segment.h
Original file line number Diff line number Diff line change
@@ -110,7 +110,7 @@ class FileBlock {
Status append(Slice data);

// read data from cache file
Status read_at(Slice buffer, size_t read_offset);
Status read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx);

// finish write, release the file writer
Status finalize_write();
9 changes: 3 additions & 6 deletions be/src/io/cache/block/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
@@ -112,7 +112,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
RETURN_IF_ERROR(_remote_file_reader->read_at(offset, result, bytes_read, io_ctx));
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
if (io_ctx->file_cache_stats) {
stats.bytes_read += bytes_req;
_update_state(stats, io_ctx->file_cache_stats);
}
return Status::OK();
@@ -142,7 +141,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
break;
}
}
stats.bytes_read += bytes_req;
size_t empty_start = 0;
size_t empty_end = 0;
if (!empty_segments.empty()) {
@@ -224,8 +222,9 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
size_t file_offset = current_offset - left;
{
SCOPED_RAW_TIMER(&stats.local_read_timer);
RETURN_IF_ERROR(segment->read_at(
Slice(result.data + (current_offset - offset), read_size), file_offset));
RETURN_IF_ERROR(
segment->read_at(Slice(result.data + (current_offset - offset), read_size),
file_offset, io_ctx));
}
*bytes_read += read_size;
current_offset = right + 1;
@@ -280,10 +279,8 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
statis->remote_io_timer += read_stats.remote_read_timer;
statis->local_io_timer += read_stats.local_read_timer;
5 changes: 3 additions & 2 deletions be/src/io/cache/block/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
@@ -66,10 +66,11 @@ class CachedRemoteFileReader final : public FileReader {
IFileCache::Key _cache_key;
CloudFileCachePtr _cache;

// Used to record read/write timer and cache related metrics.
// These metrics will finally be saved in FileCacheStatistics.
struct ReadStatistics {
bool hit_cache = true;
bool skip_cache = false;
int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
@@ -82,4 +83,4 @@ class CachedRemoteFileReader final : public FileReader {
};

} // namespace io
} // namespace doris
} // namespace doris
5 changes: 4 additions & 1 deletion be/src/io/fs/broker_file_reader.cpp
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ Status BrokerFileReader::close() {
}

Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
size_t bytes_req = result.size;
char* to = result.data;
@@ -76,6 +76,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes

*bytes_read = data.size();
memcpy(to, data.data(), *bytes_read);
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}

2 changes: 0 additions & 2 deletions be/src/io/fs/broker_file_reader.h
Original file line number Diff line number Diff line change
@@ -34,8 +34,6 @@

namespace doris::io {

struct IOContext;

class BrokerFileReader : public FileReader {
public:
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd,
2 changes: 1 addition & 1 deletion be/src/io/fs/file_reader.h
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@

#include "common/status.h"
#include "io/fs/path.h"
#include "io/io_common.h"
#include "util/profile_collector.h"
#include "util/slice.h"

@@ -32,7 +33,6 @@ namespace doris {
namespace io {

class FileSystem;
struct IOContext;

enum class FileCachePolicy : uint8_t {
NO_CACHE,
10 changes: 8 additions & 2 deletions be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ Status HdfsFileReader::close() {

#ifdef USE_HADOOP_HDFS
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
@@ -121,14 +121,17 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
has_read += loop_read;
}
*bytes_read = has_read;
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}

#else
// The hedged read only support hdfsPread().
// TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead()
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
@@ -177,6 +180,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
has_read += loop_read;
}
*bytes_read = has_read;
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}
#endif
1 change: 0 additions & 1 deletion be/src/io/fs/hdfs_file_reader.h
Original file line number Diff line number Diff line change
@@ -34,7 +34,6 @@

namespace doris {
namespace io {
struct IOContext;

class HdfsFileReader : public FileReader {
public:
5 changes: 4 additions & 1 deletion be/src/io/fs/local_file_reader.cpp
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ Status LocalFileReader::close() {
}

Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
@@ -148,6 +148,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_
*bytes_read += res;
}
}
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_local += *bytes_read;
}
DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read);
return Status::OK();
}
6 changes: 4 additions & 2 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
@@ -42,7 +42,6 @@

namespace doris {
namespace io {
struct IOContext;
bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
@@ -86,7 +85,7 @@ Status S3FileReader::close() {
}

Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
@@ -154,6 +153,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping",
_path.native(), retry_count, total_sleep_time);
}
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}
return Status::InternalError("failed to read from s3, exceeded maximum retries");
1 change: 0 additions & 1 deletion be/src/io/fs/s3_file_reader.h
Original file line number Diff line number Diff line change
@@ -35,7 +35,6 @@ namespace doris {
class RuntimeProfile;

namespace io {
struct IOContext;

class S3FileReader final : public FileReader {
public:
42 changes: 42 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@

#include <gen_cpp/Types_types.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'gen_cpp/Types_types.h' file not found [clang-diagnostic-error]

#include <gen_cpp/Types_types.h>
         ^


#include <sstream>

namespace doris {

enum class ReaderType : uint8_t {
@@ -45,6 +47,38 @@ struct FileCacheStatistics {
int64_t write_cache_io_timer = 0;
int64_t bytes_write_into_cache = 0;
int64_t num_skip_cache_io_total = 0;

void update(const FileCacheStatistics& other) {
num_local_io_total += other.num_local_io_total;
num_remote_io_total += other.num_remote_io_total;
local_io_timer += other.local_io_timer;
bytes_read_from_local += other.bytes_read_from_local;
bytes_read_from_remote += other.bytes_read_from_remote;
remote_io_timer += other.remote_io_timer;
write_cache_io_timer += other.write_cache_io_timer;
write_cache_io_timer += other.write_cache_io_timer;
bytes_write_into_cache += other.bytes_write_into_cache;
num_skip_cache_io_total += other.num_skip_cache_io_total;
}

void reset() {
num_local_io_total = 0;
num_remote_io_total = 0;
local_io_timer = 0;
bytes_read_from_local = 0;
bytes_read_from_remote = 0;
remote_io_timer = 0;
write_cache_io_timer = 0;
bytes_write_into_cache = 0;
num_skip_cache_io_total = 0;
}

std::string debug_string() const {
std::stringstream ss;
ss << "bytes_read_from_local: " << bytes_read_from_local
<< ", bytes_read_from_remote: " << bytes_read_from_remote;
return ss.str();
}
};

struct IOContext {
@@ -60,6 +94,14 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref

std::string debug_string() const {
if (file_cache_stats != nullptr) {
return file_cache_stats->debug_string();
} else {
return "no file cache stats";
}
}
};

} // namespace io
8 changes: 8 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
@@ -32,6 +32,8 @@ void QueryStatistics::merge(const QueryStatistics& other) {
cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed);
shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed);
_scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
_scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;

int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed);
if (other_peak_mem > this->max_peak_memory_bytes) {
@@ -51,6 +53,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
statistics->set_returned_rows(returned_rows);
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}

void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
@@ -64,12 +68,16 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
current_used_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}

void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
_scan_bytes_from_local_storage = statistics.scan_bytes_from_local_storage();
_scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage();
}

void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
13 changes: 13 additions & 0 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@ class QueryStatistics {
: scan_rows(0),
scan_bytes(0),
cpu_nanos(0),
_scan_bytes_from_local_storage(0),
_scan_bytes_from_remote_storage(0),
returned_rows(0),
max_peak_memory_bytes(0),
current_used_memory_bytes(0),
@@ -65,6 +67,13 @@ class QueryStatistics {
this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
}

void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) {
_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
}
void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) {
_scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
}

void add_shuffle_send_bytes(int64_t delta_bytes) {
this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed);
}
@@ -95,6 +104,8 @@ class QueryStatistics {
cpu_nanos.store(0, std::memory_order_relaxed);
shuffle_send_bytes.store(0, std::memory_order_relaxed);
shuffle_send_rows.store(0, std::memory_order_relaxed);
_scan_bytes_from_local_storage.store(0, std::memory_order_relaxed);
_scan_bytes_from_remote_storage.store(0, std::memory_order_relaxed);

returned_rows = 0;
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
@@ -120,6 +131,8 @@ class QueryStatistics {
std::atomic<int64_t> scan_rows;
std::atomic<int64_t> scan_bytes;
std::atomic<int64_t> cpu_nanos;
std::atomic<int64_t> _scan_bytes_from_local_storage;
std::atomic<int64_t> _scan_bytes_from_remote_storage;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
37 changes: 25 additions & 12 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
@@ -225,28 +225,41 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;

// block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns
// before 2.1.7, there are 12 columns in "backend_active_tasks" table.
// after 2.1.8, 2 new columns added.
// check this to make it compatible with version before 2.1.7
bool need_local_and_remote_bytes = (block->columns() > 12);
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
int col_idx = 0;
TQueryStatistics tqs;
qs_ctx_ptr->collect_query_statistics(&tqs);
SchemaScannerHelper::insert_int64_value(0, be_id, block);
SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block);
SchemaScannerHelper::insert_string_value(2, query_id, block);
SchemaScannerHelper::insert_int64_value(col_idx++, be_id, block);
SchemaScannerHelper::insert_string_value(col_idx++, qs_ctx_ptr->_fe_addr.hostname, block);
SchemaScannerHelper::insert_string_value(col_idx++, query_id, block);

int64_t task_time = qs_ctx_ptr->_is_query_finished
? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time
: MonotonicMillis() - qs_ctx_ptr->_query_start_time;
SchemaScannerHelper::insert_int64_value(3, task_time, block);
SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block);
SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block);
SchemaScannerHelper::insert_int64_value(6, tqs.scan_bytes, block);
SchemaScannerHelper::insert_int64_value(7, tqs.max_peak_memory_bytes, block);
SchemaScannerHelper::insert_int64_value(8, tqs.current_used_memory_bytes, block);
SchemaScannerHelper::insert_int64_value(9, tqs.shuffle_send_bytes, block);
SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows, block);
SchemaScannerHelper::insert_int64_value(col_idx++, task_time, block);
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.cpu_ms, block);
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_rows, block);
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes, block);

if (need_local_and_remote_bytes) {
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes_from_local_storage,
block);
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes_from_remote_storage,
block);
}

SchemaScannerHelper::insert_int64_value(col_idx++, tqs.max_peak_memory_bytes, block);
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.current_used_memory_bytes, block);
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.shuffle_send_bytes, block);
SchemaScannerHelper::insert_int64_value(col_idx++, tqs.shuffle_send_rows, block);

std::stringstream ss;
ss << qs_ctx_ptr->_query_type;
SchemaScannerHelper::insert_string_value(11, ss.str(), block);
SchemaScannerHelper::insert_string_value(col_idx++, ss.str(), block);
}
}

1 change: 0 additions & 1 deletion be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
@@ -667,7 +667,6 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
io::FileReaderSPtr& get_inner_reader() { return _inner_reader; }

protected:
void _collect_profile_at_runtime() override {};
void _collect_profile_before_close() override;

private:
16 changes: 15 additions & 1 deletion be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
@@ -656,7 +656,6 @@ void NewOlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(Parent->_total_segment_counter, stats.total_segment_number);

// Update counters for NewOlapScanner
// Update counters from tablet reader's stats
auto& stats = _tablet_reader->stats();

if (_parent) {
@@ -678,4 +677,19 @@ void NewOlapScanner::_collect_profile_before_close() {
tablet->query_scan_count->increment(1);
}

void NewOlapScanner::_update_bytes_and_rows_read() {
VScanner::_update_bytes_and_rows_read();
if (_query_statistics) {
auto& stats = _tablet_reader->stats();
int64_t delta_local = stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local;
int64_t delta_remote =
stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote;
_query_statistics->add_scan_bytes_from_local_storage(delta_local);
_query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
_query_statistics->add_scan_bytes(delta_local + delta_remote);
_bytes_read_from_local = stats.file_cache_stats.bytes_read_from_local;
_bytes_read_from_remote = stats.file_cache_stats.bytes_read_from_remote;
}
}

} // namespace doris::vectorized
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/new_olap_scanner.h
Original file line number Diff line number Diff line change
@@ -80,6 +80,7 @@ class NewOlapScanner : public VScanner {
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
void _collect_profile_before_close() override;
void _update_bytes_and_rows_read() override;

private:
void _update_realtime_counters();
15 changes: 15 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
@@ -1236,4 +1236,19 @@ void VFileScanner::_collect_profile_before_close() {
}
}

void VFileScanner::_update_bytes_and_rows_read() {
VScanner::_update_bytes_and_rows_read();
if (_query_statistics && _io_ctx.get() && _io_ctx->file_cache_stats) {
int64_t delta_local =
_io_ctx->file_cache_stats->bytes_read_from_local - _bytes_read_from_local;
int64_t delta_remote =
_io_ctx->file_cache_stats->bytes_read_from_remote - _bytes_read_from_remote;
_query_statistics->add_scan_bytes_from_local_storage(delta_local);
_query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
_query_statistics->add_scan_bytes(delta_local + delta_remote);
_bytes_read_from_local = _io_ctx->file_cache_stats->bytes_read_from_local;
_bytes_read_from_remote = _io_ctx->file_cache_stats->bytes_read_from_remote;
}
}

} // namespace doris::vectorized
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
@@ -97,6 +97,8 @@ class VFileScanner : public VScanner {

void _collect_profile_before_close() override;

void _update_bytes_and_rows_read() override;

protected:
const TFileScanRangeParams* _params = nullptr;
std::shared_ptr<vectorized::SplitSourceConnector> _split_source;
17 changes: 9 additions & 8 deletions be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
@@ -118,8 +118,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
}
}

int64_t old_scan_rows = _num_rows_read;
int64_t old_scan_bytes = _num_byte_read;
_prev_num_rows_read = _num_rows_read;
{
do {
// if step 2 filter all rows of block, and block will be reused to get next rows,
@@ -138,7 +137,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
break;
}
_num_rows_read += block->rows();
_num_byte_read += block->allocated_bytes();
}

// 2. Filter the output block finally.
@@ -153,10 +151,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
_num_rows_read < rows_read_threshold);
}

if (_query_statistics) {
_query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
_query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
}
_update_bytes_and_rows_read();

if (state->is_cancelled()) {
// TODO: Should return the specific ErrorStatus instead of just Cancelled.
@@ -281,7 +276,6 @@ void VScanner::_collect_profile_before_close() {
if (_parent) {
COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read);
} else {
COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
@@ -301,4 +295,11 @@ void VScanner::update_scan_cpu_timer() {
}
}

void VScanner::_update_bytes_and_rows_read() {
if (_query_statistics) {
_query_statistics->add_scan_rows(_num_rows_read - _prev_num_rows_read);
_prev_num_rows_read = _num_rows_read;
}
}

} // namespace doris::vectorized
11 changes: 10 additions & 1 deletion be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
@@ -135,6 +135,10 @@ class VScanner {

void update_scan_cpu_timer();

// update the bytes and rows read at each round in query statistics.
// so that we can get runtime statistics for each query.
virtual void _update_bytes_and_rows_read();

RuntimeState* runtime_state() { return _state; }

bool is_open() { return _is_open; }
@@ -214,7 +218,12 @@ class VScanner {
// num of rows read from scanner
int64_t _num_rows_read = 0;

int64_t _num_byte_read = 0;
// save the current _num_rows_read before next round,
// so that we can get delta rows between each round.
int64_t _prev_num_rows_read = 0;
// bytes read from local and remote fs
int64_t _bytes_read_from_local = 0;
int64_t _bytes_read_from_remote = 0;

// num of rows return from scanner, after filter block
int64_t _num_rows_return = 0;
14 changes: 7 additions & 7 deletions be/test/io/cache/file_block_cache_test.cpp
Original file line number Diff line number Diff line change
@@ -852,7 +852,7 @@ TEST(LRUFileCache, fd_cache_remove) {
assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0));
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0, nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
}
{
@@ -864,7 +864,7 @@ TEST(LRUFileCache, fd_cache_remove) {
assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0));
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0, nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9)));
}
{
@@ -877,7 +877,7 @@ TEST(LRUFileCache, fd_cache_remove) {
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0));
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0, nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10)));
}
{
@@ -890,7 +890,7 @@ TEST(LRUFileCache, fd_cache_remove) {
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(10);
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 10), 0));
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 10), 0, nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 15)));
}
EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
@@ -933,7 +933,7 @@ TEST(LRUFileCache, fd_cache_evict) {
assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0));
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0, nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
}
{
@@ -945,7 +945,7 @@ TEST(LRUFileCache, fd_cache_evict) {
assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0));
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0, nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9)));
}
{
@@ -958,7 +958,7 @@ TEST(LRUFileCache, fd_cache_evict) {
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0));
static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0, nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10)));
}
EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
Original file line number Diff line number Diff line change
@@ -82,6 +82,12 @@ public class InternalSchema {
AUDIT_SCHEMA.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA
.add(new ColumnDef("scan_bytes_from_local_storage", TypeDef.create(PrimitiveType.BIGINT),
true));
AUDIT_SCHEMA
.add(new ColumnDef("scan_bytes_from_remote_storage", TypeDef.create(PrimitiveType.BIGINT),
true));
AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true));
AUDIT_SCHEMA.add(new ColumnDef("is_nereids", TypeDef.create(PrimitiveType.TINYINT), true));
Original file line number Diff line number Diff line change
@@ -455,6 +455,8 @@ public class SchemaTable extends Table {
.column("TASK_CPU_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT))
.column("SCAN_ROWS", ScalarType.createType(PrimitiveType.BIGINT))
.column("SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("LOCAL_SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("BE_PEAK_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("CURRENT_USED_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
Original file line number Diff line number Diff line change
@@ -105,6 +105,10 @@ public enum EventType {
// note: newly added fields should be always before fuzzyVariables
@AuditField(value = "FuzzyVariables")
public String fuzzyVariables = "";
@AuditField(value = "scanBytesFromLocalStorage")
public long scanBytesFromLocalStorage = -1;
@AuditField(value = "scanBytesFromRemoteStorage")
public long scanBytesFromRemoteStorage = -1;

public long pushToAuditLogQueueTime;

@@ -249,6 +253,16 @@ public AuditEventBuilder setCommandType(String commandType) {
return this;
}

public AuditEventBuilder setScanBytesFromLocalStorage(long scanBytesFromLocalStorage) {
auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage;
return this;
}

public AuditEventBuilder setScanBytesFromRemoteStorage(long scanBytesFromRemoteStorage) {
auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage;
return this;
}

public AuditEvent build() {
return this.auditEvent;
}
Original file line number Diff line number Diff line change
@@ -151,6 +151,10 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
logBuffer.append(event.queryTime).append("\t");
logBuffer.append(event.scanBytes).append("\t");
logBuffer.append(event.scanRows).append("\t");
logBuffer.append(event.shuffleSendBytes).append("\t");
logBuffer.append(event.shuffleSendRows).append("\t");
logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
logBuffer.append(event.returnRows).append("\t");
logBuffer.append(event.shuffleSendRows).append("\t");
logBuffer.append(event.shuffleSendBytes).append("\t");
Original file line number Diff line number Diff line change
@@ -226,7 +226,11 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
auditEventBuilder.setSqlDigest(sqlDigest);
}
}
auditEventBuilder.setIsQuery(true);
auditEventBuilder.setIsQuery(true)
.setScanBytesFromLocalStorage(
statistics == null ? 0 : statistics.getScanBytesFromLocalStorage())
.setScanBytesFromRemoteStorage(
statistics == null ? 0 : statistics.getScanBytesFromRemoteStorage());
} else {
auditEventBuilder.setIsQuery(false);
}
Original file line number Diff line number Diff line change
@@ -84,6 +84,8 @@ protected void runAfterCatalogReady() {
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage;
auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage;
}
boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
if (!ret) {
@@ -222,6 +224,8 @@ private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) {
if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
}
dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
dst.scan_bytes_from_remote_storage += src.scan_bytes_from_remote_storage;
}

private void queryAuditEventLogWriteLock() {
@@ -232,3 +236,4 @@ private void queryAuditEventLogWriteUnlock() {
queryAuditEventLock.writeLock().unlock();
}
}

2 changes: 2 additions & 0 deletions gensrc/proto/data.proto
Original file line number Diff line number Diff line change
@@ -35,6 +35,8 @@ message PQueryStatistics {
optional int64 cpu_ms = 4;
optional int64 max_peak_memory_bytes = 5;
repeated PNodeStatistics nodes_statistics = 6;
optional int64 scan_bytes_from_local_storage = 7;
optional int64 scan_bytes_from_remote_storage = 8;
}

message PRowBatch {
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
@@ -412,6 +412,8 @@ struct TQueryStatistics {
7: optional i64 workload_group_id
8: optional i64 shuffle_send_bytes
9: optional i64 shuffle_send_rows
10: optional i64 scan_bytes_from_local_storage
11: optional i64 scan_bytes_from_remote_storage
}

struct TReportWorkloadRuntimeStatusParams {