Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump observability stack #6

Merged
merged 2 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Unfortunately as its name points out it is specific to Kubernetes.
This library brings a simple way to build `reconcilers` which is the core of an operator.
It runs a loop that for each event coming in will trigger the control-loop.

Its core gaols are:
Its core goals are:

1. Remain simple, caching, resync are not meant to be builtins because they are hard to be generic.
2. Observability is important so it's instrumented with [opentelemetry](https://opentelemetry.io/).
Expand Down
59 changes: 36 additions & 23 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package kreconciler

import (
"context"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/unit"
"hash/fnv"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/unit"
)

// Config use to configure a controller.
Expand Down Expand Up @@ -125,21 +127,27 @@ func (f EventHandlerFunc) Call(ctx context.Context, jobId string) error {
}

// MeteredEventHandler adds metrics any event reconciler
func MeteredEventHandler(meter metric.Meter, name string, child EventHandler) EventHandler {
counter := metric.Must(meter).NewInt64Counter("kreconciler_stream_event_count")
errors := counter.Bind(label.Bool("error", true), label.String("stream", name))
ok := counter.Bind(label.Bool("error", false), label.String("stream", name))
func MeteredEventHandler(meter metric.Meter, name string, child EventHandler) (EventHandler, error) {
counter, err := meter.SyncInt64().Counter("kreconciler_stream_event_count")
if err != nil {
return nil, err
}

return EventHandlerFunc(func(ctx context.Context, jobId string) (err error) {
defer func() {
attributes := []attribute.KeyValue{attribute.String("stream", name)}

if err != nil {
errors.Add(ctx, 1)
attributes = append(attributes, attribute.Bool("error", true))
} else {
ok.Add(ctx, 1)
attributes = append(attributes, attribute.Bool("error", false))
}

counter.Add(ctx, 1, attributes...)
}()
err = child.Call(ctx, jobId)
return
})
}), nil
}

// EventStream calls `reconciler` whenever a new event is triggered.
Expand All @@ -166,18 +174,23 @@ var NoopStream = EventStreamFunc(func(ctx context.Context, handler EventHandler)
// ResyncLoopEventStream an EventStream that calls `listFn` every `duration` interval.
// This is used for rerunning the control-loop for all entities periodically.
// Having one of these is recommended for any controller.
func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn func(ctx context.Context) ([]string, error)) EventStream {
m := metric.Must(obs.Meter)
count := m.NewInt64Counter("kreconciler_stream_resync_item_count",
metric.WithUnit(unit.Dimensionless),
metric.WithDescription("Increased by the number of items returned by the listFn"),
func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn func(ctx context.Context) ([]string, error)) (EventStream, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: we could panic on err so we don't change that prototype

count, err := obs.Meter.SyncInt64().Counter("kreconciler_stream_resync_item_count",
instrument.WithUnit(unit.Dimensionless),
instrument.WithDescription("Increased by the number of items returned by the listFn"),
)
recorder := m.NewInt64ValueRecorder("kreconciler_stream_resync_millis",
metric.WithUnit(unit.Milliseconds),
metric.WithDescription("time spent calling the listFn"),
if err != nil {
return nil, err
}

recorder, err := obs.Meter.SyncInt64().Histogram("kreconciler_stream_resync_millis",
instrument.WithUnit(unit.Milliseconds),
instrument.WithDescription("time spent calling the listFn"),
)
errorRecorder := recorder.Bind(label.String("status", "error"))
successRecorder := recorder.Bind(label.String("status", "success"))
if err != nil {
return nil, err
}

return EventStreamFunc(func(ctx context.Context, handler EventHandler) error {
ticker := time.NewTicker(duration)
for {
Expand All @@ -186,14 +199,14 @@ func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn fun
// Queue the objects to be handled.
elts, err := listFn(ctx)
if err != nil {
errorRecorder.Record(ctx, time.Since(start).Milliseconds())
recorder.Record(ctx, time.Since(start).Milliseconds(), attribute.String("status", "error"))
obs.Error("Failed resync loop call", "error", err)
time.Sleep(time.Millisecond * 250)
continue
}
obs.Info("Adding events", "count", len(elts))
count.Add(ctx, int64(len(elts)))
successRecorder.Record(ctx, time.Since(start).Milliseconds())
recorder.Record(ctx, time.Since(start).Milliseconds(), attribute.String("status", "success"))
for _, id := range elts {
// Listed objects enqueue as present.
err = handler.Call(ctx, id)
Expand All @@ -209,5 +222,5 @@ func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn fun
case <-ticker.C:
}
}
})
}), nil
}
18 changes: 16 additions & 2 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ func (c *controller) Run(ctx context.Context) error {
// Run workers.
workersCtx, cancelWorkers := context.WithCancel(ctx)
for i := 0; i < c.cfg.WorkerCount; i++ {
worker := newWorker(c.Observability, i, c.cfg.WorkerQueueSize, c.cfg.MaxItemRetries, c.cfg.DelayQueueSize, c.cfg.DelayResolution, c.cfg.MaxReconcileTime, c.reconciler)
worker, err := newWorker(c.Observability, i, c.cfg.WorkerQueueSize, c.cfg.MaxItemRetries, c.cfg.DelayQueueSize, c.cfg.DelayResolution, c.cfg.MaxReconcileTime, c.reconciler)
if err != nil {
cancelWorkers()
return err
}

c.workers = append(c.workers, worker)
go func() {
c.workerWaitGroup.Add(1)
defer c.workerWaitGroup.Done()
worker.Run(workersCtx)
}()
}

errChan := make(chan error, 1)
streamCtx, cancelStream := context.WithCancel(ctx)
// Run streams subscribers
Expand Down Expand Up @@ -85,7 +91,15 @@ func (c *controller) Run(ctx context.Context) error {
go func() {
c.streamWaitGroup.Add(1)
defer c.streamWaitGroup.Done()
err := stream.Subscribe(streamCtx, MeteredEventHandler(c.Observability.Meter, n, EventHandlerFunc(c.enqueue)))

eventHandler, err := MeteredEventHandler(c.Observability.Meter, n, EventHandlerFunc(c.enqueue))
if err != nil {
c.Error("Failed creating MeteredEventHandlersubscribing to stream", "error", err, "stream", n)
errChan <- err
return
}

err = stream.Subscribe(streamCtx, eventHandler)
if err != nil {
c.Error("Failed subscribing to stream", "error", err, "stream", n)
errChan <- err
Expand Down
9 changes: 6 additions & 3 deletions controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type countingHandler struct {
Expand Down Expand Up @@ -168,9 +169,11 @@ func TestReconcilerWithLock(t *testing.T) {

func TestResyncLoopEventStream(t *testing.T) {
obs := obsForTest(t)
stream := ResyncLoopEventStream(obs.Observability(), time.Millisecond*50, func(ctx context.Context) ([]string, error) {
stream, err := ResyncLoopEventStream(obs.Observability(), time.Millisecond*50, func(ctx context.Context) ([]string, error) {
return []string{"a", "b", "c"}, nil
})
assert.NoError(t, err)

idChannel := make(chan string, 10)
ctx, cancel := context.WithCancel(context.Background())

Expand Down
17 changes: 12 additions & 5 deletions examples/dummy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package main
import (
"context"
"fmt"
"github.com/koyeb/kreconciler"
"math/rand"
"strings"
"time"

"github.com/koyeb/kreconciler"
)

func main() {
Expand All @@ -22,13 +23,19 @@ func main() {
// This is just a set of fixed items to show case what can be done
allItems := []string{"a", "b", "c", "d"}

err := kreconciler.New(cfg, kreconciler.ReconcilerFunc(func(ctx context.Context, id string) kreconciler.Result {
resync, err := kreconciler.ResyncLoopEventStream(cfg.Observability, time.Second*5, func(ctx context.Context) ([]string, error) {
return allItems, nil
})
if err != nil {
fmt.Printf("Could not initialize resync loop err='%v'", err)
return
}

err = kreconciler.New(cfg, kreconciler.ReconcilerFunc(func(ctx context.Context, id string) kreconciler.Result {
cfg.Observability.Info("Got reconcile call", "id", id)
return kreconciler.Result{}
}), map[string]kreconciler.EventStream{
"resync": kreconciler.ResyncLoopEventStream(cfg.Observability, time.Second*5, func(ctx context.Context) ([]string, error) {
return allItems, nil
}),
"resync": resync,
"random5s": tickEventStream(5*time.Second, allItems),
"random1s": tickEventStream(time.Second, allItems),
}).Run(ctx)
Expand Down
22 changes: 17 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
module github.com/koyeb/kreconciler

go 1.16
go 1.18

require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.16.0
go.opentelemetry.io/otel/exporters/stdout v0.16.0
go.opentelemetry.io/otel/sdk v0.16.0
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/sdk/metric v0.30.0
go.opentelemetry.io/otel/trace v1.7.0
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
37 changes: 24 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.opentelemetry.io/otel v0.16.0 h1:uIWEbdeb4vpKPGITLsRVUS44L5oDbDUCZxn8lkxhmgw=
go.opentelemetry.io/otel v0.16.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA=
go.opentelemetry.io/otel/exporters/stdout v0.16.0 h1:lQG6ZZYLh3NxnmrHltRmqZolT/jPJ8Qfl74lWT8g69Y=
go.opentelemetry.io/otel/exporters/stdout v0.16.0/go.mod h1:bq7m22M7WIxz30KnxH9lI4RLKPajk0lnLsd5P2MsSv8=
go.opentelemetry.io/otel/sdk v0.16.0 h1:5o+fkNsOfH5Mix1bHUApNBqeDcAYczHDa7Ix+R73K2U=
go.opentelemetry.io/otel/sdk v0.16.0/go.mod h1:Jb0B4wrxerxtBeapvstmAZvJGQmvah4dHgKSngDpiCo=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/metric v0.30.0 h1:Hs8eQZ8aQgs0U49diZoaS6Uaxw3+bBE3lcMUKBFIk3c=
go.opentelemetry.io/otel/metric v0.30.0/go.mod h1:/ShZ7+TS4dHzDFmfi1kSXMhMVubNoP0oIaBp70J6UXU=
go.opentelemetry.io/otel/sdk v1.7.0 h1:4OmStpcKVOfvDOgCt7UriAPtKolwIhxpnSNI/yK+1B0=
go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe374q6cZwUU=
go.opentelemetry.io/otel/sdk/metric v0.30.0 h1:XTqQ4y3erR2Oj8xSAOL5ovO5011ch2ELg51z4fVkpME=
go.opentelemetry.io/otel/sdk/metric v0.30.0/go.mod h1:8AKFRi5HyvTR0RRty3paN1aMC9HMT+NzcEhw/BLkLX8=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
6 changes: 4 additions & 2 deletions observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package kreconciler

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/nonrecording"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -16,13 +18,13 @@ type Observability struct {

// DefaultObservability uses noopLogger and otel.GetMeter and otel.GetTracer
func DefaultObservability() Observability {
return NewObservability(NoopLogger{}, otel.GetMeterProvider(), otel.GetTracerProvider())
return NewObservability(NoopLogger{}, nonrecording.NewNoopMeterProvider(), otel.GetTracerProvider())
}

// LoggerWithCtx add the tracing context to the logger
func (o Observability) LoggerWithCtx(ctx context.Context) Logger {
spanCtx := trace.SpanContextFromContext(ctx)
return o.Logger.With("spanId", spanCtx.SpanID.String(), "traceId", spanCtx.TraceID.String())
return o.Logger.With("spanId", spanCtx.SpanID().String(), "traceId", spanCtx.TraceID().String())
}

// NewObservability create a new observability wraooer (usually easier to use DefaultObservability)
Expand Down
42 changes: 13 additions & 29 deletions observability_test.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,29 @@
package kreconciler

import (
"bytes"
"context"
"fmt"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/stdout"
"go.opentelemetry.io/otel/oteltest"
"go.opentelemetry.io/otel/sdk/metric/controller/basic"
otelcontroller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"testing"
"time"

"go.opentelemetry.io/otel/sdk/metric/metrictest"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

type obsTest struct {
log Logger
sr *oteltest.StandardSpanRecorder
contr *otelcontroller.Controller
log Logger
sr *tracetest.SpanRecorder
}

func (o obsTest) SpanRecorder() *oteltest.StandardSpanRecorder {
func (o obsTest) SpanRecorder() *tracetest.SpanRecorder {
return o.sr
}

func (o obsTest) Observability() Observability {
meterProvider, _ := metrictest.NewTestMeterProvider()
return Observability{
Logger: o.log,
Meter: o.contr.MeterProvider().Meter("test"),
Tracer: oteltest.NewTracerProvider(oteltest.WithSpanRecorder(o.sr)).Tracer("test"),
Meter: meterProvider.Meter("test"),
Tracer: trace.NewTracerProvider(trace.WithSpanProcessor(o.sr)).Tracer("test"),
}
}

Expand Down Expand Up @@ -65,21 +61,9 @@ func (l testLog) Warn(msg string, kv ...interface{}) {
}

func obsForTest(t *testing.T) obsTest {
sr := new(oteltest.StandardSpanRecorder)
buf1 := bytes.Buffer{}
contr, err := stdout.InstallNewPipeline([]stdout.Option{
stdout.WithPrettyPrint(),
stdout.WithWriter(&buf1),
}, []basic.Option{
basic.WithCollectPeriod(time.Second * 5),
})
require.NoError(t, err)
t.Cleanup(func() {
contr.Stop(context.Background())
})
sr := new(tracetest.SpanRecorder)
return obsTest{
log: testLog{t: t},
sr: sr,
contr: contr,
log: testLog{t: t},
sr: sr,
}
}
Loading