-
Notifications
You must be signed in to change notification settings - Fork 50
Conversation
abstract static class InstanceState { | ||
abstract WorkflowInstance workflowInstance(); | ||
abstract RunState runState(); | ||
public abstract static class InstanceState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer keeping this private.
In KubernetesDockerRunner
, you can get the WorkflowInstance
from the RunState
, no need to spread this class across different component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah true, I commented on the PR itself it was unnecessary job
activeWorkflowInstances.parallelStream() | ||
.forEach(workflowInstance -> stateManager.receiveIgnoreClosed( | ||
Event.runError(workflowInstance, "No pod associated with this instance"))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not that time sensitive and I think the collections that are worked on are pretty small that the paralellStream
use is unnecessary. It might actually be more overhead to dispatch on the fork-join-pool rather than just iterating sequentially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
|
||
kdr.pollPods(); | ||
|
||
verify(stateManager, never()).receiveIgnoreClosed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like we don't depend on something like SyncStateManager
👍
012bfe7
to
bf4d1dc
Compare
@@ -190,9 +194,32 @@ public void init() { | |||
.watch(new PodWatcher(Integer.parseInt(resourceVersion))); | |||
} | |||
|
|||
private void pollPods() { | |||
void examineRunningWFISandAssociatedPods(PodList podList) { | |||
final Set<WorkflowInstance> activeWorkflowInstances = stateManager.activeStates().entrySet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since RunState includes the WorkflowInstance, we can just use .values()
followed by something like:
.filter(runState -> !runState.equals(RUNNING))
.map(RunState::workflowInstance)
@@ -71,6 +75,13 @@ public void receive(Event event) { | |||
} | |||
|
|||
@Override | |||
public Set<Scheduler.InstanceState> instanceStates() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed anymore.
c134c3d
to
10e0423
Compare
@ranic can we add a test that involves an active state that is not in RUNNING state? |
10e0423
to
1c6207e
Compare
|
||
kdr.pollPods(); | ||
|
||
verify(stateManager, times(1)).receiveIgnoreClosed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add a verify
for WORKFLOW_INSTANCE
to check that that was never called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't like to reverify something that there is already a unit test for - shouldNotSendRunErrorWhenPodForRunningWFIExists
. I don't see value added in improving test coverage.
👍 |
I might remove the isntanceStates() method on StateManager. I liked using instanceState directly, but we can derive it from the activeStates().
@rouz @jocke @fdema