计算所有业务空间, 在竖直方向上的面积重叠关系 ## 前置条件 只有有轮廓的业务空间才能参与计算 ``` ``` ## 处理方式 获取所有建筑, for循环获取每个建筑下所有的业务空间, 按楼层分类 每个楼层的每个业务空间分别和别的楼层的每个业务空间判断is_vertically_overlap 将结果是true的两个业务空间保存起来 删除旧业务空间的垂直交通关系(自动计算的), 添加新关系 ## 实现方式 # 函数 ``` create or replace function public.is_vertically_overlap(project_id character varying) returns boolean as $$ from shapely.geometry import Polygon import json # 获取Polygon对象 def get_polygon(single_poly): poly_len = len(single_poly) poly = [] for i in range(poly_len): pair = single_poly[i] poly.append((pair["X"], pair["Y"])) return Polygon(poly) # 在polygon1包含polygon2的时候, 检测是否polygon1内的空洞也包含polygon2 def is_include(polygon1, poly2): length1 = len(polygon1) for i in range(1, length1): poly1 = get_polygon(polygon1[i]) if poly1.overlaps(poly2): return True if poly1.equals(poly2) or poly1.contains(poly2): return False return True def is_sub_outline_overlap(polygon1, polygon2): poly1 = get_polygon(polygon1[0]) poly2 = get_polygon(polygon2[0]) if poly1.overlaps(poly2) or poly1.equals(poly2): return True if poly1.contains(poly2): return is_include(polygon1, poly2) if poly2.contains(poly1): return is_include(polygon2, poly1) return False # 是否垂直方向上面积有重叠 def is_vertically_overlap(polygon1, polygon2): length1 = len(polygon1) length2 = len(polygon2) if length1 == 0 or length2 == 0: return False for i in range(length1): for j in range(length2): if is_sub_outline_overlap(polygon1[i], polygon2[j]): return True return False # building -> floor -> object_type -> [space_id] def compose_dict(zone_data): building_map = dict() for row in zone_data: building_id = row['building_id'] floor_id = row['floor_id'] object_type = row['object_type'] if building_id not in building_map: building_map[building_id] = dict() floor_map = building_map[building_id] if floor_id not in floor_map: floor_map[floor_id] = dict() type_map = floor_map[floor_id] if object_type not in type_map: type_map[object_type] = [] arr = type_map[object_type] arr.append(row) return building_map try: # 获取所有建筑, for循环获取每个建筑下所有的业务空间, 按楼层分类 zone_plan = plpy.prepare("SELECT rel.space_id, fl.building_id, rel.floor_id, rel.object_type, sp.outline FROM r_sp_in_fl rel LEFT JOIN public.floor fl on fl.id = rel.floor_id left join zone_space_base sp on rel.space_id = sp.id where rel.project_id = $1 and sp.outline is not null", ["text"]) zone_data = zone_plan.execute([project_id]) if len(zone_data) <2: return True row_map = compose_dict(zone_data) space_outline_json_map = dict() result_arr = [] # 每个楼层的每个业务空间分别和别的楼层的每个业务空间判断is_vertically_overlap # 将结果是true的两个业务空间保存起来 for building_id, floor_map in row_map.items(): for floor_id, type_map in floor_map.items(): for object_type, row_arr in type_map.items(): # 要被对比的楼层 for other_floor_id in floor_map.keys(): if other_floor_id == floor_id: continue other_type_map = floor_map.get(other_floor_id) if object_type not in other_type_map: continue other_row_arr = other_type_map.get(object_type) for row in row_arr: for other_row in other_row_arr: space_id = row['space_id'] other_space_id = other_row['space_id'] if space_id == other_space_id: continue if space_id not in space_outline_json_map: outline_json = json.loads(row['outline']) space_outline_json_map[space_id] = outline_json if other_space_id not in space_outline_json_map: other_outline_json = json.loads(other_row['outline']) space_outline_json_map[other_space_id] = other_outline_json outline = space_outline_json_map[space_id] other_outline = space_outline_json_map[other_space_id] if is_vertically_overlap(outline, other_outline): single_result = [] single_result.append(space_id) single_result.append(other_space_id) single_result.append(object_type) result_arr.append(single_result) if len(result_arr) == 0: return True # 删除旧业务空间的垂直交通关系(自动计算的), 添加新关系 # 将下面对数据库的操作作为一个事务, 出异常则自动rollback with plpy.subtransaction(): del_plan = plpy.prepare("delete from r_sp_vertical_sp where project_id = $1 and sign = 2", ["text"]) del_plan.execute([project_id]) for single_result in result_arr: del_manual_plan = plpy.prepare("delete from r_sp_vertical_sp where (space_id = $1 and space_other_id = $2) or (space_other_id = $1 and space_id = $2)", ["text", "text"]) del_manual_plan.execute([single_result[0], single_result[1]]) insert_plan = plpy.prepare("insert into r_sp_vertical_sp(space_id, space_other_id, project_id, sign, object_type) values($1, $2, $3, 2, $4)", ["text", "text", "text", "text"]) insert_plan.execute([single_result[0], single_result[1], project_id, single_result[2]]) return True except Exception as e: plpy.info(e) return False $$ LANGUAGE 'plpython3u' VOLATILE; ``` ## 输入 1. 项目id ## 返回结果 true 成功 false 失败