-
Notifications
You must be signed in to change notification settings - Fork 50
Introduce Queued State #10
Conversation
babac40
to
fdebd1c
Compare
@@ -58,6 +59,11 @@ | |||
void receive(Event event) throws IsClosed; | |||
|
|||
/** | |||
* Get a map of all active {@link WorkflowInstance} states. | |||
*/ | |||
ImmutableMap<WorkflowInstance, RunState> activeStates(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ImmutableMap and not just Map? Would anyone think they can mutate the return value of this and somehow that would be persisted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will replace with just Map
return isAtOrAfter(now, deadline); | ||
} | ||
|
||
private boolean isAtOrAfter(Instant now, Instant deadline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this function confusing, I'm not sure which value is at or after compared to the other and isn't this the same as !foo.isAfter(bar)
?
LGTM so far but I guess we need to redirect the old events ( |
@@ -202,7 +157,12 @@ public RunState triggerExecution(WorkflowInstance workflowInstance, String trigg | |||
public RunState created(WorkflowInstance workflowInstance, String executionId, String dockerImage) { | |||
switch (state()) { | |||
case PREPARE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity we should add // for backwards compatibility
comment.
|
||
default: | ||
throw illegalTransition("runError"); | ||
} | ||
} | ||
|
||
@Override | ||
public RunState enqueue(WorkflowInstance workflowInstance) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use retryAfter
instead of introducing a new enqueue
event? We would reduce the need for "supporting" deprecated events/transitions.
We could also modify retryAfter
to drop the delayMillis
, which is not needed anymore with the new Scheduler. The only drawback I see with this approach is that the name retryAfter
would be a bit confusing without an explicit delay argument in the constructor.
Also, without an explicit delay in the event (either enqueue
or retryAfter
), we would lose the possibility to return the expected delay in the CLI events
command. Perhaps we could keep retryAfter
as is for now, and remove the delayMillis
when introducing the info
event.
@@ -286,7 +290,11 @@ public RunState retryAfter(WorkflowInstance workflowInstance, long delayMillis) | |||
switch (state()) { | |||
case TERMINATED: | |||
case FAILED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we introduce enqueue
, the two lines above should include // for backwards compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the enqueue
and dequeue
events from this PR, so it's mainly focused on just introducing the QUEUED
state.
@@ -212,7 +216,7 @@ public StyxScheduler build() { | |||
scheduleSources, | |||
statsFactory, | |||
executorFactory, | |||
publisherFactory); | |||
publisherFactory, retryUtil); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have its own new line :)
final Duration delay = baseDelay.multipliedBy(multiplier); | ||
|
||
final String instanceKey = state.workflowInstance().toKey(); | ||
LOG.info("{} scheduling retry #{} in {}", instanceKey, state.data().tries(), delay); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended to leave out this logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
@@ -298,7 +306,7 @@ public RunState retry(WorkflowInstance workflowInstance) { | |||
switch (state()) { | |||
case TERMINATED: // for backwards compatibility | |||
case FAILED: // for backwards compatibility | |||
case AWAITING_RETRY: | |||
case QUEUED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// for backwards compatibility
@bergman I've deferred redirecting |
5336c0d
to
e85708e
Compare
👍 |
This is mainly a refactor of existing behaviour into a setup that makes it easier to implement further scheduling behaviour.
The change also renames the
AWAITING_RETRY
state to a more genericQUEUED
state.