21xrx.com
2024-11-05 14:57:34 Tuesday
登录
文章检索 我的文章 写文章
C++操作Kafka:入门指南
2023-07-04 18:41:42 深夜i     --     --
C++ Kafka 入门指南 操作 消息队列

Kafka是一种高性能、分布式的消息队列系统,被广泛用于实时数据的传输与处理。作为一种极具可靠性和可扩展性的开源平台,Kafka已成为许多企业构建实时数据处理通道的首选技术之一。而作为一名程序员,在使用Kafka时,C++语言是一个非常好的选择。

下面是一份C++操作Kafka的入门指南,帮助你快速掌握Kafka的基本用法。

1.准备工作

使用C++操作Kafka之前,需要安装Kafka库和C++客户端库,以及Kafka的依赖库,比如librdkafka。安装librdkafka库可以使用以下命令:


$ git clone https://github.com/edenhill/librdkafka.git

$ cd librdkafka

$ ./configure

$ make

$ sudo make install

2.Kafka生产者

Kafka生产者用于将消息发布到Kafka系统中。在C++中,使用librdkafka库对生产者进行封装,可以方便地发送消息。

以下是一个简单的C++程序,用于创建一个Kafka生产者,然后发送一条消息:


#include <iostream>

#include <string>

#include <librdkafka/rdkafkacpp.h>

int main() {

 std::string brokers = "localhost:9092";

 std::string topic_name = "test";

 

 RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

 RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

 

 std::string err_str;

 conf->set("metadata.broker.list", brokers, err_str);

 

 RdKafka::Producer* producer = RdKafka::Producer::create(conf, err_str);

 if (!producer) {

  std::cerr << "Failed to create producer: " << err_str << std::endl;

  exit(1);

 }

 

 RdKafka::Topic* topic = RdKafka::Topic::create(producer, topic_name, tconf, err_str);

 if (!topic) {

  std::cerr << "Failed to create topic: " << err_str << std::endl;

  exit(1);

 }

 

 std::string payload = "Hello, Kafka!";

 RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (void*)payload.c_str(), payload.size(), NULL, NULL);

 if (resp != RdKafka::ERR_NO_ERROR) {

  std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl;

  exit(1);

 }

 

 std::cout << "Message sent: " << payload << std::endl;

 

 producer->poll(0);

 delete topic;

 delete producer;

 return 0;

}

3.Kafka消费者

Kafka消费者用于从Kafka系统中读取消息。在C++中,使用librdkafka库对消费者进行封装,可以方便地接收消息。

以下是一个简单的C++程序,用于创建一个Kafka消费者,然后从指定topic中读取消息:


#include <iostream>

#include <string>

#include <librdkafka/rdkafkacpp.h>

static void msg_consume(RdKafka::Message* message, void* opaque) {

 switch (message->err()) {

  case RdKafka::ERR_NO_ERROR:

   std::cout << "Message consumed: " << std::string((char*)message->payload(), message->len()) << std::endl;

   break;

  case RdKafka::ERR__PARTITION_EOF:

   std::cerr << "Reached end of partition" << std::endl;

   break;

  case RdKafka::ERR__UNKNOWN_TOPIC:

  case RdKafka::ERR__UNKNOWN_PARTITION:

   std::cerr << "Topic or partition not found" << std::endl;

   break;

  default:

   std::cerr << "Consumed message error: " << message->errstr() << std::endl;

   break;

 }

}

int main() {

 std::string brokers = "localhost:9092";

 std::string topic_name = "test";

 

 RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

 conf->set("metadata.broker.list", brokers, err_str);

 

 RdKafka::Consumer* consumer = RdKafka::Consumer::create(conf, err_str);

 if (!consumer) {

  std::cerr << "Failed to create consumer: " << err_str << std::endl;

  exit(1);

 }

 

 RdKafka::Topic* topic = RdKafka::Topic::create(consumer, topic_name, NULL, err_str);

 if (!topic) {

  std::cerr << "Failed to create topic: " << err_str << std::endl;

  exit(1);

 }

 

 RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_BEGINNING);

 if (resp != RdKafka::ERR_NO_ERROR) {

  std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;

  exit(1);

 }

 

 while (true) {

  RdKafka::Message* msg = consumer->consume(topic, 0, 1000);

  msg_consume(msg, NULL);

  delete msg;

 }

 

 consumer->stop(topic, 0);

 consumer->poll(1000);

 delete topic;

 delete consumer;

 return 0;

}

总结

Kafka是一个强大的实时消息队列系统,C++是一种高效的编程语言,因此,在使用Kafka时,C++是一个非常好的选择。本篇文章介绍了使用C++操作Kafka的入门指南,希望为你在编写Kafka应用程序时提供帮助。

  
  

评论区

{{item['qq_nickname']}}
()
回复
回复