Skip to content

Commit e76a0de

Browse files
stdpainHappenLee
authored andcommittedJul 1, 2021
add profile in agg node (apache#33)
1 parent c841b63 commit e76a0de

File tree

4 files changed

+34
-4
lines changed

4 files changed

+34
-4
lines changed
 

‎be/src/vec/exec/vaggregation_node.cpp

+14-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
4141
_output_tuple_desc(NULL),
4242
_needs_finalize(tnode.agg_node.need_finalize),
4343
_is_merge(false),
44-
_agg_data() {}
44+
_agg_data(),
45+
_build_timer(nullptr),
46+
_exec_timer(nullptr),
47+
_merge_timer(nullptr) {}
4548

4649
AggregationNode::~AggregationNode() = default;
4750

@@ -68,7 +71,9 @@ Status AggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
6871

6972
Status AggregationNode::prepare(RuntimeState* state) {
7073
RETURN_IF_ERROR(ExecNode::prepare(state));
71-
74+
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
75+
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
76+
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
7277
SCOPED_TIMER(_runtime_profile->total_time_counter());
7378
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
7479
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
@@ -87,6 +92,11 @@ Status AggregationNode::prepare(RuntimeState* state) {
8792
output_slot_desc, mem_tracker()));
8893
}
8994

95+
// set profile timer to evaluators
96+
for (auto& evaluator : _aggregate_evaluators) {
97+
evaluator->set_timer(_exec_timer, _merge_timer);
98+
}
99+
90100
_offsets_of_aggregate_states.resize(_aggregate_evaluators.size());
91101

92102
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
@@ -300,6 +310,8 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
300310

301311
Status AggregationNode::_execute_without_key(Block* block) {
302312
DCHECK(_agg_data.without_key != nullptr);
313+
LOG(WARNING) << "block rows:" << block->rows();
314+
SCOPED_TIMER(_build_timer);
303315
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
304316
_aggregate_evaluators[i]->execute_single_add(
305317
block, _agg_data.without_key + _offsets_of_aggregate_states[i]);

‎be/src/vec/exec/vaggregation_node.h

+4
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ class AggregationNode : public ::doris::ExecNode {
147147
// add tracker here
148148
Arena _agg_arena_pool;
149149

150+
RuntimeProfile::Counter* _build_timer;
151+
RuntimeProfile::Counter* _exec_timer;
152+
RuntimeProfile::Counter* _merge_timer;
153+
150154
private:
151155
Status _create_agg_status(AggregateDataPtr data);
152156
Status _destory_agg_status(AggregateDataPtr data);

‎be/src/vec/exprs/vectorized_agg_fn.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ AggFnEvaluator::AggFnEvaluator(const TExprNode& desc)
2929
: _fn(desc.fn),
3030
_return_type(TypeDescriptor::from_thrift(desc.fn.ret_type)),
3131
_intermediate_type(TypeDescriptor::from_thrift(desc.fn.aggregate_fn.intermediate_type)),
32-
_intermediate_slot_desc(NULL),
33-
_output_slot_desc(NULL) {}
32+
_intermediate_slot_desc(nullptr),
33+
_output_slot_desc(nullptr),
34+
_exec_timer(nullptr),
35+
_merge_timer(nullptr) {}
3436

3537
Status AggFnEvaluator::create(ObjectPool* pool, const TExpr& desc, AggFnEvaluator** result) {
3638
*result = pool->add(new AggFnEvaluator(desc.nodes[0]));
@@ -111,6 +113,7 @@ void AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Ar
111113
std::vector<const IColumn*> column_arguments(columns.size());
112114
std::transform(columns.cbegin(), columns.cend(), column_arguments.begin(),
113115
[](const auto& ptr) { return ptr.get(); });
116+
SCOPED_TIMER(_exec_timer);
114117
_function->addBatchSinglePlace(block->rows(), place, column_arguments.data(), nullptr);
115118
}
116119

@@ -124,11 +127,13 @@ void AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDat
124127
column_arguments[i] =
125128
block->getByPosition(column_id).column->convertToFullColumnIfConst().get();
126129
}
130+
SCOPED_TIMER(_exec_timer);
127131
_function->addBatch(block->rows(), places, offset, column_arguments.data(), arena);
128132
}
129133

130134
void AggFnEvaluator::execute_single_merge(AggregateDataPtr place, ConstAggregateDataPtr rhs,
131135
Arena* arena) {
136+
SCOPED_TIMER(_merge_timer);
132137
_function->merge(place, rhs, arena);
133138
}
134139

‎be/src/vec/exprs/vectorized_agg_fn.h

+9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#pragma once
1919
#include "runtime/types.h"
20+
#include "util/runtime_profile.h"
2021
#include "vec/aggregate_functions/aggregate_function.h"
2122
#include "vec/core/block.h"
2223
#include "vec/data_types/data_type.h"
@@ -35,6 +36,11 @@ class AggFnEvaluator {
3536
const SlotDescriptor* output_slot_desc,
3637
const std::shared_ptr<MemTracker>& mem_tracker);
3738

39+
void set_timer(RuntimeProfile::Counter* exec_timer, RuntimeProfile::Counter* merge_timer) {
40+
_exec_timer = exec_timer;
41+
_merge_timer = merge_timer;
42+
}
43+
3844
Status open(RuntimeState* state);
3945

4046
void close(RuntimeState* state);
@@ -71,6 +77,9 @@ class AggFnEvaluator {
7177
const SlotDescriptor* _intermediate_slot_desc;
7278
const SlotDescriptor* _output_slot_desc;
7379

80+
RuntimeProfile::Counter* _exec_timer;
81+
RuntimeProfile::Counter* _merge_timer;
82+
7483
// input context
7584
std::vector<VExprContext*> _input_exprs_ctxs;
7685

0 commit comments

Comments
 (0)