Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sdevent adapter #1144

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions adapters/libsdevent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#ifndef HIREDIS_LIBSDEVENT_H
#define HIREDIS_LIBSDEVENT_H
#include <systemd/sd-event.h>
#include "../hiredis.h"
#include "../async.h"

#define REDIS_LIBSDEVENT_DELETED 0x01
#define REDIS_LIBSDEVENT_ENTERED 0x02

typedef struct redisLibsdeventEvents {
redisAsyncContext *context;
struct sd_event *event;
struct sd_event_source *fdSource;
struct sd_event_source *timerSource;
int fd;
short flags;
short state;
} redisLibsdeventEvents;

static void redisLibsdeventDestroy(redisLibsdeventEvents *e) {
if (e->fdSource) {
e->fdSource = sd_event_source_disable_unref(e->fdSource);
}
if (e->timerSource) {
e->timerSource = sd_event_source_disable_unref(e->timerSource);
}
sd_event_unref(e->event);
hi_free(e);
}

static int redisLibsdeventTimeoutHandler(sd_event_source *s, uint64_t usec, void *userdata) {
((void)s);
((void)usec);
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata;
redisAsyncHandleTimeout(e->context);
return 0;
}

static int redisLibsdeventHandler(sd_event_source *s, int fd, uint32_t event, void *userdata) {
((void)s);
((void)fd);
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata;
e->state |= REDIS_LIBSDEVENT_ENTERED;

#define CHECK_DELETED() if (e->state & REDIS_LIBSDEVENT_DELETED) {\
redisLibsdeventDestroy(e);\
return 0; \
}

if ((event & EPOLLIN) && e->context && (e->state & REDIS_LIBSDEVENT_DELETED) == 0) {
redisAsyncHandleRead(e->context);
CHECK_DELETED();
}

if ((event & EPOLLOUT) && e->context && (e->state & REDIS_LIBSDEVENT_DELETED) == 0) {
redisAsyncHandleWrite(e->context);
CHECK_DELETED();
}

e->state &= ~REDIS_LIBSDEVENT_ENTERED;
#undef CHECK_DELETED

return 0;
}

static void redisLibsdeventAddRead(void *userdata) {
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata;

if (e->flags & EPOLLIN) {
return;
}

e->flags |= EPOLLIN;

if (e->flags & EPOLLOUT) {
sd_event_source_set_io_events(e->fdSource, e->flags);
} else {
sd_event_add_io(e->event, &e->fdSource, e->fd, e->flags, redisLibsdeventHandler, e);
}
}

static void redisLibsdeventDelRead(void *userdata) {
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata;

e->flags &= ~EPOLLIN;

if (e->flags) {
sd_event_source_set_io_events(e->fdSource, e->flags);
} else {
e->fdSource = sd_event_source_disable_unref(e->fdSource);
}
}

static void redisLibsdeventAddWrite(void *userdata) {
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata;

if (e->flags & EPOLLOUT) {
return;
}

e->flags |= EPOLLOUT;

if (e->flags & EPOLLIN) {
sd_event_source_set_io_events(e->fdSource, e->flags);
} else {
sd_event_add_io(e->event, &e->fdSource, e->fd, e->flags, redisLibsdeventHandler, e);
}
}

static void redisLibsdeventDelWrite(void *userdata) {
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata;

e->flags &= ~EPOLLOUT;

if (e->flags) {
sd_event_source_set_io_events(e->fdSource, e->flags);
} else {
e->fdSource = sd_event_source_disable_unref(e->fdSource);
}
}

static void redisLibsdeventCleanup(void *userdata) {
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata;

if (!e) {
return;
}

if (e->state & REDIS_LIBSDEVENT_ENTERED) {
e->state |= REDIS_LIBSDEVENT_DELETED;
} else {
redisLibsdeventDestroy(e);
}
}

static void redisLibsdeventSetTimeout(void *userdata, struct timeval tv) {
redisLibsdeventEvents *e = (redisLibsdeventEvents *)userdata;

uint64_t usec = tv.tv_sec * 1000000 + tv.tv_usec;
if (!e->timerSource) {
sd_event_add_time_relative(e->event, &e->timerSource, CLOCK_MONOTONIC, usec, 1, redisLibsdeventTimeoutHandler, e);
} else {
sd_event_source_set_time_relative(e->timerSource, usec);
}
}

static int redisLibsdeventAttach(redisAsyncContext *ac, struct sd_event *event) {
redisContext *c = &(ac->c);
redisLibsdeventEvents *e;

/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;

/* Create container for context and r/w events */
e = (redisLibsdeventEvents*)hi_calloc(1, sizeof(*e));
if (e == NULL)
return REDIS_ERR;

/* Initialize and increase event refcount */
e->context = ac;
e->event = event;
e->fd = c->fd;
sd_event_ref(event);

/* Register functions to start/stop listening for events */
ac->ev.addRead = redisLibsdeventAddRead;
ac->ev.delRead = redisLibsdeventDelRead;
ac->ev.addWrite = redisLibsdeventAddWrite;
ac->ev.delWrite = redisLibsdeventDelWrite;
ac->ev.cleanup = redisLibsdeventCleanup;
ac->ev.scheduleTimer = redisLibsdeventSetTimeout;
ac->ev.data = e;

return REDIS_OK;
}
#endif
6 changes: 6 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ IF (LIBUV)
TARGET_LINK_LIBRARIES(example-libuv hiredis uv)
ENDIF()

FIND_PATH(LIBSDEVENT systemd/sd-event.h)
IF (LIBSDEVENT)
ADD_EXECUTABLE(example-libsdevent example-libsdevent.c)
TARGET_LINK_LIBRARIES(example-libsdevent hiredis systemd)
ENDIF()

IF (APPLE)
FIND_LIBRARY(CF CoreFoundation)
ADD_EXECUTABLE(example-macosx example-macosx.c)
Expand Down
86 changes: 86 additions & 0 deletions examples/example-libsdevent.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

#include <hiredis.h>
#include <async.h>
#include <adapters/libsdevent.h>

void debugCallback(redisAsyncContext *c, void *r, void *privdata) {
(void)privdata;
redisReply *reply = r;
if (reply == NULL) {
/* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error");
return;
}
/* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/
redisAsyncDisconnect(c);
}

void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) {
printf("`GET key` error: %s\n", c->errstr ? c->errstr : "unknown error");
return;
}
printf("`GET key` result: argv[%s]: %s\n", (char*)privdata, reply->str);

/* start another request that demonstrate timeout */
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5);
}

void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("connect error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("disconnect because of error: %s\n", c->errstr);
return;
}
printf("Disconnected...\n");
}

int main (int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);

struct sd_event *event;
sd_event_default(&event);

redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
if (c->err) {
printf("Error: %s\n", c->errstr);
redisAsyncFree(c);
return 1;
}

redisLibsdeventAttach(c,event);
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0});

/*
In this demo, we first `set key`, then `get key` to demonstrate the basic usage of libsdevent adapter.
Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request.
Because we have set a 1 second timeout to the connection, the command will always fail with a
timeout error, which is shown in the `debugCallback`.
*/

redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");

/* sd-event does not quit when there are no handlers registered. Manually exit after 1.5 seconds */
sd_event_source *s;
sd_event_add_time_relative(event, &s, CLOCK_MONOTONIC, 1500000, 1, NULL, NULL);

sd_event_loop(event);
sd_event_source_disable_unref(s);
sd_event_unref(event);
return 0;
}