Read the Topic metadata saved by ZooKeeper

Read topic metadata saved by zookeeper

Table of Contents

  • 1. There are the following problems
  • 2. Solutions
  • 3. Code
    • 3.1. KafkaHelper class< /li>
    • 3.2. The complete code of main.cc

1 There are the following questions

  • You need to use the producer to get the metadata
  • When the producer and the consumer share some objects, there will be a problem that the data cannot be read

2 Solution

Use A separate class encapsulates the code for obtaining metadata, avoiding shared variables

3 Code

3.1 KafkaHelper class

#ifndef KAFKA_HELPER_H_#define KAFKA_HELPER_H_#include < string>using std::string;#include "librdkafka/rdkafkacpp.h"#include "librdkafka/rdkafka.h"#include #include #include #define BROKER_PATH "/brokers/ids"static rd_kafka_t *rk;class KafkaHelper {public: static string Brokers(string const& zookeeper) {zhandle_t * zh = initialize_zookeeper(zookeeper); char brokers[1024]; set_brokerlist_from_zookeeper(zh, brokers); r eturn brokers;} static void PrintTopicMeta(string const& topic_name) {/* * Create producer using accumulated global configuration. */ RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); string zookeeper("localhost:2181"); string brokers = KafkaHelper::Brokers(zookeeper); string errstr; global_conf->set("metadata.broker.list", brokers, errstr); RdKafka::Conf * topic_conf = RdKafka: :Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr); if (!producer) {std::cerr << "Failed to create producer: "<< errstr << std::endl; exit(1);} std::cout << "% Created producer" << producer->name() << std::endl; /* * Create topic handle. */ RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr); if (!topic) {std::cerr << "Failed to create topic: "<< errstr << std::endl; exit(1);} bool ru n = true; while (run) {class RdKafka::Metadata *metadata; // Fetch metadata RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000); if (err != RdKafka ::ERR_NO_ERROR) {std::cerr << "%% Failed to acquire metadata: "<< RdKafka::err2str(err) << std::endl; run = 0; break;} KafkaHelper::PrintMeta(topic_name, metadata); delete metadata; run = 0;}} static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {std::cout << "Metadata for "<< (topic.empty()?" ": "all topics") << "(orig broker id from broker" << metadata->orig_broker_id() << ":" << metadata->orig_broker_name() << std::endl; /* Iterate brokers * / std::cout << "" << metadata->brokers()->size() << "brokers:" << std::endl; RdKafka::Metadata::BrokerMetadataIterator ib; for (ib = metadata- >brokers()->begin(); ib != metadata->brokers()->end(); ++ib) { std::cout << "broker" << (*ib)->id() << "at" << *(*ib)->host() << ":" << (*ib)-> port() << std::endl;} /* Iterate topics */ std::cout << metadata->topics()->size() << "topics:" << std::endl; RdKafka:: Metadata::TopicMetadataIterator it; for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {std::cout << "topic "<< *(*it)->topic() << "with" << (*it)->partitions()->size() << "partitions" << std::endl; if ((* it)->err() != RdKafka::ERR_NO_ERROR) {std::cout << "" << err2str((*it)->err()); if ((*it)->err() = = RdKafka::ERR_LEADER_NOT_AVAILABLE) {std::cout << "(try again)";}} std::cout << std::endl; /* Iterate topic's partitions */ RdKafka::TopicMetadata::PartitionMetadataIterator ip ; for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {std::cout << " partition "<< (*ip)->id() << "leader" << (*ip)->leader() << ", replicas: "; /* Iterate partition's replicas */ RdKafka::PartitionMetadata::ReplicasIterator ir; for (ir = (*ip) ->replicas()->begin(); ir != (*ip)->replicas()->end(); ++ir) {std::cout << (ir == (*ip)-> replicas()->begin()? ",":"") << *ir;} /* Iterate partition's ISRs */ std::cout << ", isrs: "; RdKafka::PartitionMetadata::ISRSIterator iis; for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end(); ++iis) std::cout << ( iis == (*ip)->isrs()->begin()? ",":"") << *iis; if ((*ip)->err() != RdKafka::ERR_NO_ERROR) std: :cout << ", "<< RdKafka::err2str((*ip)->err()) << std::endl; else std::cout << std::endl;}}} private: static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {char brokers[1024]; if (type == ZOO_CHILD_EVENT && strncmp(pa th, BROKER_PATH, sizeof(BROKER_PATH)-1) == 0) {brokers[0] ='\0'; set_brokerlist_from_zookeeper(zh, brokers); if (brokers[0] !='\0' && rk != NULL ) {rd_kafka_brokers_add(rk, brokers); rd_kafka_poll(rk, 10);}}} static zhandle_t* initialize_zookeeper(string const& zookeeper) {zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0, 0 ); if (zh == NULL) {fprintf(stderr, "Zookeeper connection not established."); exit(1);} return zh;} static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {if (zzh) {struct String_vector brokerlist; if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH); return;} int i; char *brokerptr = brokers; for (i = 0; i  0) {cfg[len] = '\0'; json_error_t jerror; json_t *jobj = json_loads(cfg, 0, &jerror); if (jobj) {json_t *jhost = json_object_get(jobj, "host"); json_t *jport = json_object_get(jobj, "port" ); if (jhost && jport) {const char *host = json_string_value(jhost); const int port = json_integer_value(jport); sprintf(brokerptr, "%s:%d", host, port); brokerptr += strlen( brokerptr); if (i 

3.2 Main.cc complete code

Here contains the code to read the data

#include #include #include #include #include #include #include #include #include "helper/kafka_helper.h" using std::string; using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;class MyEventCb: public RdKafka::EventCb {public: void event_cb (RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): "<< event.str() << std::endl; if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\" STATS\": "<< event.str() << std::endl; break; c ase RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str(). c_str()); break; default: std::cerr << "EVENT "<< event.type() <<" (" << RdKafka::err2str(event.err()) << "): "< err()) {case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: /* Real message */ std::cout << "Read msg at offset "<< message->offset() << std::endl; if (message->key()) {std::cout << "Key: "<< *message->key() << std::endl;} cout << static_cast(message->payload()) << endl; break ; case RdKafka::ERR__PARTITION_EOF: cout << "reach last message" << endl; /* Last message */ if (exit_eof) {run = false;} break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: std ::cerr << "Consume failed: "<< message->errstr() << std::endl; run = false; break; default: /* Errors */ std::cerr << "Consume failed:" << message->errstr () << std::endl; run = false; })class MyConsumeCb: public RdKafka::ConsumeCb {public: void consume_cb (RdKafka::Message &msg, void *opaque) {msg_consume(&msg, opaque); }); static void sigterm (int sig) {run = false;}int main (int argc, char **argv) {/* * Process kill signal, quit from the loop */ signal(SIGINT, sigterm); signal(SIGTERM, sigterm ); /* * Get broker list from zookeeper */ string zookeeper("localhost:2181"); string brokers = KafkaHelper::Brokers(zookeeper); cout << "brokers from zookeeper is: "<< brokers << endl; string topic_name = "test2"; /* * Print topic meta */ KafkaHelper::PrintTopicMeta(topic_name); /* * Global conf objects */ RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf ::CONF_GLOBAL); string errstr; global_conf->set("metadata.broker.list", broke rs, errstr); MyEventCb ex_event_cb; global_conf->set("event_cb", &ex_event_cb, errstr); /* * Topic conf objects */ RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf: :CONF_TOPIC); /* * Create consumer using accumulated global configuration. */ RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr); if (!consumer) {std::cerr << "Failed to create consumer: "<< errstr << std::endl; exit(1);} std::cout << "% Created consumer" << consumer->name() << std::endl; /* * Start consumer for topic+partition at start offset */ int32_t partition = 0; int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr) ; RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset); if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to start consumer: "<< RdKafka::err2str( resp) << std::endl; exit(1);} /* * Consume messages */ MyConsumeCb ex_consume_cb; int use_ccb = 0; while (run) {if (use_ccb) {consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);} else {RdKafka::Message *msg = consumer->consume (topic2, partition, 1000); msg_consume(msg, NULL); delete msg;} consumer->poll(0);} /* * Stop consumer */ consumer->stop(topic2, partition); consumer->poll( 1000); delete topic2; delete consumer; /* * Wait for RdKafka to decommission. * This is not strictly needed (when check outq_len() above), but * allows RdKafka to clean up all its resources before the application * exits so that memory profilers such as valgrind wont complain about * memory leaks. */ RdKafka::wait_destroyed(5000); return 0;}

Created: 2016-05-02 Mon 13:07

Validate

Let’s share my teacher’s artificial intelligence tutorial. Zero-based! Easy to understand! Funny and humorous! Also bring yellow jokes! Hope you join our artificial intelligence team too! https://blog.csdn.net/jiangjunshow

Read topic metadata saved by zookeeper

Table of Contents

< div id="text-table-of-contents">

  • 1. There are the following problems
  • 2. Solutions
  • 3. Code
    • 3.1. KafkaHelper class
    • 3.2. The complete code of main.cc

1 There are the following problems

  • You need to use the producer to get the metadata
  • Appears when the producer and the consumer share some objects Unable to read the data

< span class="section-number-2">2 Solution

Use a separate class package to obtain Metadata code, avoid sharing variables

3 code< /h2>

3.1 KafkaHelper class

#ifndef KAFKA_HELPER_H_#define KAFKA_HELPER_H_#include using std::string;#include "librdkafka/rdkafkacpp.h"#include "librdkafka /rdkafka.h"#include #include #include #define BROKER_PATH "/brokers/ids"static rd_kafka_t *rk;class KafkaHelper {public : static string Brokers(string const& zookeeper) {zhandle_t * zh = initialize_zookeeper(zookeeper); char brokers[1024]; set_brokerlist_from_zookeeper(zh, brokers); return brokers;} static void PrintTopicMeta(s tring const& topic_name) {/* * Create producer using accumulated global configuration. */ RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); string zookeeper("localhost:2181") ; string brokers = KafkaHelper::Brokers(zookeeper); string errstr; global_conf->set("metadata.broker.list", brokers, errstr); RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka: :Conf::CONF_TOPIC); RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr); if (!producer) {std::cerr << "Failed to create producer: "<< errstr < name() << std::endl; /* * Create topic handle. */ RdKafka:: Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr); if (!topic) {std::cerr << "Failed to create topic: "<< errstr << std::endl; exit(1);} bool run = true; while (run) {class RdKafka:: Metadata *metadata; // Fetch metadata RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000); if (err != RdKafka::ERR_NO_ERROR) {std::cerr << "% % Failed to acquire metadata: "<< RdKafka::err2str(err) << std::endl; run = 0; break;} KafkaHelper::PrintMeta(topic_name, metadata); delete metadata; run = 0;}} static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {std::cout << "Metadata for "<< (topic.empty()? "": "all topics") << "(orig broker id from broker "<< metadata->orig_broker_id() << ":" << metadata->orig_broker_name() << std::endl; /* Iterate brokers */ std::cout <<" "<< metadata- >brokers()->size() << "brokers:" << std::endl; RdKafka::Metadata::BrokerMetadataIterator ib; for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {std::cout << "broker" << (*ib)->id() << " at "<< *(*ib)->host() << ":" << (*ib)->port() << std::endl;} /* Iterate topics */ std::cout << metadata->topics()->size() << "topics:" << std::endl; RdKafka::Metadata::TopicMetadataIterator it; for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {std::cout << "topic "<< *(*it)->topic() <<" with "<< (*it )->partitions()->size() << "partitions" << std::endl; if ((*it)->err() != RdKafka::ERR_NO_ERROR) {std::cout <<" " << err2str((*it)->err()); if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {std::cout << "(try again)";}} std ::cout << std::endl; /* Iterate topic's partitions */ RdKafka::TopicMetadata::PartitionMetadataIterator ip; for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {std::cout << "partition" << (*ip)->id() << "leader" << (*ip )->leader() << ", replicas: "; /* Iterate partition's replicas */ RdKafka::PartitionMetadata::ReplicasIterator ir; for (ir = (*ip)->replicas()->begin(); ir != (* ip)->replicas()->end(); ++ir) {std::cout << (ir == (*ip)->replicas()->begin()? ",":"") << *ir;} /* Iterate partition's ISRs */ std::cout << ", isrs: "; RdKafka::PartitionMetadata::ISRSIterator iis; for (iis = (*ip)->isrs()- >begin(); iis != (*ip)->isrs()->end(); ++iis) std::cout << (iis == (*ip)->isrs()->begin( )? ",":"") << *iis; if ((*ip)->err() != RdKafka::ERR_NO_ERROR) std::cout << ", "<< RdKafka::err2str((* ip)->err()) << std::endl; else std::cout << std::endl;}}} private: static void watcher(zhandle_t *zh, int type, int state, const char *path , void *watcherCtx) {char brokers[1024]; if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH)-1) == 0) {brokers[0] ='\0'; set_brokerlist_from_zookeeper(zh, brokers); if (brokers[0] !='\0' && rk != NULL) {rd_kafka_brokers_add(rk, brokers); rd_kafka_poll(rk, 10) ;}}} static zhandle_t* initialize_zookeeper(string const& zookeeper) {zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0); if (zh == NULL) {fprintf(stderr, " Zookeeper connection not established."); exit(1);} return zh;} static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {if (zzh) {struct String_vector brokerlist; if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH); return;} int i; char *brokerptr = brokers; for (i = 0; i  0) {cfg[len] ='\0'; json_error_t jerror; json_t *jobj = json_loads(cfg, 0, &jerror); if (jobj) {json_t *jhost = json_object_get(jobj, "host"); json_t *jport = json_object_get(jobj, "port"); if (jhost && jport) {const char *host = json_string_value(jhost) ; const int port = json_integer_value(jport); sprintf(brokerptr, "%s:%d", host, port); brokerptr += strlen(brokerptr); if (i 

3.2 The complete code of main.cc

Here contains the code to read the data

#include #include #include #include #include # include #include #include #include "helper/kafka_helper.h"using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;class MyEventCb: public RdKafka::EventCb {public: void event_cb (RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): "<< event.str() << std::endl; if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": "<< event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "L OG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: std::cerr < <"EVENT "<< event.type() <<" (" << RdKafka::err2str(event.err()) << "): "<< event.str() << std::endl; break ;} });void msg_consume(RdKafka::Message* message, void* opaque) {switch (message->err()) {case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: /* Real message */ std::cout << "Read msg at offset "<< message->offset() << std::endl; if (message->key()) {std::cout << "Key:" << * message->key() << std::endl;} cout << static_cast(message->payload()) << endl; break; case RdKafka::ERR__PARTITION_EOF: cout << "reach last message "<< endl; /* Last message */ if (exit_eof) {run = false;} break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: std::cerr << "Consume failed:" << message- >errstr() << std::endl; run = false; break; default: /* Errors */ std::cerr << "Consume failed: "<< message->errstr() << std::endl; run = false; })class MyConsumeCb: public RdKafka ::ConsumeCb {public: void consume_cb (RdKafka::Message &msg, void *opaque) {msg_consume(&msg, opaque); }); static void sigterm (int sig) {run = false;}int main (int argc, char **argv) {/* * Process kill signal, quit from the loop */ signal(SIGINT, sigterm); signal(SIGTERM, sigterm); /* * Get broker list from zookeeper */ string zookeeper("localhost:2181" ); string brokers = KafkaHelper::Brokers(zookeeper); cout << "brokers from zookeeper is: "<< brokers << endl; string topic_name = "test2"; /* * Print topic meta */ KafkaHelper::PrintTopicMeta( topic_name); /* * Global conf objects */ RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); string errstr; global_conf->set("metadata.broker.list" , brokers, errstr); MyEventCb ex_event_cb; global_conf->set(" event_cb", &ex_event_cb, errstr); /* * Topic conf objects */ RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /* * Create consumer using accumulated global configuration. */ RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr); if (!consumer) {std::cerr << "Failed to create consumer: "<< errstr << std::endl ; exit(1);} std::cout << "% Created consumer "<< consumer->name() << std::endl; /* * Start consumer for topic+partition at start offset */ int32_t partition = 0; int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr); RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset); if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to start consumer: "<< RdKafka::err2str(resp) << std::endl; exit(1); } /* * Consume messages */ MyConsumeCb ex_consume_cb; int use_ccb = 0; while (run) {if (use_ccb) {consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);} else {RdKafka::Message *msg = consumer->consume(topic2, partition, 1000); msg_consume(msg , NULL); delete msg;} consumer->poll(0);} /* * Stop consumer */ consumer->stop(topic2, partition); consumer->poll(1000); delete topic2; delete consumer; /* * Wait for RdKafka to decommission. * This is not strictly needed (when check outq_len() above), but * allows RdKafka to clean up all its resources before the application * exits so that memory profilers such as valgrind wont complain about * memory leaks . */ RdKafka::wait_destroyed(5000); return 0;}

Created: 2016-05-02 Mon 13:07

Validate

Read topic metadata saved by zookeeper

Table of Contents

  • 1. There are the following problems
  • 2. Solutions
  • 3. Code
    • 3.1. KafkaHelper class
    • 3.2. Main.cc complete code

1 There are the following problems

  • You need to use producer to get metadata
  • When producer and consumer share some objects There will be a problem of not being able to read data

2 Solution

Use a separate class Encapsulate the code to obtain metadata to avoid sharing variables

< span class="section-number-2">3 code

3.1 KafkaHelper class

#ifndef KAFKA_HELPER_H_#define KAFKA_HELPER_H_#include using std::string;#include "librdkafka/rdkafkacpp.h"#include "librdkafka/rdkafka.h" #include #include #include #define BROKER_PATH "/brokers/ids"static rd_kafka_t *rk;class KafkaHelper {public: static string Brokers( string const& zookeeper) {zhandle_t * zh = initialize_zookeeper(zookeeper); char brokers[1024]; set_brokerlist_from_zookeeper(zh, brokers); return brokers;} static void PrintTopicMeta(string const& topic_name) {/* * producer Create using accumulated global configuration. */ RdKafka::Conf * global_conf = RdKafka::Conf::create (RdKafka::Conf::CONF_GLOBAL); string zookeeper("localhost:2181"); string brokers = KafkaHelper::Brokers(zookeeper); string errstr; global_conf->set("metadata.broker.list", brokers, errstr ); RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr); if (!producer ) {std::cerr << "Failed to create producer: "<< errstr << std::endl; exit(1);} std::cout << "% Created producer" << producer->name() << std::endl; /* * Create topic handle. */ RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr); if (!topic) {std::cerr << "Failed to create topic: "<< errstr << std::endl; exit(1);} bool run = true; while (run) {class RdKafka::Metadata *metadata; // Fetch metadata RdKafka:: ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000); if (err != RdKafka:: ERR_NO_ERROR) {std::cerr << "%% Failed to acquire metadata: "<< RdKafka::err2str(err) << std::endl; run = 0; break;} KafkaHelper::PrintMeta(topic_name, metadata) ; delete metadata; run = 0;}} static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {std::cout << "Metadata for "<< (topic.empty()? "": "all topics") << "(orig broker id from broker "<< metadata->orig_broker_id() << ":" << metadata->orig_broker_name() << std::endl; /* Iterate brokers */ std ::cout << "" << metadata->brokers()->size() << "brokers:" << std::endl; RdKafka::Metadata::BrokerMetadataIterator ib; for (ib = metadata->brokers ()->begin(); ib != metadata->brokers()->end(); ++ib) {std::cout << "broker" << (*ib)->id() << "at" << *(*ib)->host() << ":" << (*ib)->port() << std::endl;} /* Iterate topics */ std::cout < topics()->size( ) << " topics:" << std::endl;    RdKafka::Metadata::TopicMetadataIterator it;    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {      std::cout << "  topic "<< *(*it)->topic() << " with "                 << (*it)->partitions()->size() << " partitions" << std::endl;      if ((*it)->err() != RdKafka::ERR_NO_ERROR) {        std::cout << " " << err2str((*it)->err());        if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {          std::cout << " (try again)";        }             }      std::cout << std::endl;      /* Iterate topic‘s partitions */      RdKafka::TopicMetadata::PartitionMetadataIterator ip;      for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {        std::cout << "    partition " << (*ip)->id()                  << " leader " << (*ip)->leader()                  << ", replicas: ";        /* Iterate partition‘s replicas */        RdKafka::PartitionMetadata::ReplicasIterator ir;        for (ir = (*ip)->repl icas()->begin();              ir != (*ip)->replicas()->end() ;              ++ir) {          std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;        }        /* Iterate partition‘s ISRs */        std::cout << ", isrs: ";        RdKafka::PartitionMetadata::ISRSIterator iis;        for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)          std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;        if ((*ip)->err() != RdKafka::ERR_NO_ERROR)          std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;        else          std::cout << std::endl;      }    }  } private:  static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {    char brokers[1024];    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)      {        brokers[0] = ‘\0‘;        set_brokerlist_from_zookeeper(zh, brokers);        if (brokers[0] != ‘\0‘ && rk != NULL)          {            rd_k afka_brokers_add(rk, brokers);            rd_kafka_poll(rk, 10);          }      }  }  static zhandle_t* initialize_zookeeper(string const& zookeeper) {    zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);    if (zh == NULL) {      fprintf(stderr, "Zookeeper connection not established.");      exit(1);    }    return zh;  }  static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {    if (zzh) {      struct String_vector brokerlist;      if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {        fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);        return;      }      int i;      char *brokerptr = brokers;      for (i = 0; i < brokerlist.count; i++) {        char path[255], cfg[1024];        sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);        int len = sizeof(cfg);        zoo_get(zzh, path, 0, cfg, &len, NULL);        if (len > 0) {          cfg[len] = ‘\0‘;          json_error_t jerror;          json_t * jobj = json_loads(cfg, 0, &jerror);          if (jobj) {            json_t *jhost = json_object_get(jobj, "host");            json_t *jport = json_object_get(jobj, "port");            if (jhost && jport) {              const char *host = json_string_value(jhost);              const int   port = json_integer_value(jport);              sprintf(brokerptr, "%s:%d", host, port);              brokerptr += strlen(brokerptr);              if (i < brokerlist.count - 1) {                *brokerptr++ = ‘,‘;              }            }            json_decref(jobj);          }        }      }      deallocate_String_vector(&brokerlist);      printf("Found brokers %s\n", brokers);    }  }};#endif

3.2 main.cc完整代码

这里包含了读取数据的代码

 #include #include #include #include #include #include #include #include #include "helper/kafka_helper.h"using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;class MyEventCb : public RdKafka::EventCb {public:  void event_cb (RdKafka::Event &event) {    switch (event.type())      {      case RdKafka::Event::EVENT_ERROR:        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)          run = false;        break;      case RdKafka::Event::EVENT_STATS:        std::cerr << "\"STATS\": " << event.str() << std::endl;        break;      case RdKafka::Event::EVENT_LOG:        fprintf(stderr, "LOG-%i-%s: %s\n",                event.severity(), event.fac().c_str(), event.str().c_str());        break;      default:        std::cerr << "EVENT " << event.type() <<          " (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        break;      }  }};void msg_consume(RdKafka::Message* message, void* opaque) {  switch (message->err()) {  case RdKafka::ERR__TIMED_OUT:    break;  case RdKafka::ERR_NO_ERROR:    /* Real message */    std::cout << "Read msg at offset " << message->offset() << std::endl;    if (message->key()) {      std::cout << "Key: " << *message->key() << std::endl;    }    cout << static_cast(message->payload()) << endl;    break;  case RdKafka::ERR__PARTITION_EOF:    cout << "reach last message" << endl;    /* Last message */    if (exit_eof) {      run = false;    }    break;  case RdKafka::ERR__UNKNOWN_TOPIC:  case RdKafka::ERR__UNKNOWN_PARTITION:    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;    break;  default:    /* Errors */    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;  }}class MyConsumeCb : publi c RdKafka::ConsumeCb {public:  void consume_cb (RdKafka::Message &msg, void *opaque) {    msg_consume(&msg, opaque);  }};static void sigterm (int sig) {  run = false;}int main (int argc, char **argv) {  /*   * Process kill signal, quit from the loop   */  signal(SIGINT, sigterm);  signal(SIGTERM, sigterm);  /*   * Get broker list from zookeeper   */  string zookeeper("localhost:2181");  string brokers = KafkaHelper::Brokers(zookeeper);  cout << "brokers from zookeeper is: " << brokers << endl;  string topic_name = "test2";  /*   * Print topic meta   */  KafkaHelper::PrintTopicMeta(topic_name);  /*   * Global conf objects   */  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);  string errstr;  global_conf->set("metadata.broker.list", brokers, errstr);  MyEventCb ex_event_cb;  global_conf->set("event_cb", &ex_event_cb, errstr);  /*   * Topic conf objects   */  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);  /*   * Create cons umer using accumulated global configuration.   */  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);  if (!consumer) {    std::cerr << "Failed to create consumer: " << errstr << std::endl;    exit(1);  }  std::cout << "% Created consumer " << consumer->name() << std::endl;  /*   * Start consumer for topic+partition at start offset   */  int32_t partition = 0;  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;  RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);  RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);  if (resp != RdKafka::ERR_NO_ERROR) {    std::cerr << "Failed to start consumer: " <<      RdKafka::err2str(resp) << std::endl;    exit(1);  }  /*   * Consume messages   */  MyConsumeCb ex_consume_cb;  int use_ccb = 0;  while (run) {    if (use_ccb) {      consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);    } else {      RdKafka::Message *msg = consumer->cons ume(topic2, partition, 1000);      msg_consume(msg, NULL);      delete msg;    }    consumer->poll(0);  }    /*   * Stop consumer   */  consumer->stop(topic2, partition);  consumer->poll(1000);  delete topic2;  delete consumer;  /*   * Wait for RdKafka to decommission.   * This is not strictly needed (when check outq_len() above), but   * allows RdKafka to clean up all its resources before the application   * exits so that memory profilers such as valgrind wont complain about   * memory leaks.   */  RdKafka::wait_destroyed(5000);  return 0;}

Table of Contents

  • 1. 有以下问题
  • 2. 解决方法
  • 3. 代码
    • 3.1. KafkaHelper类
    • 3.2. main.cc完整代码

  • 1. 有以下问题
  • 2. 解决方法
  • 3. 代码
    • 3.1. KafkaHelper类
    • 3.2. main.cc完整代码

1 有以下问题

  • 需要使用producer才能获得元数据
  • 当producer和consumer共用一些对象时会出现无法读取数据的问题

  • 需要使用producer才能获得元数据
  • 当producer和consumer共用一些对象时会出现无法读取数据的问题

2 解决方法

用独立的类封装获取元数据的代码,避免共用变量

用独立的类封装获取元数据的代码,避免共用变量

3 代码

 

3.1 KafkaHelper类

#ifndef KAFKA_HELPER_H_#define KAFKA_HELPER_H_#include using std::string;#include "librdkafka/rdkafkacpp.h"#include "librdk afka/rdkafka.h"#include #include #include #define BROKER_PATH "/brokers/ids"static rd_kafka_t *rk;class KafkaHelper { public:  static string Brokers(string const& zookeeper) {    zhandle_t * zh = initialize_zookeeper(zookeeper);    char brokers[1024];    set_brokerlist_from_zookeeper(zh, brokers);    return brokers;  }  static void PrintTopicMeta(string const& topic_name) {    /*     * Create producer using accumulated global configuration.     */    RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);    string zookeeper("localhost:2181");    string brokers = KafkaHelper::Brokers(zookeeper);    string errstr;    global_conf->set("metadata.broker.list", brokers, errstr);    RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);    RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr);    if (!producer) {      std::cerr << "Failed to create producer: " << errstr << std::endl;      exit(1);    }    std::cout << "% Created producer " << producer->name() << std::endl;    /*     * Create topic handle.     */    RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);    if (!topic) {      std::cerr << "Failed to create topic: " << errstr << std::endl;      exit(1);    }    bool run = true;    while (run) {      class RdKafka::Metadata *metadata;      // Fetch metadata       RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000);      if (err != RdKafka::ERR_NO_ERROR) {        std::cerr << "%% Failed to acquire metadata: "                   << RdKafka::err2str(err) << std::endl;        run = 0;        break;      }      KafkaHelper::PrintMeta(topic_name, metadata);      delete metadata;      run = 0;    }  }  static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {    std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")              << "(or ig broker id from broker "  << metadata->orig_broker_id()              << ":" << metadata->orig_broker_name() << std::endl;    /* Iterate brokers */    std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;    RdKafka::Metadata::BrokerMetadataIterator ib;    for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {      std::cout << "  broker " << (*ib)->id() << " at "                 << *(*ib)->host() << ":" << (*ib)->port() << std::endl;    }    /* Iterate topics */            std::cout << metadata->topics()->size() << " topics:" << std::endl;    RdKafka::Metadata::TopicMetadataIterator it;    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {      std::cout << "  topic "<< *(*it)->topic() << " with "                 << (*it)->partitions()->size() << " partitions" << std::endl;      if ((*it)->err() != RdKafka::ERR_NO_ERROR) {        std::cout << " " << err2str((*it)->err());        if ((*it)->err() == RdKafka ::ERR_LEADER_NOT_AVAILABLE) {          std::cout << " (try again)";        }             }      std::cout << std::endl;      /* Iterate topic‘s partitions */      RdKafka::TopicMetadata::PartitionMetadataIterator ip;      for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {        std::cout << "    partition " << (*ip)->id()                  << " leader " << (*ip)->leader()                  << ", replicas: ";        /* Iterate partition‘s replicas */        RdKafka::PartitionMetadata::ReplicasIterator ir;        for (ir = (*ip)->replicas()->begin();              ir != (*ip)->replicas()->end() ;              ++ir) {          std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;        }        /* Iterate partition‘s ISRs */        std::cout << ", isrs: ";        RdKafka::PartitionMetadata::ISRSIterator iis;        for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)          std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;        if ((*ip)->err() != RdKafka::ERR_NO_ERROR)          std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;        else          std::cout << std::endl;      }    }  } private:  static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {    char brokers[1024];    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)      {        brokers[0] = ‘\0‘;        set_brokerlist_from_zookeeper(zh, brokers);        if (brokers[0] != ‘\0‘ && rk != NULL)          {            rd_kafka_brokers_add(rk, brokers);            rd_kafka_poll(rk, 10);          }      }  }  static zhandle_t* initialize_zookeeper(string const& zookeeper) {    zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);    if (zh == NULL) {      fprintf(stderr, "Zookeeper connection not established.");      exit(1);    }    return zh;  }  static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {    if (zzh) {      struct String_vector brokerlist;      if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {        fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);        return;      }      int i;      char *brokerptr = brokers;      for (i = 0; i < brokerlist.count; i++) {        char path[255], cfg[1024];        sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);        int len = sizeof(cfg);        zoo_get(zzh, path, 0, cfg, &len, NULL);        if (len > 0) {          cfg[len] = ‘\0‘;          json_error_t jerror;          json_t *jobj = json_loads(cfg, 0, &jerror);          if (jobj) {            json_t *jhost = json_object_get(jobj, "host");            json_t *jport = json_object_get(jobj, "port");            if (jhost && jport) {              const char *host = json_string_value(jhost);              const int   port = json_integer_value(jport);              sprintf(brokerptr, "%s:%d", host, port);              brokerptr += strlen(brokerptr);              if (i < brokerlist.count - 1) {                *brokerptr++ = ‘,‘;              }            }            json_decref(jobj);          }        }      }      deallocate_String_vector(&brokerlist);      printf("Found brokers %s\n", brokers);    }  }};#endif

3.2 main.cc完整代码

这里包含了读取数据的代码

#include #include #include #include #include #include #include #include #include "helper/kafka_helper.h"using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;class MyEventCb : public RdKafka::EventCb {public:  void event_cb (RdKafka::Event &event) {    switch (event.type())      {      case RdKafka::Event::E VENT_ERROR:        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)          run = false;        break;      case RdKafka::Event::EVENT_STATS:        std::cerr << "\"STATS\": " << event.str() << std::endl;        break;      case RdKafka::Event::EVENT_LOG:        fprintf(stderr, "LOG-%i-%s: %s\n",                event.severity(), event.fac().c_str(), event.str().c_str());        break;      default:        std::cerr << "EVENT " << event.type() <<          " (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        break;      }  }};void msg_consume(RdKafka::Message* message, void* opaque) {  switch (message->err()) {  case RdKafka::ERR__TIMED_OUT:    break;  case RdKafka::ERR_NO_ERROR:    /* Real message */    std::cout << "Read msg at offset " << message->offset() << std::endl;    if (message->key()) {      std::cout << "Key: " << *message->ke y() << std::endl;    }    cout << static_cast(message->payload()) << endl;    break;  case RdKafka::ERR__PARTITION_EOF:    cout << "reach last message" << endl;    /* Last message */    if (exit_eof) {      run = false;    }    break;  case RdKafka::ERR__UNKNOWN_TOPIC:  case RdKafka::ERR__UNKNOWN_PARTITION:    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;    break;  default:    /* Errors */    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;  }}class MyConsumeCb : public RdKafka::ConsumeCb {public:  void consume_cb (RdKafka::Message &msg, void *opaque) {    msg_consume(&msg, opaque);  }};static void sigterm (int sig) {  run = false;}int main (int argc, char **argv) {  /*   * Process kill signal, quit from the loop   */  signal(SIGINT, sigterm);  signal(SIGTERM, sigterm);  /*   * Get broker list from zookeeper   */  string zookeeper("localhost:2181");  string brokers = KafkaHelper::Brokers(zookeeper);  cout << "brokers from zookeeper is: " << brokers << endl;  string topic_name = "test2";  /*   * Print topic meta   */  KafkaHelper::PrintTopicMeta(topic_name);  /*   * Global conf objects   */  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);  string errstr;  global_conf->set("metadata.broker.list", brokers, errstr);  MyEventCb ex_event_cb;  global_conf->set("event_cb", &ex_event_cb, errstr);  /*   * Topic conf objects   */  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);  /*   * Create consumer using accumulated global configuration.   */  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);  if (!consumer) {    std::cerr << "Failed to create consumer: " << errstr << std::endl;    exit(1);  }  std::cout << "% Created consumer " << consumer->name() << std::endl;  /*   * Start consumer for topic+partition at start offset   */  int32_t partition = 0;  int64_t start_offset = RdKafka::Topic::OFFSET_BEGI NNING;  RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);  RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);  if (resp != RdKafka::ERR_NO_ERROR) {    std::cerr << "Failed to start consumer: " <<      RdKafka::err2str(resp) << std::endl;    exit(1);  }  /*   * Consume messages   */  MyConsumeCb ex_consume_cb;  int use_ccb = 0;  while (run) {    if (use_ccb) {      consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);    } else {      RdKafka::Message *msg = consumer->consume(topic2, partition, 1000);      msg_consume(msg, NULL);      delete msg;    }    consumer->poll(0);  }    /*   * Stop consumer   */  consumer->stop(topic2, partition);  consumer->poll(1000);  delete topic2;  delete consumer;  /*   * Wait for RdKafka to decommission.   * This is not strictly needed (when check outq_len() above), but   * allows RdKafka to clean up all its resources before the application   * exits so that memory profile rs such as valgrind wont complain about   * memory leaks.   */  RdKafka::wait_destroyed(5000);  return 0;}

 

3.1 KafkaHelper类

#ifndef KAFKA_HELPER_H_#define KAFKA_HELPER_H_#include using std::string;#include "librdkafka/rdkafkacpp.h"#include "librdkafka/rdkafka.h"#include #include #include #define BROKER_PATH "/brokers/ids"static rd_kafka_t *rk;class KafkaHelper { public:  static string Brokers(string const& zookeeper) {    zhandle_t * zh = initialize_zookeeper(zookeeper);    char brokers[1024];    set_brokerlist_from_zookeeper(zh, brokers);    return brokers;  }  static void PrintTopicMeta(string const& topic_name) {    /*     * Create producer using accumulated global configuration.     */    RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);    string zookeeper("localhost:2181");    string brokers = KafkaHelper::Brokers(zookeeper);    string errstr;    global_conf->set("metadata.broker.list", brokers, errstr);    RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);    RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr);    if (!producer) {      std::cerr << "Failed to create producer: " << errstr << std::endl;      exit(1);    }    std::cout << "% Created producer " << producer->name() << std::endl;    /*     * Create topic handle.     */    RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);    if (!topic) {      std::cerr << "Failed to create topic: " << errstr << std::endl;      exit(1);    }    bool run = true;    while (run) {      class RdKafka::Metadata *metadata;      // Fetch metadata       RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadat a, 5000);      if (err != RdKafka::ERR_NO_ERROR) {        std::cerr << "%% Failed to acquire metadata: "                   << RdKafka::err2str(err) << std::endl;        run = 0;        break;      }      KafkaHelper::PrintMeta(topic_name, metadata);      delete metadata;      run = 0;    }  }  static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {    std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")              << "(orig broker id from broker "  << metadata->orig_broker_id()              << ":" << metadata->orig_broker_name() << std::endl;    /* Iterate brokers */    std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;    RdKafka::Metadata::BrokerMetadataIterator ib;    for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {      std::cout << "  broker " << (*ib)->id() << " at "                 << *(*ib)->host() << ":" << (*ib)->port() << std::endl;    }    /* Iterate topics */            std ::cout << metadata->topics()->size() << " topics:" << std::endl;    RdKafka::Metadata::TopicMetadataIterator it;    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {      std::cout << "  topic "<< *(*it)->topic() << " with "                 << (*it)->partitions()->size() << " partitions" << std::endl;      if ((*it)->err() != RdKafka::ERR_NO_ERROR) {        std::cout << " " << err2str((*it)->err());        if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {          std::cout << " (try again)";        }             }      std::cout << std::endl;      /* Iterate topic‘s partitions */      RdKafka::TopicMetadata::PartitionMetadataIterator ip;      for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {        std::cout << "    partition " << (*ip)->id()                  << " leader " << (*ip)->leader()                  << ", replicas: ";        /* Iterate partition‘s replicas */        RdKafka::PartitionMetadata::ReplicasIterat or ir;        for (ir = (*ip)->replicas()->begin();              ir != (*ip)->replicas()->end() ;              ++ir) {          std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;        }        /* Iterate partition‘s ISRs */        std::cout << ", isrs: ";        RdKafka::PartitionMetadata::ISRSIterator iis;        for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)          std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;        if ((*ip)->err() != RdKafka::ERR_NO_ERROR)          std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;        else          std::cout << std::endl;      }    }  } private:  static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {    char brokers[1024];    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)      {        brokers[0] = ‘\0‘;        set_brokerlist_from_zookeeper(zh, brokers);        if (brokers[0] != ‘\0‘ && rk != NULL)          {            rd_kafka_brokers_add(rk, brokers);            rd_kafka_poll(rk, 10);          }      }  }  static zhandle_t* initialize_zookeeper(string const& zookeeper) {    zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);    if (zh == NULL) {      fprintf(stderr, "Zookeeper connection not established.");      exit(1);    }    return zh;  }  static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {    if (zzh) {      struct String_vector brokerlist;      if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {        fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);        return;      }      int i;      char *brokerptr = brokers;      for (i = 0; i < brokerlist.count; i++) {        char path[255], cfg[1024];        sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);        int len = sizeof(cfg);        zoo_get(zzh, path, 0, cfg, &len, NULL);        if (len > 0) {          cfg[len] = ‘\0‘;          jso n_error_t jerror;          json_t *jobj = json_loads(cfg, 0, &jerror);          if (jobj) {            json_t *jhost = json_object_get(jobj, "host");            json_t *jport = json_object_get(jobj, "port");            if (jhost && jport) {              const char *host = json_string_value(jhost);              const int   port = json_integer_value(jport);              sprintf(brokerptr, "%s:%d", host, port);              brokerptr += strlen(brokerptr);              if (i < brokerlist.count - 1) {                *brokerptr++ = ‘,‘;              }            }            json_decref(jobj);          }        }      }      deallocate_String_vector(&brokerlist);      printf("Found brokers %s\n", brokers);    }  }};#endif

#ifndef KAFKA_HELPER_H_#define KAFKA_HELPER_H_#include using std::string;#include "librdkafka/rdkafkacpp.h"#include "librdkafka/rdkafka.h"#include #include #include #define BROKER_PATH "/brokers/ids"static rd_kafka_t *rk;class KafkaHelper { public:  static string Brokers(string const& zookeeper) {    zhandle_t * zh = initialize_zookeeper(zookeeper);    char brokers[1024];    set_brokerlist_from_zookeeper(zh, brokers);    return brokers;  }  static void PrintTopicMeta(string const& topic_name) {    /*     * Create producer using accumulated global configuration.     */    RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);    string zookeeper("localhost:2181");    string brokers = KafkaHelper::Brokers(zookeeper);    string errstr;    global_conf->set("metadata.broker.list", brokers, errstr);    RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);    RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr);    if (!producer) {      std::cerr << "Failed to create producer: " << errstr << std::endl;      exit(1);    }    std::cout < < "% Created producer " << producer->name() << std::endl;    /*     * Create topic handle.     */    RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);    if (!topic) {      std::cerr << "Failed to create topic: " << errstr << std::endl;      exit(1);    }    bool run = true;    while (run) {      class RdKafka::Metadata *metadata;      // Fetch metadata       RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000);      if (err != RdKafka::ERR_NO_ERROR) {        std::cerr << "%% Failed to acquire metadata: "                   << RdKafka::err2str(err) << std::endl;        run = 0;        break;      }      KafkaHelper::PrintMeta(topic_name, metadata);      delete metadata;      run = 0;    }  }  static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {    std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")              << "(orig broker id from broker "  << metadata->orig_broker_id()              << ":" << metadata->orig_broker_name() << std::endl;    /* Iterate brokers */    std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;    RdKafka::Metadata::BrokerMetadataIterator ib;    for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {      std::cout << "  broker " << (*ib)->id() << " at "                 << *(*ib)->host() << ":" << (*ib)->port() << std::endl;    }    /* Iterate topics */            std::cout << metadata->topics()->size() << " topics:" << std::endl;    RdKafka::Metadata::TopicMetadataIterator it;    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {      std::cout << "  topic "<< *(*it)->topic() << " with "                 << (*it)->partitions()->size() << " partitions" << std::endl;      if ((*it)->err() != RdKafka::ERR_NO_ERROR) {        std::cout << " " << err2str((*it)->err());        if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {          std::cout << " (try again)";        }             }      std::cout << std::endl;      /* Iterate topic‘s partitions */      RdKafka::TopicMetadata::PartitionMetadataIterator ip;      for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {        std::cout << "    partition " << (*ip)->id()                  << " leader " << (*ip)->leader()                  << ", replicas: ";        /* Iterate partition‘s replicas */        RdKafka::PartitionMetadata::ReplicasIterator ir;        for (ir = (*ip)->replicas()->begin();              ir != (*ip)->replicas()->end() ;              ++ir) {          std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;        }        /* Iterate partition‘s ISRs */        std::cout << ", isrs: ";        RdKafka::PartitionMetadata::ISRSIterator iis;        for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)          std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;        if ((*ip)->err() != RdKafka::ERR_NO_ERROR)          std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;        else          std::cout << std::endl;      }    }  } private:  static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {    char brokers[1024];    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)      {        brokers[0] = ‘\0‘;        set_brokerlist_from_zookeeper(zh, brokers);        if (brokers[0] != ‘\0‘ && rk != NULL)          {            rd_kafka_brokers_add(rk, brokers);            rd_kafka_poll(rk, 10);          }      }  }  static zhandle_t* initialize_zookeeper(string const& zookeeper) {    zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);    if (zh == NULL) {      fprintf(stderr, "Zookeeper connection not established.");      exit(1);    }    return zh;  }  static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {    if (zzh) {      struct String_vector brokerlist;      if ( zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {        fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);        return;      }      int i;      char *brokerptr = brokers;      for (i = 0; i < brokerlist.count; i++) {        char path[255], cfg[1024];        sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);        int len = sizeof(cfg);        zoo_get(zzh, path, 0, cfg, &len, NULL);        if (len > 0) {          cfg[len] = ‘\0‘;          json_error_t jerror;          json_t *jobj = json_loads(cfg, 0, &jerror);          if (jobj) {            json_t *jhost = json_object_get(jobj, "host");            json_t *jport = json_object_get(jobj, "port");            if (jhost && jport) {              const char *host = json_string_value(jhost);              const int   port = json_integer_value(jport);              sprintf(brokerptr, "%s:%d", host, port);              brokerptr += strlen(brokerptr);              if (i < brokerlist.count - 1) {                *brokerp tr++ = ‘,‘;              }            }            json_decref(jobj);          }        }      }      deallocate_String_vector(&brokerlist);      printf("Found brokers %s\n", brokers);    }  }};#endif

#ifndef KAFKA_HELPER_H_#define KAFKA_HELPER_H_#include using std::string;#include "librdkafka/rdkafkacpp.h"#include "librdkafka/rdkafka.h"#include #include #include #define BROKER_PATH "/brokers/ids"static rd_kafka_t *rk;class KafkaHelper { public:  static string Brokers(string const& zookeeper) {    zhandle_t * zh = initialize_zookeeper(zookeeper);    char brokers[1024];    set_brokerlist_from_zookeeper(zh, brokers);    return brokers;  }  static void PrintTopicMeta(string const& topic_name) {    /*     * Create producer using accumulated global configuration.     */    RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);    string zookeeper("local host:2181");    string brokers = KafkaHelper::Brokers(zookeeper);    string errstr;    global_conf->set("metadata.broker.list", brokers, errstr);    RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);    RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr);    if (!producer) {      std::cerr << "Failed to create producer: " << errstr << std::endl;      exit(1);    }    std::cout << "% Created producer " << producer->name() << std::endl;    /*     * Create topic handle.     */    RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);    if (!topic) {      std::cerr << "Failed to create topic: " << errstr << std::endl;      exit(1);    }    bool run = true;    while (run) {      class RdKafka::Metadata *metadata;      // Fetch metadata       RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000);      if (err != RdKafka::ERR_NO_ERROR) {        std::cerr << "%% Failed to acquir e metadata: "                   << RdKafka::err2str(err) << std::endl;        run = 0;        break;      }      KafkaHelper::PrintMeta(topic_name, metadata);      delete metadata;      run = 0;    }  }  static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {    std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")              << "(orig broker id from broker "  << metadata->orig_broker_id()              << ":" << metadata->orig_broker_name() << std::endl;    /* Iterate brokers */    std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;    RdKafka::Metadata::BrokerMetadataIterator ib;    for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {      std::cout << "  broker " << (*ib)->id() << " at "                 << *(*ib)->host() << ":" << (*ib)->port() << std::endl;    }    /* Iterate topics */            std::cout << metadata->topics()->size() << " topics:" << std::endl;    RdKafka::Metadata::Topi cMetadataIterator it;    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {      std::cout << "  topic "<< *(*it)->topic() << " with "                 << (*it)->partitions()->size() << " partitions" << std::endl;      if ((*it)->err() != RdKafka::ERR_NO_ERROR) {        std::cout << " " << err2str((*it)->err());        if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {          std::cout << " (try again)";        }             }      std::cout << std::endl;      /* Iterate topic‘s partitions */      RdKafka::TopicMetadata::PartitionMetadataIterator ip;      for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {        std::cout << "    partition " << (*ip)->id()                  << " leader " << (*ip)->leader()                  << ", replicas: ";        /* Iterate partition‘s replicas */        RdKafka::PartitionMetadata::ReplicasIterator ir;        for (ir = (*ip)->replicas()->begin();              ir != (*ip)->replicas()->e nd() ;              ++ir) {          std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;        }        /* Iterate partition‘s ISRs */        std::cout << ", isrs: ";        RdKafka::PartitionMetadata::ISRSIterator iis;        for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)          std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;        if ((*ip)->err() != RdKafka::ERR_NO_ERROR)          std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;        else          std::cout << std::endl;      }    }  } private:  static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {    char brokers[1024];    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)      {        brokers[0] = ‘\0‘;        set_brokerlist_from_zookeeper(zh, brokers);        if (brokers[0] != ‘\0‘ && rk != NULL)          {            rd_kafka_brokers_add(rk, brokers);            rd_kafka_poll( rk, 10);          }      }  }  static zhandle_t* initialize_zookeeper(string const& zookeeper) {    zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);    if (zh == NULL) {      fprintf(stderr, "Zookeeper connection not established.");      exit(1);    }    return zh;  }  static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {    if (zzh) {      struct String_vector brokerlist;      if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {        fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);        return;      }      int i;      char *brokerptr = brokers;      for (i = 0; i < brokerlist.count; i++) {        char path[255], cfg[1024];        sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);        int len = sizeof(cfg);        zoo_get(zzh, path, 0, cfg, &len, NULL);        if (len > 0) {          cfg[len] = ‘\0‘;          json_error_t jerror;          json_t *jobj = json_loads(cfg, 0, &jerror);          if (jobj) {            json_t *jhost = json_object_get(jobj, "host");            json_t *jport = json_object_get(jobj, "port");            if (jhost && jport) {              const char *host = json_string_value(jhost);              const int   port = json_integer_value(jport);              sprintf(brokerptr, "%s:%d", host, port);              brokerptr += strlen(brokerptr);              if (i < brokerlist.count - 1) {                *brokerptr++ = ‘,‘;              }            }            json_decref(jobj);          }        }      }      deallocate_String_vector(&brokerlist);      printf("Found brokers %s\n", brokers);    }  }};#endif

3.2 main.cc完整代码

这里包含了读取数据的代码

#include #include #include #include #include #include #include #include #includ e "helper/kafka_helper.h"using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;class MyEventCb : public RdKafka::EventCb {public:  void event_cb (RdKafka::Event &event) {    switch (event.type())      {      case RdKafka::Event::EVENT_ERROR:        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)          run = false;        break;      case RdKafka::Event::EVENT_STATS:        std::cerr << "\"STATS\": " << event.str() << std::endl;        break;      case RdKafka::Event::EVENT_LOG:        fprintf(stderr, "LOG-%i-%s: %s\n",                event.severity(), event.fac().c_str(), event.str().c_str());        break;      default:        std::cerr << "EVENT " << event.type() <<          " (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        break;      }  }};void msg_consume(RdKafka: :Message* message, void* opaque) {  switch (message->err()) {  case RdKafka::ERR__TIMED_OUT:    break;  case RdKafka::ERR_NO_ERROR:    /* Real message */    std::cout << "Read msg at offset " << message->offset() << std::endl;    if (message->key()) {      std::cout << "Key: " << *message->key() << std::endl;    }    cout << static_cast(message->payload()) << endl;    break;  case RdKafka::ERR__PARTITION_EOF:    cout << "reach last message" << endl;    /* Last message */    if (exit_eof) {      run = false;    }    break;  case RdKafka::ERR__UNKNOWN_TOPIC:  case RdKafka::ERR__UNKNOWN_PARTITION:    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;    break;  default:    /* Errors */    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;  }}class MyConsumeCb : public RdKafka::ConsumeCb {public:  void consume_cb (RdKafka::Message &msg, void *opaque) {    msg_consume(&msg, opaque);  }};static void sigterm (int si g) {  run = false;}int main (int argc, char **argv) {  /*   * Process kill signal, quit from the loop   */  signal(SIGINT, sigterm);  signal(SIGTERM, sigterm);  /*   * Get broker list from zookeeper   */  string zookeeper("localhost:2181");  string brokers = KafkaHelper::Brokers(zookeeper);  cout << "brokers from zookeeper is: " << brokers << endl;  string topic_name = "test2";  /*   * Print topic meta   */  KafkaHelper::PrintTopicMeta(topic_name);  /*   * Global conf objects   */  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);  string errstr;  global_conf->set("metadata.broker.list", brokers, errstr);  MyEventCb ex_event_cb;  global_conf->set("event_cb", &ex_event_cb, errstr);  /*   * Topic conf objects   */  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);  /*   * Create consumer using accumulated global configuration.   */  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);  if (!consumer) {    std::cerr << "Failed to create consumer: " << errstr << std::endl;    exit(1);  }  std::cout << "% Created consumer " << consumer->name() << std::endl;  /*   * Start consumer for topic+partition at start offset   */  int32_t partition = 0;  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;  RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);  RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);  if (resp != RdKafka::ERR_NO_ERROR) {    std::cerr << "Failed to start consumer: " <<      RdKafka::err2str(resp) << std::endl;    exit(1);  }  /*   * Consume messages   */  MyConsumeCb ex_consume_cb;  int use_ccb = 0;  while (run) {    if (use_ccb) {      consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);    } else {      RdKafka::Message *msg = consumer->consume(topic2, partition, 1000);      msg_consume(msg, NULL);      delete msg;    }    consumer->poll(0);  }    /*   * Stop consumer   */  consumer->st op(topic2, partition);  consumer->poll(1000);  delete topic2;  delete consumer;  /*   * Wait for RdKafka to decommission.   * This is not strictly needed (when check outq_len() above), but   * allows RdKafka to clean up all its resources before the application   * exits so that memory profilers such as valgrind wont complain about   * memory leaks.   */  RdKafka::wait_destroyed(5000);  return 0;}

这里包含了读取数据的代码

#include #include #include #include #include #include #include #include #include "helper/kafka_helper.h"using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;class MyEventCb : public RdKafka::EventCb {public:  void event_cb (RdKafka::Event &event) {    switch (event.type())      {      case RdKafka::Event::EVENT_ERROR:        std::cerr << "ERROR (" << RdKafka:: err2str(event.err()) << "): " <<          event.str() << std::endl;        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)          run = false;        break;      case RdKafka::Event::EVENT_STATS:        std::cerr << "\"STATS\": " << event.str() << std::endl;        break;      case RdKafka::Event::EVENT_LOG:        fprintf(stderr, "LOG-%i-%s: %s\n",                event.severity(), event.fac().c_str(), event.str().c_str());        break;      default:        std::cerr << "EVENT " << event.type() <<          " (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        break;      }  }};void msg_consume(RdKafka::Message* message, void* opaque) {  switch (message->err()) {  case RdKafka::ERR__TIMED_OUT:    break;  case RdKafka::ERR_NO_ERROR:    /* Real message */    std::cout << "Read msg at offset " << message->offset() << std::endl;    if (message->key()) {      std::cout << "Key: " << *message->key() << std::endl;    }    cout << static_cast(message->payload()) << endl;    break;  case RdKafka::ERR__PARTITION_EOF:    cout << "reach last message" << endl;    /* Last message */    if (exit_eof) {      run = false;    }    break;  case RdKafka::ERR__UNKNOWN_TOPIC:  case RdKafka::ERR__UNKNOWN_PARTITION:    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;    break;  default:    /* Errors */    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;  }}class MyConsumeCb : public RdKafka::ConsumeCb {public:  void consume_cb (RdKafka::Message &msg, void *opaque) {    msg_consume(&msg, opaque);  }};static void sigterm (int sig) {  run = false;}int main (int argc, char **argv) {  /*   * Process kill signal, quit from the loop   */  signal(SIGINT, sigterm);  signal(SIGTERM, sigterm);  /*   * Get broker list from zookeeper   */  string zookeeper("localhost:2181");  string brokers = KafkaHelper::Brokers(zookeeper);  cout << "brokers from zookeeper is: " << brokers << en dl;  string topic_name = "test2";  /*   * Print topic meta   */  KafkaHelper::PrintTopicMeta(topic_name);  /*   * Global conf objects   */  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);  string errstr;  global_conf->set("metadata.broker.list", brokers, errstr);  MyEventCb ex_event_cb;  global_conf->set("event_cb", &ex_event_cb, errstr);  /*   * Topic conf objects   */  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);  /*   * Create consumer using accumulated global configuration.   */  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);  if (!consumer) {    std::cerr << "Failed to create consumer: " << errstr << std::endl;    exit(1);  }  std::cout << "% Created consumer " << consumer->name() << std::endl;  /*   * Start consumer for topic+partition at start offset   */  int32_t partition = 0;  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;  RdKafka::Topic *topic2 = RdKafka::Topic::creat e(consumer, topic_name, topic_conf, errstr);  RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);  if (resp != RdKafka::ERR_NO_ERROR) {    std::cerr << "Failed to start consumer: " <<      RdKafka::err2str(resp) << std::endl;    exit(1);  }  /*   * Consume messages   */  MyConsumeCb ex_consume_cb;  int use_ccb = 0;  while (run) {    if (use_ccb) {      consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);    } else {      RdKafka::Message *msg = consumer->consume(topic2, partition, 1000);      msg_consume(msg, NULL);      delete msg;    }    consumer->poll(0);  }    /*   * Stop consumer   */  consumer->stop(topic2, partition);  consumer->poll(1000);  delete topic2;  delete consumer;  /*   * Wait for RdKafka to decommission.   * This is not strictly needed (when check outq_len() above), but   * allows RdKafka to clean up all its resources before the application   * exits so that memory profilers such as valgrind wont complain about   * memory lea ks.   */  RdKafka::wait_destroyed(5000);  return 0;}

#include #include #include #include #include #include #include #include #include "helper/kafka_helper.h"using std::string;using std::list;using std::cout;using std::endl;static bool run = true;static bool exit_eof = true;class MyEventCb : public RdKafka::EventCb {public:  void event_cb (RdKafka::Event &event) {    switch (event.type())      {      case RdKafka::Event::EVENT_ERROR:        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)          run = false;        break;      case RdKafka::Event::EVENT_STATS:        std::cerr << "\"STATS\": " << event.str() << std::endl;        break;      case RdKafka::Event::EVENT_LOG:        fprintf(stderr, "LOG-%i-%s: %s\n",                event.severity(), event.fac().c_str( ), event.str().c_str());        break;      default:        std::cerr << "EVENT " << event.type() <<          " (" << RdKafka::err2str(event.err()) << "): " <<          event.str() << std::endl;        break;      }  }};void msg_consume(RdKafka::Message* message, void* opaque) {  switch (message->err()) {  case RdKafka::ERR__TIMED_OUT:    break;  case RdKafka::ERR_NO_ERROR:    /* Real message */    std::cout << "Read msg at offset " << message->offset() << std::endl;    if (message->key()) {      std::cout << "Key: " << *message->key() << std::endl;    }    cout << static_cast(message->payload()) << endl;    break;  case RdKafka::ERR__PARTITION_EOF:    cout << "reach last message" << endl;    /* Last message */    if (exit_eof) {      run = false;    }    break;  case RdKafka::ERR__UNKNOWN_TOPIC:  case RdKafka::ERR__UNKNOWN_PARTITION:    std::cerr << "Consume failed: " << message->errstr() << std::endl;    run = false;    break;  default:    /* Errors */    std::cerr << " Consume failed: " << message->errstr() << std::endl;    run = false;  }}class MyConsumeCb : public RdKafka::ConsumeCb {public:  void consume_cb (RdKafka::Message &msg, void *opaque) {    msg_consume(&msg, opaque);  }};static void sigterm (int sig) {  run = false;}int main (int argc, char **argv) {  /*   * Process kill signal, quit from the loop   */  signal(SIGINT, sigterm);  signal(SIGTERM, sigterm);  /*   * Get broker list from zookeeper   */  string zookeeper("localhost:2181");  string brokers = KafkaHelper::Brokers(zookeeper);  cout << "brokers from zookeeper is: " << brokers << endl;  string topic_name = "test2";  /*   * Print topic meta   */  KafkaHelper::PrintTopicMeta(topic_name);  /*   * Global conf objects   */  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);  string errstr;  global_conf->set("metadata.broker.list", brokers, errstr);  MyEventCb ex_event_cb;  global_conf->set("event_cb", &ex_event_cb, errstr);  /*   * Topic conf objects   */  R dKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);  /*   * Create consumer using accumulated global configuration.   */  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);  if (!consumer) {    std::cerr << "Failed to create consumer: " << errstr << std::endl;    exit(1);  }  std::cout << "% Created consumer " << consumer->name() << std::endl;  /*   * Start consumer for topic+partition at start offset   */  int32_t partition = 0;  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;  RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);  RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);  if (resp != RdKafka::ERR_NO_ERROR) {    std::cerr << "Failed to start consumer: " <<      RdKafka::err2str(resp) << std::endl;    exit(1);  }  /*   * Consume messages   */  MyConsumeCb ex_consume_cb;  int use_ccb = 0;  while (run) {    if (use_ccb) {      consumer->consume_callback(topic2, pa rtition, 1000, &ex_consume_cb, &use_ccb);    } else {      RdKafka::Message *msg = consumer->consume(topic2, partition, 1000);      msg_consume(msg, NULL);      delete msg;    }    consumer->poll(0);  }    /*   * Stop consumer   */  consumer->stop(topic2, partition);  consumer->poll(1000);  delete topic2;  delete consumer;  /*   * Wait for RdKafka to decommission.   * This is not strictly needed (when check outq_len() above), but   * allows RdKafka to clean up all its resources before the application   * exits so that memory profilers such as valgrind wont complain about   * memory leaks.   */  RdKafka::wait_destroyed(5000);  return 0;}

Created: 2016-05-02 Mon 13:07

Validate

Leave a Comment

Your email address will not be published.