394 lines
12 KiB
Python
394 lines
12 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
from itertools import product
|
|
from math import pi, cos, sin
|
|
from random import random
|
|
from uuid import uuid4
|
|
|
|
import geopandas as gpd
|
|
import pandas as pd
|
|
from PIL import Image, ImageDraw
|
|
from detectron2.structures import BoxMode
|
|
from shapely.geometry import box, Polygon, MultiPolygon
|
|
|
|
# To enable loading big images
|
|
Image.MAX_IMAGE_PIXELS = None
|
|
|
|
# Setup logging
|
|
logging.basicConfig(format='[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s')
|
|
logger = logging.getLogger("Data Preparation")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
def generate_circle(
|
|
center_x=0,
|
|
center_y=0,
|
|
r=20,
|
|
n=32,
|
|
):
|
|
"""
|
|
This function creates an array of `n` xy points that are evenly spaced on a circle.
|
|
The circle is defined by its center point at ( `center_x` | `center_y` ) and the radius `r`.
|
|
|
|
The result is given in xyxy format.
|
|
|
|
:param center_x: Center's x coordinate
|
|
:param center_y: Center's y coordinate
|
|
:param r: Radius
|
|
:param n: Number of points on the circle
|
|
:return: List of xy-coordinates on the circle
|
|
"""
|
|
points = [
|
|
(
|
|
round(cos(2 * pi / n * x) * r + center_x, 5),
|
|
round(sin(2 * pi / n * x) * r + center_y, 5),
|
|
)
|
|
for x in range(n)
|
|
]
|
|
points = [p for x in points for p in x]
|
|
return points
|
|
|
|
|
|
def crop_image(
|
|
filename,
|
|
clean_image_dir,
|
|
d=1000,
|
|
save_empty=True,
|
|
):
|
|
name, ext = os.path.splitext(filename.split(os.path.sep)[-1])
|
|
logger.debug("Opening image")
|
|
img = Image.open(filename)
|
|
logger.debug("Image opened")
|
|
w, h = img.size
|
|
|
|
os.makedirs(clean_image_dir, exist_ok=True)
|
|
grid = product(range(0, h - h % d, d), range(0, w - w % d, d))
|
|
for i, j in grid:
|
|
view_box = (j, i, j + d, i + d)
|
|
part = img.crop(view_box)
|
|
colors = part.getcolors()
|
|
if not save_empty and colors is not None and len(colors) <= 1:
|
|
continue
|
|
|
|
image_out_path = os.path.join(clean_image_dir, f'{name}_{i}_{j}{ext}')
|
|
part.save(image_out_path)
|
|
logger.debug("done")
|
|
|
|
|
|
def westend(
|
|
filename,
|
|
clean_image_dir,
|
|
d=1000,
|
|
r=20,
|
|
save_clean_images=False,
|
|
marked_image_dir=None,
|
|
training_object=None,
|
|
):
|
|
category_translations = {
|
|
'Asphalt': "Festweg",
|
|
'Bepflanzte_Flaechen': "Baumbestand",
|
|
'Beton': "Festweg",
|
|
'Noppenpflaster': "Pflaster",
|
|
'Pflaster': "Pflaster",
|
|
'Platten': "Pflaster",
|
|
'Rasen': "Wiese",
|
|
'Rasengittersteine': "Pflaster",
|
|
'Rippenpflaster': "Pflaster",
|
|
'Sand': "Festweg",
|
|
'unbefestigt': "Festweg",
|
|
}
|
|
|
|
reference_points = pd.read_csv("../data/Referenzpunkte.csv")
|
|
name, ext = os.path.splitext(filename.split(os.path.sep)[-1])
|
|
img = Image.open(filename)
|
|
w, h = img.size
|
|
corners = reference_points.head(4)
|
|
left = min(corners["X"])
|
|
right = max(corners["X"])
|
|
bottom = min(corners["Y"])
|
|
top = max(corners["Y"])
|
|
x_scale = (right - left) / w
|
|
y_scale = (bottom - top) / h
|
|
draw = ImageDraw.Draw(img)
|
|
|
|
# <editor-fold desc="Setup Gully">
|
|
gullys = gpd.read_file("../data/GeoJson/Kanaldeckel_im_Bild.json")
|
|
gullys["X_"] = gullys["X"].apply(lambda x: int((x - left) / x_scale))
|
|
gullys["Y_"] = gullys["Y"].apply(lambda y: int((y - top) / y_scale))
|
|
# </editor-fold>
|
|
|
|
flaechenbelaege = gpd.read_file("../data/GeoJson/Flaechenbelaege.json")
|
|
|
|
if training_object is None:
|
|
training_object = dict(
|
|
train_images=[],
|
|
test_images=[],
|
|
categories=[],
|
|
)
|
|
|
|
train_images = training_object["train_images"]
|
|
test_images = training_object["test_images"]
|
|
categories = training_object["categories"]
|
|
|
|
grid = product(range(0, h - h % d, d), range(0, w - w % d, d))
|
|
for i, j in grid:
|
|
if random() < 0.8:
|
|
images = train_images
|
|
else:
|
|
images = test_images
|
|
|
|
view_box = (j, i, j + d, i + d)
|
|
colors = img.crop(view_box).getcolors()
|
|
if colors is not None and len(colors) <= 1:
|
|
# Skip empty and single-colour images
|
|
continue
|
|
|
|
image_out_path = os.path.join(clean_image_dir, f'{name}_{i}_{j}{ext}')
|
|
if save_clean_images:
|
|
img.crop(view_box).save(image_out_path)
|
|
|
|
annotations = []
|
|
|
|
# <editor-fold desc="Beläge">
|
|
for _, row in flaechenbelaege.iterrows():
|
|
kind = row["Art"]
|
|
category_name = category_translations.get(kind, "")
|
|
first_obj: Polygon = row["geometry"]
|
|
|
|
scaled = list(map(
|
|
lambda xy: ((xy[0] - left) / x_scale, (xy[1] - top) / y_scale),
|
|
first_obj.exterior.coords
|
|
))
|
|
|
|
belag = Polygon(scaled)
|
|
intersection = belag.intersection(box(*view_box))
|
|
if not intersection.bounds:
|
|
continue
|
|
|
|
if isinstance(intersection, MultiPolygon):
|
|
intersects = intersection.geoms
|
|
else:
|
|
intersects = [intersection]
|
|
for intersect in intersects:
|
|
assert isinstance(intersect, Polygon)
|
|
|
|
global_segmentation = list(intersect.exterior.coords)
|
|
global_boundaries = intersect.bounds
|
|
if marked_image_dir is not None:
|
|
category_id = categories.index(category_name)
|
|
draw.polygon(
|
|
global_segmentation,
|
|
fill=(
|
|
255 * (category_id & 1 << 2 > 0),
|
|
255 * (category_id & 1 << 1 > 0),
|
|
255 * (category_id & 1 << 0 > 0),
|
|
),
|
|
)
|
|
|
|
local_segmentation = [x for xs in global_segmentation for x in xs]
|
|
local_segmentation = [[
|
|
x - (i * (c % 2)) - (j * (1 - c % 2))
|
|
for c, x in enumerate(local_segmentation)
|
|
]]
|
|
local_boundaries = [
|
|
x - (i * (c % 2)) - (j * (1 - c % 2))
|
|
for c, x in enumerate(global_boundaries)
|
|
]
|
|
|
|
annotations.append(dict(
|
|
category_id=categories.index(category_name),
|
|
category_name=category_name,
|
|
ignore=0,
|
|
iscrowd=0,
|
|
bbox=local_boundaries,
|
|
bbox_mode=BoxMode.XYXY_ABS,
|
|
segmentation=local_segmentation,
|
|
))
|
|
# </editor-fold>
|
|
|
|
# <editor-fold desc="Gully">
|
|
gullys_in_box = gullys[
|
|
(j <= gullys["X_"])
|
|
& (gullys["X_"] <= j + d)
|
|
& (i <= gullys["Y_"])
|
|
& (gullys["Y_"] <= i + d)
|
|
][["X_", "Y_"]]
|
|
gullys_in_box["x"] = (gullys_in_box["X_"] - j) / 1
|
|
gullys_in_box["y"] = (gullys_in_box["Y_"] - i) / 1
|
|
if len(gullys_in_box):
|
|
for _, row in gullys_in_box.iterrows():
|
|
circle = generate_circle(
|
|
center_x=row["x"],
|
|
center_y=row["y"],
|
|
r=r,
|
|
)
|
|
circle = [min(max(0., p), d) for p in circle]
|
|
annotations.append(dict(
|
|
category_id=categories.index("Gullydeckel"),
|
|
category_name="Gullydeckel",
|
|
ignore=0,
|
|
iscrowd=0,
|
|
bbox=[
|
|
max(row["x"] - r, 0),
|
|
max(row["y"] - r, 0),
|
|
min(row["x"] + r, d),
|
|
min(row["y"] + r, d),
|
|
],
|
|
bbox_mode=BoxMode.XYXY_ABS,
|
|
segmentation=[circle],
|
|
))
|
|
if marked_image_dir is not None:
|
|
draw.ellipse(
|
|
(row["X_"] - r, row["Y_"] - r, row["X_"] + r, row["Y_"] + r),
|
|
fill=(
|
|
255 * (5 & 1 << 2 > 0),
|
|
255 * (5 & 1 << 1 > 0),
|
|
255 * (5 & 1 << 0 > 0),
|
|
),
|
|
)
|
|
# </editor-fold>
|
|
|
|
if annotations:
|
|
images.append(
|
|
dict(
|
|
image_id=str(uuid4()),
|
|
width=d,
|
|
height=d,
|
|
file_name=image_out_path,
|
|
annotations=annotations,
|
|
)
|
|
)
|
|
|
|
return training_object
|
|
|
|
|
|
def label_studio(
|
|
json_file,
|
|
image_dir,
|
|
training_object=None,
|
|
train_split=0.8,
|
|
marked_image_dir=None,
|
|
):
|
|
colors = {
|
|
"Festweg": "grey",
|
|
"Gullydeckel": "red",
|
|
"Wiese": "green",
|
|
"Baumbestand": "brown",
|
|
"Pflaster": "orange",
|
|
"Wasser": "blue",
|
|
}
|
|
|
|
if training_object is None:
|
|
training_object = dict(
|
|
train_images=[],
|
|
test_images=[],
|
|
categories=[],
|
|
)
|
|
|
|
with open(json_file, "r") as file:
|
|
data = json.load(file)
|
|
|
|
train_images = training_object["train_images"]
|
|
test_images = training_object["test_images"]
|
|
categories = training_object["categories"]
|
|
|
|
for obj in data:
|
|
if random() < train_split:
|
|
images = train_images
|
|
else:
|
|
images = test_images
|
|
|
|
annotations = []
|
|
filename = obj["data"]["image"].split("=")[1] # Might fail if filename has = in it
|
|
ext = os.path.splitext(filename)[-1]
|
|
if not ext.lower() in [".jpg", ".png", ".tif"]:
|
|
logger.debug(f"Found extension '{ext}', skipping...")
|
|
continue
|
|
image_path = image_dir + filename
|
|
image = Image.open(image_path)
|
|
width, height = image.size
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
for task in obj["annotations"]:
|
|
for annotation in task["result"]:
|
|
points = annotation["value"]["points"]
|
|
for point in points:
|
|
point[0] *= annotation["original_width"] / 100
|
|
point[1] *= annotation["original_height"] / 100
|
|
category_name = annotation["value"]["polygonlabels"][0]
|
|
category_id = categories.index(category_name)
|
|
|
|
polygon = Polygon(points)
|
|
seg = [round(x, 4) for xs in polygon.exterior.coords for x in xs]
|
|
bounds = [round(x, 4) for x in polygon.bounds]
|
|
annotations.append(dict(
|
|
category_name=category_name,
|
|
ignore=0,
|
|
iscrowd=0,
|
|
bbox=bounds,
|
|
bbox_mode=BoxMode.XYXY_ABS,
|
|
segmentation=[seg],
|
|
category_id=category_id,
|
|
))
|
|
|
|
if marked_image_dir is not None:
|
|
draw.polygon(
|
|
polygon.exterior.coords,
|
|
fill=colors.get(category_name, "#000")
|
|
)
|
|
if marked_image_dir is not None:
|
|
marked_path = os.path.join(marked_image_dir, filename)
|
|
os.makedirs(os.path.dirname(marked_path), exist_ok=True)
|
|
image.save(marked_path)
|
|
|
|
if annotations:
|
|
images.append(
|
|
dict(
|
|
image_id=str(uuid4()),
|
|
width=width,
|
|
height=height,
|
|
file_name=image_path,
|
|
annotations=annotations,
|
|
)
|
|
)
|
|
|
|
return training_object
|
|
|
|
|
|
if __name__ == '__main__':
|
|
marked_images_dir = "../data/images/marked"
|
|
|
|
categories = [
|
|
"Baumbestand",
|
|
"Festweg",
|
|
"Pflaster",
|
|
"Wiese",
|
|
"Wasser",
|
|
"Gullydeckel",
|
|
]
|
|
training_object = dict(
|
|
train_images=[],
|
|
test_images=[],
|
|
categories=categories,
|
|
)
|
|
|
|
logger.info("Converting data from WestendDOP2.tif")
|
|
training_object = westend(
|
|
filename="../data/images/westend/WestendDOP2.tif",
|
|
clean_image_dir="../data/images/westend/cropped/",
|
|
training_object=training_object,
|
|
)
|
|
|
|
logger.info("Converting data from LabelStudio")
|
|
training_object = label_studio(
|
|
json_file="../data/json/project-4-at-2022-07-26-15-52-273e43ac.json",
|
|
image_dir="../data/images/label_studio/",
|
|
training_object=training_object,
|
|
)
|
|
|
|
logger.info("Saving results to disk")
|
|
with open(os.path.join("../data", "json", "_train_data.json"), "w") as json_file:
|
|
json.dump(training_object, json_file, indent=2)
|
|
logger.info("Done.")
|