Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ if(STATIC_BUILD)
COMMAND patch -d <SOURCE_DIR> -p1 -i "${PATCHES_DIR}/librdkafka-tarantool-security-71.patch"
COMMAND patch -d <SOURCE_DIR> -p1 -i "${PATCHES_DIR}/librdkafka-tarantool-security-72.patch"
COMMAND patch -d <SOURCE_DIR> -p1 -i "${PATCHES_DIR}/librdkafka-tarantool-security-94.patch"
COMMAND patch -d <SOURCE_DIR> -p1 -i "${PATCHES_DIR}/librdkafka-fix-ubsan.patch"
)

add_library(librdkafka_static INTERFACE)
Expand Down
236 changes: 84 additions & 152 deletions kafka/callbacks.c
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <pthread.h>
#include "callbacks.h"
#include "common.h"
#include "consumer_msg.h"
#include "queue.h"

#include <lua.h>
#include <lualib.h>
#include <lauxlib.h>

#include <librdkafka/rdkafka.h>

#include <common.h>
#include <consumer_msg.h>
#include <queue.h>
#include <callbacks.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <pthread.h>

////////////////////////////////////////////////////////////////////////////////////////////////////
/**
Expand Down Expand Up @@ -90,7 +90,8 @@ destroy_error_msg(error_msg_t *msg) {
}

void
error_callback(rd_kafka_t *UNUSED(rd_kafka), int err, const char *reason, void *opaque) {
error_callback(rd_kafka_t *rd_kafka, int err, const char *reason, void *opaque) {
(void)rd_kafka;
event_queues_t *event_queues = opaque;
if (event_queues != NULL && event_queues->queues[ERROR_QUEUE] != NULL) {
error_msg_t *msg = new_error_msg(err, reason);
Expand All @@ -100,24 +101,21 @@ error_callback(rd_kafka_t *UNUSED(rd_kafka), int err, const char *reason, void *
}

int
push_log_cb_args(struct lua_State *L, const log_msg_t *msg)
{
push_log_cb_args(struct lua_State *L, const log_msg_t *msg) {
lua_pushstring(L, msg->fac);
lua_pushstring(L, msg->buf);
lua_pushinteger(L, msg->level);
return 3;
}

int
push_stats_cb_args(struct lua_State *L, const char *msg)
{
push_stats_cb_args(struct lua_State *L, const char *msg) {
lua_pushstring(L, msg);
return 1;
}

int
push_errors_cb_args(struct lua_State *L, const error_msg_t *msg)
{
push_errors_cb_args(struct lua_State *L, const error_msg_t *msg) {
lua_pushstring(L, msg->reason);
return 1;
}
Expand All @@ -141,16 +139,18 @@ destroy_dr_msg(dr_msg_t *dr_msg) {
}

void
msg_delivery_callback(rd_kafka_t *UNUSED(producer), const rd_kafka_message_t *msg, void *opaque) {
msg_delivery_callback(rd_kafka_t *producer, const rd_kafka_message_t *msg, void *opaque) {
(void)producer;
event_queues_t *event_queues = opaque;
if (msg->_private != NULL && event_queues != NULL && event_queues->delivery_queue != NULL) {
dr_msg_t *dr_msg = msg->_private;
if (dr_msg != NULL) {
if (msg->err != RD_KAFKA_RESP_ERR_NO_ERROR) {
dr_msg->err = msg->err;
}
queue_push(event_queues->delivery_queue, dr_msg);
if (msg->_private == NULL || event_queues == NULL || event_queues->delivery_queue == NULL)
return;

dr_msg_t *dr_msg = msg->_private;
if (dr_msg != NULL) {
if (msg->err != RD_KAFKA_RESP_ERR_NO_ERROR) {
dr_msg->err = msg->err;
}
queue_push(event_queues->delivery_queue, dr_msg);
}
}

Expand All @@ -159,146 +159,81 @@ msg_delivery_callback(rd_kafka_t *UNUSED(producer), const rd_kafka_message_t *ms
*/

rebalance_msg_t *
new_rebalance_revoke_msg(rd_kafka_topic_partition_list_t *revoked) {
rebalance_msg_t *msg = xmalloc(sizeof(rebalance_msg_t));
pthread_mutex_t lock;
if (pthread_mutex_init(&lock, NULL) != 0) {
free(msg);
return NULL;
}

msg->lock = lock;
new_rebalance_msg(rebalance_event_kind_t kind,
const rd_kafka_topic_partition_list_t *partitions,
rd_kafka_resp_err_t err) {
rebalance_msg_t *msg = xcalloc(1, sizeof(*msg));
msg->kind = kind;
msg->err = err;

pthread_cond_t sync;
if (pthread_cond_init(&sync, NULL) != 0) {
free(msg);
return NULL;
if (partitions != NULL) {
msg->partitions = rd_kafka_topic_partition_list_copy(partitions);
}

msg->sync = sync;
msg->revoked = revoked;
msg->assigned = NULL;
msg->err = RD_KAFKA_RESP_ERR_NO_ERROR;
return msg;
}

rebalance_msg_t *
new_rebalance_assign_msg(rd_kafka_topic_partition_list_t *assigned) {
rebalance_msg_t *msg = xmalloc(sizeof(rebalance_msg_t));
pthread_mutex_t lock;
if (pthread_mutex_init(&lock, NULL) != 0) {
free(msg);
return NULL;
}

msg->lock = lock;

pthread_cond_t sync;
if (pthread_cond_init(&sync, NULL) != 0) {
free(msg);
return NULL;
}

msg->sync = sync;
msg->revoked = NULL;
msg->assigned = assigned;
msg->err = RD_KAFKA_RESP_ERR_NO_ERROR;
return msg;
void
destroy_rebalance_msg(rebalance_msg_t *msg) {
if (msg == NULL)
return;
if (msg->partitions != NULL)
rd_kafka_topic_partition_list_destroy(msg->partitions);
free(msg);
}

rebalance_msg_t *
new_rebalance_error_msg(rd_kafka_resp_err_t err) {
rebalance_msg_t *msg = xmalloc(sizeof(rebalance_msg_t));
pthread_mutex_t lock;
if (pthread_mutex_init(&lock, NULL) != 0) {
free(msg);
return NULL;
}
static void
push_rebalance_event_if_needed(event_queues_t *eq,
rebalance_event_kind_t kind,
const rd_kafka_topic_partition_list_t *partitions,
rd_kafka_resp_err_t err) {
if (eq == NULL)
return;
if (eq->queues[REBALANCE_QUEUE] == NULL)
return;
if (eq->cb_refs[REBALANCE_QUEUE] == LUA_REFNIL)
return;

msg->lock = lock;
rebalance_msg_t *msg = new_rebalance_msg(kind, partitions, err);
if (msg == NULL)
return;

pthread_cond_t sync;
if (pthread_cond_init(&sync, NULL) != 0) {
free(msg);
return NULL;
if (queue_push(eq->queues[REBALANCE_QUEUE], msg) != 0) {
destroy_rebalance_msg(msg);
}

msg->sync = sync;
msg->revoked = NULL;
msg->assigned = NULL;
msg->err = err;
return msg;
}

void
destroy_rebalance_msg(rebalance_msg_t *rebalance_msg) {
pthread_mutex_destroy(&rebalance_msg->lock);
pthread_cond_destroy(&rebalance_msg->sync);
free(rebalance_msg);
}

void
rebalance_callback(rd_kafka_t *consumer, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) {
event_queues_t *event_queues = opaque;
rebalance_msg_t *msg = NULL;
switch (err)
{
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
msg = new_rebalance_assign_msg(partitions);
if (msg != NULL) {

pthread_mutex_lock(&msg->lock);

if (queue_push(event_queues->queues[REBALANCE_QUEUE], msg) == 0) {
// waiting while main TX thread invokes rebalance callback
pthread_cond_wait(&msg->sync, &msg->lock);
}

pthread_mutex_unlock(&msg->lock);

destroy_rebalance_msg(msg);
}
rebalance_callback(rd_kafka_t *consumer,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque)
{
event_queues_t *eq = opaque;
const char *proto = rd_kafka_rebalance_protocol(consumer);
int cooperative = (proto != NULL) && strcmp(proto, "COOPERATIVE") == 0;

switch (err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
push_rebalance_event_if_needed(eq, REB_EVENT_ASSIGN, partitions, RD_KAFKA_RESP_ERR_NO_ERROR);
if (cooperative)
rd_kafka_incremental_assign(consumer, partitions);
else
rd_kafka_assign(consumer, partitions);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
rd_kafka_commit(consumer, partitions, 0); // sync commit

msg = new_rebalance_revoke_msg(partitions);
if (msg != NULL) {

pthread_mutex_lock(&msg->lock);

if (queue_push(event_queues->queues[REBALANCE_QUEUE], msg) == 0) {
// waiting while main TX thread invokes rebalance callback
pthread_cond_wait(&msg->sync, &msg->lock);
}

pthread_mutex_unlock(&msg->lock);

destroy_rebalance_msg(msg);
}

break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
rd_kafka_commit(consumer, partitions, 0);
push_rebalance_event_if_needed(eq, REB_EVENT_REVOKE, partitions, RD_KAFKA_RESP_ERR_NO_ERROR);
if (cooperative)
rd_kafka_incremental_unassign(consumer, partitions);
else
rd_kafka_assign(consumer, NULL);
break;
break;

default:
msg = new_rebalance_error_msg(err);
if (msg != NULL) {

pthread_mutex_lock(&msg->lock);

if (queue_push(event_queues->queues[REBALANCE_QUEUE], msg) == 0) {
// waiting while main TX thread invokes rebalance callback
pthread_cond_wait(&msg->sync, &msg->lock);
}

pthread_mutex_unlock(&msg->lock);

destroy_rebalance_msg(msg);
}
rd_kafka_assign(consumer, NULL);
break;
default:
push_rebalance_event_if_needed(eq, REB_EVENT_ERROR, NULL, err);
rd_kafka_assign(consumer, NULL);
break;
}
}

Expand Down Expand Up @@ -354,16 +289,13 @@ destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) {
destroy_log_msg(msg);
break;
case STATS_QUEUE:
free(msg);
break;
case ERROR_QUEUE:
destroy_error_msg(msg);
break;
case REBALANCE_QUEUE: {
rebalance_msg_t *rebalance_msg = msg;
pthread_mutex_lock(&rebalance_msg->lock);
// allowing background thread proceed rebalancing
pthread_cond_signal(&rebalance_msg->sync);
pthread_mutex_unlock(&rebalance_msg->lock);
destroy_rebalance_msg(msg);
break;
}
}
Expand Down
Loading
Loading