Hiện tại em đang làm 1 project kết nối giữa kafka và mongodb, em đang thực hiện như sau:
- , application.yml
spring:
mongo:
enabled: false
hosts:
- host: 192.168.101.43
port: 27017
username: chatbot
password: chatbot@vtcc
authSource: chatbot_platform
kafka:
bootstrap:
servers: 192.168.101.43:9092, 192.168.101.69:9092
topic:
name: notification
orderName: order
topic-json:
name: notification
orderName: order
consumer:
group: myGroup
orderGroup: orderGroup
username: kafka
password: 123456aA@
@Configuration public class AsyncConfig { @Bean(name = "asyncTask") public ThreadPoolTaskExecutor asyncTask() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // Số lượng luồng cố định executor.setMaxPoolSize(50); // Số lượng luồng tối đa executor.setQueueCapacity(100); // Số lượng tác vụ chờ trong hàng đợi executor.setThreadNamePrefix("asyncTask-"); executor.setKeepAliveSeconds(60); executor.initialize(); return executor; } @Bean public ExecutorService asyncExecutor() { return Executors.newCachedThreadPool(); } }
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap.servers}") private String bootstrapAddress; @Value("${spring.kafka.consumer.group}") private String consumerGroup; @Value("${spring.kafka.consumer.orderGroup}") private String orderGroup; @Value("${spring.kafka.username}") private String username; @Value("${spring.kafka.password}") private String password; @Bean public ConsumerFactory<String, String> smartNotificationConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, orderGroup); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); if (username != null && password != null) { props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + "" password="" + password + "";"); } return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> smartNotificationKafkaListenerContainerFactory( @Qualifier("asyncTask") ThreadPoolTaskExecutor executor) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(smartNotificationConsumerFactory()); factory.setConcurrency(3); return factory; } }
Em cảm ơn anh!