Skip to content

Commit 01002ce

Browse files
authored
Merge pull request #25 from strategyobject/fix/reconnection
There is fixed the issue of reconnection. `autoConnectMs` was changed…
2 parents 020f0e3 + 2713c4f commit 01002ce

File tree

17 files changed

+432
-68
lines changed

17 files changed

+432
-68
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77

88
allprojects {
99
group = 'com.strategyobject.substrateclient'
10-
version = '0.1.2-SNAPSHOT'
10+
version = '0.1.3-SNAPSHOT'
1111

1212
repositories {
1313
mavenLocal()

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageDoubleMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.strategyobject.substrateclient.scale.ScaleWriter;
99
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1010
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
11+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1112
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1213
import lombok.val;
1314
import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class StorageDoubleMapImplTests {
2930
void societyVotes() throws Exception {
3031
try (val wsProvider = WsProvider.builder()
3132
.setEndpoint(substrate.getWsAddress())
32-
.disableAutoConnect()
33+
.withPolicy(ReconnectionPolicy.MANUAL)
3334
.build()) {
3435
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3536

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.strategyobject.substrateclient.scale.ScaleWriter;
88
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
99
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
10+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1011
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1112
import lombok.val;
1213
import org.junit.jupiter.api.Test;
@@ -29,7 +30,7 @@ class StorageMapImplTests {
2930
void systemBlockHash() throws Exception {
3031
try (val wsProvider = WsProvider.builder()
3132
.setEndpoint(substrate.getWsAddress())
32-
.disableAutoConnect()
33+
.withPolicy(ReconnectionPolicy.MANUAL)
3334
.build()) {
3435
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3536

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageNMapImplTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1414
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
1515
import com.strategyobject.substrateclient.transport.ProviderInterface;
16+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1617
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1718
import lombok.NonNull;
1819
import lombok.val;
@@ -56,7 +57,7 @@ private static StorageNMapImpl<BlockHash> newSystemBlockHashStorage(State state)
5657
private WsProvider getConnectedProvider() throws InterruptedException, ExecutionException, TimeoutException {
5758
val wsProvider = WsProvider.builder()
5859
.setEndpoint(substrate.getWsAddress())
59-
.disableAutoConnect()
60+
.withPolicy(ReconnectionPolicy.MANUAL)
6061
.build();
6162
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
6263
return wsProvider;

pallet/src/test/java/com/strategyobject/substrateclient/pallet/storage/StorageValueImplTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.strategyobject.substrateclient.rpc.api.section.State;
99
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1010
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
11+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1112
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1213
import lombok.val;
1314
import org.junit.jupiter.api.Test;
@@ -33,7 +34,7 @@ void sudoKey() throws Exception {
3334

3435
try (val wsProvider = WsProvider.builder()
3536
.setEndpoint(substrate.getWsAddress())
36-
.disableAutoConnect()
37+
.withPolicy(ReconnectionPolicy.MANUAL)
3738
.build()) {
3839
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
3940

@@ -58,7 +59,7 @@ void sudoKeyAtGenesis() throws Exception {
5859

5960
try (val wsProvider = WsProvider.builder()
6061
.setEndpoint(substrate.getWsAddress())
61-
.disableAutoConnect()
62+
.withPolicy(ReconnectionPolicy.MANUAL)
6263
.build()) {
6364
wsProvider.connect().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
6465
val state = TestsHelper.createSectionFactory(wsProvider).create(State.class);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/AuthorTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.strategyobject.substrateclient.scale.ScaleWriter;
1212
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
1313
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
14+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
1415
import com.strategyobject.substrateclient.transport.ws.WsProvider;
1516
import lombok.val;
1617
import lombok.var;
@@ -121,7 +122,7 @@ void submitAndWatchExtrinsic() throws Exception {
121122
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
122123
val wsProvider = WsProvider.builder()
123124
.setEndpoint(substrate.getWsAddress())
124-
.disableAutoConnect()
125+
.withPolicy(ReconnectionPolicy.MANUAL)
125126
.build();
126127

127128
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/ChainTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.strategyobject.substrateclient.rpc.api.BlockNumber;
55
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
66
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
7+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
78
import com.strategyobject.substrateclient.transport.ws.WsProvider;
89
import lombok.val;
910
import org.junit.jupiter.api.Assertions;
@@ -134,7 +135,7 @@ void getCurrentBlock() throws ExecutionException, InterruptedException, TimeoutE
134135
private WsProvider connect() throws ExecutionException, InterruptedException, TimeoutException {
135136
val wsProvider = WsProvider.builder()
136137
.setEndpoint(substrate.getWsAddress())
137-
.disableAutoConnect()
138+
.withPolicy(ReconnectionPolicy.MANUAL)
138139
.build();
139140

140141
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/StateTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.strategyobject.substrateclient.rpc.api.StorageKey;
66
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
77
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
8+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
89
import com.strategyobject.substrateclient.transport.ws.WsProvider;
910
import lombok.val;
1011
import org.junit.jupiter.api.Assertions;
@@ -176,7 +177,7 @@ void queryStorageAt() throws Exception {
176177
private WsProvider connect() throws Exception {
177178
val wsProvider = WsProvider.builder()
178179
.setEndpoint(substrate.getWsAddress())
179-
.disableAutoConnect()
180+
.withPolicy(ReconnectionPolicy.MANUAL)
180181
.build();
181182

182183
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);

rpc/rpc-api/src/test/java/com/strategyobject/substrateclient/rpc/api/section/SystemTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.strategyobject.substrateclient.rpc.api.Index;
66
import com.strategyobject.substrateclient.tests.containers.SubstrateVersion;
77
import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer;
8+
import com.strategyobject.substrateclient.transport.ws.ReconnectionPolicy;
89
import com.strategyobject.substrateclient.transport.ws.WsProvider;
910
import lombok.val;
1011
import org.junit.jupiter.api.Test;
@@ -28,7 +29,7 @@ class SystemTests {
2829
void accountNextIndex() throws Exception {
2930
try (val wsProvider = WsProvider.builder()
3031
.setEndpoint(substrate.getWsAddress())
31-
.disableAutoConnect()
32+
.withPolicy(ReconnectionPolicy.MANUAL)
3233
.build()) {
3334
wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
3435

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
6+
import java.util.concurrent.TimeUnit;
7+
8+
/**
9+
* Represents a delay
10+
*/
11+
@RequiredArgsConstructor(staticName = "of")
12+
@Getter
13+
public class Delay {
14+
/**
15+
* The time to delay execution unit
16+
*/
17+
private final long value;
18+
19+
/**
20+
* The time unit of the delay parameter
21+
*/
22+
private final TimeUnit unit;
23+
24+
/**
25+
* A delay that should never be scheduled
26+
*/
27+
public static final Delay NEVER = Delay.of(-1, TimeUnit.SECONDS);
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import com.google.common.base.Preconditions;
4+
import lombok.*;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.LongAdder;
9+
10+
/**
11+
* Represents an exponential backoff retry policy
12+
*/
13+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
14+
@Getter
15+
@Slf4j
16+
public class ExponentialBackoffReconnectionPolicy implements ReconnectionPolicy<LongAdder> {
17+
/**
18+
* Max number of attempts
19+
*/
20+
private final long maxAttempts;
21+
/**
22+
* Initial delay, the time to delay execution unit
23+
*/
24+
private final long delay;
25+
/**
26+
* The time unit of the delay parameter
27+
*/
28+
private final TimeUnit unit;
29+
/**
30+
* Max delay
31+
*/
32+
private final long maxDelay;
33+
/**
34+
* A multiplier that's applied to delay after every attempt
35+
*/
36+
private final double factor;
37+
38+
/**
39+
* @param context contains a reason of disconnection and counter of attempts
40+
* @return a unit of time to delay the next reconnection
41+
*/
42+
@Override
43+
public @NonNull Delay getNextDelay(@NonNull ReconnectionContext<LongAdder> context) {
44+
try {
45+
if (context.getPolicyContext().longValue() >= maxAttempts) {
46+
log.info("Provider won't reconnect more.");
47+
48+
return Delay.NEVER;
49+
}
50+
51+
var nextDelay = delay * Math.pow(factor, context.getPolicyContext().longValue());
52+
nextDelay = Math.min(nextDelay, maxDelay);
53+
54+
log.info("Provider will try to reconnect after: {} {}", nextDelay, unit);
55+
return Delay.of((long) nextDelay, unit);
56+
} finally {
57+
context.getPolicyContext().increment();
58+
}
59+
}
60+
61+
/**
62+
* Returns the counter of attempts
63+
*/
64+
@Override
65+
public LongAdder initContext() {
66+
return new LongAdder();
67+
}
68+
69+
public static Builder builder() {
70+
return new Builder();
71+
}
72+
73+
public static class Builder {
74+
private long delay = 15;
75+
private TimeUnit unit = TimeUnit.SECONDS;
76+
private long maxDelay = 150;
77+
private long maxAttempts = 10;
78+
private double factor = 2;
79+
80+
Builder() {
81+
}
82+
83+
public Builder retryAfter(long delay, TimeUnit unit) {
84+
Preconditions.checkArgument(delay >= 0);
85+
86+
this.delay = delay;
87+
this.unit = unit;
88+
89+
return this;
90+
}
91+
92+
public Builder withFactor(double factor) {
93+
Preconditions.checkArgument(factor > 0);
94+
95+
this.factor = factor;
96+
return this;
97+
}
98+
99+
public Builder withMaxDelay(long maxDelay) {
100+
Preconditions.checkArgument(maxDelay >= 0);
101+
102+
this.maxDelay = maxDelay;
103+
return this;
104+
}
105+
106+
public Builder notMoreThan(long maxAttempts) {
107+
Preconditions.checkArgument(maxAttempts >= 0);
108+
109+
this.maxAttempts = maxAttempts;
110+
return this;
111+
}
112+
113+
public ExponentialBackoffReconnectionPolicy build() {
114+
return new ExponentialBackoffReconnectionPolicy(
115+
this.maxAttempts,
116+
this.delay,
117+
this.unit,
118+
this.maxDelay,
119+
this.factor
120+
);
121+
}
122+
}
123+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
6+
/**
7+
* Represents a context why connection was closed
8+
*
9+
* @param <T> a type of policy's context required for
10+
* computing the next delay or other policy's purposes
11+
*/
12+
@RequiredArgsConstructor(staticName = "of")
13+
@Getter
14+
public class ReconnectionContext<T> {
15+
/**
16+
* The code of the reason of disconnection
17+
*/
18+
private final int code;
19+
20+
/**
21+
* The text of the reason
22+
*/
23+
private final String reason;
24+
25+
/**
26+
* The policy's context
27+
*/
28+
private final T policyContext;
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.strategyobject.substrateclient.transport.ws;
2+
3+
import lombok.NonNull;
4+
5+
/**
6+
* @param <T> a type of policy's context required for
7+
* computing the next delay or other policy's purposes
8+
* Represents a strategy of reconnection
9+
*/
10+
public interface ReconnectionPolicy<T> {
11+
12+
/**
13+
* The method is called when connection was closed and probably should be reconnected.
14+
* @param context contains a reason of disconnection and policy's context.
15+
* @return a unit of time from now to delay reconnection.
16+
*/
17+
@NonNull Delay getNextDelay(@NonNull ReconnectionContext<T> context);
18+
19+
/**
20+
* The method is called before the first connection or when the one successfully reestablished.
21+
* @return a context required for the policy.
22+
*/
23+
T initContext();
24+
25+
/**
26+
* @return the builder of ExponentialBackoffReconnectionPolicy
27+
*/
28+
static ExponentialBackoffReconnectionPolicy.Builder exponentialBackoff() {
29+
return ExponentialBackoffReconnectionPolicy.builder();
30+
}
31+
32+
/**
33+
* @param <T> the type of context
34+
* @return the policy that's supposed to not reconnect automatically
35+
*/
36+
@SuppressWarnings("unchecked")
37+
static <T> ReconnectionPolicy<T> manual() {
38+
return (ReconnectionPolicy<T>) MANUAL;
39+
}
40+
41+
/**
42+
* The policy that's supposed to not reconnect automatically
43+
*/
44+
ReconnectionPolicy<?> MANUAL = new ReconnectionPolicy<Void>() {
45+
@Override
46+
public @NonNull Delay getNextDelay(@NonNull ReconnectionContext<Void> context) {
47+
return Delay.NEVER;
48+
}
49+
50+
@Override
51+
public Void initContext() {
52+
return null;
53+
}
54+
};
55+
}

0 commit comments

Comments
 (0)