Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add nats #212

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -165,6 +165,7 @@ The session stores currently included are shown in the table below. Please click
| [mongodbstore](https://github.com/alexedwards/scs/tree/master/mongodbstore) | MongoDB based session store |
| [mssqlstore](https://github.com/alexedwards/scs/tree/master/mssqlstore) | MSSQL based session store |
| [mysqlstore](https://github.com/alexedwards/scs/tree/master/mysqlstore) | MySQL based session store |
| [natsstore](https://github.com/alexedwards/scs/tree/master/natsstore) | Nats Jetsream KV based session store |
| [pgxstore](https://github.com/alexedwards/scs/tree/master/pgxstore) | PostgreSQL based session store (using the [pgx](https://github.com/jackc/pgx) driver) |
| [postgresstore](https://github.com/alexedwards/scs/tree/master/postgresstore) | PostgreSQL based session store (using the [pq](https://github.com/lib/pq) driver) |
| [redisstore](https://github.com/alexedwards/scs/tree/master/redisstore) | Redis based session store |
38 changes: 38 additions & 0 deletions natsstore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# boltstore

A [NATS JetStream KVStore](https://docs.nats.io/nats-concepts/jetstream/key-value-store) based session store for [SCS](https://github.com/alexedwards/scs).

## Setup

You should follow the instructions to [open a NATS KV store](https://natsbyexample.com/examples/kv/intro/go), and pass the database to `natsstore.New()` to establish the session store.

## Expiring contexts

You should probably be using the `CtxStore` methods for best context control. If, however, you decide to use the `Store` methods, you **must** set a global context timeout value.

```go
// set the global context timeout to 100ms
natsstore.New(db, WithTimeout(time.Millisecond * 100))
```

## Expired Session Cleanup

This package provides a background 'cleanup' goroutine to delete expired session data. This stops the database table from holding on to invalid sessions indefinitely and growing unnecessarily large. By default the cleanup runs every 1 minute. You can change this by using the `WithCleanupInterval` function to initialize your session store. For example:

```go
// Run a cleanup every 5 minutes.
natsstore.New(db, WithCleanupInterval(5*time.Minute))

// Disable the cleanup goroutine by setting the cleanup interval to zero.
natsstore.New(db, WithCleanupInterval(0))
```

### Terminating the Cleanup Goroutine

It's rare that the cleanup goroutine needs to be terminated --- it is generally intended to be long-lived and run for the lifetime of your application.

However, there may be occasions when your use of a session store instance is transient. A common example would be using it in a short-lived test function. In this scenario, the cleanup goroutine (which will run forever) will prevent the session store instance from being garbage collected even after the test function has finished. You can prevent this by either disabling the cleanup goroutine altogether (as described above) or by stopping it using the `StopCleanup()` method.

## Notes

Currently Nats doesn't allow per-key expiry. In order to support per-key expiry, we take a rather hacky approach to including the expiry in the stored data that is checked on retrieval. Per-key expiry is in the works for release 2.11. Once this is available in the go client we will simplify the code and release as a /v2.
45 changes: 45 additions & 0 deletions natsstore/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package natsstore

import (
"context"
"fmt"
"time"

"github.com/alexedwards/scs/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

func ExampleRun() {
nc, _ := nats.Connect(natsURL)
defer nc.Drain()

js, _ := jetstream.New(nc)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

db, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "scs_example",
Storage: jetstream.MemoryStorage,
})

store := New(db, WithTimeout(time.Second), WithCleanupInterval(30*time.Minute))

sessionManager := scs.New()
sessionManager.Store = store

// see the store in action

putCtx, _ := sessionManager.Load(context.Background(), "")

sessionManager.Put(putCtx, "foo", "bar")
token, _, _ := sessionManager.Commit(putCtx)

getCtx, _ := sessionManager.Load(context.Background(), token)

foo := sessionManager.GetString(getCtx, "foo")

fmt.Println(foo)
// Output: bar
}
20 changes: 20 additions & 0 deletions natsstore/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module github.com/alexedwards/scs/natsstore

go 1.22.2

require (
github.com/nats-io/nats-server/v2 v2.10.14
github.com/nats-io/nats.go v1.34.1
)

require (
github.com/klauspost/compress v1.17.7 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
25 changes: 25 additions & 0 deletions natsstore/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
github.com/alexedwards/scs/v2 v2.8.0 h1:h31yUYoycPuL0zt14c0gd+oqxfRwIj6SOjHdKRZxhEw=
github.com/alexedwards/scs/v2 v2.8.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4=
github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.14 h1:98gPJFOAO2vLdM0gogh8GAiHghwErrSLhugIqzRC+tk=
github.com/nats-io/nats-server/v2 v2.10.14/go.mod h1:a0TwOVBJZz6Hwv7JH2E4ONdpyFk9do0C18TEwxnHdRk=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
195 changes: 195 additions & 0 deletions natsstore/natsstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package natsstore

import (
"context"
"log"
"time"

"github.com/nats-io/nats.go/encoders/builtin"
"github.com/nats-io/nats.go/jetstream"
)

var encoder = &builtin.GobEncoder{}

type expirableValue struct {
Value []byte
Expires time.Time
}

type NatsStore struct {
db jetstream.KeyValue

timeout time.Duration
cleanup time.Duration
stopCleanup chan bool
}

func (ns *NatsStore) get(ctx context.Context, key string, now time.Time) ([]byte, bool, error) {
val, err := ns.db.Get(ctx, key)
if err == jetstream.ErrKeyNotFound {
return nil, false, nil
}
if err != nil {
return nil, false, err
}

var decoded expirableValue
err = encoder.Decode(key, val.Value(), &decoded)
if err != nil {
return nil, false, err
}

if decoded.Expires.Before(now) {
err = ns.delete(ctx, key)
return nil, false, err
}

return decoded.Value, true, nil
}

func (ns *NatsStore) put(ctx context.Context, key string, val []byte, expiry time.Time) error {
toEncode := expirableValue{
Value: val,
Expires: expiry,
}
encoded, err := encoder.Encode(key, toEncode)
if err != nil {
return err
}
_, err = ns.db.Put(ctx, key, encoded)
return err
}

func (ns *NatsStore) delete(ctx context.Context, key string) error {
return ns.db.Purge(ctx, key)
}

// AllCtx implements scs.IterableCtxStore.
func (ns *NatsStore) AllCtx(ctx context.Context) (map[string][]byte, error) {
// cleanup := false
now := time.Now()

keys, err := ns.db.ListKeys(ctx, jetstream.IgnoreDeletes())
defer keys.Stop()
if err != nil {
return nil, err
}

out := make(map[string][]byte)
for key := range keys.Keys() {
val, available, err := ns.get(ctx, key, now)
if !available || err != nil {
// cleanup = true
continue
}
out[key] = val
}
// ns.db.PurgeDeletes(ctx)
return out, nil
}

func (ns *NatsStore) StartCleanup() {
ns.stopCleanup = make(chan bool)
ticker := time.NewTicker(ns.cleanup)
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), ns.cleanup.Truncate(time.Second))

err := ns.db.PurgeDeletes(ctx)
if err != nil {
log.Println(err)
}
cancel()
case <-ns.stopCleanup:
ns.stopCleanup = nil
ticker.Stop()
return
}
}
}

// StopCleanup terminates the background cleanup goroutine for the NetsKVStore
// instance. It's rare to terminate this; generally NetsKVStore instances and
// their cleanup goroutines are intended to be long-lived and run for the lifetime
// of your application.
//
// There may be occasions though when your use of the NetsKVStore is transient.
// An example is creating a new NetsKVStore instance in a test function. In this
// scenario, the cleanup goroutine (which will run forever) will prevent the
// NetsKVStore object from being garbage collected even after the test function
// has finished. You can prevent this by manually calling StopCleanup.
func (bs *NatsStore) StopCleanup() {
if bs.stopCleanup != nil {
bs.stopCleanup <- true
}
}

// All implements scs.IterableStore.
func (ns *NatsStore) All() (map[string][]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), ns.timeout)
defer cancel()
return ns.AllCtx(ctx)
}

// FindCtx implements scs.CtxStore.
func (ns *NatsStore) FindCtx(ctx context.Context, token string) ([]byte, bool, error) {
return ns.get(ctx, token, time.Now())
}

// Find implements scs.Store.
func (ns *NatsStore) Find(token string) ([]byte, bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), ns.timeout)
defer cancel()
return ns.FindCtx(ctx, token)
}

// CommitCtx implements scs.CtxStore.
func (ns *NatsStore) CommitCtx(ctx context.Context, token string, b []byte, expiry time.Time) error {
return ns.put(ctx, token, b, expiry)
}

// Commit implements scs.Store.
func (ns *NatsStore) Commit(token string, b []byte, expiry time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), ns.timeout)
defer cancel()
return ns.CommitCtx(ctx, token, b, expiry)
}

// DeleteCtx implements scs.CtxStore.
func (ns *NatsStore) DeleteCtx(ctx context.Context, token string) error {
return ns.delete(ctx, token)
}

func (ns *NatsStore) Delete(token string) error {
ctx, cancel := context.WithTimeout(context.Background(), ns.timeout)
defer cancel()
return ns.DeleteCtx(ctx, token)
}

// New creates a NatsStore instance. db should be a pointer to a jetstreamKeyValue store.
func New(db jetstream.KeyValue, opts ...Opt) *NatsStore {
ns := &NatsStore{db: db, cleanup: time.Minute}
for _, opt := range opts {
opt(ns)
}
if ns.cleanup > 0 {
go ns.StartCleanup()
}
return ns
}

type Opt func(ns *NatsStore)

func WithTimeout(t time.Duration) Opt {
return func(ns *NatsStore) {
ns.timeout = t
}
}

// CleanupFrequency sets how frequently stale session data gets cleaned up. It's 1 min by default
func WithCleanupInterval(t time.Duration) Opt {
return func(ns *NatsStore) {
ns.cleanup = t
}
}
253 changes: 253 additions & 0 deletions natsstore/natsstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package natsstore

import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"os"
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

var db jetstream.KeyValue

var natsURL string

func localNats() *server.Server {
ns, err := server.NewServer(&server.Options{JetStream: true})
if err != nil {
panic(err)
}

go ns.Start()
time.Sleep(time.Second)
return ns
}

func TestMain(m *testing.M) {
ns := localNats()
defer ns.Shutdown()

natsURL = ns.ClientURL()

nc, err := nats.Connect(natsURL)
if err != nil {
panic(err)
}
defer nc.Drain()

js, err := jetstream.New(nc)
if err != nil {
panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

db, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "scs_tests",
Storage: jetstream.MemoryStorage,
})
if err != nil {
panic(err)
}

results := m.Run()

os.Exit(results)
}

type testsData struct {
key string
value []byte
expiry time.Time
}

func generateData(count int, expiry time.Duration) []testsData {
val := func() []byte {
out := make([]byte, 8)
rand.Read(out)
return out
}
// mimics what scs uses to generate tokens
key := func() string {
return base64.RawURLEncoding.EncodeToString(val())
}
out := make([]testsData, count)
for i := 0; i < count; i++ {
out[i] = testsData{key(), val(), time.Now().Add(expiry)}
}
return out
}

func TestCrud(t *testing.T) {
h := New(db, WithTimeout(time.Minute))

src := generateData(1, time.Hour)[0]

t.Run("Commit", func(t *testing.T) {
err := h.Commit(src.key, src.value, src.expiry)
if err != nil {
t.Error(err)
}
})

t.Run("Find", func(t *testing.T) {
val, found, err := h.Find(src.key)
if err != nil {
t.Error(err)
}

if found != true {
t.Error("record not found")
}

if !bytes.Equal(val, src.value) {
t.Error("values don't match")
}
})

t.Run("Delete", func(t *testing.T) {
err := h.Delete(src.key)
if err != nil {
t.Error(err)
}

_, found, err := h.Find(src.key)
if err != nil {
t.Error(err)
}

if found != false {
t.Error("record not deleted")
}
})
}

func TestCrudCtx(t *testing.T) {
h := New(db)

src := generateData(1, time.Hour)[0]

t.Run("Commit", func(t *testing.T) {
err := h.CommitCtx(context.TODO(), src.key, src.value, src.expiry)
if err != nil {
t.Error(err)
}
})

t.Run("Find", func(t *testing.T) {
val, found, err := h.FindCtx(context.TODO(), src.key)
if err != nil {
t.Error(err)
}

if found != true {
t.Error("record not found")
}

if !bytes.Equal(val, src.value) {
t.Error("values don't match")
}
})

t.Run("Delete", func(t *testing.T) {
ctx := context.TODO()
err := h.DeleteCtx(ctx, src.key)
if err != nil {
t.Error(err)
}

_, found, err := h.FindCtx(ctx, src.key)
if err != nil {
t.Error(err)
}

if found != false {
t.Error("record not deleted")
}
})
}

func TestAll(t *testing.T) {
h := New(db, WithTimeout(time.Second))
src := generateData(2, time.Hour)

for _, row := range src {
h.Commit(row.key, row.value, row.expiry)
}

t.Run("basic", func(t *testing.T) {
all, err := h.All()
if err != nil {
t.Error(err)
}

if len(all) != len(src) {
t.Error("count of All is incorrect")
}
})
t.Run("with deletes", func(t *testing.T) {
h.Delete(src[0].key)

all, err := h.All()
if err != nil {
t.Error(err)
}

if len(all) != len(src)-1 {
t.Error("count of All is incorrect")
}
})
t.Run("with expiry", func(t *testing.T) {
before, _ := h.All()

shorties := generateData(2, time.Second*2)
for _, row := range shorties {
h.Commit(row.key, row.value, row.expiry)
}

after, _ := h.All()
if len(after) != len(before)+len(shorties) {
t.Error("unexpected lengths from All")
}

time.Sleep(time.Second * 3)

after, _ = h.All()
if len(after) != len(before) {
t.Error("unexpected lengths from All after expiry")
}
})

// cleanup
all, _ := h.All()
for key := range all {
h.Delete(key)
}
}

func TestCleanup(t *testing.T) {
h := New(db, WithTimeout(time.Second), WithCleanupInterval(time.Second))
src := generateData(2, time.Hour)

// uneccessary
defer h.StopCleanup()

for _, row := range src {
h.Commit(row.key, row.value, row.expiry)
}

time.Sleep(2 * time.Second)

for _, row := range src {
h.Delete(row.key)
}

time.Sleep(2 * time.Second)
}