import json import logging import os from itertools import product from math import pi, cos, sin from random import random from uuid import uuid4 import geopandas as gpd import pandas as pd from PIL import Image, ImageDraw from detectron2.structures import BoxMode from shapely.geometry import box, Polygon, MultiPolygon # To enable loading big images Image.MAX_IMAGE_PIXELS = None # Setup logging logging.basicConfig(format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s') logger = logging.getLogger("Data Preparation") logger.setLevel(logging.DEBUG) def generate_circle( center_x=0, center_y=0, r=20, n=32, ): """ This function creates an array of `n` xy points that are evenly spaced on a circle. The circle is defined by its center point at ( `center_x` | `center_y` ) and the radius `r`. The result is given in xyxy format. :param center_x: Center's x coordinate :param center_y: Center's y coordinate :param r: Radius :param n: Number of points on the circle :return: List of xy-coordinates on the circle """ points = [ ( round(cos(2 * pi / n * x) * r + center_x, 5), round(sin(2 * pi / n * x) * r + center_y, 5), ) for x in range(n) ] points = [p for x in points for p in x] return points def crop_image( filename, clean_image_dir, d=1000, save_empty=True, ): name, ext = os.path.splitext(filename.split(os.path.sep)[-1]) logger.debug("Opening image") img = Image.open(filename) logger.debug("Image opened") w, h = img.size os.makedirs(clean_image_dir, exist_ok=True) grid = product(range(0, h - h % d, d), range(0, w - w % d, d)) for i, j in grid: view_box = (j, i, j + d, i + d) part = img.crop(view_box) colors = part.getcolors() if not save_empty and colors is not None and len(colors) <= 1: continue image_out_path = os.path.join(clean_image_dir, f'{name}_{i}_{j}{ext}') part.save(image_out_path) logger.debug("done") def westend( filename, clean_image_dir, d=1000, r=20, save_clean_images=False, marked_image_dir=None, training_object=None, ): category_translations = { 'Asphalt': "Festweg", 'Bepflanzte_Flaechen': "Baumbestand", 'Beton': "Festweg", 'Noppenpflaster': "Pflaster", 'Pflaster': "Pflaster", 'Platten': "Pflaster", 'Rasen': "Wiese", 'Rasengittersteine': "Pflaster", 'Rippenpflaster': "Pflaster", 'Sand': "Festweg", 'unbefestigt': "Festweg", } reference_points = pd.read_csv("../data/Referenzpunkte.csv") name, ext = os.path.splitext(filename.split(os.path.sep)[-1]) img = Image.open(filename) w, h = img.size corners = reference_points.head(4) left = min(corners["X"]) right = max(corners["X"]) bottom = min(corners["Y"]) top = max(corners["Y"]) x_scale = (right - left) / w y_scale = (bottom - top) / h draw = ImageDraw.Draw(img) # gullys = gpd.read_file("../data/GeoJson/Kanaldeckel_im_Bild.json") gullys["X_"] = gullys["X"].apply(lambda x: int((x - left) / x_scale)) gullys["Y_"] = gullys["Y"].apply(lambda y: int((y - top) / y_scale)) # flaechenbelaege = gpd.read_file("../data/GeoJson/Flaechenbelaege.json") if training_object is None: training_object = dict( train_images=[], test_images=[], categories=[], ) train_images = training_object["train_images"] test_images = training_object["test_images"] categories = training_object["categories"] grid = product(range(0, h - h % d, d), range(0, w - w % d, d)) for i, j in grid: if random() < 0.8: images = train_images else: images = test_images view_box = (j, i, j + d, i + d) colors = img.crop(view_box).getcolors() if colors is not None and len(colors) <= 1: # Skip empty and single-colour images continue image_out_path = os.path.join(clean_image_dir, f'{name}_{i}_{j}{ext}') if save_clean_images: img.crop(view_box).save(image_out_path) annotations = [] # for _, row in flaechenbelaege.iterrows(): kind = row["Art"] category_name = category_translations.get(kind, "") first_obj: Polygon = row["geometry"] scaled = list(map( lambda xy: ((xy[0] - left) / x_scale, (xy[1] - top) / y_scale), first_obj.exterior.coords )) belag = Polygon(scaled) intersection = belag.intersection(box(*view_box)) if not intersection.bounds: continue if isinstance(intersection, MultiPolygon): intersects = intersection.geoms else: intersects = [intersection] for intersect in intersects: assert isinstance(intersect, Polygon) global_segmentation = list(intersect.exterior.coords) global_boundaries = intersect.bounds if marked_image_dir is not None: category_id = categories.index(category_name) draw.polygon( global_segmentation, fill=( 255 * (category_id & 1 << 2 > 0), 255 * (category_id & 1 << 1 > 0), 255 * (category_id & 1 << 0 > 0), ), ) local_segmentation = [x for xs in global_segmentation for x in xs] local_segmentation = [[ x - (i * (c % 2)) - (j * (1 - c % 2)) for c, x in enumerate(local_segmentation) ]] local_boundaries = [ x - (i * (c % 2)) - (j * (1 - c % 2)) for c, x in enumerate(global_boundaries) ] annotations.append(dict( category_id=categories.index(category_name), category_name=category_name, ignore=0, iscrowd=0, bbox=local_boundaries, bbox_mode=BoxMode.XYXY_ABS, segmentation=local_segmentation, )) # # gullys_in_box = gullys[ (j <= gullys["X_"]) & (gullys["X_"] <= j + d) & (i <= gullys["Y_"]) & (gullys["Y_"] <= i + d) ][["X_", "Y_"]] gullys_in_box["x"] = (gullys_in_box["X_"] - j) / 1 gullys_in_box["y"] = (gullys_in_box["Y_"] - i) / 1 if len(gullys_in_box): for _, row in gullys_in_box.iterrows(): circle = generate_circle( center_x=row["x"], center_y=row["y"], r=r, ) circle = [min(max(0., p), d) for p in circle] annotations.append(dict( category_id=categories.index("Gullydeckel"), category_name="Gullydeckel", ignore=0, iscrowd=0, bbox=[ max(row["x"] - r, 0), max(row["y"] - r, 0), min(row["x"] + r, d), min(row["y"] + r, d), ], bbox_mode=BoxMode.XYXY_ABS, segmentation=[circle], )) if marked_image_dir is not None: draw.ellipse( (row["X_"] - r, row["Y_"] - r, row["X_"] + r, row["Y_"] + r), fill=( 255 * (5 & 1 << 2 > 0), 255 * (5 & 1 << 1 > 0), 255 * (5 & 1 << 0 > 0), ), ) # if annotations: images.append( dict( image_id=str(uuid4()), width=d, height=d, file_name=image_out_path, annotations=annotations, ) ) return training_object def label_studio( json_file, image_dir, training_object=None, train_split=0.8, marked_image_dir=None, ): colors = { "Festweg": "grey", "Gullydeckel": "red", "Wiese": "green", "Baumbestand": "brown", "Pflaster": "orange", "Wasser": "blue", } if training_object is None: training_object = dict( train_images=[], test_images=[], categories=[], ) with open(json_file, "r") as file: data = json.load(file) train_images = training_object["train_images"] test_images = training_object["test_images"] categories = training_object["categories"] for obj in data: if random() < train_split: images = train_images else: images = test_images annotations = [] filename = obj["data"]["image"].split("=")[1] # Might fail if filename has = in it ext = os.path.splitext(filename)[-1] if not ext.lower() in [".jpg", ".png", ".tif"]: logger.debug(f"Found extension '{ext}', skipping...") continue image_path = image_dir + filename image = Image.open(image_path) width, height = image.size draw = ImageDraw.Draw(image) for task in obj["annotations"]: for annotation in task["result"]: points = annotation["value"]["points"] for point in points: point[0] *= annotation["original_width"] / 100 point[1] *= annotation["original_height"] / 100 category_name = annotation["value"]["polygonlabels"][0] category_id = categories.index(category_name) polygon = Polygon(points) seg = [round(x, 4) for xs in polygon.exterior.coords for x in xs] bounds = [round(x, 4) for x in polygon.bounds] annotations.append(dict( category_name=category_name, ignore=0, iscrowd=0, bbox=bounds, bbox_mode=BoxMode.XYXY_ABS, segmentation=[seg], category_id=category_id, )) if marked_image_dir is not None: draw.polygon( polygon.exterior.coords, fill=colors.get(category_name, "#000") ) if marked_image_dir is not None: marked_path = os.path.join(marked_image_dir, filename) os.makedirs(os.path.dirname(marked_path), exist_ok=True) image.save(marked_path) if annotations: images.append( dict( image_id=str(uuid4()), width=width, height=height, file_name=image_path, annotations=annotations, ) ) return training_object if __name__ == '__main__': marked_images_dir = "../data/images/marked" categories = [ "Baumbestand", "Festweg", "Pflaster", "Wiese", "Wasser", "Gullydeckel", ] training_object = dict( train_images=[], test_images=[], categories=categories, ) logger.info("Converting data from WestendDOP2.tif") training_object = westend( filename="../data/images/westend/WestendDOP2.tif", clean_image_dir="../data/images/westend/cropped/", training_object=training_object, ) logger.info("Converting data from LabelStudio") training_object = label_studio( json_file="../data/json/project-4-at-2022-07-26-15-52-273e43ac.json", image_dir="../data/images/label_studio/", training_object=training_object, ) logger.info("Saving results to disk") with open(os.path.join("../data", "json", "_train_data.json"), "w") as json_file: json.dump(training_object, json_file, indent=2) logger.info("Done.")