Tran Duy Tung
4 questions
0 answers
Avatar
0
Tran Duy Tung Beginner
Tran Duy Tung Beginner
Cấu hình Async và Cluster cho Kafka
Em chào anh ạ. <p> Hiện tại em đang làm 1 project kết nối giữa kafka và mongodb, em đang thực hiện như sau: </p> <ol start="1"> <li>, application.yml</li> </ol> <div class="markdown-block position-relative overflow-auto source-java"> <pre> spring: mongo: enabled: false hosts: - host: 192.168.101.43 port: 27017 username: chatbot password: chatbot@vtcc authSource: chatbot_platform kafka: bootstrap: servers: 192.168.101.43:9092, 192.168.101.69:9092 topic: name: notification orderName: order topic-json: name: notification orderName: order consumer: group: myGroup orderGroup: orderGroup username: kafka password: 123456aA@ </pre> </div>2., AsyncConfig<div class="markdown-block position-relative overflow-auto source-java"> <pre> <span class="pl-s">@Configuration</span> <span class="pl-k">public</span> <span class="pl-k">class</span> AsyncConfig { <span class="pl-s">@Bean</span>(name = <span class="pl-s">"asyncTask"</span>) <span class="pl-k">public</span> ThreadPoolTaskExecutor asyncTask() { ThreadPoolTaskExecutor executor = <span class="pl-k">new</span> ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // Số lượng luồng cố định executor.setMaxPoolSize(50); // Số lượng luồng tối đa executor.setQueueCapacity(100); // Số lượng tác vụ chờ trong hàng đợi executor.setThreadNamePrefix(<span class="pl-s">"asyncTask-"</span>); executor.setKeepAliveSeconds(60); executor.initialize(); <span class="pl-k">return</span> executor; } <span class="pl-s">@Bean</span> <span class="pl-k">public</span> ExecutorService asyncExecutor() { <span class="pl-k">return</span> Executors.newCachedThreadPool(); } } </pre> </div>3., Kafka config<div class="markdown-block position-relative overflow-auto source-java"> <pre> <span class="pl-s">@EnableKafka</span> <span class="pl-s">@Configuration</span> <span class="pl-k">public</span> <span class="pl-k">class</span> KafkaConsumerConfig { <span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.bootstrap.servers}"</span>) private String bootstrapAddress; <span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.consumer.group}"</span>) private String consumerGroup; <span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.consumer.orderGroup}"</span>) private String orderGroup; <span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.username}"</span>) private String username; <span class="pl-s">@Value</span>(<span class="pl-s">"${spring.kafka.password}"</span>) private String password; <span class="pl-s">@Bean</span> <span class="pl-k">public</span> ConsumerFactory&lt;String, String&gt; smartNotificationConsumerFactory() { Map&lt;String, Object&gt; props = <span class="pl-k">new</span> HashMap&lt;&gt;(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, orderGroup); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, <span class="pl-s">"30000"</span>); <span class="pl-k">if</span> (username != null && password != null) { props.put(<span class="pl-s">"security.protocol"</span>, <span class="pl-s">"SASL_PLAINTEXT"</span>); props.put(<span class="pl-s">"sasl.mechanism"</span>, <span class="pl-s">"PLAIN"</span>); props.put(<span class="pl-s">"sasl.jaas.config"</span>, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + <span class="pl-s">""</span> password="" + password + <span class="pl-s">"";"</span>); } <span class="pl-k">return</span> <span class="pl-k">new</span> DefaultKafkaConsumerFactory&lt;&gt;(props); } <span class="pl-s">@Bean</span> <span class="pl-k">public</span> ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; smartNotificationKafkaListenerContainerFactory( <span class="pl-s">@Qualifier</span>(<span class="pl-s">"asyncTask"</span>) ThreadPoolTaskExecutor executor) { ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory = <span class="pl-k">new</span> ConcurrentKafkaListenerContainerFactory&lt;&gt;(); factory.setConsumerFactory(smartNotificationConsumerFactory()); factory.setConcurrency(3); <span class="pl-k">return</span> factory; } } </pre> </div>Vì là lần đầu tiên làm nên em không rõ là với những cấu hình trên thì đã đáp ứng được async và kết nối theo cụm cluster đến kafka chưa ạ. Và làm cách nào để có thể kiểm tra đưọc ạ.<p> Em cảm ơn anh! </p>
Answer
Avatar
0
Tran Duy Tung Beginner
Tran Duy Tung Beginner
Maven - Dependency
Em chào anh ạ. Anh cho em hỏi, em đang có 1 project em chia thành các module nhỏ trong đó có module "common" em dùng mongo-java-driver<div class="markdown-block position-relative overflow-auto source-java"> <pre> &lt;dependency&gt; &lt;groupId&gt;org.mongodb&lt;/groupId&gt; &lt;artifactId&gt;mongo-java-driver&lt;/artifactId&gt; &lt;version&gt;${mongodb.version}&lt;/version&gt; &lt;/dependency&gt; </pre> </div>Bây giờ ở 1 module mới em muốn dùng sang Mongo của Spring mà vẫn muốn dùng các config của common<div class="markdown-block position-relative overflow-auto source-java"> <pre> &lt;dependency&gt; &lt;groupId&gt;com.example.common&lt;/groupId&gt; &lt;artifactId&gt;common&lt;/artifactId&gt; &lt;version&gt;0.0.1-SNAPSHOT&lt;/version&gt; &lt;/dependency&gt; &lt;dependency&gt; &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt; &lt;artifactId&gt;spring-boot-starter-data-mongodb&lt;/artifactId&gt; &lt;/dependency&gt; </pre> </div>Vì tính chất không thể thay đổi ở common, vậy có cách nào có thể dùng Mongo của Spring ở module mới mà không bị conflix thư viện với mongo ở module "common" không ạ.<p> Em cảm ơn ạ </p>
Answer
Avatar
0
Tran Duy Tung Beginner
Tran Duy Tung Beginner
Lưu dữ liệu từ Activemq vào Elasticsearch
Em chào anh.<p> Em đang có 1 task lấy data từ Activemq đã đưọc đọc và lưu vào Elasticsearch. Tuy nhiên em chưa có hướng để làm phần này. Anh có thể giúp em được không ạ. </p> <p> Em cảm ơn ạ </p>
Answer
Avatar
0
Tran Duy Tung Beginner
Tran Duy Tung Beginner
Convert từ String sang objectId MongoDB
Em chào anh.<p> Em đang dùng Lookup trong MongoTemplate để join 2 collection như sau: </p> <p> public List<UserWithDepartment> findUsersWithDepartments() { </p> <p> ProjectionOperation projectionOperation = Aggregation.project() </p> <p> .and(ConvertOperators.valueOf("userId").convertToObjectId()).as("userId"); </p> <p> Aggregation aggregation = Aggregation.newAggregation( </p> <p> projectionOperation, </p> <p> Aggregation.lookup(departmentCollectionName, "_id","userId","departments") </p> <p> ); </p> <p> AggregationResults<UserWithDepartment> results = mongoTemplate.aggregate( </p> <p> aggregation, userCollectionName, UserWithDepartment.class); </p> <p> return results.getMappedResults(); </p> <p> } </p> <p> Vấn đề của em là "_id" kiểu dữ liệu objectId còn "userId" kiểu dữ liệu String vì vậy em cần convert về 1 kiểu dữ liệu nhưng đang gặp lỗi này: Unrecognized expression '$toObjectId' </p> <p> Anh đã gặp lỗi này chưa và cách khắc phục là gì ạ. </p> <p> Em cảm ơn ạ </p>
Answer