Em chào anh ạ.
Hiện tại em đang làm 1 project kết nối giữa kafka và mongodb, em đang thực hiện như sau:
- , application.yml
spring:
mongo:
enabled: false
hosts:
- host: 192.168.101.43
port: 27017
username: chatbot
password: chatbot@vtcc
authSource: chatbot_platform
kafka:
bootstrap:
servers: 192.168.101.43:9092, 192.168.101.69:9092
topic:
name: notification
orderName: order
topic-json:
name: notification
orderName: order
consumer:
group: myGroup
orderGroup: orderGroup
username: kafka
password: 123456aA@
- , AsyncConfig
@Configuration
public class AsyncConfig {
@Bean(name = "asyncTask")
public ThreadPoolTaskExecutor asyncTask() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // Số lượng luồng cố định
executor.setMaxPoolSize(50); // Số lượng luồng tối đa
executor.setQueueCapacity(100); // Số lượng tác vụ chờ trong hàng đợi
executor.setThreadNamePrefix("asyncTask-");
executor.setKeepAliveSeconds(60);
executor.initialize();
return executor;
}
@Bean
public ExecutorService asyncExecutor() {
return Executors.newCachedThreadPool();
}
}
- , Kafka config
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("")
private String bootstrapAddress;
@Value("")
private String consumerGroup;
@Value("")
private String orderGroup;
@Value("")
private String username;
@Value("")
private String password;
@Bean
public ConsumerFactory<String, String> smartNotificationConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, orderGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
if (username != null && password != null) {
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + "" password="" + password + "";");
}
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> smartNotificationKafkaListenerContainerFactory(
@Qualifier("asyncTask") ThreadPoolTaskExecutor executor) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(smartNotificationConsumerFactory());
factory.setConcurrency(3);
return factory;
}
}
Vì là lần đầu tiên làm nên em không rõ là với những cấu hình trên thì đã đáp ứng được async và kết nối theo cụm cluster đến kafka chưa ạ. Và làm cách nào để có thể kiểm tra đưọc ạ.
Em cảm ơn anh!