孙永军 1 gadu atpakaļ
vecāks
revīzija
bc238ea2c1
3 mainītis faili ar 170 papildinājumiem un 0 dzēšanām
  1. 3 0
      build.gradle
  2. 51 0
      deploy/sxtt-sync.yml
  3. 116 0
      src/main/java/com/cxzx/sxtt/SyncMain.java

+ 3 - 0
build.gradle

@@ -34,10 +34,13 @@ dependencies {
     implementation 'com.aliyun.openservices:aliyun-log:0.6.77:jar-with-dependencies'
     implementation 'com.aliyun.openservices:aliyun-log-producer:0.3.10'
 
+    implementation 'cn.hutool:hutool-http:5.8.21'
+
     testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
     testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
     compileOnly 'org.projectlombok:lombok:1.18.24'
     annotationProcessor 'org.projectlombok:lombok:1.18.24'
+
 }
 
 jar {

+ 51 - 0
deploy/sxtt-sync.yml

@@ -0,0 +1,51 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: sxtt-sync
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      k8s-app: sxtt-sync
+  strategy:
+    type: Recreate
+  template:
+    metadata:
+      labels:
+        k8s-app: sxtt-sync
+    spec:
+      hostAliases:
+        - ip: "172.16.101.16"
+          hostnames:
+            - "baidu06"
+        - ip: "172.16.101.17"
+          hostnames:
+            - "baidu07"
+        - ip: "172.16.101.18"
+          hostnames:
+            - "baidu08"
+        - ip: "172.16.101.19"
+          hostnames:
+            - "baidu09"
+      imagePullSecrets:
+        - name: registry-key
+      containers:
+        - command: [ "java","-cp","/app.jar","com.cxzx.sxtt.SyncMain" ]
+          env:
+            - name: PATH
+              value: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
+            - name: TZ
+              value: Asia/Shanghai
+          image: { { image } }
+          imagePullPolicy: IfNotPresent
+          name: sxtt-sync
+          resources:
+            limits:
+              cpu: 100m
+              memory: 515Mi
+            requests:
+              cpu: 100m
+              memory: 50Mi
+      dnsPolicy: ClusterFirst
+      terminationGracePeriodSeconds: 10
+

+ 116 - 0
src/main/java/com/cxzx/sxtt/SyncMain.java

@@ -0,0 +1,116 @@
+package com.cxzx.sxtt;
+
+import cn.hutool.crypto.SecureUtil;
+import cn.hutool.http.Header;
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.HttpUtil;
+import com.cxzx.config.KafkaConfig;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonWriteFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationConfig;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+@Slf4j
+public class SyncMain {
+
+    public static final String appId = "ceedf32922b3ed10";
+    public static final String appSecret = "3c52c3f01665d0cca9d94f1dd2643840";
+
+
+    public static void main(String[] args) {
+
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConfig.getKafkaConsumerProperties());
+        ObjectMapper om = new ObjectMapper();
+        om.setConfig(om.getSerializationConfig()
+                .with(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
+                .with(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
+
+        ).configure(JsonWriteFeature.ESCAPE_NON_ASCII.mappedFeature(), true);
+        try(consumer) {
+
+            consumer.subscribe(List.of(
+                    "article_843ea558d5ef41a1877584c62762632d_app_81",  // 稿件 article_{租户Id}_app
+                    "catalog_843ea558d5ef41a1877584c62762632d_app_81",
+                    "author_843ea558d5ef41a1877584c62762632d_app",
+                    "article_843ea558d5ef41a1877584c62762632d_app"
+                    ));
+            while (true) {
+                int count = 0;
+                for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(200))) {
+                    if (null == record.value()){
+                        continue;
+                    }
+                    TreeMap<String, Object> treeMap = om.readValue(record.value(), TreeMap.class);
+
+                    if("author_843ea558d5ef41a1877584c62762632d_app".equals(record.topic()) || "article_843ea558d5ef41a1877584c62762632d_app".equals(record.topic())){
+                        Map<String, Object> data = (Map<String, Object>) treeMap.get("data");
+                        if (!"81".equals(data.get("appId"))){
+                            continue;
+                        }
+                    }
+                    count++;
+
+                    log.info("读取数据 topic:{}, offset: {}, data: {}, 时间: {}", record.topic(),record.offset(),record.value(), record.timestamp());
+
+                    long timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8));
+
+                    // SecureUtil.md5(om.writeValueAsString(treeMap)) +
+                    String sign = SecureUtil.md5(appId + appSecret + timestamp);
+
+                    String body = om.writeValueAsString(Map.of(
+                            "appId", appId,
+                            "sign", sign,
+                            "timestamp", timestamp,
+                            "data", treeMap
+                    ));
+                    HttpResponse execute = HttpRequest.post("http://admin.console.cnwest.cn/api/open/all_media_plat/receive")
+                            .body(body, "application/json")
+                            .execute();
+                    log.info("send data: {}", body);
+                    log.info("思拓响应 {}", execute.body());
+
+                }
+                if (count > 0){
+                    consumer.commitSync();
+                }
+
+            }
+        }catch (Exception e){
+            log.error(e.getMessage(), e);
+        }
+
+    }
+
+
+
+    public static void test() {
+        ObjectMapper om = new ObjectMapper();
+        om.setConfig(om.getSerializationConfig()
+                .with(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY).with(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
+        ).configure(JsonWriteFeature.ESCAPE_NON_ASCII.mappedFeature(), true);;
+        String str = "{\"data\":{\"catalogId\":1936,\"idaasGroupId\":\"843ea558d5ef41a1877584c62762632d\",\"appId\":81,\"name\":\"测试\",\"action\":7,\"parentId\":1933},\"eventType\":\"catalog\"}";
+        TreeMap<String, Object> treeMap = null;
+        try {
+            treeMap = om.readValue(str, TreeMap.class);
+            System.out.println(om.writeValueAsString(treeMap));
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+}