Skip to content
This repository was archived by the owner on Mar 21, 2025. It is now read-only.

使用 master 分支最新代码的异步接口时,写入大量命令时会 coredump,请帮忙看下啥原因导致的,谢谢! #26

Closed
helloBingGeGe opened this issue Dec 2, 2016 · 19 comments

Comments

@helloBingGeGe
Copy link

helloBingGeGe commented Dec 2, 2016

#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <hircluster.h>
#include <adapters/libevent.h>

const char* get_redis_reply_type(int type)
{
    static const char *REDIS_REPLY_TYPE[] = {
        "ERROR_UNDEFINED_REDIS_REPLY_TYPE",
        "REDIS_REPLY_STRING",
        "REDIS_REPLY_ARRAY",
        "REDIS_REPLY_INTEGER",
        "REDIS_REPLY_NIL",
        "REDIS_REPLY_STATUS",
        "REDIS_REPLY_ERROR"
    };

    if(type >= REDIS_REPLY_STRING && type <= REDIS_REPLY_ERROR)
    {
        return REDIS_REPLY_TYPE[type];
    }
    else
        return REDIS_REPLY_TYPE[0];
}

void check_redis_reply(redisReply *reply, const char *cmd, int free)
{
    if(reply == NULL)
    {
        printf("exec : [%s] reply is null\n", cmd == NULL ? "NULL" : cmd);
    }
    else
    {
        printf("exec : [%s], reply->type = [%d:%s], reply->str = [%s]\n", 
                cmd == NULL ? "NULL" : cmd, reply->type, get_redis_reply_type(reply->type), reply->str);
        if(free)
            freeReplyObject(reply);
    }
}

void connectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("connect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("connect ok !\n");
    }
}

void disconnectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("disconnect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("disconnect ok !\n");
    }
}

void getCallback(redisClusterAsyncContext *acc, void *reply, void *cmd)
{
    // 会自动 free reply, 不要手动 free
    check_redis_reply(reply, cmd, 0);
}

void* event_loop_thread(void* c)
{
    struct event_base *base = event_base_new();

    redisClusterAsyncContext *acc = redisClusterAsyncConnect("127.0.0.1:6380", HIRCLUSTER_FLAG_NULL);

    if (acc->err) {
        printf("async connect error : [%s]\n", acc->errstr);
        return NULL;
    }

    redisClusterLibeventAttach(acc, base);
    redisClusterAsyncSetConnectCallback(acc, connectCallback);
    redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

    // 必须先任意注册一个事件再 event_base_dispatch ,不然 event_base_dispatch 会失败
    if(redisClusterAsyncCommand(acc, getCallback, "first", "set %d %d", 0, 0) != REDIS_OK)
        printf("redisClusterAsyncCommand error : [%d:%s]\n", acc->err, acc->errstr);

    *(redisClusterAsyncContext**)c = acc;

    int ret = event_base_dispatch(base);
    printf("event loop error ! [%d]\n", ret);
}

int main()
{
    printf("------test async------\n");

    pthread_t tid;
    redisClusterAsyncContext *acc = NULL;
    if(pthread_create(&tid, NULL, event_loop_thread, &acc) != 0)
    {
        printf("pthread_create error : [%s]\n", strerror(errno));
        return -1;
    }

    while(acc == NULL)
    {
        sleep(1);
    }

    unsigned int i=0;
    while(1)
    {
        if(redisClusterAsyncCommand(acc, getCallback, NULL, "set %d %d", i, i) != REDIS_OK)
            printf("redisClusterAsyncCommand error !\n");
        ++i;
        usleep(1000);
    }
    
    pause();
    return 0;
}

运行大概30s后内存就开始暴涨了 --!

@deep011
Copy link
Contributor

deep011 commented Dec 5, 2016

acc不能被两个线程一起访问的

@deep011 deep011 closed this as completed Dec 5, 2016
@helloBingGeGe
Copy link
Author

@deep011 请教下, evetloop 会阻塞当前线程, 那如果想在多线程中用 acc 的话异步接口岂不是没法用了? 你们生产用的是同步接口还是异步的? 如果是异步是怎么做到的呢,烦请请指点下,谢谢!

@deep011
Copy link
Contributor

deep011 commented Dec 5, 2016

一个单独的线程运行evetloop,并且监控请求队列。其他线程把请求放入该队列中,evetloop线程从队列中拿出后执行。

@helloBingGeGe
Copy link
Author

@deep011 这个监控队列是通过 libevent 的信号事件吗?有什么地方可以找到例子吗? 如果是频繁向redis写入很多数据,这种模式是否高效?

@deep011
Copy link
Contributor

deep011 commented Dec 5, 2016

异步模式肯定是高效的。你可以在evetloop线程中设置一个readable事件,这个事件被触发的时候,就可以执行响应的redis异步命令了。

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 5, 2016

#include <time.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <hircluster.h>
#include <event2/thread.h>
#include <adapters/libevent.h>

#include <deque>
#include <string>
#include "ThreadLock.h"
using namespace std;

ThreadLock* g_lock;
deque<string> g_queue;
struct event g_write_reids_event;

const char* get_redis_reply_type(int type)
{
    static const char *REDIS_REPLY_TYPE[] = {
        "ERROR_UNDEFINED_REDIS_REPLY_TYPE",
        "REDIS_REPLY_STRING",
        "REDIS_REPLY_ARRAY",
        "REDIS_REPLY_INTEGER",
        "REDIS_REPLY_NIL",
        "REDIS_REPLY_STATUS",
        "REDIS_REPLY_ERROR"
    };

    if(type >= REDIS_REPLY_STRING && type <= REDIS_REPLY_ERROR)
    {
        return REDIS_REPLY_TYPE[type];
    }
    else
        return REDIS_REPLY_TYPE[0];
}

void check_redis_reply(redisReply *reply, const char* cmd, int free)
{
    if(reply == NULL)
    {
        printf("exec : [%s] reply is null\n", cmd == NULL ? "NULL" : cmd);
    }
    else
    {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        time_t now = tv.tv_sec;
        struct tm *buf = localtime( &now );
        printf("exec : [%s], reply->type = [%d:%s], reply->str = [%s], time=[%d:%d:%d]\n", 
                cmd == NULL ? "NULL" : cmd, reply->type, get_redis_reply_type(reply->type), reply->str, buf->tm_hour, buf->tm_min, buf->tm_sec);
        if(free)
            freeReplyObject(reply);
    }
}

void connectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("connect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("connect ok !\n");
    }
}

void disconnectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("disconnect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("disconnect ok !\n");
    }
}

void getCallback(redisClusterAsyncContext *acc, void *reply, void *cmd)
{
    // 会自动 free reply, 不要手动 free
    check_redis_reply((redisReply*)reply, (const char*)cmd, 0);
}

    static void
signal_cb(evutil_socket_t fd, short event, void *arg)
{
    struct event *signal = (struct event*)arg;
    printf("%s: got signal %d\n", __func__, EVENT_SIGNAL(signal));
    exit(0);
}

    static void
redisClusterAsyncCommand_cb(evutil_socket_t fd, short event, void *arg)
{
    GuardLock gl(g_lock);

    redisClusterAsyncContext *acc = (redisClusterAsyncContext *)arg;

    while(!g_queue.empty())
    {
        if(redisClusterAsyncCommand(acc, getCallback, (void*)g_queue.front().c_str(), g_queue.front().c_str()) != REDIS_OK)
            printf("redisClusterAsyncCommand [%s] error !\n", g_queue.front().c_str());
        g_queue.pop_front();
    }
}

void* event_loop_thread(void* c)
{
    if(-1 == evthread_use_pthreads())
        printf("use thread-safe event failed !\n");

    struct event_base *base = event_base_new();

    redisClusterAsyncContext *acc = redisClusterAsyncConnect("127.0.0.1:6380", HIRCLUSTER_FLAG_NULL);

    if (acc->err) {
        printf("async connect error : [%s]\n", acc->errstr);
        return NULL;
    }

    redisClusterLibeventAttach(acc, base);
    redisClusterAsyncSetConnectCallback(acc, connectCallback);
    redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

    // 注册事件回调异步写redis函数
    event_assign(&g_write_reids_event, base, -1, EV_PERSIST, redisClusterAsyncCommand_cb, acc);
    event_add(&g_write_reids_event, NULL);

    // 注册退出事件
    struct event signal_int;
    event_assign(&signal_int, base, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb,
            &signal_int);
    event_add(&signal_int, NULL);

    *(redisClusterAsyncContext**)c = acc;

    int ret = event_base_dispatch(base);
    event_base_free(base);
    printf("event loop error ! [%d]\n", ret);
}

void* test_thread(void* v)
{
    unsigned int i=0;
    while(1)
    {
        char cmd[32] = {0};
        snprintf(cmd, sizeof(cmd), "set %u %u", i, i);

        {
                GuardLock gl(g_lock);
                g_queue.push_back(cmd);
        }
        event_active(&g_write_reids_event, i, i);
        ++i;

        usleep(100);
    }
}

int main()
{
    signal(SIGPIPE, SIG_IGN);

    g_lock = new ThreadLock();

    pthread_t tid;
    redisClusterAsyncContext *acc = NULL;
    if(pthread_create(&tid, NULL, event_loop_thread, &acc) != 0)
    {
        printf("pthread_create error : [%s]\n", strerror(errno));
        return -1;
    }

    // 等待 event_loop_thread 创建成功
    while(acc == NULL)
    {
        sleep(1);
    }

    pthread_t tid1;
    if(pthread_create(&tid1, NULL, test_thread, NULL) != 0)
    {
        printf("pthread_create error : [%s]\n", strerror(errno));
        return -1;
    }   

    pause();
    return 0;
}

@deep011 大神请问下是否像这样多个线程共享一个队列,然后 event_loop 单线程的处理队列中的命令吗? 这样多线程频繁的写入,锁竞争太大了吧, 我在我的2核虚拟机上每秒只能处理2000笔。
测试又发现写到大概150多万条的时候 coredump 了。。。

Core was generated by `./a.out'.
Program terminated with signal 11, Segmentation fault.
#0  0x000000000040b585 in dictSdsKeyCompare (privdata=0x0, key1=0x7f320c1266d8, key2=0x7f3200000000) at hircluster.c:78
78          if (l1 != l2) return 0;
Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.192.el6.x86_64 libgcc-4.4.7-17.el6.x86_64 libstdc++-4.4.7-17.el6.x86_64
(gdb) bt
#0  0x000000000040b585 in dictSdsKeyCompare (privdata=0x0, key1=0x7f320c1266d8, key2=0x7f3200000000) at hircluster.c:78
#1  0x000000000040aa7c in dictFind (ht=0x7f320c16d440, key=0x7f320c1266d8) at dict.c:251
#2  0x000000000040f418 in cluster_nodes_swap_ctx (cc=0x7f3212451010, ip=<value optimized out>, port=<value optimized out>)
    at hircluster.c:626
#3  cluster_update_route_by_addr (cc=0x7f3212451010, ip=<value optimized out>, port=<value optimized out>) at hircluster.c:1406
#4  0x000000000040f6ca in cluster_update_route (cc=0x7f3212451010) at hircluster.c:1910
#5  0x000000000040fc5b in redisClusterAsyncCallback (ac=<value optimized out>, r=0x0, privdata=0x7f320c073cc0) at hircluster.c:4294
#6  0x00000000004081bd in __redisRunCallback (ac=0x7f320c0e8670) at async.c:269
#7  __redisAsyncFree (ac=0x7f320c0e8670) at async.c:283
#8  0x00000000004092bc in __redisAsyncDisconnect (ac=0x7f320c0e8670) at async.c:348
#9  redisProcessCallbacks (ac=0x7f320c0e8670) at async.c:487
#10 0x00000000004020d0 in redisLibeventReadEvent(int, short, void*) ()
#11 0x00007f3212032f0c in event_process_active_single_queue (base=0x7f320c000980, flags=0) at event.c:1368
#12 event_process_active (base=0x7f320c000980, flags=0) at event.c:1438
#13 event_base_loop (base=0x7f320c000980, flags=0) at event.c:1639
#14 0x00000000004027f3 in event_loop_thread(void*) ()
#15 0x00007f32110d5aa1 in start_thread () from /lib64/libpthread.so.0
#16 0x00007f32113d3aad in clone () from /lib64/libc.so.6

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 6, 2016

127.0.0.1:6380> cluster nodes
907d20a754a99ecd4daae47fbc4c1ff25e988a3f 127.0.0.1:6382 slave d5d9401de32f0d19f923289e264b4190c272b0d0 1481010415359 1481010401565 8 connected
5c19bf9e08f9c9c730a57c0bb6fcbddf60ab6480 127.0.0.1:6380 myself,slave 9dbc28ed8b4cf78227876dcac5120eb4d922ed13 0 0 1 connected
d5d9401de32f0d19f923289e264b4190c272b0d0 127.0.0.1:6385 master,fail? - 1481010399557 1481010397686 8 disconnected 10923-16383
9dbc28ed8b4cf78227876dcac5120eb4d922ed13 127.0.0.1:6383 master,fail? - 1481010399557 1481010397686 9 disconnected 0-5460
51a8a647e88155f5b134a982e8aa26064e6c6d28 127.0.0.1:6384 master - 1481010415359 1481010401565 7 connected 5461-10922
d53f74b2e85e1d5e0f456dcdfc4583695fb92b91 127.0.0.1:6381 slave,fail 51a8a647e88155f5b134a982e8aa26064e6c6d28 1481010393671 1481010393073 7 disconnected
(10.34s)

127.0.0.1:6380> cluster info
cluster_state:fail
cluster_slots_assigned:16384
cluster_slots_ok:5462
cluster_slots_pfail:10922
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:30
cluster_my_epoch:27
cluster_stats_messages_sent:73692
cluster_stats_messages_received:73138

@deep011 发现每次快 coredump 的时候(写入大概150多万条命令), redis-cluster 就极不稳定, 如上面的 cluster nodes 命令所示, 然后部分redis节点就再也无法写入数据, 接着应用程序就会出现上面的崩溃。

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

你的集群设置timeout了吗?

@helloBingGeGe
Copy link
Author

用的 create-cluster 脚本:

Settings

PORT=6379
TIMEOUT=2000
NODES=6
REPLICAS=1

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

  1. 看看你cluster的cluster-require-full-coverage设置的是什么?如果是yes的话,要改成no;
  2. hiredis-vip的异步api效率很高的,qps跑到几万很easy,你的qps不高,主要是你程序使用的问题;
  3. 控制命令的发送量,并不是发送的越快,效率就越高,要看发送了多少命令,有多少命令还没有收到回复,要控制没有收到回复的命令数量,如果命令积攒的太多没有回复,反而大大降低你的qps。

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 6, 2016

@deep011

  1. 已改为no
  2. 能指出下哪里的问题吗 - -!
  3. 看了下响应很快但是写到150w的时候有响应就很慢了, redis-server 都是 D 了
    image

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

另外,请下载最新的master代码。

@helloBingGeGe
Copy link
Author

@deep011 非常感谢!我立马试试

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 6, 2016

image
image
image
image

@deep011 为什么也是写到150w的时候出现了大量的 null reply,写入的速度也是越来越慢,reids-server 像是被写死了后又自动重启了 - -!

@deep011
Copy link
Contributor

deep011 commented Dec 7, 2016

你集群有问题,你把redis-cluster-tool装上先。

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 7, 2016

@deep011 谢谢你的耐心解答, 找出问题原因了, 是我虚拟机只有 512M 的内存, 导致写到150w的时候,内存被耗光了。。。

@dandyhuang
Copy link

@deep011 谢谢你的耐心解答, 找出问题原因了, 是我虚拟机只有 512M 的内存, 导致写到150w的时候,内存被耗光了。。。
请教一下,你这边是用的大佬给的 https://github.com/vipshop/hiredis-vip/wiki/test_hiredisvip_async_ae.c 例子来封装的吗,不是很想用这个

@dandyhuang
Copy link

异步模式肯定是高效的。你可以在evetloop线程中设置一个readable事件,这个事件被触发的时候,就可以执行响应的redis异步命令了。

大佬这个一定要设置一个readable事件吗,不能调用完redisClusterAsyncCommand后在 aeMain里头,获取回掉吗,我这样处理。 回掉没有调用

#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <iostream>
#include <thread>
#include <vector>

#ifdef __cplusplus
extern "C" {
#endif
#include "adapters/ae.h"
#include "async.h"
#include "hircluster.h"
#include "hiredis.h"
#ifdef __cplusplus
}
#endif

/* Put event loop in the global scope, so it can be explicitly stopped */
void getCallback(redisClusterAsyncContext* c, void* r, void* privdata) {
  redisReply* reply = (redisReply*)r;
  if (reply == NULL) return;
  printf("argv[%s]: %s\n", (char*)privdata, reply->str);

  /* Disconnect after receiving the reply to GET */
  // redisAsyncDisconnect(c);
}

void connectCallback(const redisAsyncContext* c, int status) {
  if (status != REDIS_OK) {
    printf("Error: %s\n", c->errstr);
    // aeStop(loop);
    return;
  }

  printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext* c, int status) {
  if (status != REDIS_OK) {
    printf("Error: %s\n", c->errstr);
    // aeStop(loop);
    return;
  }

  printf("Disconnected...\n");
  // aeStop(loop);
}

void* thread_fun(void* data) {
  aeEventLoop* loop = aeCreateEventLoop(64);
  int flags = HIRCLUSTER_FLAG_NULL;
  printf("address: %s\n", (char*)data);
  std::string address = (const char*)data;
  redisClusterAsyncContext* c = redisClusterAsyncConnect(address.c_str(), flags);
  if (c->err) {
    /* Let *c leak for now... */
    printf("Error: %s\n", c->errstr);
    return NULL;
  }
  redisClusterAeAttach(loop, c);
  redisClusterAsyncSetConnectCallback(c, connectCallback);
  redisClusterAsyncSetDisconnectCallback(c, disconnectCallback);
  // aeMain(loop);
  // redisAsyncContext* c = (redisAsyncContext*)data;
  loop->stop = 0;
  while (!loop->stop) {
    redisClusterAsyncCommand(c, getCallback, (char*)"1", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"1", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"2", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"3", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"4", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"5", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"6", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"7", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"8", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"9", "GET key");
    // CMessageCollector::GetInstance()->SetResponseMessage(reply);
    if (loop->beforesleep != NULL) {
      loop->beforesleep(loop);
    }
    printf("thread_fun get\n");
    sleep(1);
    aeProcessEvents(loop, AE_ALL_EVENTS | AE_DONT_WAIT);
  }
}

int main(int argc, char** argv) {
  signal(SIGPIPE, SIG_IGN);

  std::vector<std::thread> threads;
  for (int i = 0; i < 5; ++i) {
    threads.push_back(std::thread(thread_fun, argv[1]));
  }

  // redisClusterAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc - 1], strlen(argv[argc - 1]));

  // aeMain(loop);
  for (auto& thread : threads) {
    thread.join();
  }
  return 0;
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants