客户端消息发送/消息消费代码实现

<p class="shortdesc">本文主要介绍如何完成生产者客户端消息发送及消费者客户端消息消费的业务逻辑代码实现。</p> <section class="section" id="kafkaImplementation__section_whl_mnx_1mb"><h2 class="doc-tairway">前提条件</h2> <ul class="ul" id="kafkaImplementation__ul_g4x_b31_tmb"> <li class="li">您已成功创建Topic和Consumer Group,可参考<a class="xref" href="/ssr/help/middleware/Kafka/KafkaOperationManual.kafkaTopicManager.create_topic" target="_blank">创建Topic</a>和<a class="xref" href="/ssr/help/middleware/Kafka/KafkaOperationManual.kafkaTopicManager.create_consumer" target="_blank">创建Consumer Group</a>。</li> <li class="li">您已成功获取接入点和AccessKey/SecretKey,可参考<a class="xref" href="/ssr/help/middleware/Kafka/KafkaOperationManual.kafkaTopicManager.topic_accesspoint" target="_blank">获取接入点信息</a>和<a class="xref" href="/ssr/help/middleware/Kafka/quickStart.aksk" target="_blank">获取AccessKey和SecretKey</a>。</li> <li class="li">您已成功创建云服务器ECS,可参考<a class="xref" href="/ssr/help/compute/ecs/manual.Instance.Create_ECS_Instance" target="_blank">创建ECS实例</a>。</li> </ul> </section> <section class="section" id="kafkaImplementation__section_i4r_4nx_1mb"><h2 class="doc-tairway">环境要求</h2> <p class="p">在使用Kafka收发消息前,需要先在云服务器ECS上完成JDK安装、环境变量配置以及Kafka开源客户端下载等。运行环境具体要求如下:</p> <ol class="ol" id="kafkaImplementation__ol_zvs_jnx_1mb"> <li class="li">JAVA 1.7 U51以上版本,推荐使用JAVA 1.8。</li> <li class="li">消息队列Kafka当前版本为1.1.0。Kafka客户端需使用0.11及以上的版本,并支持1.1.0版本的服务端,推荐使用1.1.0版本的客户端。</li> </ol> </section> <section class="section" id="kafkaImplementation__section_hb5_v4x_1mb"><h2 class="doc-tairway">Demo下载</h2> <p class="p">集成Kafka之前,需下载Demo文件。</p> <p class="p">Java版Kafka消息收发: <a class="xref" href="https://obs-cn-shanghai.yun.pingan.com/pcp-portal/kafka-demo-pcd.zip?response-content-disposition=attachment%3Bfilename%3Dkafka-demo-pcd.zip" target="_blank">Demo包下载</a></p> <p class="p">SpringBoot版Kafka消息收发: <a class="xref" href="https://obs-cn-shanghai.yun.pingan.com/pcp-portal/20200904111509-12d721eb935e.zip" target="_blank">Demo包下载</a></p> <p class="p">KafkaStream版Kafka消息收发: <a class="xref" href="https://obs-cn-shanghai.yun.pingan.com/pcp-portal/20200904091743-1de0842c9799.zip" target="_blank">Demo包下载</a></p> </section> <section class="section" id="kafkaImplementation__section_gpk_snx_1mb"><h2 class="doc-tairway">集成Kafka</h2> <ol class="ol" id="kafkaImplementation__ol_z2f_h4x_1mb"> <li class="li">将Demo文件中的kafka-clients.jar、kafka-clients-security-1.1.1-SNAPSHOT.jar包引入到项目工程,并将Demo中的其他依赖包也一起引入项目工程。Kafka客户端可以使用从官网下载的jar包,版本需要大于等于0.11,并支持Kafka1.1.0服务端。</li> <li class="li">创建kafka_jaas.conf文件。<pre class="pre codeblock"><code> KafkaClient { &nbsp;&nbsp; &nbsp;org.apache.kafka.common.security.oac.OacLoginModule required &nbsp; &nbsp; AccessKey="" &nbsp;&nbsp; &nbsp;SecretKey=""; };</code></pre></li> <li class="li">设置security.protocol和sasl.mechanism属性。<pre class="pre codeblock"><code> security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN</code></pre></li> <li class="li">指定kafka_jaas.conf配置文件路径,有两种方式:<div class="p">第一种,在启动脚本中,添加启动参数来指定kafka_jaas.conf文件的路径,示例如下:<pre class="pre codeblock" id="kafkaImplementation__codeblock_yvd_vpv_xrb"><code>-Djava.security.auth.login.config=/wls/kafka/kafka_server_jaas.conf</code></pre></div><div class="p">第二种,在java代码中添加系统参数来指定kafka_jaas.conf文件的路径,示例如下:<pre class="pre codeblock" id="kafkaImplementation__codeblock_ilr_vpv_xrb"><code>public class KafkaClientDemoApplication { static{ // 需要使用绝对路径,把kafka_jaas.conf文件放在jar包外。路径根据实际情况进行修改。 String filePath = "/etc/kafka_jaas.conf"; log.info("path of kafka_jaas.conf is {}", filePath); System.setProperty("java.security.auth.login.config", filePath); } public static void main(String[] args) { SpringApplication.run(KafkaClientDemoApplication.class, args); } }</code></pre></div></li> </ol> </section> <section class="section" id="kafkaImplementation__section_my4_m4x_1mb"><h2 class="doc-tairway">依赖包</h2> <p class="p">引入如下依赖,如果使用的是spring-kafka框架,则不需要单独引入kafka-clients。</p> <pre class="pre codeblock"><code><dependencies> &nbsp; <dependency> &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;<groupId>org.apache.kafka</groupId> &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;<artifactId>kafka-clients</artifactId> &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;<version>1.1.0</version> &nbsp;&nbsp; &nbsp;</dependency> &nbsp;&nbsp; &nbsp;<dependency> &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;<groupId>org.apache.kafka</groupId> &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;<artifactId>kafka-clients-security</artifactId> &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;<version>1.1.1-SNAPSHOT</version> &nbsp;&nbsp; &nbsp;</dependency> </dependencies></code></pre> </section> <section class="section" id="kafkaImplementation__section_a1f_p4x_1mb"><h2 class="doc-tairway">发送消息</h2> <div class="p">消息发送示例代码:<pre class="pre codeblock"><code>public void testProducer() { &nbsp;&nbsp; &nbsp;Properties props = new Properties(); &nbsp;&nbsp; &nbsp;String topic = "test2"; &nbsp;&nbsp; &nbsp;props.put("bootstrap.servers", "10.42.8.222:9092");// 此处填写接入点服务地址,可参考<a class="xref" href="/ssr/help/middleware/Kafka/KafkaOperationManual.kafkaTopicManager.topic_accesspoint" target="_blank">获取接入点信息</a>。 &nbsp;&nbsp; &nbsp;props.put("security.protocol", "SASL_PLAINTEXT"); &nbsp;&nbsp; &nbsp;props.put("sasl.mechanism", "PLAIN"); &nbsp; &nbsp; props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); &nbsp; &nbsp; Producer<String, String> producer = new KafkaProducer<>(props); &nbsp; &nbsp; for (int i = 0; i < 50; i++) { &nbsp; &nbsp; &nbsp; &nbsp; producer.send(new ProducerRecord<String, String>(topic, topic + "-K-" + Integer.toString(i), topic + "-V-" + Integer.toString(i))); &nbsp; &nbsp; } &nbsp; &nbsp; producer.close(); }</code></pre></div> </section> <section class="section" id="kafkaImplementation__section_kqm_r4x_1mb"><h2 class="doc-tairway">消费消息</h2> <p class="p">消费消息示例代码:</p> <pre class="pre codeblock"><code>public static void testConsumer() { &nbsp; &nbsp; Properties props = new Properties(); &nbsp; &nbsp; String topic = "test2"; &nbsp; &nbsp; props.put("bootstrap.servers", "localhost:9092");// 此处填写接入点服务地址,可参考<a class="xref" href="/ssr/help/middleware/Kafka/KafkaOperationManual.ConsumerGroupManager.access_point" target="_blank">获取接入点信息</a>。 &nbsp; &nbsp; props.put("security.protocol", "SASL_PLAINTEXT"); &nbsp; &nbsp; props.put("sasl.mechanism", "PLAIN"); &nbsp; &nbsp; props.put("group.id", "szx_test"); &nbsp; &nbsp; props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); &nbsp; &nbsp; props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); &nbsp; &nbsp; KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); &nbsp; &nbsp; consumer.subscribe(Arrays.asList(topic)); &nbsp; &nbsp; while (true) { &nbsp; &nbsp; &nbsp; &nbsp; ConsumerRecords<String, String> records = consumer.poll(100); &nbsp; &nbsp; &nbsp; &nbsp; for (ConsumerRecord<String, String> record : records) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.info("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value()); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; } }</code></pre> </section>
以上内容是否解决了您的问题?
请补全提交信息!
联系我们

电话咨询

400-151-8800

邮件咨询

fincloud@ocft.com

在线客服

工单支持

解决云产品相关技术问题