Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

request response timeout and metric #2373

Merged
merged 10 commits into from
Feb 13, 2025
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmake/Hunter/config.cmake
Original file line number Diff line number Diff line change
@@ -92,6 +92,12 @@ hunter_config(
SHA1 4207446a0e45764b814805821aa6860924b03cb7
)

hunter_config(
libp2p
URL https://github.com/libp2p/cpp-libp2p/archive/de20f4006eb58b12b3ac5ee5f5272376c77d46fc.zip
SHA1 7a3c13f87ed400a3215e4e8c4652ee7071e7292d
)

hunter_config(
libsecp256k1
VERSION 0.5.1
2 changes: 1 addition & 1 deletion core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
@@ -198,8 +198,8 @@ namespace kagome::authority_discovery {
if (has(authorities, it->second)) {
++it;
} else {
it = peer_to_auth_cache_.erase(it);
validation_protocol_.get()->reserve(it->first, false);
it = peer_to_auth_cache_.erase(it);
}
}
std::shuffle(authorities.begin(), authorities.end(), random_);
16 changes: 10 additions & 6 deletions core/network/impl/protocols/beefy_justification_protocol.cpp
Original file line number Diff line number Diff line change
@@ -15,22 +15,26 @@
#include "network/peer_manager.hpp"

namespace kagome::network {
constexpr std::chrono::seconds kRequestTimeout{3};

BeefyJustificationProtocol::BeefyJustificationProtocol(
libp2p::Host &host,
const RequestResponseInject &inject,
const blockchain::GenesisBlockHash &genesis,
common::MainThreadPool &main_thread_pool,
std::shared_ptr<PeerManager> peer_manager,
std::shared_ptr<Beefy> beefy)
: RequestResponseProtocolImpl{kName,
host,
inject,
make_protocols(kBeefyJustificationProtocol,
genesis),
log::createLogger(kName),
main_thread_pool},
main_pool_handler_{main_thread_pool.handlerStarted()},
kRequestTimeout},
main_pool_handler_{inject.main_thread_pool->handlerStarted()},
peer_manager_{std::move(peer_manager)},
beefy_{std::move(beefy)} {}
beefy_{std::move(beefy)} {
BOOST_ASSERT(main_pool_handler_ != nullptr);
BOOST_ASSERT(peer_manager_ != nullptr);
BOOST_ASSERT(beefy_ != nullptr);
}

std::optional<outcome::result<BeefyJustificationProtocol::ResponseType>>
BeefyJustificationProtocol::onRxRequest(RequestType block,
3 changes: 1 addition & 2 deletions core/network/impl/protocols/beefy_justification_protocol.hpp
Original file line number Diff line number Diff line change
@@ -39,9 +39,8 @@ namespace kagome::network {
static constexpr auto kName = "BeefyJustificationProtocol";

public:
BeefyJustificationProtocol(libp2p::Host &host,
BeefyJustificationProtocol(const RequestResponseInject &inject,
const blockchain::GenesisBlockHash &genesis,
common::MainThreadPool &main_thread_pool,
std::shared_ptr<PeerManager> peer_manager,
std::shared_ptr<Beefy> beefy);

11 changes: 6 additions & 5 deletions core/network/impl/protocols/fetch_attested_candidate.hpp
Original file line number Diff line number Diff line change
@@ -29,28 +29,29 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
static constexpr std::chrono::milliseconds kRequestTimeout{2500};

public:
FetchAttestedCandidateProtocol(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<
parachain::statement_distribution::StatementDistribution>
statement_distribution,
common::MainThreadPool &main_thread_pool)
statement_distribution)
: RequestResponseProtocolImpl<
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
ScaleMessageReadWriter>{kFetchAttestedCandidateProtocolName,
host,
std::move(inject),
make_protocols(
kFetchAttestedCandidateProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol"),
main_thread_pool},
kRequestTimeout},
statement_distribution_(std::move(statement_distribution)) {
BOOST_ASSERT(statement_distribution_);
}
18 changes: 12 additions & 6 deletions core/network/impl/protocols/light.cpp
Original file line number Diff line number Diff line change
@@ -14,25 +14,31 @@
#include "storage/trie/on_read.hpp"

namespace kagome::network {
constexpr std::chrono::seconds kRequestTimeout{15};

LightProtocol::LightProtocol(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis,
std::shared_ptr<blockchain::BlockHeaderRepository> repository,
std::shared_ptr<storage::trie::TrieStorage> storage,
std::shared_ptr<runtime::ModuleRepository> module_repo,
std::shared_ptr<runtime::Executor> executor,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<runtime::Executor> executor)
: RequestResponseProtocolImpl{kName,
host,
std::move(inject),
make_protocols(
kLightProtocol, genesis, chain_spec),
log::createLogger(kName),
main_thread_pool},
kRequestTimeout},
repository_{std::move(repository)},
storage_{std::move(storage)},
module_repo_{std::move(module_repo)},
executor_{std::move(executor)} {}
executor_{std::move(executor)} {
BOOST_ASSERT(repository_ != nullptr);
BOOST_ASSERT(storage_ != nullptr);
BOOST_ASSERT(module_repo_ != nullptr);
BOOST_ASSERT(executor_ != nullptr);
}

std::optional<outcome::result<LightProtocol::ResponseType>>
LightProtocol::onRxRequest(RequestType req, std::shared_ptr<Stream>) {
5 changes: 2 additions & 3 deletions core/network/impl/protocols/light.hpp
Original file line number Diff line number Diff line change
@@ -43,14 +43,13 @@ namespace kagome::network {
static constexpr auto kName = "LightProtocol";

public:
LightProtocol(libp2p::Host &host,
LightProtocol(RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis,
std::shared_ptr<blockchain::BlockHeaderRepository> repository,
std::shared_ptr<storage::trie::TrieStorage> storage,
std::shared_ptr<runtime::ModuleRepository> module_repo,
std::shared_ptr<runtime::Executor> executor,
common::MainThreadPool &main_thread_pool);
std::shared_ptr<runtime::Executor> executor);

std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType req, std::shared_ptr<Stream>) override;
20 changes: 10 additions & 10 deletions core/network/impl/protocols/protocol_fetch_available_data.hpp
Original file line number Diff line number Diff line change
@@ -28,25 +28,25 @@ namespace kagome::network {
ScaleMessageReadWriter> {
public:
static constexpr const char *kName = "FetchAvailableDataProtocol";
static constexpr std::chrono::seconds kRequestTimeout{2};

FetchAvailableDataProtocolImpl(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::AvailabilityStore> av_store,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<parachain::AvailabilityStore> av_store)
: RequestResponseProtocolImpl<
FetchAvailableDataRequest,
FetchAvailableDataResponse,
ScaleMessageReadWriter>{kName,
host,
std::move(inject),
make_protocols(
kFetchAvailableDataProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_available_data_protocol"),
main_thread_pool},
kRequestTimeout},
av_store_{std::move(av_store)} {}

private:
@@ -73,24 +73,24 @@ namespace kagome::network {
ScaleMessageReadWriter> {
public:
static constexpr const char *kName = "FetchStatementProtocol";
static constexpr std::chrono::seconds kRequestTimeout{1};

StatementFetchingProtocol(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::BackingStore> backing_store,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<parachain::BackingStore> backing_store)
: RequestResponseProtocolImpl<
FetchStatementRequest,
FetchStatementResponse,
ScaleMessageReadWriter>{kName,
host,
std::move(inject),
make_protocols(kFetchStatementProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_statement_protocol"),
main_thread_pool},
kRequestTimeout},
backing_store_{std::move(backing_store)} {}

private:
11 changes: 6 additions & 5 deletions core/network/impl/protocols/protocol_fetch_chunk.hpp
Original file line number Diff line number Diff line change
@@ -37,25 +37,26 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
static constexpr std::chrono::seconds kRequestTimeout{1};

public:
FetchChunkProtocolImpl(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp,
std::shared_ptr<PeerManager> pm,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<PeerManager> pm)
: RequestResponseProtocolImpl<
FetchChunkRequest,
FetchChunkResponse,
ScaleMessageReadWriter>{kFetchChunkProtocolName,
host,
std::move(inject),
make_protocols(kFetchChunkProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kFetchChunkProtocolName,
"req_chunk_protocol"),
main_thread_pool},
kRequestTimeout},
pp_{std::move(pp)},
pm_{std::move(pm)} {
BOOST_ASSERT(pp_);
9 changes: 6 additions & 3 deletions core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
#include "blockchain/genesis_block_hash.hpp"
#include "log/logger.hpp"
#include "network/common.hpp"
#include "network/helpers/scale_message_read_writer.hpp"
#include "network/impl/protocols/request_response_protocol.hpp"
#include "parachain/validator/parachain_processor.hpp"
#include "utils/non_copyable.hpp"
@@ -37,9 +38,11 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
static constexpr std::chrono::seconds kRequestTimeout{1};

public:
FetchChunkProtocolObsoleteImpl(
libp2p::Host &host,
const RequestResponseInject &inject,
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp,
@@ -48,14 +51,14 @@ namespace kagome::network {
FetchChunkRequest,
FetchChunkResponseObsolete,
ScaleMessageReadWriter>{kFetchChunkProtocolName,
host,
inject,
make_protocols(
kFetchChunkProtocolObsolete,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kFetchChunkProtocolName,
"req_chunk_protocol"),
main_thread_pool},
kRequestTimeout},
pp_{std::move(pp)} {
BOOST_ASSERT(pp_);
}
28 changes: 11 additions & 17 deletions core/network/impl/protocols/protocol_req_collation.cpp
Original file line number Diff line number Diff line change
@@ -25,18 +25,19 @@ namespace kagome::network {
std::decay_t<ResponseT>,
ScaleMessageReadWriter>;

ReqCollationProtocolImpl(libp2p::Host &host,
static constexpr std::chrono::seconds kRequestTimeout{2};

ReqCollationProtocolImpl(RequestResponseInject inject,
const libp2p::peer::ProtocolName &protoname,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<ReqCollationObserver> observer)
: Base{kReqCollationProtocolName,
host,
std::move(inject),
make_protocols(protoname, genesis_hash, kProtocolPrefixPolkadot),
log::createLogger(kReqCollationProtocolName,
"req_collation_protocol"),
main_thread_pool},
kRequestTimeout},
observer_{std::move(observer)} {}

protected:
@@ -59,29 +60,22 @@ namespace kagome::network {
};

ReqCollationProtocol::ReqCollationProtocol(
libp2p::Host &host,
const RequestResponseInject &inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<ReqCollationObserver> observer)
: v1_impl_{std::make_shared<
ReqCollationProtocolImpl<CollationFetchingRequest,
CollationFetchingResponse>>(
host,
kReqCollationProtocol,
chain_spec,
genesis_hash,
observer,
main_thread_pool)},
inject, kReqCollationProtocol, chain_spec, genesis_hash, observer)},
vstaging_impl_{std::make_shared<
ReqCollationProtocolImpl<vstaging::CollationFetchingRequest,
vstaging::CollationFetchingResponse>>(
host,
inject,
kReqCollationVStagingProtocol,
chain_spec,
genesis_hash,
observer,
main_thread_pool)} {
observer)} {
BOOST_ASSERT(v1_impl_);
BOOST_ASSERT(vstaging_impl_);
}
6 changes: 3 additions & 3 deletions core/network/impl/protocols/protocol_req_collation.hpp
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
#include "application/chain_spec.hpp"
#include "common/main_thread_pool.hpp"
#include "log/logger.hpp"
#include "network/impl/protocols/request_response_protocol.hpp"
#include "network/peer_manager.hpp"
#include "network/protocols/req_collation_protocol.hpp"
#include "network/types/collator_messages_vstaging.hpp"
@@ -39,11 +40,10 @@ namespace kagome::network {
ReqCollationProtocol() = delete;
~ReqCollationProtocol() override = default;

ReqCollationProtocol(libp2p::Host &host,
ReqCollationProtocol(const RequestResponseInject &inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool);
std::shared_ptr<ReqCollationObserver> observer);

const Protocol &protocolName() const override;

24 changes: 11 additions & 13 deletions core/network/impl/protocols/protocol_req_pov.cpp
Original file line number Diff line number Diff line change
@@ -20,22 +20,23 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
ReqPovProtocolImpl(libp2p::Host &host,
static constexpr std::chrono::seconds kRequestTimeout{2};

ReqPovProtocolImpl(RequestResponseInject &&inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<ReqPovObserver> observer)
: RequestResponseProtocolImpl<
RequestPov,
ResponsePov,
ScaleMessageReadWriter>{kReqPovProtocolName,
host,
std::move(inject),
make_protocols(kReqPovProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kReqPovProtocolName,
"req_pov_protocol"),
main_thread_pool},
kRequestTimeout},
observer_{std::move(observer)} {}

protected:
@@ -70,16 +71,13 @@ namespace kagome::network {
};

ReqPovProtocol::ReqPovProtocol(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer,
common::MainThreadPool &main_thread_pool)
: impl_{std::make_shared<ReqPovProtocolImpl>(host,
chain_spec,
genesis_hash,
std::move(observer),
main_thread_pool)} {}
std::shared_ptr<ReqPovObserver> observer)
: impl_{std::make_shared<ReqPovProtocolImpl>(
std::move(inject), chain_spec, genesis_hash, std::move(observer))} {
}

const Protocol &ReqPovProtocol::protocolName() const {
BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!");
6 changes: 3 additions & 3 deletions core/network/impl/protocols/protocol_req_pov.hpp
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
#include "application/chain_spec.hpp"
#include "common/main_thread_pool.hpp"
#include "log/logger.hpp"
#include "network/impl/protocols/request_response_protocol.hpp"
#include "network/peer_manager.hpp"
#include "network/protocols/req_pov_protocol.hpp"
#include "utils/non_copyable.hpp"
@@ -34,11 +35,10 @@ namespace kagome::network {
ReqPovProtocol() = delete;
~ReqPovProtocol() override = default;

ReqPovProtocol(libp2p::Host &host,
ReqPovProtocol(RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer,
common::MainThreadPool &main_thread_pool);
std::shared_ptr<ReqPovObserver> observer);

const Protocol &protocolName() const override;

151 changes: 122 additions & 29 deletions core/network/impl/protocols/request_response_protocol.hpp
Original file line number Diff line number Diff line change
@@ -6,11 +6,16 @@

#pragma once

#include "network/impl/protocols/protocol_base_impl.hpp"
#include <libp2p/basic/scheduler.hpp>
#include <libp2p/common/shared_fn.hpp>

#include "common/main_thread_pool.hpp"
#include "protocol_error.hpp"
#include "metrics/metrics.hpp"
#include "network/helpers/new_stream.hpp"
#include "network/impl/protocols/protocol_base_impl.hpp"
#include "network/impl/protocols/protocol_error.hpp"
#include "utils/box.hpp"
#include "utils/weak_macro.hpp"

namespace kagome::network {

@@ -25,6 +30,94 @@ namespace kagome::network {
&&response_handler) = 0;
};

struct RequestResponseInject {
std::shared_ptr<libp2p::Host> host;
std::shared_ptr<libp2p::basic::Scheduler> scheduler;
std::shared_ptr<common::MainThreadPool> main_thread_pool;
};

struct RequestResponseMetrics {
RequestResponseMetrics(const std::string &name)
: timeout_{metric(name, "timeout")},
success_{metric(name, "success")},
failure_{metric(name, "failure")},
lost_{metric(name, "lost")} {}

static metrics::Counter *metric(const std::string &protocol,
const std::string &type) {
auto name = "kagome_request_response_protocol_result";
auto help =
"Number of timeout, success, failure results for request response "
"protocols.";
static auto registry = [&] {
auto registry = metrics::createRegistry();
registry->registerCounterFamily(name, help);
return registry;
}();
return registry->registerCounterMetric(
name, {{"protocol", protocol}, {"type", type}});
}

class Lost {
public:
Lost(const Lost &) = delete;
Lost(Lost &&other) noexcept
: lost_{std::exchange(other.lost_, nullptr)} {}
Lost(RequestResponseMetrics &metrics) : lost_{metrics.lost_} {}
~Lost() {
if (lost_ != nullptr) {
lost_->inc();
}
}
void notLost() {
lost_ = nullptr;
}

// cppcoreguidelines-special-member-functions
Lost &operator=(const Lost &) = delete;
Lost &operator=(Lost &&) = delete;

private:
metrics::Counter *lost_;
};

metrics::Counter *timeout_;
metrics::Counter *success_;
metrics::Counter *failure_;
metrics::Counter *lost_;
};

// TODO(turuslan): #2372, RequestResponseProtocol
struct RequestResponseTimeout {
template <typename T>
static void wrap(const std::shared_ptr<T> &self,
auto &cb,
std::weak_ptr<Stream> weak_stream) {
auto timer = self->scheduler_->scheduleWithHandle(
[weak_self{std::weak_ptr{self}},
weak_stream{std::move(weak_stream)}] {
IF_WEAK_LOCK(stream) {
stream->reset();
IF_WEAK_LOCK(self) {
self->metrics_.timeout_->inc();
}
}
},
self->timeout_);
cb = libp2p::SharedFn{[weak_self{std::weak_ptr{self}},
cb{std::move(cb)},
lost{RequestResponseMetrics::Lost{self->metrics_}},
timer{std::move(timer)}](auto r) mutable {
lost.notLost();
timer.reset();
IF_WEAK_LOCK(self) {
(r ? self->metrics_.success_ : self->metrics_.failure_)->inc();
}
cb(std::move(r));
}};
}
};

template <typename Request, typename Response, typename ReadWriter>
struct RequestResponseProtocolImpl
: virtual protected ProtocolBase,
@@ -35,16 +128,21 @@ namespace kagome::network {
using ResponseType = Response;
using ReadWriterType = ReadWriter;

RequestResponseProtocolImpl(
Protocol name,
libp2p::Host &host,
Protocols protocols,
log::Logger logger,
common::MainThreadPool &main_thread_pool,
std::chrono::milliseconds timeout = std::chrono::seconds(1))
: base_(std::move(name), host, std::move(protocols), std::move(logger)),
friend RequestResponseTimeout;

RequestResponseProtocolImpl(const Protocol &name,
RequestResponseInject inject,
Protocols protocols,
log::Logger logger,
std::chrono::milliseconds timeout)
: base_(name, *inject.host, std::move(protocols), std::move(logger)),
metrics_{name},
scheduler_{std::move(inject.scheduler)},
timeout_(std::move(timeout)),
main_pool_handler_{main_thread_pool.handlerStarted()} {}
main_pool_handler_{inject.main_thread_pool->handlerStarted()} {
BOOST_ASSERT(scheduler_ != nullptr);
BOOST_ASSERT(main_pool_handler_ != nullptr);
}

bool start() override {
return base_.start(this->weak_from_this());
@@ -61,7 +159,7 @@ namespace kagome::network {
onTxRequest(request);
newOutgoingStream(
peer_id,
[wptr{this->weak_from_this()},
[WEAK_SELF,
request{std::move(request)},
response_handler{std::move(response_handler)}](auto &&res) mutable {
if (res.has_error()) {
@@ -71,13 +169,15 @@ namespace kagome::network {
auto &stream = res.value();
BOOST_ASSERT(stream);

auto self = wptr.lock();
auto self = weak_self.lock();
if (!self) {
self->base_.closeStream(std::move(wptr), std::move(stream));
stream->reset();
response_handler(outcome::failure(ProtocolError::GONE));
return;
}

RequestResponseTimeout::wrap(self, response_handler, stream);

SL_DEBUG(self->base_.logger(),
"Established outgoing {} stream with {}",
self->protocolName(),
@@ -123,23 +223,12 @@ namespace kagome::network {
"New outgoing {} stream with {}",
protocolName(),
peer_id);

auto addresses_res =
base_.host().getPeerRepository().getAddressRepository().getAddresses(
peer_id);
if (not addresses_res.has_value()) {
main_pool_handler_->execute(
[cb(std::move(cb)), addresses_res(std::move(addresses_res))] {
cb(addresses_res.as_failure());
});
return;
}

base_.host().newStream(
PeerInfo{peer_id, std::move(addresses_res.value())},
newStream(
base_.host(),
peer_id,
base_.protocolIds(),
[wptr{this->weak_from_this()}, peer_id, cb{std::move(cb)}](
auto &&stream_and_proto) mutable {
libp2p::StreamAndProtocolOrError &&stream_and_proto) mutable {
if (!stream_and_proto.has_value()) {
cb(stream_and_proto.as_failure());
return;
@@ -377,8 +466,12 @@ namespace kagome::network {
});
}

// NOLINTBEGIN(cppcoreguidelines-non-private-member-variables-in-classes)
ProtocolBaseImpl base_;
RequestResponseMetrics metrics_;
std::shared_ptr<libp2p::basic::Scheduler> scheduler_;
std::chrono::milliseconds timeout_;
// NOLINTEND(cppcoreguidelines-non-private-member-variables-in-classes)

private:
std::shared_ptr<PoolHandler> main_pool_handler_;
11 changes: 6 additions & 5 deletions core/network/impl/protocols/send_dispute_protocol.hpp
Original file line number Diff line number Diff line change
@@ -41,24 +41,25 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
static constexpr std::chrono::seconds kRequestTimeout{12};

public:
SendDisputeProtocolImpl(libp2p::Host &host,
SendDisputeProtocolImpl(RequestResponseInject inject,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<network::DisputeRequestObserver>
dispute_request_observer,
common::MainThreadPool &main_thread_pool)
dispute_request_observer)
: RequestResponseProtocolImpl<
DisputeRequest,
DisputeResponse,
ScaleMessageReadWriter>{kSendDisputeProtocolName,
host,
std::move(inject),
make_protocols(kSendDisputeProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kSendDisputeProtocolName,
"dispute_protocol"),
main_thread_pool},
kRequestTimeout},
dispute_request_observer_{std::move(dispute_request_observer)} {
BOOST_ASSERT(dispute_request_observer_);
}
1 change: 1 addition & 0 deletions core/network/impl/protocols/state_protocol_impl.hpp
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ namespace kagome::network {
using PeerId = libp2p::peer::PeerId;
using PeerInfo = libp2p::peer::PeerInfo;

// TODO(turuslan): #2372, RequestResponseProtocol
class StateProtocolImpl final
: public StateProtocol,
public std::enable_shared_from_this<StateProtocolImpl>,
29 changes: 14 additions & 15 deletions core/network/impl/protocols/sync_protocol_impl.cpp
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
#include "network/adapters/protobuf_block_request.hpp"
#include "network/adapters/protobuf_block_response.hpp"
#include "network/common.hpp"
#include "network/helpers/new_stream.hpp"
#include "network/helpers/protobuf_message_read_writer.hpp"
#include "network/impl/protocols/protocol_error.hpp"
#include "network/rpc.hpp"
@@ -126,6 +127,7 @@ namespace kagome::network {

SyncProtocolImpl::SyncProtocolImpl(
libp2p::Host &host,
std::shared_ptr<libp2p::basic::Scheduler> scheduler,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<SyncProtocolObserver> sync_observer,
@@ -134,10 +136,13 @@ namespace kagome::network {
host,
make_protocols(kSyncProtocol, genesis_hash, chain_spec),
log::createLogger(kSyncProtocolName, "sync_protocol")),
scheduler_{std::move(scheduler)},
metrics_{kSyncProtocolName},
sync_observer_(std::move(sync_observer)),
reputation_repository_(std::move(reputation_repository)),
response_cache_(kResponsesCacheCapacity,
kResponsesCacheExpirationTimeout) {
BOOST_ASSERT(scheduler_ != nullptr);
BOOST_ASSERT(sync_observer_ != nullptr);
BOOST_ASSERT(reputation_repository_ != nullptr);
}
@@ -159,17 +164,9 @@ namespace kagome::network {
"New outgoing {} stream with {}",
protocolName(),
peer_id);

auto addresses_res =
base_.host().getPeerRepository().getAddressRepository().getAddresses(
peer_id);
if (not addresses_res.has_value()) {
cb(addresses_res.as_failure());
return;
}

base_.host().newStream(
PeerInfo{peer_id, std::move(addresses_res.value())},
newStream(
base_.host(),
peer_id,
base_.protocolIds(),
[wp{weak_from_this()}, peer_id, cb = std::move(cb)](
auto &&stream_res) mutable {
@@ -469,7 +466,7 @@ namespace kagome::network {

newOutgoingStream(
peer_id,
[wp{weak_from_this()},
[WEAK_SELF,
response_handler = std::move(response_handler),
block_request = std::move(block_request)](auto &&stream_res) mutable {
if (not stream_res.has_value()) {
@@ -478,7 +475,7 @@ namespace kagome::network {
}
auto &stream = stream_res.value();

auto self = wp.lock();
auto self = weak_self.lock();
if (not self) {
stream->reset();
response_handler(ProtocolError::GONE);
@@ -490,13 +487,15 @@ namespace kagome::network {
self->protocolName(),
stream->remotePeerId().value());

RequestResponseTimeout::wrap(self, response_handler, stream);

self->writeRequest(stream,
block_request,
[stream,
wp = std::move(wp),
weak_self,
response_handler = std::move(response_handler)](
auto &&write_res) mutable {
auto self = wp.lock();
auto self = weak_self.lock();
if (not self) {
stream->reset();
response_handler(ProtocolError::GONE);
9 changes: 9 additions & 0 deletions core/network/impl/protocols/sync_protocol_impl.hpp
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
#include "blockchain/genesis_block_hash.hpp"
#include "log/logger.hpp"
#include "network/impl/protocols/protocol_base_impl.hpp"
#include "network/impl/protocols/request_response_protocol.hpp"
#include "network/reputation_repository.hpp"
#include "network/sync_protocol_observer.hpp"
#include "utils/non_copyable.hpp"
@@ -103,14 +104,18 @@ namespace kagome::network {

} // namespace detail

// TODO(turuslan): #2372, RequestResponseProtocol
class SyncProtocolImpl final
: public SyncProtocol,
public std::enable_shared_from_this<SyncProtocolImpl>,
NonCopyable,
NonMovable {
public:
using Response = BlocksResponse;

SyncProtocolImpl(
libp2p::Host &host,
std::shared_ptr<libp2p::basic::Scheduler> scheduler,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<SyncProtocolObserver> sync_observer,
@@ -149,6 +154,10 @@ namespace kagome::network {
private:
inline static const auto kSyncProtocolName = "SyncProtocol"s;
ProtocolBaseImpl base_;
friend RequestResponseTimeout;
std::shared_ptr<libp2p::basic::Scheduler> scheduler_;
RequestResponseMetrics metrics_;
std::chrono::seconds timeout_{20};
std::shared_ptr<SyncProtocolObserver> sync_observer_;
std::shared_ptr<ReputationRepository> reputation_repository_;
detail::BlocksResponseCache response_cache_;
13 changes: 2 additions & 11 deletions core/network/notifications/connect_and_handshake.hpp
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

#pragma once

#include "network/helpers/new_stream.hpp"
#include "network/helpers/stream_read_buffer.hpp"
#include "network/impl/protocols/protocol_base_impl.hpp"
#include "network/notifications/handshake.hpp"
@@ -64,16 +65,6 @@ namespace kagome::network::notifications {
std::move(stream), std::move(frame_stream), handshake, std::move(cb));
};

auto addresses_res =
base.host().getPeerRepository().getAddressRepository().getAddresses(
peer_id);
if (not addresses_res.has_value()) {
cb(addresses_res.as_failure());
return;
}

base.host().newStream(PeerInfo{peer_id, std::move(addresses_res.value())},
base.protocolIds(),
std::move(cb));
newStream(base.host(), peer_id, base.protocolIds(), std::move(cb));
}
} // namespace kagome::network::notifications
10 changes: 5 additions & 5 deletions core/network/warp/protocol.hpp
Original file line number Diff line number Diff line change
@@ -31,19 +31,19 @@ namespace kagome::network {
WarpResponse,
ScaleMessageReadWriter> {
static constexpr auto kName = "WarpProtocol";
static constexpr std::chrono::seconds kRequestTimeout{10};

public:
WarpProtocolImpl(libp2p::Host &host,
WarpProtocolImpl(RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis,
std::shared_ptr<WarpSyncCache> cache,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<WarpSyncCache> cache)
: RequestResponseProtocolImpl(
kName,
host,
std::move(inject),
make_protocols(kWarpProtocol, genesis, chain_spec),
log::createLogger(kName, "warp_sync_protocol"),
main_thread_pool),
kRequestTimeout),
cache_{std::move(cache)} {}

std::optional<outcome::result<ResponseType>> onRxRequest(
6 changes: 3 additions & 3 deletions core/utils/weak_macro.hpp
Original file line number Diff line number Diff line change
@@ -6,9 +6,9 @@

#pragma once

#define WEAK_SELF \
weak_self { \
weak_from_this() \
#define WEAK_SELF \
weak_self { \
this->weak_from_this() \
}

#define WEAK_LOCK(name) \