|
@@ -0,0 +1,74 @@
|
|
|
|
+package com.cxzx;
|
|
|
|
+
|
|
|
|
+import com.cxzx.db.Tidb;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.commons.io.FilenameUtils;
|
|
|
|
+import org.apache.commons.lang3.SystemUtils;
|
|
|
|
+import org.apache.kafka.clients.CommonClientConfigs;
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
+import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
|
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
|
|
+import org.apache.kafka.common.TopicPartition;
|
|
|
|
+import org.apache.kafka.common.config.SaslConfigs;
|
|
|
|
+import org.apache.kafka.common.config.SslConfigs;
|
|
|
|
+import com.cxzx.db.KafkaData;
|
|
|
|
+
|
|
|
|
+import java.sql.Timestamp;
|
|
|
|
+import java.time.Duration;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Properties;
|
|
|
|
+
|
|
|
|
+@Slf4j
|
|
|
|
+public class Main {
|
|
|
|
+ public static void main(String[] args) {
|
|
|
|
+
|
|
|
|
+ var userDir = SystemUtils.getUserDir();
|
|
|
|
+ var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
|
|
|
|
+ var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
|
|
|
|
+ var props = new Properties();
|
|
|
|
+ //根证书store的密码,保持不变。
|
|
|
|
+ //接入协议,目前支持使用SASL_SSL协议接入。
|
|
|
|
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
|
|
|
|
+ props.put(CommonClientConfigs.GROUP_ID_CONFIG, "cxzx");
|
|
|
|
+ //SASL鉴权方式,保持不变。
|
|
|
|
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
|
|
|
|
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksPath);
|
|
|
|
+ //根证书store的密码,保持不变。
|
|
|
|
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
|
|
|
|
+ props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
|
|
|
|
+ System.setProperty("java.security.auth.login.config", jaasPath);
|
|
|
|
+ props.put("bootstrap.servers",
|
|
|
|
+ "alikafka-pre-cn-tl32wxejg003-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-3.alikafka.aliyuncs.com:9093");
|
|
|
|
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
|
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
|
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
|
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
|
|
|
+
|
|
|
|
+ //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
|
|
|
|
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 300);
|
|
|
|
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ var consumer = new KafkaConsumer<String, String>(props);
|
|
|
|
+
|
|
|
|
+ try (consumer) {
|
|
|
|
+ consumer.subscribe(List.of("article"));
|
|
|
|
+ while (true) {
|
|
|
|
+ var kafkaDataList = new ArrayList<KafkaData>();
|
|
|
|
+ for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(30 * 1000))) {
|
|
|
|
+ kafkaDataList.add(new KafkaData(record.offset(), record.value(), new Timestamp(record.timestamp())));
|
|
|
|
+ var offsetMap = Map.of(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
|
|
|
|
+ consumer.commitSync(offsetMap);
|
|
|
|
+ }
|
|
|
|
+ Tidb.insert(kafkaDataList);
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+}
|