-
-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mqtt] Avoid parallel streams with common thread pool to avoid deadlocks #13621
Conversation
To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <[email protected]>
Signed-off-by: Sami Salonen <[email protected]>
return availabilityStates.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, 0)) | ||
.collect(FutureCollector.allOf()); | ||
@NonNull | ||
Collector<@NonNull CompletableFuture<@Nullable Void>, @NonNull Set<@NonNull CompletableFuture<@Nullable Void>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector | ||
.allOf(); | ||
return availabilityStates.values().stream().map(cChannel -> { | ||
final @NonNull CompletableFuture<@Nullable Void> fut; | ||
fut = cChannel == null ? CompletableFuture.completedFuture(null) : cChannel.start(connection, scheduler, 0); | ||
return fut; | ||
}).collect(allOfCollector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cChannel.start()
i.e. ChannelState.start
calls MqttBrokerConnection.subscribe
which is internally synchronizing on all subscribers
of the connection.
Making this non-parallel stream should reduce resource contention anyways.
Other changes are to remove null typing warnings
this.topics.parallelStream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf()) | ||
this.topics.stream().map(t -> connection.subscribe(t, this)).collect(allOfCollector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MqttBrokerConnection.subscribe
is internally synchronizing on all subscribers
of the connection.
Making this non-parallel stream should reduce resource contention anyways.
this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this)); | ||
this.topics.stream().forEach(t -> connection.unsubscribe(t, this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MqttBrokerConnection.unsubscribe
is internally synchronizing on all subscribers
of the connection.
Making this non-parallel stream should reduce resource contention anyways.
this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this)); | ||
this.topics.stream().forEach(t -> connection.unsubscribe(t, this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MqttBrokerConnection.unsubscribe
is internally synchronizing on all subscribers
of the connection.
Making this non-parallel stream should reduce resource contention anyways.
return channels.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, timeout)) | ||
.collect(FutureCollector.allOf()); | ||
@NonNull | ||
Collector<@NonNull CompletableFuture<@Nullable Void>, @NonNull Set<@NonNull CompletableFuture<@Nullable Void>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector | ||
.allOf(); | ||
return channels.values().stream().map(cChannel -> cChannel.start(connection, scheduler, timeout)) | ||
.collect(allOfCollector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cChannel.start()
i.e. ComponentChannel.start
calls ChannelState.start
which calls MqttBrokerConnection.subscribe
which is internally synchronizing on all subscribers
of the connection.
Making this non-parallel stream should reduce resource contention anyways.
Other changes are to remove null typing warnings
return channels.values().parallelStream().map(ComponentChannel::stop).collect(FutureCollector.allOf()); | ||
@NonNull | ||
Collector<@NonNull CompletableFuture<@Nullable Void>, @NonNull Set<@NonNull CompletableFuture<@Nullable Void>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector | ||
.allOf(); | ||
return channels.values().stream().map(ComponentChannel::stop).collect(allOfCollector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ComponentChannel.stop
i.e. ComponentChannel.start
calls ChannelState.stop
which calls MqttBrokerConnection.unsubscribe
which is internally synchronizing on all subscribers
of the connection.
Making this non-parallel stream should reduce resource contention anyways.
Other changes are to remove null typing warnings
haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout)) | ||
haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually the streams synchronizes in MqttBrokerConnection.subscribe
(see above comment with AbstractComponent
leading to resource contention.
Making this non-parallel should reduce resource contention.
haComponents.values().parallelStream().map(AbstractComponent::stop) // | ||
haComponents.values().stream().map(AbstractComponent::stop) // |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually the streams synchronizes in MqttBrokerConnection.unsubscribe
(see above comment with AbstractComponent
leading to resource contention.
Making this non-parallel should reduce resource contention.
.../java/org/openhab/binding/mqtt/homeassistant/internal/handler/HomeAssistantThingHandler.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponent.java
Outdated
Show resolved
Hide resolved
This looks like an important fix. |
Any chance getting this to official release? I have been running this version (as prebuilt here) for several weeks without adverse effects |
I agree that calling and getting results from asynchronous methods in parallel streams is not a best idea and could be replaced by sequential version. |
Signed-off-by: Sami Salonen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Let's deal with the annotations another time. 🙂
…cks (openhab#13621) To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <[email protected]> Signed-off-by: Ben Rosenblum <[email protected]>
…cks (openhab#13621) To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <[email protected]> Signed-off-by: Andras Uhrin <[email protected]>
…cks (openhab#13621) To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <[email protected]>
…cks (openhab#13621) To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <[email protected]>
…cks (openhab#13621) To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <[email protected]>
…cks (openhab#13621) To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <[email protected]> Signed-off-by: Andras Uhrin <[email protected]>
To mitigate issue #13657 (common thread pool exhaustion when combining parallel streams with synchronization or locks)
All the parallel streams are anyways hitting resource contention / bottleneck since they all leads to
MqttBrokerConnection
subscribe
orunsubscribe
which is havingsynchronized
block withsubscribers
of that broker connection. [*]On the surface things looks like they are asynchronous (based on
CompletableFuture
an all) but only the lowest level operations seem to be asynchronous, and in mid-levels we have this synchronization and locking behavour currently.I would argue typical users have only single
MqttBrokerConnection
, raising the question whether the parallel streams bring any true benefit really.[*] It looks like we might be able to remove the whole
synchronized
block there, usingConcurrentHashMap.compute
. But even then the synchronization would happen, just hidden from plain sight, inside java runtime. From official javadocs: Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this Map.... Not worth the trouble, the code would be much more unreadable that wayFixes #13657
Signed-off-by: Sami Salonen [email protected]