Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction broadcast #1407

Merged
merged 20 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/api/service/author/impl/author_api_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ namespace kagome::api {
extrinsic));
BOOST_ASSERT(tx_hash == tx.hash);

SL_DEBUG(logger_, "Subscribe for ex hash={}", tx_hash);
SL_DEBUG(logger_, "Submit and watch transaction with hash {}", tx_hash);

return sub_id;
}
Expand Down
86 changes: 50 additions & 36 deletions core/api/service/impl/api_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,49 +266,62 @@ namespace kagome::api {
outcome::result<ApiServiceImpl::PubsubSubscriptionId>
ApiServiceImpl::subscribeSessionToKeys(
const std::vector<common::Buffer> &keys) {
return withThisSession([&](kagome::api::Session::SessionId tid) {
return withSession(tid, [&](SessionSubscriptions &session_context) {
auto &session = session_context.storage_sub;
const auto id = session->generateSubscriptionSetId();
const auto &best_block_hash = block_tree_->deepestLeaf().hash;
const auto &header = block_tree_->getBlockHeader(best_block_hash);
BOOST_ASSERT(header.has_value());
auto persistent_batch =
trie_storage_->getPersistentBatchAt(header.value().state_root);
BOOST_ASSERT(persistent_batch.has_value());

auto &pb = persistent_batch.value();
BOOST_ASSERT(pb);

session_context.messages = uploadMessagesListFromCache();

std::vector<std::pair<common::Buffer, std::optional<common::Buffer>>>
pairs;
pairs.reserve(keys.size());

for (auto &key : keys) {
session->subscribe(id, key);

auto value_opt_res = pb->tryGet(key);
if (value_opt_res.has_value()) {
pairs.emplace_back(std::move(key),
std::move(value_opt_res.value()));
}
}

forJsonData(server_,
return withThisSession(
[&](kagome::api::Session::SessionId tid)
-> outcome::result<ApiServiceImpl::PubsubSubscriptionId> {
return withSession(
tid,
[&](SessionSubscriptions &session_context)
-> outcome::result<ApiServiceImpl::PubsubSubscriptionId> {
auto &session = session_context.storage_sub;
const auto id = session->generateSubscriptionSetId();
const auto &best_block_hash = block_tree_->deepestLeaf().hash;
const auto &header =
block_tree_->getBlockHeader(best_block_hash);
BOOST_ASSERT(header.has_value());
auto persistent_batch = trie_storage_->getPersistentBatchAt(
header.value().state_root);
if (!persistent_batch.has_value()) {
SL_ERROR(logger_,
"Failed to get storage state for block {}, required "
"to subscribe an RPC session to some storage keys.",
best_block_hash);
return persistent_batch.as_failure();
}

auto &batch = persistent_batch.value();

session_context.messages = uploadMessagesListFromCache();

std::vector<
std::pair<common::Buffer, std::optional<common::Buffer>>>
pairs;
pairs.reserve(keys.size());

for (auto &key : keys) {
session->subscribe(id, key);

auto value_opt_res = batch->tryGet(key);
if (value_opt_res.has_value()) {
pairs.emplace_back(key,
value_opt_res.value());
}
}

forJsonData(
server_,
logger_,
id,
kRpcEventSubscribeStorage,
createStateStorageEvent(std::move(pairs), best_block_hash),
createStateStorageEvent(pairs, best_block_hash),
[&](const auto &result) {
session_context.messages->emplace_back(
uploadFromCache(result.data()));
});

return static_cast<PubsubSubscriptionId>(id);
});
});
return static_cast<PubsubSubscriptionId>(id);
});
});
}

outcome::result<ApiServiceImpl::PubsubSubscriptionId>
Expand Down Expand Up @@ -508,7 +521,8 @@ namespace kagome::api {

session_context.messages.reset();
});
} catch (jsonrpc::InternalErrorFault &) {
} catch (jsonrpc::InternalErrorFault & e) {
SL_DEBUG(logger_, "Internal jsonrpc error: {}", e.what());
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/application/app_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ namespace kagome::application {
virtual uint32_t inPeersLight() const = 0;

/**
* @return int32_t maximum number or lucky peers (peers being gossiped to)
* @return uint32_t maximum number or lucky peers (peers being gossiped to)
*/
virtual int32_t luckyPeers() const = 0;
virtual uint32_t luckyPeers() const = 0;

/**
* @return multiaddresses of bootstrat nodes
Expand Down
2 changes: 1 addition & 1 deletion core/application/impl/app_configuration_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ namespace kagome::application {
load_u32(val, "out-peers", out_peers_);
load_u32(val, "in-peers", in_peers_);
load_u32(val, "in-peers-light", in_peers_light_);
load_i32(val, "lucky-peers", lucky_peers_);
load_u32(val, "lucky-peers", lucky_peers_);
load_telemetry_uris(val, "telemetry-endpoints", telemetry_endpoints_);
load_u32(val, "random-walk-interval", random_walk_interval_);
}
Expand Down
4 changes: 2 additions & 2 deletions core/application/impl/app_configuration_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace kagome::application {
uint32_t inPeersLight() const override {
return in_peers_light_;
}
int32_t luckyPeers() const override {
uint32_t luckyPeers() const override {
return lucky_peers_;
}
const boost::asio::ip::tcp::endpoint &rpcHttpEndpoint() const override {
Expand Down Expand Up @@ -312,7 +312,7 @@ namespace kagome::application {
uint32_t out_peers_;
uint32_t in_peers_;
uint32_t in_peers_light_;
int32_t lucky_peers_;
uint32_t lucky_peers_;
network::PeeringConfig peering_config_;
bool dev_mode_;
std::string node_name_;
Expand Down
5 changes: 4 additions & 1 deletion core/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,12 @@ namespace kagome::blockchain {
chain_events_engine_->notify(primitives::events::ChainEventType::kNewHeads,
block.header);
trie_changes_tracker_->onBlockAdded(block_hash);
SL_DEBUG(log_, "Adding block {}", block_hash);
for (const auto &ext : block.body) {
auto hash = hasher_->blake2b_256(ext.data);
SL_DEBUG(log_, "Adding extrinsic with hash {}", hash);
if (auto key =
extrinsic_event_key_repo_->get(hasher_->blake2b_256(ext.data))) {
extrinsic_event_key_repo_->get(hash)) {
extrinsic_events_engine_->notify(
key.value(),
primitives::events::ExtrinsicLifecycleEvent::InBlock(key.value(),
Expand Down
Empty file.
4 changes: 3 additions & 1 deletion core/consensus/babe/impl/block_executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ namespace kagome::consensus::babe {

// remove block's extrinsics from tx pool
for (const auto &extrinsic : block.body) {
auto res = tx_pool_->removeOne(hasher_->blake2b_256(extrinsic.data));
auto hash = hasher_->blake2b_256(extrinsic.data);
SL_DEBUG(logger_, "Contains extrinsic with hash: {}", hash);
auto res = tx_pool_->removeOne(hash);
if (res.has_error()
&& res
!= outcome::failure(
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/grandpa/impl/vote_crypto_provider_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace kagome::consensus::grandpa {
return std::nullopt;
}
auto payload = scale::encode(vote, round_number_, voter_set_->id()).value();
auto signature = ed_provider_->sign(*keypair_.get(), payload).value();
auto signature = ed_provider_->sign(*keypair_, payload).value();
return {{.message = std::move(vote),
.signature = signature,
.id = keypair_->public_key}};
Expand Down
47 changes: 22 additions & 25 deletions core/host_api/impl/io_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,25 @@ namespace kagome::host_api {
BOOST_ASSERT_MSG(memory_provider_ != nullptr, "memory provider is nullptr");
}

void IOExtension::ext_logging_log_version_1(runtime::WasmEnum level,
soralog::Level mapLevel(runtime::WasmLogLevel wasm_level) {
using WasmLevel = runtime::WasmLogLevel;
using SlLevel = soralog::Level;
switch (wasm_level) {
case WasmLevel::Error:
return SlLevel::ERROR;
case WasmLevel::Warn:
return SlLevel::WARN;
case WasmLevel::Info:
return SlLevel::INFO;
case WasmLevel::Debug:
return SlLevel::DEBUG;
case WasmLevel::Trace:
return SlLevel::TRACE;
}
return SlLevel::ERROR;
}

void IOExtension::ext_logging_log_version_1(runtime::WasmEnum wasm_level,
runtime::WasmSpan target,
runtime::WasmSpan message) {
using runtime::WasmLogLevel;
Expand All @@ -32,30 +50,9 @@ namespace kagome::host_api {
const auto target_str = read_str_from_position(runtime::PtrSize(target));
const auto message_str = read_str_from_position(runtime::PtrSize(message));

switch (static_cast<WasmLogLevel>(level)) {
case WasmLogLevel::Error:
logger_->error("target: {}, message: {}", target_str, message_str);
break;
case WasmLogLevel::Warn:
logger_->warn("target: {}, message: {}", target_str, message_str);
break;
case WasmLogLevel::Info:
logger_->info("target: {}, message: {}", target_str, message_str);
break;
case WasmLogLevel::Debug:
SL_DEBUG(logger_, "target: {}, message: {}", target_str, message_str);
break;
case WasmLogLevel::Trace:
SL_TRACE(logger_, "target: {}, message: {}", target_str, message_str);
break;
default: {
BOOST_UNREACHABLE_RETURN();
logger_->error(
"Message with incorrect log level. Target: {}, message: {}",
target_str,
message_str);
}
}
auto level = std::max(mapLevel(static_cast<WasmLogLevel>(wasm_level)),
soralog::Level::VERBOSE);
logger_->log(level, "{}: {}", target_str, message_str);
}

runtime::WasmEnum IOExtension::ext_logging_max_level_version_1() {
Expand Down
1 change: 1 addition & 0 deletions core/network/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ add_library(network
impl/extrinsic_observer_impl.cpp
impl/transactions_transmitter_impl.cpp
impl/sync_protocol_observer_impl.cpp
impl/stream_engine.cpp
impl/protocols/protocol_error.cpp
impl/protocols/state_protocol_impl.cpp
impl/protocols/protocol_factory.cpp
Expand Down
17 changes: 9 additions & 8 deletions core/network/helpers/stream_read_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,21 @@ namespace libp2p::connection {
readSome(out,
out.size(),
[weak{weak_from_this()}, out, total, cb{std::move(cb)}](
outcome::result<size_t> _r) mutable {
outcome::result<size_t> bytes_num_res) mutable {
if (auto self{weak.lock()}) {
if (_r.has_error()) {
return cb(_r.error());
if (bytes_num_res.has_error()) {
return cb(bytes_num_res.error());
}
const auto &r = _r.value();
const auto _r{gsl::narrow<ptrdiff_t>(r)};
BOOST_ASSERT(_r <= out.size());
if (_r == out.size()) {
const auto &bytes_num = bytes_num_res.value();
BOOST_ASSERT(bytes_num != 0);
const auto bytes_num_ptrdiff{gsl::narrow<ptrdiff_t>(bytes_num)};
BOOST_ASSERT(bytes_num_ptrdiff <= out.size());
if (bytes_num_ptrdiff == out.size()) {
// successfully read last bytes
return cb(total);
}
// read remaining bytes
self->readFull(out.subspan(r), total, std::move(cb));
self->readFull(out.subspan(bytes_num_ptrdiff), total, std::move(cb));
}
});
}
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion core/network/impl/protocols/block_announce_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ namespace kagome::network {
stream_engine_->broadcast(
shared_from_this(),
shared_msg,
StreamEngine::RandomGossipStrategy{
RandomGossipStrategy{
stream_engine_->outgoingStreamsNumber(shared_from_this()),
app_config_.luckyPeers()});
}
Expand Down
2 changes: 1 addition & 1 deletion core/network/impl/protocols/grandpa_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ namespace kagome::network {
stream_engine_->broadcast<GrandpaMessage>(
shared_from_this(),
shared_msg,
StreamEngine::RandomGossipStrategy{
RandomGossipStrategy{
stream_engine_->outgoingStreamsNumber(shared_from_this()),
app_config_.luckyPeers()});
}
Expand Down
26 changes: 10 additions & 16 deletions core/network/impl/protocols/propagate_transactions_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "network/impl/protocols/propagate_transactions_protocol.hpp"

#include <algorithm>

#include "application/app_configuration.hpp"
#include "network/common.hpp"
#include "network/impl/protocols/protocol_error.hpp"
Expand Down Expand Up @@ -35,7 +37,6 @@ namespace kagome::network {
chain_spec.protocolId())},
log::createLogger("PropagateTransactionsProtocol",
"propagate_transactions_protocol")),
app_config_(app_config),
babe_(std::move(babe)),
extrinsic_observer_(std::move(extrinsic_observer)),
stream_engine_(std::move(stream_engine)),
Expand Down Expand Up @@ -336,23 +337,16 @@ namespace kagome::network {
}
}

PropagatedExtrinsics exts;
exts.extrinsics.resize(txs.size());
std::transform(
txs.begin(), txs.end(), exts.extrinsics.begin(), [](auto &tx) {
return tx.ext;
});

auto shared_msg = KAGOME_EXTRACT_SHARED_CACHE(PropagateTransactionsProtocol,
PropagatedExtrinsics);
(*shared_msg) = std::move(exts);

auto propagated_exts = KAGOME_EXTRACT_SHARED_CACHE(
PropagateTransactionsProtocol, PropagatedExtrinsics);
propagated_exts->extrinsics.resize(txs.size());
std::transform(txs.begin(),
txs.end(),
propagated_exts->extrinsics.begin(),
[](auto &tx) { return tx.ext; });
stream_engine_->broadcast<PropagatedExtrinsics>(
shared_from_this(),
shared_msg,
StreamEngine::RandomGossipStrategy{
stream_engine_->outgoingStreamsNumber(shared_from_this()),
app_config_.luckyPeers()});
propagated_exts);
}

} // namespace kagome::network
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ namespace kagome::network {
const static inline auto kPropogateTransacionsProtocolName =
"PropagateTransactionsProtocol"s;
ProtocolBaseImpl base_;
const application::AppConfiguration &app_config_;
std::shared_ptr<consensus::babe::Babe> babe_;
std::shared_ptr<ExtrinsicObserver> extrinsic_observer_;
std::shared_ptr<StreamEngine> stream_engine_;
Expand Down
Loading