@@ -82,7 +82,7 @@ struct spawn_dispatch_t :
82
82
spawn_handle->on_terminate (ec, msg);
83
83
}
84
84
85
- virtual void discard (const std::error_code& ec ) const {
85
+ virtual void discard (const std::error_code&) const {
86
86
// As a personal Boris request we do not shutdown worker on external daemon disconnection
87
87
// spawn_handle->on_terminate(ec, "external isolation session was discarded");
88
88
}
@@ -169,6 +169,7 @@ struct external_t::inner_t :
169
169
std::atomic<id_t > cur_id;
170
170
std::shared_ptr<dispatch<io::context_tag>> signal_dispatch;
171
171
bool prepared;
172
+ bool connecting;
172
173
173
174
174
175
std::vector<std::shared_ptr<spool_load_t >> spool_queue;
@@ -183,7 +184,8 @@ struct external_t::inner_t :
183
184
args (_args),
184
185
log (context.log(" universal_isolate/" +_name)),
185
186
signal_dispatch (std::make_shared<dispatch<io::context_tag>>(" universal_isolate_signal" )),
186
- prepared (false )
187
+ prepared (false ),
188
+ connecting (false )
187
189
{
188
190
signal_dispatch->on <io::context::prepared>([&](){
189
191
prepared = true ;
@@ -197,25 +199,27 @@ struct external_t::inner_t :
197
199
198
200
void connect () {
199
201
socket.reset (new asio::ip::tcp::socket (io_context));
202
+ connecting = true ;
200
203
auto ep = args.as_object ().at (" external_isolation_endpoint" , dynamic_t ::empty_object).as_object ();
201
204
tcp::endpoint endpoint (
202
205
asio::ip::address::from_string (ep.at (" host" , " 127.0.0.1" ).as_string ()),
203
206
static_cast <unsigned short >(ep.at (" port" , 29042u ).as_uint ())
204
207
);
205
208
206
- auto connect_timeout = args.as_object ().at (" connect_timeout " , 5u ).as_uint ();
209
+ auto connect_timeout_ms = args.as_object ().at (" connect_timeout_ms " , 5000u ).as_uint ();
207
210
208
211
COCAINE_LOG_INFO (log , " connecting to external isolation daemon to {}" , boost::lexical_cast<std::string>(endpoint));
209
212
auto self_shared = shared_from_this ();
210
213
211
- connect_timer.expires_from_now (boost::posix_time::seconds (connect_timeout ));
214
+ connect_timer.expires_from_now (boost::posix_time::milliseconds (connect_timeout_ms ));
212
215
connect_timer.async_wait ([=](const std::error_code& ec){
213
216
if (!ec) {
214
217
self_shared->socket ->cancel ();
215
218
}
216
219
});
217
220
218
221
socket->async_connect (endpoint, [=](const std::error_code& ec) {
222
+ connecting = false ;
219
223
if (connect_timer.cancel () && !ec) {
220
224
COCAINE_LOG_INFO (log , " connected to isolation daemon" );
221
225
session = context.engine ().attach (std::move (socket), nullptr );
@@ -229,11 +233,25 @@ struct external_t::inner_t :
229
233
}
230
234
231
235
void reconnect (const std::error_code& e) {
232
- if (session) {
233
- COCAINE_LOG_INFO (log , " reconnecting to external isolation daemon" );
234
- session->detach (e);
235
- session.reset ();
236
- connect ();
236
+ if (!connecting) {
237
+ connecting = true ;
238
+ if (session) {
239
+ session->detach (e);
240
+ session.reset ();
241
+ }
242
+
243
+ auto reconnect_timeout_ms = args.as_object ().at (" reconnect_timeout_ms" , 3000u ).as_uint ();
244
+ COCAINE_LOG_INFO (log , " queueing reconnect to external isolation daemon after {} ms" , reconnect_timeout_ms);
245
+
246
+ connect_timer.expires_from_now (boost::posix_time::milliseconds (reconnect_timeout_ms));
247
+ std::weak_ptr<inner_t > weak_self (shared_from_this ());
248
+ connect_timer.async_wait ([=](const std::error_code&) {
249
+ // We don't perform error code check as nobody should cancel this timer
250
+ auto self = weak_self.lock ();
251
+ if (self) {
252
+ self->connect ();
253
+ } // else application was already stopped and isolation was destroyed
254
+ });
237
255
}
238
256
}
239
257
@@ -379,8 +397,8 @@ external_t::spool(std::shared_ptr<api::spool_handle_base_t> handler) {
379
397
} else {
380
398
COCAINE_LOG_DEBUG (inner->log , " queuing spool request" );
381
399
_inner->spool_queue .push_back (load);
382
- if (!_inner-> session && ! _inner->socket ) {
383
- inner->connect ( );
400
+ if (!inner-> connecting && _inner->prepared ) {
401
+ inner->reconnect ( std::error_code () );
384
402
}
385
403
}
386
404
});
@@ -402,10 +420,10 @@ external_t::spawn(const std::string& path,
402
420
load->apply ();
403
421
} else {
404
422
COCAINE_LOG_DEBUG (_inner->log , " queuing spawn request" );
405
- if (!_inner->session && !_inner->socket ) {
406
- inner->connect ();
407
- }
408
423
_inner->spawn_queue .push_back (load);
424
+ if (!inner->connecting && _inner->prepared ) {
425
+ inner->reconnect (std::error_code ());
426
+ }
409
427
}
410
428
});
411
429
return std::unique_ptr<api::cancellation_t >(new api::cancellation_wrapper (load));
0 commit comments