|
@@ -3,7 +3,9 @@ package com.cxzx.sxtt;
|
|
|
import cn.hutool.crypto.SecureUtil;
|
|
|
import cn.hutool.http.HttpRequest;
|
|
|
import cn.hutool.http.HttpResponse;
|
|
|
+import com.aliyun.openservices.log.common.LogItem;
|
|
|
import com.cxzx.config.KafkaConfig;
|
|
|
+import com.cxzx.sls.AliyunLogger;
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.core.json.JsonWriteFeature;
|
|
|
import com.fasterxml.jackson.databind.MapperFeature;
|
|
@@ -29,7 +31,8 @@ public class SyncMain {
|
|
|
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
-
|
|
|
+ AliyunLogger aliyunLogger = new AliyunLogger("sxtt-sync");
|
|
|
+ LogItem logItem = new LogItem();
|
|
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConfig.getSXTTKafkaConsumerProperties());
|
|
|
ObjectMapper om = new ObjectMapper();
|
|
|
om.setConfig(om.getSerializationConfig()
|
|
@@ -61,6 +64,11 @@ public class SyncMain {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ logItem.PushBack("topic", record.topic());
|
|
|
+ logItem.PushBack("offset", String.valueOf(record.offset()));
|
|
|
+ logItem.PushBack("data", record.value());
|
|
|
+ logItem.PushBack("time", String.valueOf(record.timestamp()));
|
|
|
+
|
|
|
log.info("读取数据 topic:{}, offset: {}, data: {}, 时间: {}", record.topic(),record.offset(),record.value(), record.timestamp());
|
|
|
|
|
|
long timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8));
|
|
@@ -78,7 +86,10 @@ public class SyncMain {
|
|
|
.body(body, "application/json")
|
|
|
.execute();
|
|
|
log.info("send data: {}", body);
|
|
|
+ logItem.PushBack("sendData", body);
|
|
|
log.info("思拓响应 {}", execute.body());
|
|
|
+ logItem.PushBack("response", execute.body());
|
|
|
+ aliyunLogger.info(logItem);
|
|
|
|
|
|
}
|
|
|
if (count > 0){
|
|
@@ -88,6 +99,8 @@ public class SyncMain {
|
|
|
}
|
|
|
}catch (Exception e){
|
|
|
log.error(e.getMessage(), e);
|
|
|
+ logItem.PushBack( "error", e.getMessage());
|
|
|
+ aliyunLogger.error(logItem);
|
|
|
}
|
|
|
|
|
|
}
|