Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast grandpa votes/commit #1752

Merged
merged 4 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/consensus/grandpa/impl/grandpa_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ namespace kagome::consensus::grandpa {
if (msg.round_number > current_round_->roundNumber() + 1) {
reputation_repository_->change(peer_id,
network::reputation::cost::FUTURE_MESSAGE);
} else if (msg.round_number < current_round_->roundNumber() - 1) {
} else if (msg.round_number + 1 < current_round_->roundNumber()) {
reputation_repository_->change(peer_id,
network::reputation::cost::PAST_REJECTION);
}
Expand Down
104 changes: 68 additions & 36 deletions core/network/impl/protocols/grandpa_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace kagome::network {

GrandpaProtocol::GrandpaProtocol(
libp2p::Host &host,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<boost::asio::io_context> io_context,
const application::AppConfiguration &app_config,
std::shared_ptr<consensus::grandpa::GrandpaObserver> grandpa_observer,
Expand All @@ -35,6 +36,7 @@ namespace kagome::network {
host,
make_protocols(kGrandpaProtocol, genesis_hash, "paritytech"),
log::createLogger(kGrandpaProtocolName, "grandpa_protocol")),
hasher_{std::move(hasher)},
io_context_(std::move(io_context)),
app_config_(app_config),
grandpa_observer_(std::move(grandpa_observer)),
Expand Down Expand Up @@ -303,21 +305,25 @@ namespace kagome::network {
}

auto peer_id = stream->remotePeerId().value();
auto &message = grandpa_message_res.value();
auto hash = self->getHash(message);
visit_in_place(
std::move(grandpa_message_res.value()),
std::move(message),
[&](network::GrandpaVote &&vote_message) {
SL_VERBOSE(self->base_.logger(),
"VoteMessage has received from {}",
peer_id);
self->grandpa_observer_->onVoteMessage(
std::nullopt, peer_id, vote_message);
self->addKnown(peer_id, hash);
},
[&](FullCommitMessage &&commit_message) {
SL_VERBOSE(self->base_.logger(),
"CommitMessage has received from {}",
peer_id);
self->grandpa_observer_->onCommitMessage(
std::nullopt, peer_id, commit_message);
self->addKnown(peer_id, hash);
},
[&](GrandpaNeighborMessage &&neighbor_message) {
if (peer_id != self->own_info_.id) {
Expand Down Expand Up @@ -353,19 +359,11 @@ namespace kagome::network {
"Send vote message: grandpa round number {}",
vote_message.round_number);

auto filter = [&, &msg = vote_message](const PeerId &peer_id) {
auto info_opt = peer_manager_->getPeerState(peer_id);
if (not info_opt.has_value()) {
SL_DEBUG(base_.logger(),
"Vote signed by {} with set_id={} in round={} "
"has not been sent to {}: peer is not connected",
msg.id(),
msg.counter,
msg.round_number,
peer_id);
auto filter = [&, &msg = vote_message](const PeerId &peer_id,
const PeerState &info) {
if (info.roles.flags.light != 0) {
return false;
}
const auto &info = info_opt.value().get();

if (not info.set_id.has_value() or not info.round_number.has_value()) {
SL_DEBUG(base_.logger(),
Expand Down Expand Up @@ -393,9 +391,8 @@ namespace kagome::network {
return false;
}

// If a peer is at round r, is impolite to send messages about r-2 or
// earlier
if (msg.round_number + 2 < info.round_number.value()) {
// only r-1 ... r+1
if (msg.round_number + 1 < info.round_number.value()) {
SL_DEBUG(
base_.logger(),
"Vote signed by {} with set_id={} in round={} "
Expand All @@ -408,9 +405,7 @@ namespace kagome::network {
return false;
}

// If a peer is at round r, is extremely impolite to send messages about
// r+1 or later
if (msg.round_number > info.round_number.value()) {
if (msg.round_number > info.round_number.value() + 1) {
SL_DEBUG(base_.logger(),
"Vote signed by {} with set_id={} in round={} "
"has not been sent to {} as impolite: their round is old: {}",
Expand All @@ -430,9 +425,9 @@ namespace kagome::network {
(*shared_msg) = GrandpaMessage(std::move(vote_message));

if (not peer_id.has_value()) {
stream_engine_->broadcast<GrandpaMessage>(
shared_from_this(), std::move(shared_msg), filter);
broadcast(std::move(shared_msg), filter);
} else {
addKnown(*peer_id, getHash(*shared_msg));
stream_engine_->send(
peer_id.value(), shared_from_this(), std::move(shared_msg));
};
Expand Down Expand Up @@ -492,20 +487,8 @@ namespace kagome::network {
auto filter = [this,
set_id = msg.set_id,
round_number = msg.round,
finalizing =
msg.message.target_number](const PeerId &peer_id) {
auto info_opt = peer_manager_->getPeerState(peer_id);
if (not info_opt.has_value()) {
SL_DEBUG(base_.logger(),
"Commit with set_id={} in round={} "
"has not been sent to {}: peer is not connected",
set_id,
round_number,
peer_id);
return false;
}
const auto &info = info_opt.value().get();

finalizing = msg.message.target_number](
const PeerId &peer_id, const PeerState &info) {
if (not info.set_id.has_value() or not info.round_number.has_value()) {
SL_DEBUG(base_.logger(),
"Commit with set_id={} in round={} "
Expand Down Expand Up @@ -564,9 +547,9 @@ namespace kagome::network {
(*shared_msg) = GrandpaMessage(std::move(msg));

if (not peer_id.has_value()) {
stream_engine_->broadcast<GrandpaMessage>(
shared_from_this(), std::move(shared_msg), filter);
broadcast(std::move(shared_msg), filter);
} else {
addKnown(*peer_id, getHash(*shared_msg));
stream_engine_->send(
peer_id.value(), shared_from_this(), std::move(shared_msg));
}
Expand Down Expand Up @@ -730,4 +713,53 @@ namespace kagome::network {
stream_engine_->send(peer_id, shared_from_this(), std::move(shared_msg));
}

common::Hash256 GrandpaProtocol::getHash(
const GrandpaMessage &message) const {
return hasher_->twox_256(scale::encode(message).value());
}

bool GrandpaProtocol::addKnown(const PeerId &peer,
const common::Hash256 &hash) {
auto info = peer_manager_->getPeerState(peer);
return info and info->get().known_grandpa_messages.add(hash);
}

template <typename F>
void GrandpaProtocol::broadcast(std::shared_ptr<GrandpaMessage> message,
const F &predicate) {
constexpr size_t kAuthorities = 4;
constexpr size_t kAny = 4;
std::vector<PeerId> authorities, any;
peer_manager_->forEachPeer([&](const PeerId &peer) {
if (auto info_ref = peer_manager_->getPeerState(peer)) {
auto &info = info_ref->get();
if (not predicate(peer, info)) {
return;
}
(info.roles.flags.authority != 0 ? authorities : any)
.emplace_back(peer);
}
});
auto hash = getHash(*message);
size_t need = 0;
auto loop = [&](std::vector<PeerId> &peers) {
while (not peers.empty() and need != 0) {
auto &peer = peers.back();
if (addKnown(peer, hash)) {
stream_engine_->send(peer, shared_from_this(), message);
--need;
}
peers.pop_back();
}
};
std::shuffle(authorities.begin(), authorities.end(), random_);
need += kAuthorities;
loop(authorities);
any.insert(any.end(),
std::make_move_iterator(authorities.begin()),
std::make_move_iterator(authorities.end()));
std::shuffle(any.begin(), any.end(), random_);
need += kAny;
loop(any);
}
} // namespace kagome::network
11 changes: 11 additions & 0 deletions core/network/impl/protocols/grandpa_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "network/protocol_base.hpp"

#include <memory>
#include <random>

#include <libp2p/basic/scheduler.hpp>
#include <libp2p/connection/stream.hpp>
Expand Down Expand Up @@ -44,6 +45,7 @@ namespace kagome::network {

GrandpaProtocol(
libp2p::Host &host,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<boost::asio::io_context> io_context,
const application::AppConfiguration &app_config,
std::shared_ptr<consensus::grandpa::GrandpaObserver> grandpa_observer,
Expand Down Expand Up @@ -92,13 +94,20 @@ namespace kagome::network {
const int &msg,
std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb);

common::Hash256 getHash(const GrandpaMessage &message) const;
bool addKnown(const PeerId &peer, const common::Hash256 &hash);

template <typename F>
void broadcast(std::shared_ptr<GrandpaMessage> message, const F &predicate);

/// Node should send catch-up requests rarely to be polite, because
/// processing of them consume more enough resources.
/// How long replying outgoing catch-up requests must be suppressed
static constexpr std::chrono::milliseconds kRecentnessDuration =
std::chrono::seconds(300);

ProtocolBaseImpl base_;
std::shared_ptr<crypto::Hasher> hasher_;
std::shared_ptr<boost::asio::io_context> io_context_;
const application::AppConfiguration &app_config_;
std::shared_ptr<consensus::grandpa::GrandpaObserver> grandpa_observer_;
Expand All @@ -114,6 +123,8 @@ namespace kagome::network {
GrandpaNeighborMessage last_neighbor_{};

std::set<libp2p::peer::PeerId> recent_catchup_requests_by_peer_;

std::default_random_engine random_;
};

} // namespace kagome::network
Expand Down
4 changes: 4 additions & 0 deletions core/network/peer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

namespace kagome::network {
constexpr size_t kPeerStateMaxKnownBlocks = 1024;
constexpr size_t kPeerStateMaxKnownGrandpaMessages = 8192;

struct CollatorState {
network::ParachainId parachain_id;
Expand Down Expand Up @@ -53,6 +54,9 @@ namespace kagome::network {
std::optional<CollatorState> collator_state = std::nullopt;
std::optional<View> view;
LruSet<primitives::BlockHash> known_blocks{kPeerStateMaxKnownBlocks};
LruSet<common::Hash256> known_grandpa_messages{
kPeerStateMaxKnownGrandpaMessages,
};
};

struct StreamEngine;
Expand Down