Source code for datadings.sets.InriaBuildings_write

"""Create InriaBuildings data set files.

This tool will look for the unpacked "AerialImageDataset"
directory in the input directory.

See also:
    https://project.inria.fr/aerialimagelabeling/contest/

Note:
    Registration is required to download this dataset. Please visit the
    website and follow the instructions to download and decompress it.

Important:
    Samples are NOT SHUFFLED! It is recommended to use the datadings-shuffle
    command to create a shuffled copy.
"""
import os.path as pt
import io

import numpy as np
from simplejpeg import encode_jpeg
from PIL import Image

from ..writer import FileWriter
from . import ImageData
from . import ImageSegmentationData
from .InriaBuildings import CROP_SIZE
from ..tools import split_array
from ..tools import tiff_to_nd_array
from ..tools import document_keys


__doc__ += document_keys(ImageSegmentationData)


[docs]def array2imagedata(array): if array.dtype == np.bool: bio = io.BytesIO() img = Image.fromarray(array, '1') img.save(bio, 'PNG', optimize=True) return bio.getvalue() else: array = np.ascontiguousarray(array.transpose((1, 2, 0))) return encode_jpeg(array, quality=95)
[docs]def write(writer, img, labels, filename=""): if labels is not None: writer.write(ImageSegmentationData( filename, array2imagedata(img), array2imagedata(labels), )) else: writer.write(ImageData( filename, array2imagedata(img), ))
[docs]def images_and_labels_iter(img_dir, label_dir, locations, ids): for location in locations: for id in ids: filename = "%s%s.tif" % (location, id) img_path = pt.join(img_dir, filename) train_img = tiff_to_nd_array(img_path, type=np.uint8) labels = None if label_dir is not None: label_path = pt.join(label_dir, filename) labels = tiff_to_nd_array(label_path, type=np.uint8) labels = labels[0] == 255 yield filename, train_img, labels
[docs]def write_sets(indir, outdir, args, crop_size=(CROP_SIZE, CROP_SIZE)): dataset_dir = pt.join(indir, 'AerialImageDataset') train_dir = pt.join(dataset_dir, 'train') test_dir = pt.join(dataset_dir, 'test') train_img_dir = pt.join(train_dir, 'images') test_img_dir = pt.join(test_dir, 'images') train_gt_dir = pt.join(train_dir, 'gt') train_locations = ["vienna", "kitsap", "tyrol-w", "chicago", "austin"] test_locations = ["bellingham", "bloomington", "innsbruck", "sfo", "tyrol-e"] # Training-Split -> give whole image ids = range(6, 37) total = len(train_locations) * len(ids) train_file = pt.join(outdir, 'train.msgpack') try: with FileWriter(train_file, total=total, overwrite=args.no_confirm) as writer: for fn, img, labels in images_and_labels_iter( train_img_dir, train_gt_dir, train_locations, ids ): write(writer, img, labels, fn) except FileExistsError: pass # Put first 5 images into the validation set, as in the paper # https://hal.inria.fr/hal-01468452/document # Validation-Split -> give splitted images test_file = pt.join(outdir, 'val.msgpack') ids = range(1, 6) crop_h, crop_w = crop_size crops = (5000 // crop_h) * (5000 // crop_w) total = len(train_locations) * len(ids) * crops try: with FileWriter(test_file, total=total, overwrite=args.no_confirm) as writer: for fn, img, labels in images_and_labels_iter( train_img_dir, train_gt_dir, train_locations, ids ): labels = np.expand_dims(labels, axis=0) gen = enumerate(zip( split_array(img, *crop_size), split_array(labels, *crop_size) )) for idx, (sub_img, sub_label) in gen: _, w, h = sub_img.shape if w < crop_w or h < crop_h: # reject small patches continue write(writer, sub_img, sub_label[0], "%s_%s" % (fn, idx)) except FileExistsError: pass # Test-Split -> give splitted images without labels test_file = pt.join(outdir, 'test.msgpack') ids = range(1, 37) crops = (5000 // crop_h) * (5000 // crop_w) total = len(test_locations) * len(ids) * crops try: with FileWriter(test_file, total=total, overwrite=args.no_confirm) as writer: for fn, img, labels in images_and_labels_iter( test_img_dir, None, test_locations, ids ): gen = enumerate(split_array(img, *crop_size)) for idx, sub_img in gen: _, w, h = sub_img.shape if w < crop_w or h < crop_h: # reject small patches continue write(writer, sub_img, None, "%s_%s" % (fn, idx)) except FileExistsError: pass
[docs]def main(): from ..tools.argparse import make_parser parser = make_parser(__doc__, skip_verification=False, shuffle=False) args = parser.parse_args() outdir = args.outdir or args.indir write_sets(args.indir, outdir, args)
if __name__ == '__main__': try: main() except KeyboardInterrupt: pass finally: print()