Hiện tại em đang làm 1 project kết nối giữa kafka và mongodb, em đang thực hiện như sau:
- , application.yml
spring: mongo: enabled: false hosts: - host: 192.168.101.43 port: 27017 username: chatbot password: chatbot@vtcc authSource: chatbot_platform kafka: bootstrap: servers: 192.168.101.43:9092, 192.168.101.69:9092 topic: name: notification orderName: order topic-json: name: notification orderName: order consumer: group: myGroup orderGroup: orderGroup username: kafka password: 123456aA@
@Configuration public class AsyncConfig { @Bean(name = "asyncTask") public ThreadPoolTaskExecutor asyncTask() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // Số lượng luồng cố định executor.setMaxPoolSize(50); // Số lượng luồng tối đa executor.setQueueCapacity(100); // Số lượng tác vụ chờ trong hàng đợi executor.setThreadNamePrefix("asyncTask-"); executor.setKeepAliveSeconds(60); executor.initialize(); return executor; } @Bean public ExecutorService asyncExecutor() { return Executors.newCachedThreadPool(); } }
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap.servers}") private String bootstrapAddress; @Value("${spring.kafka.consumer.group}") private String consumerGroup; @Value("${spring.kafka.consumer.orderGroup}") private String orderGroup; @Value("${spring.kafka.username}") private String username; @Value("${spring.kafka.password}") private String password; @Bean public ConsumerFactory<String, String> smartNotificationConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, orderGroup); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); if (username != null && password != null) { props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + "" password="" + password + "";"); } return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> smartNotificationKafkaListenerContainerFactory( @Qualifier("asyncTask") ThreadPoolTaskExecutor executor) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(smartNotificationConsumerFactory()); factory.setConcurrency(3); return factory; } }
Em cảm ơn anh!