-
Notifications
You must be signed in to change notification settings - Fork 50
Conversation
8869805
to
3743521
Compare
} else { | ||
LOG.info("{} executing retry #{}", key.toKey(), state.data().tries()); | ||
// todo: send info event, could not run because resource limit reached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this come in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a separate ticket for it, so I'll leave the todos.
return emptySet(); | ||
} | ||
|
||
return Sets.newHashSet(workflowOpt.get().schedule().resources()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this preferred over new HashSet<>()
? I can see the use in pre-jdk7 but not since <>
was ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly habit :)
4b22937
to
adbe922
Compare
final Map<String, Long> currentResourceUsage = | ||
activeStates.parallelStream() | ||
.filter(entry -> !timedOutInstances.contains(entry.workflowInstance())) | ||
.filter(entry -> entry.runState().state() != RunState.State.QUEUED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After we have filtered out the instances about to time out, shouldn't we then filter only instances whose state is RUNNING
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was what I did in my first iteration, but it causes a race.
Example: The scheduler will dequeue some instances on tick 1. These go into submitting (not yet running). The scheduler ticks again and sees free slots in the same resources and dequeues even more instances.
By looking at instances not in queued, we avoid this condition.
.filter(entry -> shouldExecute(entry.runState())) | ||
.collect(toCollection(Lists::newArrayList)); | ||
Collections.shuffle(eligibleInstances); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense the Resource
related methods to be moved to something like ResourceManager
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, but I would like to do it at a later point when the patterns have settled a bit.
activeStates.parallelStream() | ||
.map(InstanceState::workflowInstance) | ||
.map(WorkflowInstance::workflowId) | ||
.distinct() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really needed given you put them in a map in the next step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the collector complained about duplicate keys otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, these parallel streams… so one could use .collect(toConcurrentMap())
instead
👍 |
By doing the transition and counter updates on the calling thread and then enqueing the persistence and output handler calls on the state manager queue.
ef4399a
to
36c046d
Compare
todo
depends on #23