Avatar
0
Tran Duy Tung Beginner
Tran Duy Tung Beginner
Cấu hình Async và Cluster cho Kafka
Em chào anh ạ.

Hiện tại em đang làm 1 project kết nối giữa kafka và mongodb, em đang thực hiện như sau:

  1. , application.yml
spring:
  mongo:
    enabled: false
    hosts:
      -   host: 192.168.101.43
          port: 27017
    username: chatbot
    password: chatbot@vtcc
    authSource: chatbot_platform

  kafka:
    bootstrap:
      servers: 192.168.101.43:9092, 192.168.101.69:9092
    topic:
      name: notification
      orderName: order
    topic-json:
      name: notification
      orderName: order
    consumer:
      group: myGroup
      orderGroup: orderGroup
    username: kafka
    password: 123456aA@
2., AsyncConfig
@Configuration
public class AsyncConfig {

    @Bean(name = "asyncTask")
    public ThreadPoolTaskExecutor asyncTask() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // Số lượng luồng cố định
        executor.setMaxPoolSize(50); // Số lượng luồng tối đa
        executor.setQueueCapacity(100); // Số lượng tác vụ chờ trong hàng đợi
        executor.setThreadNamePrefix("asyncTask-");
        executor.setKeepAliveSeconds(60);
        executor.initialize();
        return executor;
    }

    @Bean
    public ExecutorService asyncExecutor() {
        return Executors.newCachedThreadPool();
    }
}
3., Kafka config
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap.servers}")
    private String bootstrapAddress;
    @Value("${spring.kafka.consumer.group}")
    private String consumerGroup;
    @Value("${spring.kafka.consumer.orderGroup}")
    private String orderGroup;
    @Value("${spring.kafka.username}")
    private String username;
    @Value("${spring.kafka.password}")
    private String password;
    @Bean
    public ConsumerFactory<String, String> smartNotificationConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, orderGroup);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

        if (username != null && password != null) {
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + "" password="" + password + "";");
        }
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> smartNotificationKafkaListenerContainerFactory(
            @Qualifier("asyncTask") ThreadPoolTaskExecutor executor) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(smartNotificationConsumerFactory());
        factory.setConcurrency(3);
        return factory;
    }

}
Vì là lần đầu tiên làm nên em không rõ là với những cấu hình trên thì đã đáp ứng được async và kết nối theo cụm cluster đến kafka chưa ạ. Và làm cách nào để có thể kiểm tra đưọc ạ.

Em cảm ơn anh!

  • Answer
Remain: 5
1 Answer
Avatar
monkey Beginner
monkey Beginner
Anh nghĩ là ok đó em ạ, nhưng mà anh nghĩ nên gọi là concurrency hoặc multi-threading nó đúng hơn gọi là async trong trường hợp này. Em có thể dùng visualvm để xem danh sách các thread được tạo ra, nếu có nhiều hơn một main thread được khởi tạo và hoạt động thì có thể em đã đạt được mục tiêu của mình.
  • 0
  • Reply
Em cảm ơn anh đã phản hồi ạ.

Cho em hỏi thêm chút, khi em đọc tài liệu thì có 2 keyword em thấy khá khó hiểu là partition và commit offset. Anh có thể giải thích giúp em đưọc không ạ, và với kinh nghiệm của anh thì theo anh nên sử dụng hình thức commit offset nào cho hệ thống liên quan đến chatbot ạ

Em cảm ơn ạ.

 –  Tran Duy Tung 1710744540000
Về phía người sử dụng kafka như em thì anh nghĩ em không cần thiết phải quan tâm đến các thông số này và sử dụng mặc định em ạ. Commit offsets và partitions là các khái niệm bên trong phần core của kafka liên quan đến việc nó tổ chức lưu trữ dữ liệu. Anh chưa hiểu chatbot thì có liên quan gì đến câu chuyện kafka mà mình đang nói ta? Chắc là em cần phải tạo câu hỏi riêng để rõ bối cảnh hơn cho người trả lời em ạ.  –  monkey 1710744956000