Cấu hình Async và Cluster cho Kafka
Em chào anh ạ. <p>
Hiện tại em đang làm 1 project kết nối giữa kafka và mongodb, em đang thực hiện như sau:
</p>
<ol start="1">
<li>, application.yml</li>
</ol>
<div class="markdown-block position-relative overflow-auto source-java">
<pre>
spring:
mongo:
enabled: false
hosts:
- host: 192.168.101.43
port: 27017
username: chatbot
password: chatbot@vtcc
authSource: chatbot_platform
kafka:
bootstrap:
servers: 192.168.101.43:9092, 192.168.101.69:9092
topic:
name: notification
orderName: order
topic-json:
name: notification
orderName: order
consumer:
group: myGroup
orderGroup: orderGroup
username: kafka
password: 123456aA@
</pre>
</div>2., AsyncConfig<div class="markdown-block position-relative overflow-auto source-java">
<pre>
<span class="pl-s">@Configuration</span>
<span class="pl-k">public</span> <span class="pl-k">class</span> AsyncConfig {
<span class="pl-s">@Bean</span>(name = <span class="pl-s">"asyncTask"</span>)
<span class="pl-k">public</span> ThreadPoolTaskExecutor asyncTask() {
ThreadPoolTaskExecutor executor = <span class="pl-k">new</span> ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // Số lượng luồng cố định
executor.setMaxPoolSize(50); // Số lượng luồng tối đa
executor.setQueueCapacity(100); // Số lượng tác vụ chờ trong hàng đợi
executor.setThreadNamePrefix(<span class="pl-s">"asyncTask-"</span>);
executor.setKeepAliveSeconds(60);
executor.initialize();
<span class="pl-k">return</span> executor;
}
<span class="pl-s">@Bean</span>
<span class="pl-k">public</span> ExecutorService asyncExecutor() {
<span class="pl-k">return</span> Executors.newCachedThreadPool();
}
}
</pre>
</div>3., Kafka config<div class="markdown-block position-relative overflow-auto source-java">
<pre>
<span class="pl-s">@EnableKafka</span>
<span class="pl-s">@Configuration</span>
<span class="pl-k">public</span> <span class="pl-k">class</span> KafkaConsumerConfig {
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.bootstrap.servers}"</span>)
private String bootstrapAddress;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.consumer.group}"</span>)
private String consumerGroup;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.consumer.orderGroup}"</span>)
private String orderGroup;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.username}"</span>)
private String username;
<span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.password}"</span>)
private String password;
<span class="pl-s">@Bean</span>
<span class="pl-k">public</span> ConsumerFactory<String, String> smartNotificationConsumerFactory() {
Map<String, Object> props = <span class="pl-k">new</span> HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, orderGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, <span class="pl-s">"30000"</span>);
<span class="pl-k">if</span> (username != null && password != null) {
props.put(<span class="pl-s">"security.protocol"</span>, <span class="pl-s">"SASL_PLAINTEXT"</span>);
props.put(<span class="pl-s">"sasl.mechanism"</span>, <span class="pl-s">"PLAIN"</span>);
props.put(<span class="pl-s">"sasl.jaas.config"</span>, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + <span class="pl-s">""</span> password="" + password + <span class="pl-s">"";"</span>);
}
<span class="pl-k">return</span> <span class="pl-k">new</span> DefaultKafkaConsumerFactory<>(props);
}
<span class="pl-s">@Bean</span>
<span class="pl-k">public</span> ConcurrentKafkaListenerContainerFactory<String, String> smartNotificationKafkaListenerContainerFactory(
<span class="pl-s">@Qualifier</span>(<span class="pl-s">"asyncTask"</span>) ThreadPoolTaskExecutor executor) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
<span class="pl-k">new</span> ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(smartNotificationConsumerFactory());
factory.setConcurrency(3);
<span class="pl-k">return</span> factory;
}
}
</pre>
</div>Vì là lần đầu tiên làm nên em không rõ là với những cấu hình trên thì đã đáp ứng được async và kết nối theo cụm cluster đến kafka chưa ạ. Và làm cách nào để có thể kiểm tra đưọc ạ.<p>
Em cảm ơn anh!
</p>
Maven - Dependency
Em chào anh ạ. Anh cho em hỏi, em đang có 1 project em chia thành các module nhỏ trong đó có module "common" em dùng mongo-java-driver<div class="markdown-block position-relative overflow-auto source-java">
<pre>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongodb.version}</version>
</dependency>
</pre>
</div>Bây giờ ở 1 module mới em muốn dùng sang Mongo của Spring mà vẫn muốn dùng các config của common<div class="markdown-block position-relative overflow-auto source-java">
<pre>
<dependency>
<groupId>com.example.common</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
</pre>
</div>Vì tính chất không thể thay đổi ở common, vậy có cách nào có thể dùng Mongo của Spring ở module mới mà không bị conflix thư viện với mongo ở module "common" không ạ.<p>
Em cảm ơn ạ
</p>
Lưu dữ liệu từ Activemq vào Elasticsearch
Em chào anh.<p>
Em đang có 1 task lấy data từ Activemq đã đưọc đọc và lưu vào Elasticsearch. Tuy nhiên em chưa có hướng để làm phần này. Anh có thể giúp em được không ạ.
</p>
<p>
Em cảm ơn ạ
</p>
Convert từ String sang objectId MongoDB
Em chào anh.<p>
Em đang dùng Lookup trong MongoTemplate để join 2 collection như sau:
</p>
<p>
public List<UserWithDepartment> findUsersWithDepartments() {
</p>
<p>
ProjectionOperation projectionOperation = Aggregation.project()
</p>
<p>
.and(ConvertOperators.valueOf("userId").convertToObjectId()).as("userId");
</p>
<p>
Aggregation aggregation = Aggregation.newAggregation(
</p>
<p>
projectionOperation,
</p>
<p>
Aggregation.lookup(departmentCollectionName, "_id","userId","departments")
</p>
<p>
);
</p>
<p>
AggregationResults<UserWithDepartment> results = mongoTemplate.aggregate(
</p>
<p>
aggregation, userCollectionName, UserWithDepartment.class);
</p>
<p>
return results.getMappedResults();
</p>
<p>
}
</p>
<p>
Vấn đề của em là "_id" kiểu dữ liệu objectId còn "userId" kiểu dữ liệu String vì vậy em cần convert về 1 kiểu dữ liệu nhưng đang gặp lỗi này: Unrecognized expression '$toObjectId'
</p>
<p>
Anh đã gặp lỗi này chưa và cách khắc phục là gì ạ.
</p>
<p>
Em cảm ơn ạ
</p>