-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better handling of event stream errors #5
Better handling of event stream errors #5
Conversation
If an individual event stream fails, the error is not properly propagated to the reconciler. The error is muted, and the reconciler continue to pull events from other stream without restarting or notifying the caller.
ff0dd70
to
6d3bb07
Compare
cancelStream
when one of the stream is failingThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGFM 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM would be perfect with a test :)
c.workerWaitGroup.Wait() | ||
c.Info("stopped workers...") | ||
c.Info("stopped controller...") | ||
close(errChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT:
Is there a specific motiviation for moving this into defer?
I find defer makes the code harder to read so I prefer to avoid it when possible (an err
var might just make things work)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to be able to return directly the error instead of storing and returning the error later (my previous version not commited).
controller.go
Outdated
@@ -72,24 +88,19 @@ func (c *controller) Run(ctx context.Context) error { | |||
err := stream.Subscribe(streamCtx, MeteredEventHandler(c.Observability.Meter, n, EventHandlerFunc(c.enqueue))) | |||
if err != nil { | |||
c.Error("Failed subscribing to stream", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe worth adding the streamId to the log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks for the review, I improved the logging, and added a test |
If an individual event stream fails, the error is not properly propagated to the reconciler.
The error is muted, and the reconciler continue to pull events from other stream without restarting or notifying the caller.