{"id":2898,"date":"2023-07-14T13:43:28","date_gmt":"2023-07-14T05:43:28","guid":{"rendered":"https:\/\/xinchewhd.com.cn\/?p=2898"},"modified":"2023-07-14T13:43:28","modified_gmt":"2023-07-14T05:43:28","slug":"springboot-%e9%9b%86%e6%88%90-kafka-%e9%85%8d%e7%bd%ae","status":"publish","type":"post","link":"https:\/\/xinchewhd.com.cn\/index.php\/spring-boot\/springboot-%e9%9b%86%e6%88%90-kafka-%e9%85%8d%e7%bd%ae\/","title":{"rendered":"SpringBoot \u96c6\u6210 Kafka \u914d\u7f6e"},"content":{"rendered":"<link rel=\"stylesheet\" href=\"https:\/\/csdnimg.cn\/release\/blogv2\/dist\/mdeditor\/css\/editerView\/kdoc_html_views-1a98987dfd.css\">\n \t<link rel=\"stylesheet\" href=\"https:\/\/csdnimg.cn\/release\/blogv2\/dist\/mdeditor\/css\/editerView\/ck_htmledit_views-25cebea3f9.css\">\n<div id=\"content_views\" class=\"htmledit_views\">\n<h2><\/h2>\n<h2>\u539f\u751f\u6a21\u5f0f<\/h2>\n<pre><code class=\"language-XML\">        &lt;dependency&gt;\n            &lt;groupId&gt;org.apache.kafka&lt;\/groupId&gt;\n            &lt;artifactId&gt;kafka-clients&lt;\/artifactId&gt;\n            &lt;version&gt;3.0.0&lt;\/version&gt;\n        &lt;\/dependency&gt;<\/code><\/pre>\n<h3>\u81ea\u5b9a\u4e49\u5206\u533a\u5668<\/h3>\n<pre><code class=\"language-java\">\/**\n * \u81ea\u5b9a\u4e49\u5206\u533a\u5668\n *\n * @Author: chen yang\n * @Date: 2023\/5\/7 11:34\n *\/\npublic class CustomerPartitioner implements Partitioner {\n    @Override\n    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {\n\n        List&lt;PartitionInfo&gt; partitionInfos = cluster.partitionsForTopic(topic);\n        int partition = 2;\n        return partitionInfos.size() % partition;\n    }\n\n    @Override\n    public void close() {\n\n    }\n\n    @Override\n    public void configure(Map&lt;String, ?&gt; configs) {\n\n    }\n}<\/code><\/pre>\n<h3>\u751f\u4ea7\u8005<\/h3>\n<pre><code class=\"language-java\">\/**\n * @Author: chen yang\n * @Date: 2023\/5\/6 21:59\n *\/\npublic class Producer {\n    public static void main(String[] args) throws ExecutionException, InterruptedException {\n\n        Properties properties = new Properties();\n        \/\/ \u8fde\u63a5\u96c6\u7fa4\n        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\");\n\n        \/\/ \u5e8f\u5217\u5316\u7c7b\u578b\n        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());\n        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());\n\n        \/\/ \u6dfb\u52a0\u81ea\u5b9a\u4e49\u5206\u533a\u5668\n        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitioner.class.getName());\n\n\n        \/\/ batch.size\uff1a\u6279\u6b21\u5927\u5c0f\uff0c\u9ed8\u8ba4 16K\n        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);\n        \/\/ linger.ms\uff1a\u7b49\u5f85\u65f6\u95f4\uff0c\u9ed8\u8ba4 0\n        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);\n        \/\/ RecordAccumulator\uff1a\u7f13\u51b2\u533a\u5927\u5c0f\uff0c\u9ed8\u8ba4 32M\uff1abuffer.memory\n        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);\n        \/\/ compression.type\uff1a\u538b\u7f29\uff0c\u9ed8\u8ba4 none\uff0c\u53ef\u914d\u7f6e\u503c gzip\u3001snappy\u3001lz4 \u548c zstd\n        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, \"snappy\");\n\n\n        \/\/ \u8bbe\u7f6e ack\n        \/\/ 0:\u4e0d\u9700\u8981\u7b49\u5f85\u6570\u636e\u843d\u76d8\u5e94\u7b54\uff0c1:\u9700\u8981\u7b49 leader \u843d\u76d8\u5e94\u7b54\uff0c-1(all):\u9700\u8981\u7b49 leader \u548c\u6240\u6709\u7684 follower(isr\u961f\u5217) \u843d\u76d8\u5e94\u7b54\n        \/\/ type: String, valid values [all, -1, 0, 1], default: all\n        properties.put(ProducerConfig.ACKS_CONFIG, \"all\");\n\n        \/\/ \u8bbe\u7f6e\u91cd\u8bd5\u6b21\u6570\uff0c\u9ed8\u8ba4\u4e3a int \u6700\u5927\u503c\n        properties.put(ProducerConfig.RETRIES_CONFIG, 3);\n\n        \/\/ \u8bbe\u7f6e\u4e8b\u52a1id\n        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, \"default_transactional_id_23\");\n\n        KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);\n\n        \/\/ \u5f00\u542f\u4e8b\u52a1\n        producer.initTransactions();\n        producer.beginTransaction();\n\n        try {\n            \/\/ \u5f02\u6b65\u53d1\u9001\n            producer.send(new ProducerRecord&lt;&gt;(\"test\", \"this is async message\"));\n\n            producer.send(new ProducerRecord&lt;&gt;(\"test\", \"this is a async rollback message!\"), new Callback() {\n                @Override\n                public void onCompletion(RecordMetadata recordMetadata, Exception e) {\n                    \/\/ \u53d1\u9001\u6d88\u606f\u5931\u8d25\u4f1a\u81ea\u52a8\u91cd\u8bd5\uff0c\u4e0d\u9700\u8981\u5728\u56de\u8c03\u51fd\u6570\u4e2d\u624b\u52a8\u91cd\u8bd5\n                    if (Objects.isNull(e)){\n                        System.out.println(\"result: \" + recordMetadata.topic() + \", partitions: \" + recordMetadata.partition());\n                    }\n                }\n            });\n\n            \/\/ \u540c\u6b65\u53d1\u9001\uff0c\u53ea\u9700\u8981\u5728\u5f02\u6b65\u53d1\u9001\u7684\u57fa\u7840\u4e0a\u518d\u8c03\u7528 get() \u72af\u6cd5\u5373\u53ef\n            producer.send(new ProducerRecord&lt;&gt;(\"test\", 1,\"\",\"this is sync message\")).get();\n            producer.commitTransaction();\n\n        }catch (Exception e){\n            producer.abortTransaction();\n        }finally {\n            producer.close();\n        }\n    }\n}<\/code><\/pre>\n<h3>\u6d88\u8d39\u8005<\/h3>\n<pre><code class=\"language-java\">\/**\n * @Author: chen yang\n * @Date: 2023\/7\/9 10:37\n *\/\npublic class Consumer {\n\n    public static void main(String[] args) {\n\n        Properties properties = new Properties();\n        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\");\n        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);\n        properties.put(ConsumerConfig.GROUP_ID_CONFIG, \"consumer_01\");\n        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\n        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\n\n        try (KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(properties)) {\n              \/\/ \u8ba2\u9605 topic\n\/\/            ArrayList&lt;String&gt; topics = new ArrayList&lt;&gt;();\n\/\/            consumer.subscribe(topics);\n\n            \/\/ \u8ba2\u9605 topic \u4e0b\u7684 partition\n            ArrayList&lt;TopicPartition&gt; topicPartitions = new ArrayList&lt;&gt;();\n            topicPartitions.add(new TopicPartition(\"night_topic\", 1));\n            consumer.assign(topicPartitions);\n\n            \/\/ \u4ece\u6307\u5b9a\u7684 offset \u5f00\u59cb\u6d88\u8d39\n\/\/            consumer.seek(new TopicPartition(\"night_topic\", 1), 3);\n\n            ConsumerRecords&lt;String, String&gt; consumerRecords = consumer.poll(Duration.ofSeconds(2));\n            consumerRecords.forEach(System.out::println);\n\n            \/\/ \u624b\u52a8\u63d0\u4ea4 offset\n            consumer.commitAsync();\n        } catch (Exception e) {\n            throw new RuntimeException(e);\n        }\n    }\n}<\/code><\/pre>\n<h2>KafkaTemplate<\/h2>\n<pre><code class=\"language-XML\">        &lt;dependency&gt;\n            &lt;groupId&gt;org.springframework.kafka&lt;\/groupId&gt;\n            &lt;artifactId&gt;spring-kafka&lt;\/artifactId&gt;\n        &lt;\/dependency&gt;<\/code><\/pre>\n<h3>\u914d\u7f6e\u6587\u4ef6<\/h3>\n<pre><code class=\"language-bash\">spring:\n  kafka:\n    bootstrap-servers: localhost:9092\n    producer:\n      batch-size: 16384\n      acks: -1\n      retries: 10\n      transaction-id-prefix: transaction_05\n      buffer-memory: 33554432\n      key-serializer: org.apache.kafka.common.serialization.StringSerializer\n      value-serializer: org.apache.kafka.common.serialization.StringSerializer\n      properties:\n        linger:\n          ms: 2000\n        partitioner:\n          class: com.night.config.CustomerPartitionHandler\n    consumer:\n      group-id: g_01\n      enable-auto-commit: false\n      auto-offset-reset: latest\n      max-poll-records: 500\n#      auto-commit-interval: 2000  autoCommit = false\n      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer\n      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer\n      properties:\n        session:\n          timeout:\n            ms: 120000 # \u6d88\u8d39\u4f1a\u8bdd\u8d85\u65f6\u65f6\u95f4\uff08\u8d85\u8fc7\u8fd9\u4e2a\u65f6\u95f4 consumer \u6ca1\u6709\u53d1\u9001\u5fc3\u8df3\uff0c\u5c31\u4f1a\u89e6\u53d1 rebalance \u64cd\u4f5c\uff09\n        request:\n          timeout:\n            ms: 18000\n\n    listener:\n      missing-topics-fatal: false # consumer listener topics \u4e0d\u5b58\u5728\u65f6\uff0c\u542f\u52a8\u9879\u76ee\u5c31\u4f1a\u62a5\u9519\n      type: batch<\/code><\/pre>\n<h3>\u81ea\u5b9a\u4e49\u5206\u533a\u5668<\/h3>\n<pre><code class=\"language-java\">\/**\n * @Author: chen yang\n * @Date: 2023\/7\/8 11:02\n *\/\n@Component\npublic class CustomerPartitionHandler implements Partitioner {\n\n    @Override\n    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {\n        if (value.toString().contains(\"\u4e8c\")){\n            return 2;\n        }else if (value.toString().contains(\"\u4e00\")){\n            return 1;\n        }else {\n            return 0;\n        }\n    }\n\n    @Override\n    public void close() {\n\n    }\n\n    @Override\n    public void configure(Map&lt;String, ?&gt; configs) {\n\n    }\n}<\/code><\/pre>\n<h3>\u751f\u4ea7\u8005<\/h3>\n<pre><code class=\"language-java\">@RestController\n@RequiredArgsConstructor\npublic class HelloController {\n\n    private final KafkaTemplate&lt;String, String&gt; kafkaTemplate;\n\n    @GetMapping(\"\/send\")\n    @Transactional \/\/ \u914d\u7f6e\u6587\u4ef6\u4e2d\u8bbe\u7f6e\u4e86\u4e8b\u52a1id\uff0c\u90a3\u4e48\u542f\u7528\u65f6\u8981\u52a0\u4e0a \u8be5\u6ce8\u89e3\u6216\u8005\u4f7f\u7528 kafka \u4e8b\u52a1\u5904\u7406\n    public Boolean send(String msg){\n        for (int i = 0; i &lt; 10; i++) {\n            kafkaTemplate.send(\"night.topic\", null, \"night key - \" + i, msg + \" - \" + i).addCallback(success -&gt; {\n                \/\/ \u6210\u529f\u56de\u8c03\n                if (success == null || success.getRecordMetadata() == null){\n                    return;\n                }\n                String topic = success.getRecordMetadata().topic();\n                int partition = success.getRecordMetadata().partition();\n                long offset = success.getRecordMetadata().offset();\n                String key = success.getProducerRecord().key();\n                System.out.println(\"send topic:\" + topic +\", partition: \" + partition + \", key:\" + key + \", offset: \" + offset);\n            }, failure -&gt; {\n                \/\/ \u5931\u8d25\u56de\u8c03\n                System.out.println(\"\u53d1\u9001\u6d88\u606f\u5931\u8d25\uff1a\" + failure.getMessage());\n            });\n        }\n        return true;\n    }\n}<\/code><\/pre>\n<h3>\u6d88\u8d39\u8005\u914d\u7f6e<\/h3>\n<h4>\u6d88\u8d39\u6570\u636e\u8fc7\u6ee4<\/h4>\n<pre><code class=\"language-java\">\/**\n * \u6d88\u8d39\u6570\u636e\u8fc7\u6ee4\n *\n * @Author: chen yang\n * @Date: 2023\/7\/8 12:20\n *\/\n@Component\npublic class ConsumerFilterStrategy implements RecordFilterStrategy&lt;String, String&gt; {\n    @Override\n    public boolean filter(ConsumerRecord&lt;String, String&gt; consumerRecord) {\n        \/\/ return true: \u4e22\u5f03\u6d88\u606f\n        return consumerRecord.value().contains(\"\u65e0\u6548\u6570\u636e\");\n    }\n}<\/code><\/pre>\n<h4>\u6d88\u8d39\u5f02\u5e38\u5904\u7406\u7c7b<\/h4>\n<pre><code class=\"language-java\">\/**\n * \u6d88\u8d39\u5f02\u5e38\u5904\u7406\u7c7b\n *\n * @Author: chen yang\n * @Date: 2023\/7\/8 11:55\n *\/\n@Component\npublic class ConsumerExceptionHandler implements ConsumerAwareListenerErrorHandler {\n\n    @Override\n    public Object handleError(Message&lt;?&gt; message, ListenerExecutionFailedException e, Consumer&lt;?, ?&gt; consumer) {\n        System.out.println(\"\u6d88\u8d39\u5f02\u5e38\uff1a\" + message.getPayload() + \", ex: \" + e.getMessage());\n        return null;\n    }\n}<\/code><\/pre>\n<h4>\u6d88\u8d39\u8005\u914d\u7f6e&nbsp;<\/h4>\n<pre><code class=\"language-java\">\/**\n * @Author: chen yang\n * @Date: 2023\/7\/8 12:22\n *\/\n@Configuration\n@RequiredArgsConstructor\npublic class KafkaConsumerConfig {\n\n    private final KafkaTemplate&lt;String, String&gt; kafkaTemplate;\n\n    private final ConsumerFilterStrategy consumerFilterStrategy;\n\n\n    @Bean\n    public ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; filterContainerFactory(){\n        ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();\n        \/\/ @SendTo \u4f7f\u7528\uff0cconsole exception:  a KafkaTemplate is required to support replies\n        factory.setReplyTemplate(kafkaTemplate);\n\n        factory.setConsumerFactory(initConsumerFactory());\n        \/\/ \u8bbe\u7f6e\u5e76\u53d1\u91cf\uff0c\u5c0f\u4e8e\u6216\u7b49\u4e8eTopic\u7684\u5206\u533a\u6570,\u5e76\u4e14\u8981\u5728consumerFactory\u8bbe\u7f6e\u4e00\u6b21\u62c9\u53d6\u7684\u6570\u91cf\n        factory.setConcurrency(1);\n\n        \/\/ \u8bbe\u7f6e\u4e3a\u6279\u91cf\u76d1\u542c\n        factory.setBatchListener(true);\n\n        \/\/ \u914d\u5408RecordFilterStrategy\u4f7f\u7528\uff0c\u88ab\u8fc7\u6ee4\u7684\u4fe1\u606f\u5c06\u88ab\u4e22\u5f03\n        factory.setAckDiscarded(true);\n        factory.setRecordFilterStrategy(consumerFilterStrategy);\n        return factory;\n    }\n\n\n\n    @Bean\n    public ConsumerFactory&lt;String, String&gt; initConsumerFactory(){\n        HashMap&lt;String, Object&gt; configs = new HashMap&lt;&gt;();\n        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\");\n        configs.put(ConsumerConfig.GROUP_ID_CONFIG, \"consumer_group_id_02\");\n        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);\n        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);\n        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, \"latest\");\n        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);\n        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringDeserializer\");\n        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringDeserializer\");\n        return new DefaultKafkaConsumerFactory&lt;&gt;(configs);\n    }\n}\n<\/code><\/pre>\n<h3>@KafkaListener<\/h3>\n<pre><code class=\"language-java\">@Component\npublic class HelloListener {\n\n    @KafkaListener(topicPartitions = {\n            @TopicPartition(topic = \"night.topic\", partitions = {\"0\"})\n    }, errorHandler = \"consumerExceptionHandler\", containerFactory = \"filterContainerFactory\")\n    public void consumer0(List&lt;ConsumerRecord&lt;String, String&gt;&gt; records, Consumer&lt;String, String&gt; consumer){\n        System.out.println(\"first consumer receive list size: \" + records.size());\n        records.forEach(System.out::println);\n        consumer.commitSync();\n    }\n\n\n    @KafkaListener(topicPartitions = {\n            @TopicPartition(topic = \"night.topic\", partitions = {\"1\"})\n    }, errorHandler = \"consumerExceptionHandler\")\n    @SendTo(\"singleTopic\")\n    public String consumer1(List&lt;ConsumerRecord&lt;String, String&gt;&gt; records, Consumer&lt;String, String&gt; consumer){\n        System.out.println(\"second consumer receive list size: \" + records.size());\n        records.forEach(System.out::println);\n        consumer.commitSync();\n        return \"@SendTo annotation msg\";\n    }\n\n\n    @KafkaListener(topicPartitions = {\n            @TopicPartition(topic = \"night.topic\", partitions = {\"2\"})\n    }, errorHandler = \"consumerExceptionHandler\")\n    public void consumer2(List&lt;ConsumerRecord&lt;String, String&gt;&gt; records, Consumer&lt;String, String&gt; consumer){\n        System.out.println(\"third consumer receive list size: \" + records.size());\n        records.forEach(System.out::println);\n        consumer.commitSync();\n    }\n}<\/code><\/pre>\n<\/div>\n<p>\u672c\u7bc7\u6587\u7ae0\u6765\u6e90\u4e8eCSDN\uff1ahttps:\/\/blog.csdn.net\/weixin_46058921\/article\/details\/131620421<\/p>\n","raw":"","protected":false},"excerpt":{"rendered":"<p>\u539f\u751f\u6a21\u5f0f &lt;dependency&gt; &lt;groupId&gt;org.apache.kafka&lt;\/group &#8230;<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"emotion":"","emotion_color":"","title_style":"","license":"","footnotes":""},"categories":[16,7],"tags":[],"class_list":["post-2898","post","type-post","status-publish","format-standard","hentry","category-kafka","category-spring-boot"],"post_thumbnail_image":"","content_first_image":null,"post_medium_image_300":"","post_thumbnail_image_624":"","post_frist_image":null,"post_medium_image":"","post_large_image":"","post_full_image":"","post_all_images":[],"videoAdId":"","listAd":"0","listAdId":"","listAdEvery":6,"total_comments":0,"category_name":"Kafka","post_date":"2023-07-14","like_count":"0","praiseWord":"\u9f13\u52b1","copyright_state":"","excitationAd":"0","rewardedVideoAdId":"","detailAdId":"","detailAd":"0","enterpriseMinapp":"0","audios":[],"postImageUrl":"https:\/\/wp-moto-1258805347.cos.ap-shanghai.myqcloud.com\/2023\/05\/20230519082947553.jpg","avatarurls":[],"related_posts":null,"pageviews":779,"next_post_id":2904,"next_post_title":"java \u63a5\u53e3 \u8be6\u89e3","previous_post_id":2894,"previous_post_title":"Feign\u7b2c\u4e00\u6b21\u8c03\u7528\u4e3a\u4ec0\u4e48\u4f1a\u5f88\u6162\uff1f","_links":{"self":[{"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/posts\/2898","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/comments?post=2898"}],"version-history":[{"count":0,"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/posts\/2898\/revisions"}],"wp:attachment":[{"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/media?parent=2898"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/categories?post=2898"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/xinchewhd.com.cn\/index.php\/wp-json\/wp\/v2\/tags?post=2898"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}