Skip to content

Commit 7916dfb

Browse files
committed
There is fixed the issue of reconnection. autoConnectMs was changed by ReconnectionPolicy.
WsProvider couldn't reconnect to the web socket if it was closed - future `whenConnected` wasn't being completed when socket closes that blocked reconnection.
1 parent 020f0e3 commit 7916dfb

File tree

16 files changed

+412
-55
lines changed

16 files changed

+412
-55
lines changed

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageDoubleMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.strategyobject.substrateclient.scale.ScaleWriter;
99
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1010
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
11+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1112
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1213
import lombok.val;
1314
import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class StorageDoubleMapImplTests {
2930
void societyVotes() throws Exception {
3031
try (val wsProvider = WsProvider.builder()
3132
.setEndpoint(substrate.getWsAddress())
32-
.disableAutoConnect()
33+
.withPolicy(ReconnectionPolicy.MANUAL)
3334
.build()) {
3435
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3536

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.strategyobject.substrateclient.scale.ScaleWriter;
88
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
99
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
10+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1011
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1112
import lombok.val;
1213
import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class StorageMapImplTests {
2930
void systemBlockHash() throws Exception {
3031
try (val wsProvider = WsProvider.builder()
3132
.setEndpoint(substrate.getWsAddress())
32-
.disableAutoConnect()
33+
.withPolicy(ReconnectionPolicy.MANUAL)
3334
.build()) {
3435
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3536

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageNMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1414
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
1515
import com.strategyobject.substrateclient.transport.ProviderInterface;
16+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1617
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1718
import lombok.NonNull;
1819
import lombok.val;
@@ -56,7 +57,7 @@ private static StorageNMapImpl<BlockHash> newSystemBlockHashStorage(State state)
5657
private WsProvider getConnectedProvider() throws InterruptedException, ExecutionException, TimeoutException {
5758
val wsProvider = WsProvider.builder()
5859
.setEndpoint(substrate.getWsAddress())
59-
.disableAutoConnect()
60+
.withPolicy(ReconnectionPolicy.MANUAL)
6061
.build();
6162
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
6263
return wsProvider;

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageValueImplTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.strategyobject.substrateclient.rpc.api.section.State;
99
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1010
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
11+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1112
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1213
import lombok.val;
1314
import org.junit.jupiter.api.Test;
@@ -33,7 +34,7 @@ void sudoKey() throws Exception {
3334

3435
try (val wsProvider = WsProvider.builder()
3536
.setEndpoint(substrate.getWsAddress())
36-
.disableAutoConnect()
37+
.withPolicy(ReconnectionPolicy.MANUAL)
3738
.build()) {
3839
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3940

@@ -58,7 +59,7 @@ void sudoKeyAtGenesis() throws Exception {
5859

5960
try (val wsProvider = WsProvider.builder()
6061
.setEndpoint(substrate.getWsAddress())
61-
.disableAutoConnect()
62+
.withPolicy(ReconnectionPolicy.MANUAL)
6263
.build()) {
6364
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
6465
val state = TestsHelper.createSectionFactory(wsProvider).create(State.class);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/AuthorTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.strategyobject.substrateclient.scale.ScaleWriter;
1212
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1313
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
14+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1415
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1516
import lombok.val;
1617
import lombok.var;
@@ -121,7 +122,7 @@ void submitAndWatchExtrinsic() throws Exception {
121122
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
122123
val wsProvider = WsProvider.builder()
123124
.setEndpoint(substrate.getWsAddress())
124-
.disableAutoConnect()
125+
.withPolicy(ReconnectionPolicy.MANUAL)
125126
.build();
126127

127128
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/ChainTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.strategyobject.substrateclient.rpc.api.BlockNumber;
55
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
66
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
7+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
78
import com.strategyobject.substrateclient.transport.ws.WsProvider;
89
import lombok.val;
910
import org.junit.jupiter.api.Assertions;
@@ -134,7 +135,7 @@ void getCurrentBlock() throws ExecutionException, InterruptedException, TimeoutE
134135
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
135136
val wsProvider = WsProvider.builder()
136137
.setEndpoint(substrate.getWsAddress())
137-
.disableAutoConnect()
138+
.withPolicy(ReconnectionPolicy.MANUAL)
138139
.build();
139140

140141
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/StateTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.strategyobject.substrateclient.rpc.api.StorageKey;
66
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
77
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
8+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
89
import com.strategyobject.substrateclient.transport.ws.WsProvider;
910
import lombok.val;
1011
import org.junit.jupiter.api.Assertions;
@@ -176,7 +177,7 @@ void queryStorageAt() throws Exception {
176177
private WsProvider connect() throws Exception {
177178
val wsProvider = WsProvider.builder()
178179
.setEndpoint(substrate.getWsAddress())
179-
.disableAutoConnect()
180+
.withPolicy(ReconnectionPolicy.MANUAL)
180181
.build();
181182

182183
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/SystemTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.strategyobject.substrateclient.rpc.api.Index;
66
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
77
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
8+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
89
import com.strategyobject.substrateclient.transport.ws.WsProvider;
910
import lombok.val;
1011
import org.junit.jupiter.api.Test;
@@ -28,7 +29,7 @@ class SystemTests {
2829
void accountNextIndex() throws Exception {
2930
try (val wsProvider = WsProvider.builder()
3031
.setEndpoint(substrate.getWsAddress())
31-
.disableAutoConnect()
32+
.withPolicy(ReconnectionPolicy.MANUAL)
3233
.build()) {
3334
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
3435

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import com.google.common.base.Preconditions;
4+
import lombok.*;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import java.util.concurrent.TimeUnit;
8+
9+
/**
10+
* Represents a backoff retry policy
11+
*/
12+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
13+
@Getter
14+
@Slf4j
15+
public class BackoffReconnectionPolicy implements ReconnectionPolicy {
16+
/**
17+
* The number of executed attempts to reconnect
18+
*/
19+
private int counter = 0;
20+
/**
21+
* Max number of attempts
22+
*/
23+
private final int maxAttempts;
24+
/**
25+
* Initial delay
26+
*/
27+
private final long delay;
28+
/**
29+
* Time duration at a given unit
30+
*/
31+
private final TimeUnit unit;
32+
/**
33+
* Max delay
34+
*/
35+
private final long maxDelay;
36+
/**
37+
* A multiplier that's applied to delay after every attempt
38+
*/
39+
private final double factor;
40+
41+
/**
42+
* @param context contains a reason of disconnection
43+
* @return a schedule for the next reconnection
44+
*/
45+
@Override
46+
public @NonNull ReconnectionSchedule apply(@NonNull ReconnectionContext context) {
47+
try {
48+
if (counter >= maxAttempts) {
49+
log.info("Provider won't reconnect more.");
50+
51+
return ReconnectionSchedule.NEVER;
52+
}
53+
54+
var nextDelay = delay * Math.pow(factor, counter);
55+
nextDelay = Math.min(nextDelay, maxDelay);
56+
57+
log.info("Provider will try to reconnect after: {} {}", nextDelay, unit);
58+
return ReconnectionSchedule.of((long) nextDelay, unit);
59+
} finally {
60+
counter++;
61+
}
62+
}
63+
64+
/**
65+
* Reset the counter of attempts
66+
*/
67+
@Override
68+
public void reset() {
69+
counter = 0;
70+
}
71+
72+
public static Builder builder() {
73+
return new Builder();
74+
}
75+
76+
public static class Builder {
77+
private long delay = 15;
78+
private TimeUnit unit = TimeUnit.SECONDS;
79+
private long maxDelay = 150;
80+
private int maxAttempts = 10;
81+
private double factor = 2;
82+
83+
Builder() {
84+
}
85+
86+
public Builder retryAfter(long delay, TimeUnit unit) {
87+
Preconditions.checkArgument(delay >= 0);
88+
89+
this.delay = delay;
90+
this.unit = unit;
91+
92+
return this;
93+
}
94+
95+
public Builder withFactor(double factor) {
96+
Preconditions.checkArgument(factor > 0);
97+
98+
this.factor = factor;
99+
return this;
100+
}
101+
102+
public Builder withMaxDelay(long maxDelay) {
103+
Preconditions.checkArgument(maxDelay >= 0);
104+
105+
this.maxDelay = maxDelay;
106+
return this;
107+
}
108+
109+
public Builder notMoreThan(int maxAttempts) {
110+
Preconditions.checkArgument(maxAttempts >= 0);
111+
112+
this.maxAttempts = maxAttempts;
113+
return this;
114+
}
115+
116+
public BackoffReconnectionPolicy build() {
117+
return new BackoffReconnectionPolicy(
118+
this.maxAttempts,
119+
this.delay,
120+
this.unit,
121+
this.maxDelay,
122+
this.factor
123+
);
124+
}
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
6+
/**
7+
* Represents a context why connection was closed
8+
*/
9+
@RequiredArgsConstructor(staticName = "of")
10+
@Getter
11+
public class ReconnectionContext {
12+
/**
13+
* The code of the reason of disconnection
14+
*/
15+
private final int code;
16+
17+
/**
18+
* The text of the reason
19+
*/
20+
private final String reason;
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.NonNull;
4+
5+
/**
6+
* Represents a strategy of reconnection
7+
*/
8+
public interface ReconnectionPolicy {
9+
10+
/**
11+
* The method is called when connection was closed and probably should be reconnected
12+
* @param context contains a reason of disconnection
13+
* @return a schedule planned for reconnection
14+
*/
15+
@NonNull ReconnectionSchedule apply(@NonNull ReconnectionContext context);
16+
17+
/**
18+
* The method is called when connection successfully open
19+
*/
20+
void reset();
21+
22+
/**
23+
* @return the builder of BackoffReconnectionPolicy
24+
*/
25+
static BackoffReconnectionPolicy.Builder backoff() {
26+
return BackoffReconnectionPolicy.builder();
27+
}
28+
29+
/**
30+
* The policy that's supposed to not reconnect automatically
31+
*/
32+
ReconnectionPolicy MANUAL = new ReconnectionPolicy() {
33+
@Override
34+
public @NonNull ReconnectionSchedule apply(@NonNull ReconnectionContext context) {
35+
return ReconnectionSchedule.NEVER;
36+
}
37+
38+
@Override
39+
public void reset() {
40+
41+
}
42+
};
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
6+
import java.util.concurrent.TimeUnit;
7+
8+
/**
9+
* Represents a schedule of reconnection
10+
*/
11+
@RequiredArgsConstructor(staticName = "of")
12+
@Getter
13+
public class ReconnectionSchedule {
14+
private final long delay;
15+
private final TimeUnit unit;
16+
17+
/**
18+
* A schedule that should never be planned
19+
*/
20+
public static final ReconnectionSchedule NEVER = ReconnectionSchedule.of(-1, TimeUnit.SECONDS);
21+
}

0 commit comments

Comments
 (0)