Skip to content

Commit 2713c4f

Browse files
committed
There is fixed the issue of reconnection. autoConnectMs was changed by ReconnectionPolicy.
WsProvider couldn't reconnect to the web socket if it was closed - future `whenConnected` wasn't being completed when socket closes that blocked reconnection.
1 parent 020f0e3 commit 2713c4f

File tree

17 files changed

+432
-68
lines changed

17 files changed

+432
-68
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77

88
allprojects {
99
group = 'com.strategyobject.substrateclient'
10-
version = '0.1.2-SNAPSHOT'
10+
version = '0.1.3-SNAPSHOT'
1111

1212
repositories {
1313
mavenLocal()

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageDoubleMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.strategyobject.substrateclient.scale.ScaleWriter;
99
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1010
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
11+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1112
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1213
import lombok.val;
1314
import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class StorageDoubleMapImplTests {
2930
void societyVotes() throws Exception {
3031
try (val wsProvider = WsProvider.builder()
3132
.setEndpoint(substrate.getWsAddress())
32-
.disableAutoConnect()
33+
.withPolicy(ReconnectionPolicy.MANUAL)
3334
.build()) {
3435
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3536

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.strategyobject.substrateclient.scale.ScaleWriter;
88
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
99
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
10+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1011
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1112
import lombok.val;
1213
import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class StorageMapImplTests {
2930
void systemBlockHash() throws Exception {
3031
try (val wsProvider = WsProvider.builder()
3132
.setEndpoint(substrate.getWsAddress())
32-
.disableAutoConnect()
33+
.withPolicy(ReconnectionPolicy.MANUAL)
3334
.build()) {
3435
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3536

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageNMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1414
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
1515
import com.strategyobject.substrateclient.transport.ProviderInterface;
16+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1617
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1718
import lombok.NonNull;
1819
import lombok.val;
@@ -56,7 +57,7 @@ private static StorageNMapImpl<BlockHash> newSystemBlockHashStorage(State state)
5657
private WsProvider getConnectedProvider() throws InterruptedException, ExecutionException, TimeoutException {
5758
val wsProvider = WsProvider.builder()
5859
.setEndpoint(substrate.getWsAddress())
59-
.disableAutoConnect()
60+
.withPolicy(ReconnectionPolicy.MANUAL)
6061
.build();
6162
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
6263
return wsProvider;

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageValueImplTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.strategyobject.substrateclient.rpc.api.section.State;
99
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1010
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
11+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1112
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1213
import lombok.val;
1314
import org.junit.jupiter.api.Test;
@@ -33,7 +34,7 @@ void sudoKey() throws Exception {
3334

3435
try (val wsProvider = WsProvider.builder()
3536
.setEndpoint(substrate.getWsAddress())
36-
.disableAutoConnect()
37+
.withPolicy(ReconnectionPolicy.MANUAL)
3738
.build()) {
3839
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3940

@@ -58,7 +59,7 @@ void sudoKeyAtGenesis() throws Exception {
5859

5960
try (val wsProvider = WsProvider.builder()
6061
.setEndpoint(substrate.getWsAddress())
61-
.disableAutoConnect()
62+
.withPolicy(ReconnectionPolicy.MANUAL)
6263
.build()) {
6364
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
6465
val state = TestsHelper.createSectionFactory(wsProvider).create(State.class);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/AuthorTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.strategyobject.substrateclient.scale.ScaleWriter;
1212
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1313
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
14+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1415
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1516
import lombok.val;
1617
import lombok.var;
@@ -121,7 +122,7 @@ void submitAndWatchExtrinsic() throws Exception {
121122
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
122123
val wsProvider = WsProvider.builder()
123124
.setEndpoint(substrate.getWsAddress())
124-
.disableAutoConnect()
125+
.withPolicy(ReconnectionPolicy.MANUAL)
125126
.build();
126127

127128
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/ChainTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.strategyobject.substrateclient.rpc.api.BlockNumber;
55
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
66
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
7+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
78
import com.strategyobject.substrateclient.transport.ws.WsProvider;
89
import lombok.val;
910
import org.junit.jupiter.api.Assertions;
@@ -134,7 +135,7 @@ void getCurrentBlock() throws ExecutionException, InterruptedException, TimeoutE
134135
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
135136
val wsProvider = WsProvider.builder()
136137
.setEndpoint(substrate.getWsAddress())
137-
.disableAutoConnect()
138+
.withPolicy(ReconnectionPolicy.MANUAL)
138139
.build();
139140

140141
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/StateTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.strategyobject.substrateclient.rpc.api.StorageKey;
66
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
77
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
8+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
89
import com.strategyobject.substrateclient.transport.ws.WsProvider;
910
import lombok.val;
1011
import org.junit.jupiter.api.Assertions;
@@ -176,7 +177,7 @@ void queryStorageAt() throws Exception {
176177
private WsProvider connect() throws Exception {
177178
val wsProvider = WsProvider.builder()
178179
.setEndpoint(substrate.getWsAddress())
179-
.disableAutoConnect()
180+
.withPolicy(ReconnectionPolicy.MANUAL)
180181
.build();
181182

182183
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/SystemTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.strategyobject.substrateclient.rpc.api.Index;
66
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
77
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
8+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
89
import com.strategyobject.substrateclient.transport.ws.WsProvider;
910
import lombok.val;
1011
import org.junit.jupiter.api.Test;
@@ -28,7 +29,7 @@ class SystemTests {
2829
void accountNextIndex() throws Exception {
2930
try (val wsProvider = WsProvider.builder()
3031
.setEndpoint(substrate.getWsAddress())
31-
.disableAutoConnect()
32+
.withPolicy(ReconnectionPolicy.MANUAL)
3233
.build()) {
3334
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
3435

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
6+
import java.util.concurrent.TimeUnit;
7+
8+
/**
9+
* Represents a delay
10+
*/
11+
@RequiredArgsConstructor(staticName = "of")
12+
@Getter
13+
public class Delay {
14+
/**
15+
* The time to delay execution unit
16+
*/
17+
private final long value;
18+
19+
/**
20+
* The time unit of the delay parameter
21+
*/
22+
private final TimeUnit unit;
23+
24+
/**
25+
* A delay that should never be scheduled
26+
*/
27+
public static final Delay NEVER = Delay.of(-1, TimeUnit.SECONDS);
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import com.google.common.base.Preconditions;
4+
import lombok.*;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.LongAdder;
9+
10+
/**
11+
* Represents an exponential backoff retry policy
12+
*/
13+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
14+
@Getter
15+
@Slf4j
16+
public class ExponentialBackoffReconnectionPolicy implements ReconnectionPolicy<LongAdder> {
17+
/**
18+
* Max number of attempts
19+
*/
20+
private final long maxAttempts;
21+
/**
22+
* Initial delay, the time to delay execution unit
23+
*/
24+
private final long delay;
25+
/**
26+
* The time unit of the delay parameter
27+
*/
28+
private final TimeUnit unit;
29+
/**
30+
* Max delay
31+
*/
32+
private final long maxDelay;
33+
/**
34+
* A multiplier that's applied to delay after every attempt
35+
*/
36+
private final double factor;
37+
38+
/**
39+
* @param context contains a reason of disconnection and counter of attempts
40+
* @return a unit of time to delay the next reconnection
41+
*/
42+
@Override
43+
public @NonNull Delay getNextDelay(@NonNull ReconnectionContext<LongAdder> context) {
44+
try {
45+
if (context.getPolicyContext().longValue() >= maxAttempts) {
46+
log.info("Provider won't reconnect more.");
47+
48+
return Delay.NEVER;
49+
}
50+
51+
var nextDelay = delay * Math.pow(factor, context.getPolicyContext().longValue());
52+
nextDelay = Math.min(nextDelay, maxDelay);
53+
54+
log.info("Provider will try to reconnect after: {} {}", nextDelay, unit);
55+
return Delay.of((long) nextDelay, unit);
56+
} finally {
57+
context.getPolicyContext().increment();
58+
}
59+
}
60+
61+
/**
62+
* Returns the counter of attempts
63+
*/
64+
@Override
65+
public LongAdder initContext() {
66+
return new LongAdder();
67+
}
68+
69+
public static Builder builder() {
70+
return new Builder();
71+
}
72+
73+
public static class Builder {
74+
private long delay = 15;
75+
private TimeUnit unit = TimeUnit.SECONDS;
76+
private long maxDelay = 150;
77+
private long maxAttempts = 10;
78+
private double factor = 2;
79+
80+
Builder() {
81+
}
82+
83+
public Builder retryAfter(long delay, TimeUnit unit) {
84+
Preconditions.checkArgument(delay >= 0);
85+
86+
this.delay = delay;
87+
this.unit = unit;
88+
89+
return this;
90+
}
91+
92+
public Builder withFactor(double factor) {
93+
Preconditions.checkArgument(factor > 0);
94+
95+
this.factor = factor;
96+
return this;
97+
}
98+
99+
public Builder withMaxDelay(long maxDelay) {
100+
Preconditions.checkArgument(maxDelay >= 0);
101+
102+
this.maxDelay = maxDelay;
103+
return this;
104+
}
105+
106+
public Builder notMoreThan(long maxAttempts) {
107+
Preconditions.checkArgument(maxAttempts >= 0);
108+
109+
this.maxAttempts = maxAttempts;
110+
return this;
111+
}
112+
113+
public ExponentialBackoffReconnectionPolicy build() {
114+
return new ExponentialBackoffReconnectionPolicy(
115+
this.maxAttempts,
116+
this.delay,
117+
this.unit,
118+
this.maxDelay,
119+
this.factor
120+
);
121+
}
122+
}
123+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
6+
/**
7+
* Represents a context why connection was closed
8+
*
9+
* @param <T> a type of policy's context required for
10+
* computing the next delay or other policy's purposes
11+
*/
12+
@RequiredArgsConstructor(staticName = "of")
13+
@Getter
14+
public class ReconnectionContext<T> {
15+
/**
16+
* The code of the reason of disconnection
17+
*/
18+
private final int code;
19+
20+
/**
21+
* The text of the reason
22+
*/
23+
private final String reason;
24+
25+
/**
26+
* The policy's context
27+
*/
28+
private final T policyContext;
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.NonNull;
4+
5+
/**
6+
* @param <T> a type of policy's context required for
7+
* computing the next delay or other policy's purposes
8+
* Represents a strategy of reconnection
9+
*/
10+
public interface ReconnectionPolicy<T> {
11+
12+
/**
13+
* The method is called when connection was closed and probably should be reconnected.
14+
* @param context contains a reason of disconnection and policy's context.
15+
* @return a unit of time from now to delay reconnection.
16+
*/
17+
@NonNull Delay getNextDelay(@NonNull ReconnectionContext<T> context);
18+
19+
/**
20+
* The method is called before the first connection or when the one successfully reestablished.
21+
* @return a context required for the policy.
22+
*/
23+
T initContext();
24+
25+
/**
26+
* @return the builder of ExponentialBackoffReconnectionPolicy
27+
*/
28+
static ExponentialBackoffReconnectionPolicy.Builder exponentialBackoff() {
29+
return ExponentialBackoffReconnectionPolicy.builder();
30+
}
31+
32+
/**
33+
* @param <T> the type of context
34+
* @return the policy that's supposed to not reconnect automatically
35+
*/
36+
@SuppressWarnings("unchecked")
37+
static <T> ReconnectionPolicy<T> manual() {
38+
return (ReconnectionPolicy<T>) MANUAL;
39+
}
40+
41+
/**
42+
* The policy that's supposed to not reconnect automatically
43+
*/
44+
ReconnectionPolicy<?> MANUAL = new ReconnectionPolicy<Void>() {
45+
@Override
46+
public @NonNull Delay getNextDelay(@NonNull ReconnectionContext<Void> context) {
47+
return Delay.NEVER;
48+
}
49+
50+
@Override
51+
public Void initContext() {
52+
return null;
53+
}
54+
};
55+
}

0 commit comments

Comments
 (0)