import json import os import piexif from starearth import Filename, Scene, GridFactory from starearth import Nodata, Slicer from starearth.coordinate import EPSG, Coordinate from starearth.output.raw import OutputRAW from starearth.output.png import OutputPNG from starearth.output.jpeg import OutputJPEG from starearth.renderer.greyscale import RendererGreyscale from starearth.renderer.rgb import RendererRGB from starearth.sheet import Sheet from starearth.utils.general_utils import print_log def get_size(path): total_size = 0 for dirpath, dirnames, filenames in os.walk(path): for f in filenames: fp = os.path.join(dirpath, f) # skip if it is symbolic link if not os.path.islink(fp): total_size += os.path.getsize(fp) return total_size class Accessory: def __init__(self): pass def structure_exif_dict(self, properties): """构造一个exif_dict字典,这样不用写多条exif信息""" date = properties.get('date', '') date_time = properties.get('date_time', '') eoc = properties.get('eoc', '') eop = properties.get('eop', '') eoi = properties.get('eoi', '') res = properties.get('res', '') eopd = properties.get('eopd', '') eopc = properties.get('eopc', '') user = properties.get('user', '') disp = properties.get('disp', 0) bk1 = properties.get('bk1', '') bk2 = properties.get('bk2', '') bk3 = properties.get('bk3', '') exif_dict = {'Exif': {}, '0th': {}, 'Interop': {}, '1st': {}, 'thumbnail': None, 'GPS': {}} _time = date or date_time or 'defalut' description = '%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s' % (eoc, eop, eoi, res, eopd, eopc, user, bk1, bk2, bk3, disp) zeroth_ifd = { piexif.ImageIFD.ImageDescription: description.encode('unicode_escape').decode('ascii'), piexif.ImageIFD.DateTime: date_time, } add_exif_dict = {"0th": zeroth_ifd} # 使用0th这个key exif_dict = dict(exif_dict, **add_exif_dict) # 将构造的exif_dict添加到照片原有的exif_dict中 return piexif.dump(exif_dict), _time # 返回最终的exif_dicttile_content def get_coordinate(coordinates, all_coordinates): for coordinate in coordinates: if not isinstance(coordinate[0], list): all_coordinates.append(coordinate) pass else: get_coordinate(coordinate, all_coordinates) pass pass def calc_bbox(coordinates): longitude_max = -180 longitude_min = 180 latitude_max = -90 latitude_min = 90 # 获取所有单独的coordinate all_coordinate = [] get_coordinate(coordinates, all_coordinate) # 获取bbox范围 for coordinate in all_coordinate: longitude_max = max(longitude_max, coordinate[0]) longitude_min = min(longitude_min, coordinate[0]) latitude_max = max(latitude_max, coordinate[1]) latitude_min = min(latitude_min, coordinate[1]) bbox = [longitude_min, latitude_min, longitude_max, latitude_max] return bbox def slice \ ( input_file, tile_grid, tile_size, tmp_tiles, epsg=None, nodata=None, tile_format='png', min_zoom=None, max_zoom=None, enable_msmt=0, channels=None, lut=None, render_type=None ): sliceTiler = GridFactory.create(tile_grid) # 格式化 tile_format = tile_format.strip().lower() if render_type: render_type = render_type.strip().upper() # new_input_file = filesystem.resolve(input_file) filename = Filename(input_file) base_name = os.path.splitext(input_file)[0] geojsonl = base_name + '.geojsonl' scene = Scene() scene.load(filename) coordinate = None if sliceTiler.identifier == 'WGS1984Quad': pass elif sliceTiler.identifier == 'WebMercatorQuad': coordinate = Coordinate.EPSG_3857 elif sliceTiler.identifier == 'GoogleEarthQuad': coordinate = Coordinate.EPSG_4326 # 瓦片成果是否携带时相信息 msmt_accessory = Accessory() if enable_msmt else None # min_zoom, max_zoom = 12, 13 # 测试用,手动设置一个级别范围 print_log('min-max:%s-%s' % (min_zoom, max_zoom)) # star_time = time.time() # 选择不同的渲染类型 if render_type == 'RGB': # 彩色渲染 renderer = RendererRGB(scene.datatype, channels, nodata) elif render_type == 'GREYSCALE': # 灰度图的渲染 renderer = RendererGreyscale(scene.datatype, lut, nodata) else: renderer = None if tile_format == 'png': output = OutputPNG \ ( tmp_tiles, msmt_accessory, renderer=renderer ) pass elif tile_format == 'jpeg': output = OutputJPEG \ ( tmp_tiles, msmt_accessory, renderer=renderer ) else: output = OutputRAW \ ( tmp_tiles, msmt_accessory, nodata=Nodata([nodata for _ in range(scene.bands)]), ) square = tile_size slicer = Slicer(scene, square, output, sliceTiler) sheets = None if filename.geojsonl: sheets = Sheet.parse_geojsonl(filename.geojsonl) if isinstance(epsg, str): epsg_id = int(epsg.split(':')[-1]) else: epsg_id = epsg slicer.slice(min_zoom, max_zoom, epsg_id, sheets, coordinate=coordinate) # slicer._slice_sp(min_zoom, max_zoom, epsg_id, sheets) # 瓦片列表写入文件 with open(geojsonl, 'r') as f: new_feature = json.loads(f.read()) date = new_feature['properties']['date'] tiles = sliceTiler.tiles([new_feature], min_zoom, max_zoom) tiles_list_json = tmp_tiles.path(date).resolve('tiles_list.json') exist_tiles_list_json = [] if os.path.exists(tiles_list_json): with open(tiles_list_json, 'r', encoding='utf-8') as fp: exist_tiles_list_json = json.load(fp) with open(tiles_list_json, 'w') as fp: if exist_tiles_list_json: for tile in exist_tiles_list_json: if tile not in tiles: tiles.append(tile) json.dump(tiles, fp) # tiles_nums = 0 # tiles_nums += len(tiles) # end_time = time.time() # print_log('切片数量为:{} 个'.format(tiles_nums)) # print_log("切片用时: {}'s".format(end_time - star_time))