Просмотр исходного кода

朱老板他们的数据入湖接口

DESKTOP-6LTVLN7\Liumouren 3 недель назад
Родитель
Сommit
ef140c11fe

+ 9 - 0
src/main/java/com/skyversation/poiaddr/controller/CorporateLibraryController.java

@@ -248,6 +248,15 @@ public class CorporateLibraryController {
         return "处理完成!用时" + (endTime - startTime) / 1000 + "秒!";
     }
 
+    @PostMapping(value = "/saveFileDataToPg3File")
+    public Object saveFileDataToPg3File(@RequestParam(name = "fileRootPath") String fileRootPath, @RequestParam(name = "startFileIndex") Integer startFileIndex, @RequestParam(name = "endFileIndex") Integer endFileIndex, @RequestParam(name = "sort") Integer sort, @RequestParam(name = "pageSize") Integer pageSize) {
+        // 记录程序开始时间
+        long startTime = System.currentTimeMillis();
+        yyskAddressStandardizationService.readFileToKafka(fileRootPath, startFileIndex, endFileIndex, sort, pageSize);
+        long endTime = System.currentTimeMillis();
+        return "处理完成!用时" + (endTime - startTime) / 1000 + "秒!";
+    }
+
     /**
      * 先获取文件内容,然后根据address当做key,standardAddress和经纬度作为value
      * 读取结果文件,然后存成Map,接着获取经纬度

+ 39 - 14
src/main/java/com/skyversation/poiaddr/service/impl/YyskAddressStandardizationServiceImpl.java

@@ -568,13 +568,21 @@ public class YyskAddressStandardizationServiceImpl {
                 JSONObject data = new JSONObject();
                 for (String key : item.keySet()) {
                     if (key.equals("updatetime")) {
-                        LocalDate date = LocalDate.parse(item.get(key).toString(), inputFormatter);
-                        // 转换为 LocalDateTime(默认时间为 00:00:00)
-                        LocalDateTime dateTime = date.atStartOfDay();
-                        // 定义输出格式并格式化日期时间
-                        DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
-                        String output = dateTime.format(outputFormatter);
-                        data.put(key, output);
+                        String updateTimeStr = item.get(key).toString();
+                        if (updateTimeStr.contains(".")) {
+                            updateTimeStr = updateTimeStr.replaceAll("\\.", "").substring(0, 8);
+                            LocalDate date = LocalDate.parse(updateTimeStr, inputFormatter);
+                            // 转换为 LocalDateTime(默认时间为 00:00:00)
+                            LocalDateTime dateTime = date.atStartOfDay();
+                            // 定义输出格式并格式化日期时间
+                            DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
+                            String output = dateTime.format(outputFormatter);
+                            data.put(key, output);
+                        }else{
+                            data.put(key, ScheduledTasks.timestampToFormat(item.get(key).toString()));
+                        }
+                    } else if (key.equals("anchedate")) {
+                        data.put(key, ScheduledTasks.timestampToFormat(item.get(key).toString()));
                     } else {
                         data.put(key, item.get(key));
                     }
@@ -669,8 +677,8 @@ public class YyskAddressStandardizationServiceImpl {
             dataItem.put("community", tYyszAddress.getCommunity());
             dataItem.put("lon", tYyszAddress.getLon());
             dataItem.put("lat", tYyszAddress.getLat());
-            dataItem.put("x",tYyszAddress.getX());
-            dataItem.put("y",tYyszAddress.getY());
+            dataItem.put("x", tYyszAddress.getX());
+            dataItem.put("y", tYyszAddress.getY());
             dataItem.put("data_type", "zl_v3");
 
             dataItem.put("createtime", getCurrentDateTime());
@@ -717,6 +725,21 @@ public class YyskAddressStandardizationServiceImpl {
         return dateTime.format(formatter);
     }
 
+    public void readFileToKafka(String fileRootPath, Integer startFileIndex, Integer endFileIndex, Integer sort, Integer pageSize) {
+        System.out.println("<<<<<<<<------readFileToPg{fileIndex:" + startFileIndex + "}");
+        try {
+            readFileToPg3(fileRootPath + startFileIndex + ".xlsx", pageSize);
+        } catch (Exception e) {
+            System.err.println("readFileToKafka error:" + e);
+        }
+        if (startFileIndex + sort != endFileIndex) {
+            startFileIndex += sort;
+            readFileToPg(fileRootPath, startFileIndex, endFileIndex, sort, pageSize);
+        } else {
+            System.out.println("<<<<<<<<------任务处理完成!");
+        }
+    }
+
     public void readFileToPg(String fileRootPath, Integer startFileIndex, Integer endFileIndex, Integer sort, Integer pageSize) {
         System.out.println("<<<<<<<<------readFileToPg{fileIndex:" + startFileIndex + "}");
         long startTime = System.currentTimeMillis();
@@ -780,12 +803,14 @@ public class YyskAddressStandardizationServiceImpl {
                 }
             }
             dataList.put("data", datas);
-            try {
-                System.out.println(AddressQueryEngine.getInstance().putDataToSJ_Big_Data(dataList));
-            } catch (Exception e) {
-                e.printStackTrace();
+            if (datas.size() > 0) {
+                try {
+                    System.out.println(AddressQueryEngine.getInstance().putDataToSJ_Big_Data(dataList));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                datas.clear();
             }
-            datas.clear();
             long endTime = System.currentTimeMillis();
             dataList.clear();
             System.out.println("处理单批次用时" + (endTime - startTime) / 1000 + "秒!");

+ 4 - 4
src/main/java/com/skyversation/poiaddr/util/net/AddressNetTools.java

@@ -50,10 +50,10 @@ public class AddressNetTools {
     @Async
     public ResponseEntity requestGetOrPost(HttpMethod httpMethod, String url, JSONObject params, Map<String, String> headerMap, Integer ifReloadSize) {
         SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
-        requestFactory.setConnectTimeout(10000);
-        requestFactory.setReadTimeout(5000);
-        /*requestFactory.setConnectTimeout(150000);
-        requestFactory.setReadTimeout(120000);*/
+        /*requestFactory.setConnectTimeout(10000);
+        requestFactory.setReadTimeout(5000);*/
+        requestFactory.setConnectTimeout(150000);
+        requestFactory.setReadTimeout(120000);
         RestTemplate client = new RestTemplate(requestFactory);
         client.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
         HttpHeaders headers = new HttpHeaders();

+ 29 - 4
src/main/java/com/skyversation/poiaddr/util/tasks/ScheduledTasks.java

@@ -6,7 +6,9 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
 import java.time.LocalDate;
+import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -21,12 +23,12 @@ public class ScheduledTasks {
 
     public static int szxRequestSize = 0;
     public static int gdRequestSize = 0;
-//  2kw-3kw是xlsx数据;amap是3kw开头
+    //  2kw-3kw是xlsx数据;amap是3kw开头
     public static Long OID_TAG = 30000000L;
-    public static List<Map<String,Object>> LogInfos = new ArrayList<>();
+    public static List<Map<String, Object>> LogInfos = new ArrayList<>();
     public static boolean pauseTag = false;
-//  统计异常数量和类型
-    public static Map<String,Integer> errorCount = new ConcurrentHashMap<>();
+    //  统计异常数量和类型
+    public static Map<String, Integer> errorCount = new ConcurrentHashMap<>();
 
     @Resource
     private YyskAddressStandardizationServiceImpl yyskAddressStandardizationService;
@@ -86,4 +88,27 @@ public class ScheduledTasks {
         return previousDate.format(formatter);
     }
 
+    public static String timestampToFormat(String timestamp) {
+        try {
+
+            // 定义输入格式
+            DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern(
+                    "EEE MMM dd HH:mm:ss zzz yyyy",
+                    Locale.US
+            );
+
+            // 解析为 ZonedDateTime
+            ZonedDateTime zdt = ZonedDateTime.parse(timestamp, inputFormatter);
+
+            // 定义输出格式
+            DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+            // 格式化输出
+            return zdt.format(outputFormatter);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return "";
+        }
+    }
+
 }