123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- package com.persagy.iottransfer.config;
- import org.apache.commons.lang.StringUtils;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
- import org.springframework.boot.context.properties.EnableConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.listener.ContainerProperties;
- import java.util.HashSet;
- import java.util.Map;
- import java.util.Set;
- /**
- * kafka配置,实际上,在KafkaAutoConfiguration中已经有默认的根据配置文件信息创建配置,但是自动配置属性没有涵盖所有
- * 我们可以自定义创建相关bean,进行如下配置
- *
- * @author zhoujy
- * @date 2018年12月17日
- **/
- @Configuration
- @EnableConfigurationProperties({KafkaProperties.class})
- @ConditionalOnProperty(prefix = "spring.kafka", name = "enable", havingValue = "true")
- public class KafkaCommonConfig implements InitializingBean {
- private final KafkaProperties properties;
- @Value("${spring.kafka.consumer.topics}")
- private String topics;
- public KafkaCommonConfig(KafkaProperties properties) {
- this.properties = properties;
- }
- //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
- private Map<String, Object> consumerProperties() {
- // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- return properties.buildConsumerProperties();
- }
- /**
- * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
- *
- * @return
- */
- @Bean("consumerFactory")
- public DefaultKafkaConsumerFactory consumerFactory() {
- return new DefaultKafkaConsumerFactory(consumerProperties());
- }
- @Bean("listenerContainerFactory")
- //个性化定义消费者
- public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
- //指定使用DefaultKafkaConsumerFactory
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
- factory.setConsumerFactory(consumerFactory);
- //设置消费者ack模式为手动,看需求设置
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
- //设置可批量拉取消息消费,拉取数量一次3,看需求设置
- factory.setConcurrency(1);
- factory.setBatchListener(true);
- return factory;
- }
- /* @Bean
- //代码创建方式topic
- public NewTopic batchTopic() {
- return new NewTopic("topic.quick.batch", 8, (short) 1);
- }*/
- //创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
- private Map<String, Object> producerProperties() {
- return properties.buildProducerProperties();
- }
- /**
- * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
- *
- * @return
- */
- @Bean("produceFactory")
- public DefaultKafkaProducerFactory produceFactory() {
- return new DefaultKafkaProducerFactory(producerProperties());
- }
- /**
- * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
- *
- * @param produceFactory
- * @return
- */
- @Bean
- public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory) {
- return new KafkaTemplate(produceFactory);
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- String topicName = wireTopics();
- System.setProperty("topicName", topicName);
- System.out.println("### set system config topic:{}" + topicName);
- }
- private String wireTopics() {
- Set<String> topicSet = new HashSet<>();
- topicSet.add(topics);
- return StringUtils.join(topicSet, ",");
- }
- }
|