Cấu hình Async và Cluster cho Kafka
Em chào anh ạ. <p>
Hiện tại em đang làm 1 project kết nối giữa kafka và mongodb, em đang thực hiện như sau:
</p>
<ol start="1">
<li>, application.yml</li>
</ol>
<div class="markdown-block position-relative overflow-auto source-java">
<pre>
spring:
mongo:
enabled: false
hosts:
- host: 192.168.101.43
port: 27017
username: chatbot
password: chatbot@vtcc
authSource: chatbot_platform
kafka:
bootstrap:
servers: 192.168.101.43:9092, 192.168.101.69:9092
topic:
name: notification
orderName: order
topic-json:
name: notification
orderName: order
consumer:
group: myGroup
orderGroup: orderGroup
username: kafka
password: 123456aA@
</pre>
</div>2., AsyncConfig<div class="markdown-block position-relative overflow-auto source-java">
<pre>
<span class="pl-s">@Configuration</span>
<span class="pl-k">public</span> <span class="pl-k">class</span> AsyncConfig {
<span class="pl-s">@Bean</span>(name = <span class="pl-s">"asyncTask"</span>)
<span class="pl-k">public</span> ThreadPoolTaskExecutor asyncTask() {
ThreadPoolTaskExecutor executor = <span class="pl-k">new</span> ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // Số lượng luồng cố định
executor.setMaxPoolSize(50); // Số lượng luồng tối đa
executor.setQueueCapacity(100); // Số lượng tác vụ chờ trong hàng đợi
executor.setThreadNamePrefix(<span class="pl-s">"asyncTask-"</span>);
executor.setKeepAliveSeconds(60);
executor.initialize();
<span class="pl-k">return</span> executor;
}
<span class="pl-s">@Bean</span>
<span class="pl-k">public</span> ExecutorService asyncExecutor() {
<span class="pl-k">return</span> Executors.newCachedThreadPool();
}
}
</pre>
</div>3., Kafka config<div class="markdown-block position-relative overflow-auto source-java">
<pre>
<span class="pl-s">@EnableKafka</span>
<span class="pl-s">@Configuration</span>
<span class="pl-k">public</span> <span class="pl-k">class</span> KafkaConsumerConfig {
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.bootstrap.servers}"</span>)
private String bootstrapAddress;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.consumer.group}"</span>)
private String consumerGroup;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.consumer.orderGroup}"</span>)
private String orderGroup;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.username}"</span>)
private String username;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.password}"</span>)
private String password;
<span class="pl-s">@Bean</span>
<span class="pl-k">public</span> ConsumerFactory<String, String> smartNotificationConsumerFactory() {
Map<String, Object> props = <span class="pl-k">new</span> HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, orderGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, <span class="pl-s">"30000"</span>);
<span class="pl-k">if</span> (username != null && password != null) {
props.put(<span class="pl-s">"security.protocol"</span>, <span class="pl-s">"SASL_PLAINTEXT"</span>);
props.put(<span class="pl-s">"sasl.mechanism"</span>, <span class="pl-s">"PLAIN"</span>);
props.put(<span class="pl-s">"sasl.jaas.config"</span>, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + <span class="pl-s">""</span> password="" + password + <span class="pl-s">"";"</span>);
}
<span class="pl-k">return</span> <span class="pl-k">new</span> DefaultKafkaConsumerFactory<>(props);
}
<span class="pl-s">@Bean</span>
<span class="pl-k">public</span> ConcurrentKafkaListenerContainerFactory<String, String> smartNotificationKafkaListenerContainerFactory(
<span class="pl-s">@Qualifier</span>(<span class="pl-s">"asyncTask"</span>) ThreadPoolTaskExecutor executor) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
<span class="pl-k">new</span> ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(smartNotificationConsumerFactory());
factory.setConcurrency(3);
<span class="pl-k">return</span> factory;
}
}
</pre>
</div>Vì là lần đầu tiên làm nên em không rõ là với những cấu hình trên thì đã đáp ứng được async và kết nối theo cụm cluster đến kafka chưa ạ. Và làm cách nào để có thể kiểm tra đưọc ạ.<p>
Em cảm ơn anh!
</p>