@@ -60,7 +60,7 @@ namespace {
60
60
case SM::Fast:
61
61
case SM::FastWithoutState:
62
62
return kagome::network::BlockAttribute::HEADER
63
- | kagome::network::BlockAttribute::JUSTIFICATION;
63
+ | kagome::network::BlockAttribute::JUSTIFICATION;
64
64
}
65
65
return kagome::network::BlocksRequest::kBasicAttributes ;
66
66
}
@@ -203,7 +203,9 @@ namespace kagome::network {
203
203
it != known_blocks_.end ()) {
204
204
auto &block_in_queue = it->second ;
205
205
block_in_queue.peers .emplace (peer_id);
206
- if (handler) handler (block_info);
206
+ if (handler) {
207
+ handler (block_info);
208
+ }
207
209
return false ;
208
210
}
209
211
@@ -229,7 +231,9 @@ namespace kagome::network {
229
231
230
232
// Provided block is equal our best one. Nothing needs to do.
231
233
if (block_info == best_block) {
232
- if (handler) handler (block_info);
234
+ if (handler) {
235
+ handler (block_info);
236
+ }
233
237
return false ;
234
238
}
235
239
@@ -264,7 +268,9 @@ namespace kagome::network {
264
268
265
269
// Finding the best common block was failed
266
270
if (not res.has_value ()) {
267
- if (handler) handler (res.as_failure ());
271
+ if (handler) {
272
+ handler (res.as_failure ());
273
+ }
268
274
return ;
269
275
}
270
276
@@ -274,7 +280,9 @@ namespace kagome::network {
274
280
it != self->known_blocks_ .end ()) {
275
281
auto &block_in_queue = it->second ;
276
282
block_in_queue.peers .emplace (peer_id);
277
- if (handler) handler (std::move (block_info));
283
+ if (handler) {
284
+ handler (std::move (block_info));
285
+ }
278
286
return ;
279
287
}
280
288
@@ -405,14 +413,16 @@ namespace kagome::network {
405
413
406
414
auto request_fingerprint = request.fingerprint ();
407
415
408
- if (not recent_requests_.emplace (peer_id, request_fingerprint).second ) {
416
+ if (auto r = recent_requests_.emplace (
417
+ std::make_tuple (peer_id, request_fingerprint), " find common block" );
418
+ not r.second ) {
409
419
SL_VERBOSE (log_,
410
420
" Can't check if block #{} in #{}..#{} is common with {}: {}" ,
411
421
hint,
412
422
lower,
413
423
upper - 1 ,
414
424
peer_id,
415
- make_error_code (Error::DUPLICATE_REQUEST) );
425
+ r. first -> second );
416
426
handler (Error::DUPLICATE_REQUEST);
417
427
return ;
418
428
}
@@ -566,7 +576,9 @@ namespace kagome::network {
566
576
SyncResultHandler &&handler) {
567
577
// Interrupts process if node is shutting down
568
578
if (node_is_shutting_down_) {
569
- if (handler) handler (Error::SHUTTING_DOWN);
579
+ if (handler) {
580
+ handler (Error::SHUTTING_DOWN);
581
+ }
570
582
return ;
571
583
}
572
584
@@ -577,13 +589,17 @@ namespace kagome::network {
577
589
578
590
auto request_fingerprint = request.fingerprint ();
579
591
580
- if (not recent_requests_.emplace (peer_id, request_fingerprint).second ) {
592
+ if (auto r = recent_requests_.emplace (
593
+ std::make_tuple (peer_id, request_fingerprint), " load blocks" );
594
+ not r.second ) {
581
595
SL_ERROR (log_,
582
596
" Can't load blocks from {} beginning block {}: {}" ,
583
597
peer_id,
584
598
from,
585
- make_error_code (Error::DUPLICATE_REQUEST));
586
- if (handler) handler (Error::DUPLICATE_REQUEST);
599
+ r.first ->second );
600
+ if (handler) {
601
+ handler (Error::DUPLICATE_REQUEST);
602
+ }
587
603
return ;
588
604
}
589
605
@@ -607,7 +623,9 @@ namespace kagome::network {
607
623
peer_id,
608
624
from,
609
625
response_res.error ());
610
- if (handler) handler (response_res.as_failure ());
626
+ if (handler) {
627
+ handler (response_res.as_failure ());
628
+ }
611
629
return ;
612
630
}
613
631
auto &blocks = response_res.value ().blocks ;
@@ -620,7 +638,9 @@ namespace kagome::network {
620
638
" Response does not have any blocks" ,
621
639
peer_id,
622
640
from);
623
- if (handler) handler (Error::EMPTY_RESPONSE);
641
+ if (handler) {
642
+ handler (Error::EMPTY_RESPONSE);
643
+ }
624
644
return ;
625
645
}
626
646
@@ -641,7 +661,9 @@ namespace kagome::network {
641
661
" Received block without header" ,
642
662
peer_id,
643
663
from);
644
- if (handler) handler (Error::RESPONSE_WITHOUT_BLOCK_HEADER);
664
+ if (handler) {
665
+ handler (Error::RESPONSE_WITHOUT_BLOCK_HEADER);
666
+ }
645
667
return ;
646
668
}
647
669
// Check if body is provided
@@ -651,7 +673,9 @@ namespace kagome::network {
651
673
" Received block without body" ,
652
674
peer_id,
653
675
from);
654
- if (handler) handler (Error::RESPONSE_WITHOUT_BLOCK_BODY);
676
+ if (handler) {
677
+ handler (Error::RESPONSE_WITHOUT_BLOCK_BODY);
678
+ }
655
679
return ;
656
680
}
657
681
auto &header = block.header .value ();
@@ -669,7 +693,9 @@ namespace kagome::network {
669
693
peer_id,
670
694
from,
671
695
BlockInfo (header.number , block.hash ));
672
- if (handler) handler (Error::DISCARDED_BLOCK);
696
+ if (handler) {
697
+ handler (Error::DISCARDED_BLOCK);
698
+ }
673
699
return ;
674
700
}
675
701
@@ -700,7 +726,9 @@ namespace kagome::network {
700
726
peer_id,
701
727
from,
702
728
BlockInfo (header.number , header.parent_hash ));
703
- if (handler) handler (Error::DISCARDED_BLOCK);
729
+ if (handler) {
730
+ handler (Error::DISCARDED_BLOCK);
731
+ }
704
732
return ;
705
733
}
706
734
@@ -716,7 +744,9 @@ namespace kagome::network {
716
744
" block {}: Received block is not descendant of previous" ,
717
745
peer_id,
718
746
from);
719
- if (handler) handler (Error::WRONG_ORDER);
747
+ if (handler) {
748
+ handler (Error::WRONG_ORDER);
749
+ }
720
750
return ;
721
751
}
722
752
@@ -730,7 +760,9 @@ namespace kagome::network {
730
760
" Received block whose hash does not match the header" ,
731
761
peer_id,
732
762
from);
733
- if (handler) handler (Error::INVALID_HASH);
763
+ if (handler) {
764
+ handler (Error::INVALID_HASH);
765
+ }
734
766
return ;
735
767
}
736
768
@@ -788,7 +820,9 @@ namespace kagome::network {
788
820
std::optional<uint32_t > limit,
789
821
SyncResultHandler &&handler) {
790
822
if (node_is_shutting_down_) {
791
- if (handler) handler (Error::SHUTTING_DOWN);
823
+ if (handler) {
824
+ handler (Error::SHUTTING_DOWN);
825
+ }
792
826
return ;
793
827
}
794
828
@@ -807,12 +841,15 @@ namespace kagome::network {
807
841
limit};
808
842
809
843
auto request_fingerprint = request.fingerprint ();
810
- if (not recent_requests_.emplace (peer_id, request_fingerprint).second ) {
844
+ if (auto r = recent_requests_.emplace (
845
+ std::make_tuple (peer_id, request_fingerprint),
846
+ " load justifications" );
847
+ not r.second ) {
811
848
SL_ERROR (log_,
812
849
" Can't load justification from {} for block {}: {}" ,
813
850
peer_id,
814
851
target_block,
815
- make_error_code (Error::DUPLICATE_REQUEST) );
852
+ r. first -> second );
816
853
if (handler) {
817
854
handler (Error::DUPLICATE_REQUEST);
818
855
}
@@ -824,6 +861,7 @@ namespace kagome::network {
824
861
auto response_handler = [wp = weak_from_this (),
825
862
peer_id,
826
863
target_block,
864
+ limit,
827
865
handler = std::move (handler)](
828
866
auto &&response_res) mutable {
829
867
auto self = wp.lock ();
@@ -851,26 +889,42 @@ namespace kagome::network {
851
889
" Response does not have any contents" ,
852
890
peer_id,
853
891
target_block);
854
- if (handler) handler (Error::EMPTY_RESPONSE);
892
+ if (handler) {
893
+ handler (Error::EMPTY_RESPONSE);
894
+ }
855
895
return ;
856
896
}
857
897
898
+ // Use decreasing limit,
899
+ // to avoid race between block and justification requests
900
+ if (limit.has_value ()) {
901
+ if (blocks.size () >= limit.value ()) {
902
+ limit = 0 ;
903
+ } else {
904
+ limit.value () -= (blocks.size () - 1 );
905
+ }
906
+ }
907
+
858
908
bool justification_received = false ;
859
909
BlockInfo last_justified_block;
910
+ BlockInfo last_observed_block;
860
911
for (auto &block : blocks) {
861
912
if (not block.header ) {
862
913
SL_ERROR (self->log_ ,
863
914
" No header was provided from {} for block {} while "
864
915
" requesting justifications" ,
865
916
peer_id,
866
917
target_block);
867
- if (handler) handler (Error::RESPONSE_WITHOUT_BLOCK_HEADER);
918
+ if (handler) {
919
+ handler (Error::RESPONSE_WITHOUT_BLOCK_HEADER);
920
+ }
868
921
return ;
869
922
}
923
+ last_observed_block =
924
+ primitives::BlockInfo{block.header ->number , block.hash };
870
925
if (block.justification ) {
871
926
justification_received = true ;
872
- last_justified_block =
873
- primitives::BlockInfo{block.header ->number , block.hash };
927
+ last_justified_block = last_observed_block;
874
928
{
875
929
std::lock_guard lock (self->justifications_mutex_ );
876
930
self->justifications_ .emplace (last_justified_block,
@@ -887,6 +941,25 @@ namespace kagome::network {
887
941
}
888
942
});
889
943
}
944
+
945
+ // Continue justifications requesting till limit is non-zero and last
946
+ // observed block is not target (no block anymore)
947
+ if ((not limit.has_value () or limit.value () > 0 )
948
+ and last_observed_block != target_block) {
949
+ SL_TRACE (self->log_ , " Request next block pack" );
950
+ self->scheduler_ ->schedule ([wp,
951
+ peer_id,
952
+ target_block = last_observed_block,
953
+ limit,
954
+ handler = std::move (handler)]() mutable {
955
+ if (auto self = wp.lock ()) {
956
+ self->loadJustifications (
957
+ peer_id, target_block, limit, std::move (handler));
958
+ }
959
+ });
960
+ return ;
961
+ }
962
+
890
963
if (handler) {
891
964
handler (last_justified_block);
892
965
}
@@ -1043,7 +1116,9 @@ namespace kagome::network {
1043
1116
" Block {} {} not applied as discarded" ,
1044
1117
block_info,
1045
1118
n ? fmt::format (" and {} others have" , n) : fmt::format (" has" ));
1046
- if (handler) handler (Error::DISCARDED_BLOCK);
1119
+ if (handler) {
1120
+ handler (Error::DISCARDED_BLOCK);
1121
+ }
1047
1122
}
1048
1123
1049
1124
} else {
@@ -1068,7 +1143,9 @@ namespace kagome::network {
1068
1143
" state syncing on block in progress" ,
1069
1144
block_info,
1070
1145
n ? fmt::format (" and {} others have" , n) : fmt::format (" has" ));
1071
- if (handler) handler (Error::DISCARDED_BLOCK);
1146
+ if (handler) {
1147
+ handler (Error::DISCARDED_BLOCK);
1148
+ }
1072
1149
return ;
1073
1150
}
1074
1151
}
@@ -1086,15 +1163,21 @@ namespace kagome::network {
1086
1163
block_info,
1087
1164
n ? fmt::format (" and {} others have" , n) : fmt::format (" has" ),
1088
1165
applying_res.error ());
1089
- if (handler) handler (Error::DISCARDED_BLOCK);
1166
+ if (handler) {
1167
+ handler (Error::DISCARDED_BLOCK);
1168
+ }
1090
1169
} else {
1091
1170
SL_DEBUG (log_, " Block {} is skipped as existing" , block_info);
1092
- if (handler) handler (block_info);
1171
+ if (handler) {
1172
+ handler (block_info);
1173
+ }
1093
1174
}
1094
1175
} else {
1095
1176
telemetry_->notifyBlockImported (
1096
1177
block_info, telemetry::BlockOrigin::kNetworkInitialSync );
1097
- if (handler) handler (block_info);
1178
+ if (handler) {
1179
+ handler (block_info);
1180
+ }
1098
1181
1099
1182
// Check if finality lag greater than justification saving interval
1100
1183
static const BlockNumber kJustificationInterval = 512 ;
@@ -1108,7 +1191,7 @@ namespace kagome::network {
1108
1191
syncMissingJustifications (
1109
1192
peer_id,
1110
1193
last_finalized,
1111
- std::nullopt ,
1194
+ kJustificationInterval * 2 ,
1112
1195
[wp = weak_from_this (), last_finalized, block_info](
1113
1196
auto res) {
1114
1197
if (auto self = wp.lock ()) {
0 commit comments