-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ordered Consumers #789
Ordered Consumers #789
Conversation
Signed-off-by: R.I.Pienaar <[email protected]>
Signed-off-by: R.I.Pienaar <[email protected]>
This introduces ordered consumers. They are a convenience over ephemeral, no ack, no redelivery, only deliver things in strict order setups. We have the swap out when we detect gaps and change out the underlying sub and JetStream consumer, and we process the heartbeats as well, detecting gaps at the end or if a stream or consumer is pulled out from underneath of us. Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid that we are introducing lots of lock inversion with work on JetStream. See also #775 that I could address later with some code cleanup.
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still some lock inversion and a comment on how the sub could be recreated when getting "already exists"
@@ -1180,37 +1292,44 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync | |||
return nil, ErrSubjectMismatch | |||
} | |||
|
|||
// Update attached status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if you add a LABEL before processing of switch{}
where we have case info != nil
, then line 1288 (after calling js.ConsumerInfo() when return is OK, we could here simply make sure that the subscription is destroyed,
set deliver
to info.Config.DeliverSubject (although likely set in the processing of the info in switch statement), set shouldCreate
to false, so we don't attempt again to add the consumer, then a simple goto . That should take care of validation and recreating the sub/jsi from scratch.
note1: recreate also the ch
for sync subscriptions because it has been assigned to another subscription, so you can't reused. This is an issue I found in the past.
note2: you would also have to move the declaration of ccreq *createConsumerRequest
and some other at the top of the function).
That's how I do in the C client and find it a bit more clean. That being said, this is with my pull subscribe implementation that uses a single sync subscribe, so basically I have only a single js.nc.subscribe() call, it's just that the deliver subject is an inbox in case of pull subscribe (and a sync flag).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code is working, so rather not redo completely again.
Signed-off-by: Derek Collison <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This introduces ordered consumers. They are a convenience over ephemeral, no ack, no redelivery, only deliver things in strict order setups. We have the swap out when we detect gaps and change out the underlying sub and JetStream consumer, and we process the heartbeats as well, detecting gaps at the end or if a stream or consumer is pulled out from underneath of us.
Signed-off-by: Derek Collison [email protected]