KafkaCommonConfig.java 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package com.persagy.iottransfer.config;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.springframework.beans.factory.InitializingBean;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  6. import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
  7. import org.springframework.boot.context.properties.EnableConfigurationProperties;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  11. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  12. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  13. import org.springframework.kafka.core.KafkaTemplate;
  14. import org.springframework.kafka.listener.ContainerProperties;
  15. import java.util.HashSet;
  16. import java.util.Map;
  17. import java.util.Set;
  18. /**
  19. * kafka配置,实际上,在KafkaAutoConfiguration中已经有默认的根据配置文件信息创建配置,但是自动配置属性没有涵盖所有
  20. * 我们可以自定义创建相关bean,进行如下配置
  21. *
  22. * @author zhoujy
  23. * @date 2018年12月17日
  24. **/
  25. @Configuration
  26. @EnableConfigurationProperties({KafkaProperties.class})
  27. @ConditionalOnProperty(prefix = "spring.kafka", name = "enable", havingValue = "true")
  28. public class KafkaCommonConfig implements InitializingBean {
  29. private final KafkaProperties properties;
  30. @Value("${spring.kafka.consumer.topics}")
  31. private String topics;
  32. public KafkaCommonConfig(KafkaProperties properties) {
  33. this.properties = properties;
  34. }
  35. //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
  36. private Map<String, Object> consumerProperties() {
  37. // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  38. return properties.buildConsumerProperties();
  39. }
  40. /**
  41. * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
  42. *
  43. * @return
  44. */
  45. @Bean("consumerFactory")
  46. public DefaultKafkaConsumerFactory consumerFactory() {
  47. return new DefaultKafkaConsumerFactory(consumerProperties());
  48. }
  49. @Bean("listenerContainerFactory")
  50. //个性化定义消费者
  51. public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
  52. //指定使用DefaultKafkaConsumerFactory
  53. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
  54. factory.setConsumerFactory(consumerFactory);
  55. //设置消费者ack模式为手动,看需求设置
  56. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  57. //设置可批量拉取消息消费,拉取数量一次3,看需求设置
  58. factory.setConcurrency(1);
  59. factory.setBatchListener(true);
  60. return factory;
  61. }
  62. /* @Bean
  63. //代码创建方式topic
  64. public NewTopic batchTopic() {
  65. return new NewTopic("topic.quick.batch", 8, (short) 1);
  66. }*/
  67. //创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
  68. private Map<String, Object> producerProperties() {
  69. return properties.buildProducerProperties();
  70. }
  71. /**
  72. * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
  73. *
  74. * @return
  75. */
  76. @Bean("produceFactory")
  77. public DefaultKafkaProducerFactory produceFactory() {
  78. return new DefaultKafkaProducerFactory(producerProperties());
  79. }
  80. /**
  81. * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
  82. *
  83. * @param produceFactory
  84. * @return
  85. */
  86. @Bean
  87. public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory) {
  88. return new KafkaTemplate(produceFactory);
  89. }
  90. @Override
  91. public void afterPropertiesSet() throws Exception {
  92. String topicName = wireTopics();
  93. System.setProperty("topicName", topicName);
  94. System.out.println("### set system config topic:{}" + topicName);
  95. }
  96. private String wireTopics() {
  97. Set<String> topicSet = new HashSet<>();
  98. topicSet.add(topics);
  99. return StringUtils.join(topicSet, ",");
  100. }
  101. }