MG-KI_Flaechenanalyse/source/prepare_training_data.py

394 lines
12 KiB
Python

import json
import logging
import os
from itertools import product
from math import pi, cos, sin
from random import random
from uuid import uuid4
import geopandas as gpd
import pandas as pd
from PIL import Image, ImageDraw
from detectron2.structures import BoxMode
from shapely.geometry import box, Polygon, MultiPolygon
# To enable loading big images
Image.MAX_IMAGE_PIXELS = None
# Setup logging
logging.basicConfig(format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
logger = logging.getLogger("Data Preparation")
logger.setLevel(logging.DEBUG)
def generate_circle(
center_x=0,
center_y=0,
r=20,
n=32,
):
"""
This function creates an array of `n` xy points that are evenly spaced on a circle.
The circle is defined by its center point at ( `center_x` | `center_y` ) and the radius `r`.
The result is given in xyxy format.
:param center_x: Center's x coordinate
:param center_y: Center's y coordinate
:param r: Radius
:param n: Number of points on the circle
:return: List of xy-coordinates on the circle
"""
points = [
(
round(cos(2 * pi / n * x) * r + center_x, 5),
round(sin(2 * pi / n * x) * r + center_y, 5),
)
for x in range(n)
]
points = [p for x in points for p in x]
return points
def crop_image(
filename,
clean_image_dir,
d=1000,
save_empty=True,
):
name, ext = os.path.splitext(filename.split(os.path.sep)[-1])
logger.debug("Opening image")
img = Image.open(filename)
logger.debug("Image opened")
w, h = img.size
os.makedirs(clean_image_dir, exist_ok=True)
grid = product(range(0, h - h % d, d), range(0, w - w % d, d))
for i, j in grid:
view_box = (j, i, j + d, i + d)
part = img.crop(view_box)
colors = part.getcolors()
if not save_empty and colors is not None and len(colors) <= 1:
continue
image_out_path = os.path.join(clean_image_dir, f'{name}_{i}_{j}{ext}')
part.save(image_out_path)
logger.debug("done")
def westend(
filename,
clean_image_dir,
d=1000,
r=20,
save_clean_images=False,
marked_image_dir=None,
training_object=None,
):
category_translations = {
'Asphalt': "Festweg",
'Bepflanzte_Flaechen': "Baumbestand",
'Beton': "Festweg",
'Noppenpflaster': "Pflaster",
'Pflaster': "Pflaster",
'Platten': "Pflaster",
'Rasen': "Wiese",
'Rasengittersteine': "Pflaster",
'Rippenpflaster': "Pflaster",
'Sand': "Festweg",
'unbefestigt': "Festweg",
}
reference_points = pd.read_csv("../data/Referenzpunkte.csv")
name, ext = os.path.splitext(filename.split(os.path.sep)[-1])
img = Image.open(filename)
w, h = img.size
corners = reference_points.head(4)
left = min(corners["X"])
right = max(corners["X"])
bottom = min(corners["Y"])
top = max(corners["Y"])
x_scale = (right - left) / w
y_scale = (bottom - top) / h
draw = ImageDraw.Draw(img)
# <editor-fold desc="Setup Gully">
gullys = gpd.read_file("../data/GeoJson/Kanaldeckel_im_Bild.json")
gullys["X_"] = gullys["X"].apply(lambda x: int((x - left) / x_scale))
gullys["Y_"] = gullys["Y"].apply(lambda y: int((y - top) / y_scale))
# </editor-fold>
flaechenbelaege = gpd.read_file("../data/GeoJson/Flaechenbelaege.json")
if training_object is None:
training_object = dict(
train_images=[],
test_images=[],
categories=[],
)
train_images = training_object["train_images"]
test_images = training_object["test_images"]
categories = training_object["categories"]
grid = product(range(0, h - h % d, d), range(0, w - w % d, d))
for i, j in grid:
if random() < 0.8:
images = train_images
else:
images = test_images
view_box = (j, i, j + d, i + d)
colors = img.crop(view_box).getcolors()
if colors is not None and len(colors) <= 1:
# Skip empty and single-colour images
continue
image_out_path = os.path.join(clean_image_dir, f'{name}_{i}_{j}{ext}')
if save_clean_images:
img.crop(view_box).save(image_out_path)
annotations = []
# <editor-fold desc="Beläge">
for _, row in flaechenbelaege.iterrows():
kind = row["Art"]
category_name = category_translations.get(kind, "")
first_obj: Polygon = row["geometry"]
scaled = list(map(
lambda xy: ((xy[0] - left) / x_scale, (xy[1] - top) / y_scale),
first_obj.exterior.coords
))
belag = Polygon(scaled)
intersection = belag.intersection(box(*view_box))
if not intersection.bounds:
continue
if isinstance(intersection, MultiPolygon):
intersects = intersection.geoms
else:
intersects = [intersection]
for intersect in intersects:
assert isinstance(intersect, Polygon)
global_segmentation = list(intersect.exterior.coords)
global_boundaries = intersect.bounds
if marked_image_dir is not None:
category_id = categories.index(category_name)
draw.polygon(
global_segmentation,
fill=(
255 * (category_id & 1 << 2 > 0),
255 * (category_id & 1 << 1 > 0),
255 * (category_id & 1 << 0 > 0),
),
)
local_segmentation = [x for xs in global_segmentation for x in xs]
local_segmentation = [[
x - (i * (c % 2)) - (j * (1 - c % 2))
for c, x in enumerate(local_segmentation)
]]
local_boundaries = [
x - (i * (c % 2)) - (j * (1 - c % 2))
for c, x in enumerate(global_boundaries)
]
annotations.append(dict(
category_id=categories.index(category_name),
category_name=category_name,
ignore=0,
iscrowd=0,
bbox=local_boundaries,
bbox_mode=BoxMode.XYXY_ABS,
segmentation=local_segmentation,
))
# </editor-fold>
# <editor-fold desc="Gully">
gullys_in_box = gullys[
(j <= gullys["X_"])
& (gullys["X_"] <= j + d)
& (i <= gullys["Y_"])
& (gullys["Y_"] <= i + d)
][["X_", "Y_"]]
gullys_in_box["x"] = (gullys_in_box["X_"] - j) / 1
gullys_in_box["y"] = (gullys_in_box["Y_"] - i) / 1
if len(gullys_in_box):
for _, row in gullys_in_box.iterrows():
circle = generate_circle(
center_x=row["x"],
center_y=row["y"],
r=r,
)
circle = [min(max(0., p), d) for p in circle]
annotations.append(dict(
category_id=categories.index("Gullydeckel"),
category_name="Gullydeckel",
ignore=0,
iscrowd=0,
bbox=[
max(row["x"] - r, 0),
max(row["y"] - r, 0),
min(row["x"] + r, d),
min(row["y"] + r, d),
],
bbox_mode=BoxMode.XYXY_ABS,
segmentation=[circle],
))
if marked_image_dir is not None:
draw.ellipse(
(row["X_"] - r, row["Y_"] - r, row["X_"] + r, row["Y_"] + r),
fill=(
255 * (5 & 1 << 2 > 0),
255 * (5 & 1 << 1 > 0),
255 * (5 & 1 << 0 > 0),
),
)
# </editor-fold>
if annotations:
images.append(
dict(
image_id=str(uuid4()),
width=d,
height=d,
file_name=image_out_path,
annotations=annotations,
)
)
return training_object
def label_studio(
json_file,
image_dir,
training_object=None,
train_split=0.8,
marked_image_dir=None,
):
colors = {
"Festweg": "grey",
"Gullydeckel": "red",
"Wiese": "green",
"Baumbestand": "brown",
"Pflaster": "orange",
"Wasser": "blue",
}
if training_object is None:
training_object = dict(
train_images=[],
test_images=[],
categories=[],
)
with open(json_file, "r") as file:
data = json.load(file)
train_images = training_object["train_images"]
test_images = training_object["test_images"]
categories = training_object["categories"]
for obj in data:
if random() < train_split:
images = train_images
else:
images = test_images
annotations = []
filename = obj["data"]["image"].split("=")[1] # Might fail if filename has = in it
ext = os.path.splitext(filename)[-1]
if not ext.lower() in [".jpg", ".png", ".tif"]:
logger.debug(f"Found extension '{ext}', skipping...")
continue
image_path = image_dir + filename
image = Image.open(image_path)
width, height = image.size
draw = ImageDraw.Draw(image)
for task in obj["annotations"]:
for annotation in task["result"]:
points = annotation["value"]["points"]
for point in points:
point[0] *= annotation["original_width"] / 100
point[1] *= annotation["original_height"] / 100
category_name = annotation["value"]["polygonlabels"][0]
category_id = categories.index(category_name)
polygon = Polygon(points)
seg = [round(x, 4) for xs in polygon.exterior.coords for x in xs]
bounds = [round(x, 4) for x in polygon.bounds]
annotations.append(dict(
category_name=category_name,
ignore=0,
iscrowd=0,
bbox=bounds,
bbox_mode=BoxMode.XYXY_ABS,
segmentation=[seg],
category_id=category_id,
))
if marked_image_dir is not None:
draw.polygon(
polygon.exterior.coords,
fill=colors.get(category_name, "#000")
)
if marked_image_dir is not None:
marked_path = os.path.join(marked_image_dir, filename)
os.makedirs(os.path.dirname(marked_path), exist_ok=True)
image.save(marked_path)
if annotations:
images.append(
dict(
image_id=str(uuid4()),
width=width,
height=height,
file_name=image_path,
annotations=annotations,
)
)
return training_object
if __name__ == '__main__':
marked_images_dir = "../data/images/marked"
categories = [
"Baumbestand",
"Festweg",
"Pflaster",
"Wiese",
"Wasser",
"Gullydeckel",
]
training_object = dict(
train_images=[],
test_images=[],
categories=categories,
)
logger.info("Converting data from WestendDOP2.tif")
training_object = westend(
filename="../data/images/westend/WestendDOP2.tif",
clean_image_dir="../data/images/westend/cropped/",
training_object=training_object,
)
logger.info("Converting data from LabelStudio")
training_object = label_studio(
json_file="../data/json/project-4-at-2022-07-26-15-52-273e43ac.json",
image_dir="../data/images/label_studio/",
training_object=training_object,
)
logger.info("Saving results to disk")
with open(os.path.join("../data", "json", "_train_data.json"), "w") as json_file:
json.dump(training_object, json_file, indent=2)
logger.info("Done.")