|
@@ -0,0 +1,100 @@
|
|
|
|
+package com.smcic.api.tags.job;
|
|
|
|
+
|
|
|
|
+import com.smcic.api.tags.dto.Recommend;
|
|
|
|
+import com.smcic.api.tags.dto.UserTagsDTO;
|
|
|
|
+import com.smcic.api.tags.entity.UserTags;
|
|
|
|
+import com.smcic.api.tags.mapper.UserTagsMapper;
|
|
|
|
+import com.smcic.api.tags.service.TagsQueue;
|
|
|
|
+import com.smcic.api.tags.service.impl.UserTagsService;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.http.util.TextUtils;
|
|
|
|
+import org.springframework.http.ResponseEntity;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+import org.springframework.web.client.RestTemplate;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.concurrent.ConcurrentLinkedDeque;
|
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+@Service
|
|
|
|
+@Slf4j
|
|
|
|
+public class UserTagsConsumer {
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private UserTagsService userTagsService;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private UserTagsMapper userTagsMapper;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private ThreadPoolExecutor threadPoolExecutor;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public void run(){
|
|
|
|
+
|
|
|
|
+ System.out.println("启动队列消费");
|
|
|
|
+ ConcurrentLinkedDeque<UserTagsDTO> userTagsQueue = TagsQueue.getInstance().getUserTags();
|
|
|
|
+ long time = System.currentTimeMillis();
|
|
|
|
+ int count = 0;
|
|
|
|
+ List<UserTags> userTagsList = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ Recommend recommend = new Recommend();
|
|
|
|
+ recommend.setAppid("51000");
|
|
|
|
+ recommend.setTableName("user");
|
|
|
|
+
|
|
|
|
+ List<Recommend.TableContentDTO> tableContent = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ while (true){
|
|
|
|
+ try {
|
|
|
|
+ UserTagsDTO userTags = userTagsQueue.poll();
|
|
|
|
+ if (null != userTags){
|
|
|
|
+ log.info("接收到数据{}", userTags);
|
|
|
|
+
|
|
|
|
+ userTagsList.add(new UserTags(userTags));
|
|
|
|
+
|
|
|
|
+ Recommend.TableContentDTO tableContentDTO = new Recommend.TableContentDTO();
|
|
|
|
+ tableContentDTO.setCmd("update");
|
|
|
|
+
|
|
|
|
+ Recommend.TableContentDTO.FieldsDTO fieldsDTO = new Recommend.TableContentDTO.FieldsDTO();
|
|
|
|
+// fieldsDTO.setUserid(userTags.getUserId());
|
|
|
|
+ //fieldsDTO.setTel();
|
|
|
|
+ if (!TextUtils.isEmpty(userTags.getAgeRange()))
|
|
|
|
+ userTags.getTags().add(userTags.getAgeRange());
|
|
|
|
+ fieldsDTO.setUserTags(String.join(";", userTags.getTags()));
|
|
|
|
+ fieldsDTO.setProvince(userTags.getProvince());
|
|
|
|
+ fieldsDTO.setCity(userTags.getCity());
|
|
|
|
+
|
|
|
|
+ fieldsDTO.setImei(userTags.getUuid());
|
|
|
|
+
|
|
|
|
+ tableContentDTO.setFields(fieldsDTO);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ tableContent.add(tableContentDTO);
|
|
|
|
+
|
|
|
|
+ count++;
|
|
|
|
+ }
|
|
|
|
+ long curr = System.currentTimeMillis();
|
|
|
|
+ if(count > 300 || (count > 0 && curr - time > 2000)){
|
|
|
|
+ userTagsMapper.replaceUser(userTagsList);
|
|
|
|
+ userTagsList.clear();
|
|
|
|
+ count = 0;
|
|
|
|
+ time = curr;
|
|
|
|
+ this.threadPoolExecutor.execute(() -> {
|
|
|
|
+ RestTemplate restTemplate = new RestTemplate();
|
|
|
|
+ recommend.setTableContent(tableContent);
|
|
|
|
+ ResponseEntity<Object> mapResponseEntity = restTemplate.postForEntity("https://receiver.sxtvs.net/user/qmtssp", recommend, Object.class);
|
|
|
|
+ log.info("发送数据{}, 推荐系统响应{}", recommend, mapResponseEntity.getBody());
|
|
|
|
+ tableContent.clear();
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }catch (Exception e){
|
|
|
|
+ log.info("队列处理异常", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|