|
@@ -3,9 +3,7 @@ package com.cxzx.sxtt;
|
|
|
import cn.hutool.crypto.SecureUtil;
|
|
|
import cn.hutool.http.HttpRequest;
|
|
|
import cn.hutool.http.HttpResponse;
|
|
|
-import com.aliyun.openservices.log.common.LogItem;
|
|
|
import com.cxzx.config.KafkaConfig;
|
|
|
-import com.cxzx.sls.AliyunLogger;
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.core.json.JsonWriteFeature;
|
|
|
import com.fasterxml.jackson.databind.MapperFeature;
|
|
@@ -31,8 +29,7 @@ public class SyncMain {
|
|
|
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
- AliyunLogger aliyunLogger = new AliyunLogger("sxtt-sync");
|
|
|
- LogItem logItem = new LogItem();
|
|
|
+
|
|
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConfig.getSXTTKafkaConsumerProperties());
|
|
|
ObjectMapper om = new ObjectMapper();
|
|
|
om.setConfig(om.getSerializationConfig()
|
|
@@ -64,11 +61,6 @@ public class SyncMain {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- logItem.PushBack("topic", record.topic());
|
|
|
- logItem.PushBack("offset", String.valueOf(record.offset()));
|
|
|
- logItem.PushBack("data", record.value());
|
|
|
- logItem.PushBack("time", String.valueOf(record.timestamp()));
|
|
|
-
|
|
|
log.info("读取数据 topic:{}, offset: {}, data: {}, 时间: {}", record.topic(),record.offset(),record.value(), record.timestamp());
|
|
|
|
|
|
long timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8));
|
|
@@ -86,10 +78,7 @@ public class SyncMain {
|
|
|
.body(body, "application/json")
|
|
|
.execute();
|
|
|
log.info("send data: {}", body);
|
|
|
- logItem.PushBack("sendData", body);
|
|
|
log.info("思拓响应 {}", execute.body());
|
|
|
- logItem.PushBack("response", execute.body());
|
|
|
- aliyunLogger.info(logItem);
|
|
|
|
|
|
}
|
|
|
if (count > 0){
|
|
@@ -99,8 +88,6 @@ public class SyncMain {
|
|
|
}
|
|
|
}catch (Exception e){
|
|
|
log.error(e.getMessage(), e);
|
|
|
- logItem.PushBack( "error", e.getMessage());
|
|
|
- aliyunLogger.error(logItem);
|
|
|
}
|
|
|
|
|
|
}
|