|
@@ -0,0 +1,263 @@
|
|
|
|
+package com.skyversation.poiaddr.service.impl;
|
|
|
|
+
|
|
|
|
+import com.skyversation.poiaddr.addquery.AddressQueryEngine;
|
|
|
|
+import com.skyversation.poiaddr.bean.AddressResult;
|
|
|
|
+import com.skyversation.poiaddr.config.DbConnection;
|
|
|
|
+import com.skyversation.poiaddr.entity.YyskAddressStandardization;
|
|
|
|
+import com.skyversation.poiaddr.util.ExcelReaderUtils;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+import java.sql.PreparedStatement;
|
|
|
|
+import java.sql.SQLException;
|
|
|
|
+import java.sql.Types;
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
+import java.util.concurrent.Future;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * 法人库服务层
|
|
|
|
+ * 更新操作
|
|
|
|
+ * 查询操作
|
|
|
|
+ * 插入操作
|
|
|
|
+ * 查询dws库的fusion_nfrk_corp_info_partition_dsjzx_v1表的corp_name\real_address字段
|
|
|
|
+ * 将这两个字段中的非空数据保存到一个Set集合中
|
|
|
|
+ * 先将这个集合中的数据都插入到yysk_*_address_standardization表中,并作为主键
|
|
|
|
+ * <p>
|
|
|
|
+ * 先初始化表,然后每每天凌晨一点跑前一天的更新数据
|
|
|
|
+ */
|
|
|
|
+@Service
|
|
|
|
+public class YyskAddressStandardizationServiceImpl {
|
|
|
|
+
|
|
|
|
+ // 下载所有数据到xlsx
|
|
|
|
+ public void uploadDataBase(String tableName) {
|
|
|
|
+ try {
|
|
|
|
+ List<Map<String, Object>> oldDbData = DbConnection.getInstance().runSqlStr("select * FROM " + tableName);
|
|
|
|
+// 去重
|
|
|
|
+ Map<String, Map<String, Object>> newRequestData = new HashMap<>();
|
|
|
|
+ for (Map<String, Object> item : oldDbData) {
|
|
|
|
+ if (item.get("address") != null) {
|
|
|
|
+ newRequestData.put(item.get("address").toString(), item);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ List<Map<String, Object>> dbData = new ArrayList<>();
|
|
|
|
+ for (String address : newRequestData.keySet()) {
|
|
|
|
+ dbData.add(newRequestData.get(address));
|
|
|
|
+ }
|
|
|
|
+// 直接保存数据到xlsx
|
|
|
|
+ ExcelReaderUtils.writeToExcel(dbData, "output/" + tableName + ".xlsx");
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ System.err.println("下载失败!" + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 分页查询地址表
|
|
|
|
+ public List<YyskAddressStandardization> getAllDataPage(int size, String tableName) {
|
|
|
|
+ try {
|
|
|
|
+ List<YyskAddressStandardization> requestData = new ArrayList<>();
|
|
|
|
+ String sql = "SELECT * FROM " + tableName + " WHERE update_time is null limit " + size;
|
|
|
|
+ List<Map<String, Object>> dbData = DbConnection.getInstance().runSqlStr(sql);
|
|
|
|
+ for (Map<String, Object> item : dbData) {
|
|
|
|
+ YyskAddressStandardization YyskAddressStandardization = new YyskAddressStandardization();
|
|
|
|
+ if (item.get("address") != null) {
|
|
|
|
+ YyskAddressStandardization.setAddress(item.getOrDefault("address", "").toString().trim());
|
|
|
|
+ }
|
|
|
|
+ requestData.add(YyskAddressStandardization);
|
|
|
|
+ }
|
|
|
|
+ return requestData;
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ System.err.println("sql运行异常:" + e);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 批量修改地址表
|
|
|
|
+ public void updateDatas(List<YyskAddressStandardization> sjArrDzbzhSjWcbryDzxxList, String tableName) {
|
|
|
|
+ String sqls = "update " + tableName + " set lat = ?,lon = ?,market = ?, distinguish = ?, street_town = ?, residential_committee = ?" +
|
|
|
|
+ ", return_address = ? , standard_address = ? ,match_level = ? , update_time = ? where address = ?";
|
|
|
|
+ try (PreparedStatement preparedStatement = DbConnection.getInstance().connection.prepareStatement(sqls)) {
|
|
|
|
+ for (YyskAddressStandardization entity : sjArrDzbzhSjWcbryDzxxList) {
|
|
|
|
+ if (entity.getAddress() != null) {
|
|
|
|
+ if (entity.getLat() == null) {
|
|
|
|
+ preparedStatement.setNull(1, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(1, entity.getLat());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getLon() == null) {
|
|
|
|
+ preparedStatement.setNull(2, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(2, entity.getLon());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getMarket() == null) {
|
|
|
|
+ preparedStatement.setNull(3, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(3, entity.getMarket());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getDistinguish() == null) {
|
|
|
|
+ preparedStatement.setNull(4, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(4, entity.getDistinguish());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getStreetTown() == null) {
|
|
|
|
+ preparedStatement.setNull(5, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(5, entity.getStreetTown());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getResidentialCommittee() == null) {
|
|
|
|
+ preparedStatement.setNull(6, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(6, entity.getResidentialCommittee());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getReturnAddress() == null) {
|
|
|
|
+ preparedStatement.setNull(7, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(7, entity.getReturnAddress());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getStandardAddress() == null) {
|
|
|
|
+ preparedStatement.setNull(8, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(8, entity.getStandardAddress());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getMatchLevel() == null) {
|
|
|
|
+ preparedStatement.setNull(9, Types.VARCHAR);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setString(9, entity.getMatchLevel());
|
|
|
|
+ }
|
|
|
|
+ if (entity.getUpdateTime() == null) {
|
|
|
|
+ preparedStatement.setNull(10, Types.DATE);
|
|
|
|
+ } else {
|
|
|
|
+ preparedStatement.setDate(10, new java.sql.Date(entity.getUpdateTime().getTime()));
|
|
|
|
+ }
|
|
|
|
+ preparedStatement.setString(11, entity.getAddress());
|
|
|
|
+ // 将当前的 SQL 语句添加到批量操作中
|
|
|
|
+ preparedStatement.addBatch();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // 执行批量操作
|
|
|
|
+ int[] updateCounts = preparedStatement.executeBatch();
|
|
|
|
+ System.out.println("总条数:" + sjArrDzbzhSjWcbryDzxxList.size() + ";更新的记录数: " + updateCounts.length);
|
|
|
|
+ } catch (SQLException throwables) {
|
|
|
|
+ System.err.println("------updateError--------------------------------------" + sqls + "更新异常!");
|
|
|
|
+ throwables.printStackTrace();
|
|
|
|
+ System.err.println("更新异常" + throwables);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 首先要得到一个Page<T>对象,然后判断是否还有别的数据,有的话接着请求并返回新的Page<T>对象并迭代处理数据
|
|
|
|
+ * init [0:批量处理;1:初始化]
|
|
|
|
+ */
|
|
|
|
+ public void iterativeProcessing(Integer pageSize, Integer init, String tableName) {
|
|
|
|
+ System.out.println("<<<<<<<<------iterativeProcessing{pageSize:" + pageSize + "}");
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
+ if (init == 0) {
|
|
|
|
+ List<YyskAddressStandardization> listData = getAllDataPage(pageSize, tableName);
|
|
|
|
+ if (listData != null && listData.size() > 0) {
|
|
|
|
+// 批量更新处理后的数据
|
|
|
|
+ updateDatas(runExecutorService(listData), tableName);
|
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
|
+ System.out.println("处理单批次用时" + (endTime - startTime) / 1000 + "秒!");
|
|
|
|
+ if (listData.size() > 0) {
|
|
|
|
+ iterativeProcessing(pageSize, init, tableName);
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("<<<<<<<<------任务处理完成!");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("<<<<<<<<------任务处理完成!");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ String SqlStr = "update " + tableName + " set lat = null,lon = null,market = null, distinguish = null, street_town = null, residential_committee = null , return_address = null , standard_address = null ,match_level = null ,update_time = null where update_time is not null";
|
|
|
|
+ try {
|
|
|
|
+ int updateDataSum = DbConnection.getInstance().updateSql(SqlStr);
|
|
|
|
+ if (updateDataSum < 1) {
|
|
|
|
+ System.err.println("------updateError--------------------------------------" + SqlStr + "更新异常!");
|
|
|
|
+ }
|
|
|
|
+ System.out.println("<<<<<<<<------任务处理完成!");
|
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
|
+ System.out.println("处理单批次用时" + (endTime - startTime) / 1000 + "秒!");
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ System.err.println("更新异常" + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public List<YyskAddressStandardization> runExecutorService(List<YyskAddressStandardization> listData) {
|
|
|
|
+ // 创建线程池
|
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
|
+ for (YyskAddressStandardization item : listData) {
|
|
|
|
+ futures.add(executorService.submit(() -> {
|
|
|
|
+ List<String> addrList = new ArrayList<>();
|
|
|
|
+ if (item.getAddress() != null && !item.getAddress().isEmpty()) {
|
|
|
|
+ addrList.add(item.getAddress());
|
|
|
|
+ }
|
|
|
|
+ if (addrList.size() > 0) {
|
|
|
|
+ // TODO 开始查询
|
|
|
|
+ AddressResult addressResult = AddressQueryEngine.getInstance().commonSearchByName(addrList);
|
|
|
|
+ if (addressResult == null || addressResult.getData() == null || addressResult.getData().size() < 1) {
|
|
|
|
+ item.setMatchLevel("异常");
|
|
|
|
+ item.setReturnAddress("未匹配到符合规则的结果");
|
|
|
|
+ } else {
|
|
|
|
+ try {
|
|
|
|
+ for (AddressResult.ContentBean contentBean : addressResult.getData()) {
|
|
|
|
+ String resultAddrKey = contentBean.getAddress();
|
|
|
|
+// && AddressTools.isOtherDistrictThanShangHai(resultAddrKey)
|
|
|
|
+ if (resultAddrKey != null && contentBean.getLon() != null && contentBean.getLat() != null) {
|
|
|
|
+ String lng = contentBean.getLon() + "";
|
|
|
|
+ String lat = contentBean.getLat() + "";
|
|
|
|
+ if (contentBean.getAdname() != null && !contentBean.getAdname().isEmpty()) {
|
|
|
|
+ item.setStreetTown(contentBean.getAdname());
|
|
|
|
+ } else if (contentBean.getTownJson().getString("name") != null) {
|
|
|
|
+ item.setStreetTown(contentBean.getTownJson().getString("name"));
|
|
|
|
+ } else {
|
|
|
|
+ item.setStreetTown("");
|
|
|
|
+ }
|
|
|
|
+ if (contentBean.getCityname() != null && !contentBean.getCityname().isEmpty()) {
|
|
|
|
+ item.setMarket(contentBean.getCityname());
|
|
|
|
+ } else if (contentBean.getAdJson().getString("name") != null) {
|
|
|
|
+ item.setMarket(contentBean.getAdJson().getString("name"));
|
|
|
|
+ } else {
|
|
|
|
+ item.setMarket("");
|
|
|
|
+ }
|
|
|
|
+ item.setLat(lat);
|
|
|
|
+ item.setLon(lng);
|
|
|
|
+ item.setReturnAddress(resultAddrKey);
|
|
|
|
+ item.setMatchLevel(contentBean.getScore());
|
|
|
|
+ String oldAddress = contentBean.getSearchAddress().replaceAll(item.getMarket(), "").replaceAll(item.getDistinguish(), "").replaceAll(item.getStreetTown(), "");
|
|
|
|
+ if (contentBean.getCjJson() != null && contentBean.getCjJson().containsKey("所属居委")) {
|
|
|
|
+ item.setResidentialCommittee(contentBean.getCjJson().getString("所属居委"));
|
|
|
|
+ oldAddress = contentBean.getCjJson().getString("所属居委") + oldAddress;
|
|
|
|
+ }
|
|
|
|
+ item.setStandardAddress(item.getMarket() + item.getDistinguish() + item.getStreetTown() + oldAddress);
|
|
|
|
+ break;
|
|
|
|
+ } else {
|
|
|
|
+ item.setMatchLevel("异常");
|
|
|
|
+ item.setReturnAddress("结果处理异常");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ item.setMatchLevel("异常");
|
|
|
|
+ System.err.println("查询结果处理异常:" + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ item.setMatchLevel("否");
|
|
|
|
+ }
|
|
|
|
+ item.setUpdateTime(new Date());
|
|
|
|
+ }));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 等待所有任务完成
|
|
|
|
+ for (Future<?> future : futures) {
|
|
|
|
+ try {
|
|
|
|
+ future.get();
|
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
|
+ System.err.println("线程异常:" + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // 关闭线程池
|
|
|
|
+ executorService.shutdown();
|
|
|
|
+ return listData;
|
|
|
|
+ }
|
|
|
|
+}
|