Mask3Dをカスタムデータで学習 -s3disデータセット作成2-【Python】

AI
スポンサーリンク
スポンサーリンク

はじめに

前回は RealSense を用いて点群データを作成し、アノテーションを実行しました。

今回は、アノテーションデータを s3disデータセット用に変換していきます。

前提条件

前提条件は以下の通りです。

  • Windows11
  • Realsense
  • Python3.10

必要なライブラリのインストール

前処理+カスタムデータ作成に必要な機能は以下の通りです。

pip install numpy fire loguru natsort pyyaml tqdm joblib 

データ準備

mkdir s3dis_workspace
cd work_space
touch ply2txt.py
touch base_preprocessing.py
touch s3dis_preprocessing_custom_windows.py
mkdir table Annotations s3dis CustomDataset
cd CustomDataset
mkdir -p Area_1/table_1
mkdir -p Area_2/table_1

今作成した s3dis_workspace/table フォルダに

  • keyboard_1.ply
  • rip_1.ply
  • other_1.ply

をコピーしておいてください。

ply2txt.ply

import glob
import os

if __name__ == "__main__":
    # ファイル読み込み
    folder_path = glob.glob("table/*.ply")
    data_list = []
    for file_path in folder_path:
        basename = os.path.basename(file_path)
        print(basename)
        f = open(file_path, 'r')
        # data = [s.rstrip() for s in f.readlines()]
        data = f.readlines()
        f.close()

        # ply ファイルのヘッダーを削除
        data = data[15:]
        edit_data = []
        # 各データを順次処理
        for d in data:
            d = d.split()
            # 不要部分を削除
            d = d[:-3]
            # RGB 情報を浮動小数点6桁に
            d[3] = d[3] + ".000000"
            d[4] = d[4] + ".000000"
            d[5] = d[5] + ".000000"
            # 改行追加
            d.append("\n")

            # 作成したリスト形式のデータをスペース区切りの文字列に変換
            edit_data += ' '.join(d)
        data_list += edit_data
        # 部品データ作成
        f = open('Annotations/{}.txt'.format(basename[:-4]), 'w')
        f.writelines(edit_data)
        f.close()
        
    # 全体のデータ作成
    f = open('concat.txt', 'w')
    f.writelines(data_list)
    f.close()

上記を実行すると、table フォルダに concat.txt, table/Annotations フォルダに keyboard_1.txt, rip_1.txt, other_1.txt が作成されます。

s3dis/CustomDataset/Area_1/table_1 に今作成した Annotations フォルダをコピーし concat.txt を table_1.txt に変更しておいてください。

同様の手順を s3dis/CustomDataset/Area_2/table_1 でも実行しておいてください。

参考にしたプログラム

Mask3D の preprocessing を参考にしました。

base_preprocessing.py

import os
import sys
import re
import yaml
import json
import multiprocessing
from pathlib import Path
from hashlib import md5

import numpy as np
from fire import Fire
from tqdm import tqdm
from joblib import Parallel, delayed
from loguru import logger


class BasePreprocessing:
    def __init__(
        self,
        data_dir: str = "./data/raw/",
        save_dir: str = "./data/processed/",
        modes: tuple = ("train", "validation", "test"),
        n_jobs: int = -1,
    ):
        # self.data_dir = Path(data_dir)
        # self.save_dir = Path(save_dir)
        self.data_dir = data_dir
        self.save_dir = save_dir
        self.n_jobs = n_jobs
        self.modes = modes

        if not Path(self.data_dir).exists():
            logger.error("data folder doesn't exist")
            raise FileNotFoundError
        if Path(self.save_dir).exists() is False:
            self.save_dir.mkdir(parents=True, exist_ok=True)

        self.files = {}
        for data_type in self.modes:
            self.files.update({data_type: []})

    @logger.catch
    def preprocess(self):
        self.n_jobs = (
            multiprocessing.cpu_count() if self.n_jobs == -1 else self.n_jobs
        )
        for mode in self.modes:
            database = []
            logger.info(f"Tasks for {mode}: {len(self.files[mode])}")
            parallel_results = Parallel(n_jobs=self.n_jobs, verbose=10)(
                delayed(self.process_file)(file, mode)
                for file in self.files[mode]
            )
            for filebase in parallel_results:
                database.append(filebase)
            self.save_database(database, mode)
        self.fix_bugs_in_labels()
        self.joint_database()
        self.compute_color_mean_std(
            train_database_path=(self.save_dir +"/"+ "train_database.yaml")
        )

    def preprocess_sequential(self):
        for mode in self.modes:
            database = []
            for filepath in tqdm(self.files[mode], unit="file"):
                filebase = self.process_file(filepath, mode)
                database.append(filebase)
            self.save_database(database, mode)
        self.fix_bugs_in_labels()
        self.joint_database()
        self.compute_color_mean_std(
            train_database_path=(self.save_dir / "train_database.yaml")
        )

    def process_file(self, filepath, mode):
        """process_file.

        Args:
            filepath: path to the main file
            mode: typically train, test or validation

        Returns:
            filebase: info about file
        """
        raise NotImplementedError

    def make_instance_database_sequential(
        self,
        train_database_path: str = "./data/processed/train_database.yaml",
        mode="instance",
    ):
        train_database = self._load_yaml(train_database_path)
        instance_database = []
        for sample in tqdm(train_database):
            instance_database.append(self.extract_instance_from_file(sample))
        self.save_database(instance_database, mode=mode)

    @logger.catch
    def make_instance_database(
        self,
        train_database_path: str = "./data/processed/train_database.yaml",
        mode="instance",
    ):
        self.n_jobs = (
            multiprocessing.cpu_count() if self.n_jobs == -1 else self.n_jobs
        )
        train_database = self._load_yaml(train_database_path)
        instance_database = []
        logger.info(f"Files in database: {len(train_database)}")
        parallel_results = Parallel(n_jobs=self.n_jobs, verbose=10)(
            delayed(self.extract_instance_from_file)(sample)
            for sample in train_database
        )
        for filebase in parallel_results:
            instance_database.append(filebase)
        self.save_database(instance_database, mode=mode)

    def extract_instance_from_file(self, sample_from_database):
        points = np.load(sample_from_database["filepath"])
        labels = points[:, -2:]
        file_instances = []
        for instance_id in np.unique(labels[:, 1]):
            occupied_indices = np.isin(labels[:, 1], instance_id)
            instance_points = points[occupied_indices].copy()
            instance_classes = (
                np.unique(instance_points[:, 9]).astype(int).tolist()
            )

            hash_string = str(sample_from_database["filepath"]) + str(
                instance_id
            )
            hash_string = md5(hash_string.encode("utf-8")).hexdigest()
            instance_filepath = (
                self.save_dir / "instances" / f"{hash_string}.npy"
            )
            instance = {
                "classes": instance_classes,
                "instance_filepath": str(instance_filepath),
                "instance_size": len(instance_points),
                "original_file": str(sample_from_database["filepath"]),
            }
            if not instance_filepath.parent.exists():
                instance_filepath.parent.mkdir(parents=True, exist_ok=True)
            np.save(instance_filepath, instance_points.astype(np.float32))
            file_instances.append(instance)
        return file_instances

    def fix_bugs_in_labels(self):
        pass

    def compute_color_mean_std(
        self,
        train_database_path: str = "./data/processed/train_database.yaml",
    ):
        pass

    def save_database(self, database, mode):
        for element in database:
            self._dict_to_yaml(element)
        self._save_yaml(self.save_dir +"/"+ (mode + "_database.yaml"), database)

    def joint_database(self, train_modes=["train", "validation"]):
        joint_db = []
        for mode in train_modes:
            joint_db.extend(
                self._load_yaml(self.save_dir / (mode + "_database.yaml"))
            )
        self._save_yaml(
            self.save_dir / "train_validation_database.yaml", joint_db
        )

    @classmethod
    def _read_json(cls, path):
        with open(path) as f:
            file = json.load(f)
        return file

    @classmethod
    def _save_yaml(cls, path, file):
        with open(path, "w") as f:
            yaml.safe_dump(
                file, f, default_style=None, default_flow_style=False
            )

    @classmethod
    def _dict_to_yaml(cls, dictionary):
        if not isinstance(dictionary, dict):
            return
        for k, v in dictionary.items():
            if isinstance(v, dict):
                cls._dict_to_yaml(v)
            if isinstance(v, np.ndarray):
                dictionary[k] = v.tolist()
            if isinstance(v, Path):
                dictionary[k] = str(v)

    @classmethod
    def _load_yaml(cls, filepath):
        with open(filepath) as f:
            file = yaml.safe_load(f)
        return file


if __name__ == "__main__":
    Fire(BasePreprocessing)

s3dis_preprocessing_custom_windows.py

import os
import re

import numpy as np
from fire import Fire
from loguru import logger
from natsort import natsorted
from pathlib import Path

from base_preprocessing import BasePreprocessing


class S3DISPreprocessing(BasePreprocessing):
    def __init__(
        self,
        data_dir: str = "./s3dis/CustomDataSet",
        save_dir: str = "./data/processed/s3dis",
        modes: tuple = (
            "Area_1",
            "Area_2",
        ),
        n_jobs: int = -1,
    ):
        super().__init__(data_dir, save_dir, modes, n_jobs)

        self.class_map = {
            "keyboard": 0,
            "rip": 1,
            "other": 2,
        }

        self.color_map = [
            [0, 255, 0],  # ceiling
            [0, 0, 255],  # floor
            [0, 255, 255],  # wall
        ]  # clutter

        self.create_label_database()

        for mode in self.modes:
            filepaths = []
            for scene_path in [
                f.path for f in os.scandir(self.data_dir +"/"+ mode) if f.is_dir()
            ]:
                filepaths.append(scene_path)
            self.files[mode] = natsorted(filepaths)

    def create_label_database(self):
        label_database = dict()
        for class_name, class_id in self.class_map.items():
            label_database[class_id] = {
                "color": self.color_map[class_id],
                "name": class_name,
                "validation": True,
            }

        self._save_yaml(self.save_dir + "/label_database.yaml", label_database)
        return label_database

    def _buf_count_newlines_gen(self, fname):
        def _make_gen(reader):
            while True:
                b = reader(2**16)
                if not b:
                    break
                yield b

        with open(fname, "rb") as f:
            count = sum(buf.count(b"\n") for buf in _make_gen(f.raw.read))
        return count

    def process_file(self, filepath, mode):
        """process_file.

        Please note, that for obtaining segmentation labels ply files were used.

        Args:
            filepath: path to the main ply file
            mode: train, test or validation

        Returns:
            filebase: info about file
        """
        filebase = {
            "filepath": filepath,
            "scene": filepath.split("\\")[-1],
            "area": mode,
            "raw_filepath": str(filepath).replace('\\', "/"),
            "file_len": -1,
        }

        # scene_name = filepath.split("/")[-1]
        scene_name = filepath.split("\\")[-1]
        instance_counter = 0
        scene_points = []
        for instance in [
            f
            for f in os.scandir(
                self.data_dir +"/"+ mode +"/"+ scene_name +"/"+ "Annotations"
            )
            if f.name.endswith(".txt")
        ]:
            instance_class = self.class_map[instance.name.split("_")[0]]
            instance_points = np.loadtxt(instance.path)

            instance_normals = np.ones((instance_points.shape[0], 3))
            instance_class = np.array(instance_class).repeat(
                instance_points.shape[0]
            )[..., None]
            instance_id = np.array(instance_counter).repeat(
                instance_points.shape[0]
            )[..., None]

            instance_points = np.hstack(
                (
                    instance_points,
                    instance_normals,
                    instance_class,
                    instance_id,
                )
            )

            scene_points.append(instance_points)
            instance_counter += 1

        points = np.vstack(scene_points)
        pcd_size = self._buf_count_newlines_gen(f"{filepath}/{scene_name}.txt")
        if points.shape[0] != pcd_size:
            print(f"FILE SIZE DOES NOT MATCH FOR {filepath}/{scene_name}.txt")
            print(f"({points.shape[0]} vs. {pcd_size})")

        filebase["raw_segmentation_filepath"] = ""

        # add segment id as additional feature (DUMMY)
        points = np.hstack((points, np.ones(points.shape[0])[..., None]))
        points[:, [9, 10, -1]] = points[
            :, [-1, 9, 10]
        ]  # move segments after RGB

        gt_data = (points[:, -2] + 1) * 1000 + points[:, -1] + 1

        file_len = len(points)
        filebase["file_len"] = file_len

        processed_filepath = self.save_dir +"/"+ mode +"/"+ f"{scene_name}.npy"
        if not Path(processed_filepath).parent.exists():
            Path(processed_filepath).parent.mkdir(parents=True, exist_ok=True)
        np.save(processed_filepath, points.astype(np.float32))
        filebase["filepath"] = str(processed_filepath)

        processed_gt_filepath = (
            self.save_dir +"/"+ "instance_gt" +"/"+ mode +"/"+ f"{scene_name}.txt"
        )
        if not Path(processed_gt_filepath).parent.exists():
            Path(processed_gt_filepath).parent.mkdir(parents=True, exist_ok=True)
        np.savetxt(processed_gt_filepath, gt_data.astype(np.int32), fmt="%d")
        filebase["instance_gt_filepath"] = str(processed_gt_filepath)

        filebase["color_mean"] = [
            float((points[:, 3] / 255).mean()),
            float((points[:, 4] / 255).mean()),
            float((points[:, 5] / 255).mean()),
        ]
        filebase["color_std"] = [
            float(((points[:, 3] / 255) ** 2).mean()),
            float(((points[:, 4] / 255) ** 2).mean()),
            float(((points[:, 5] / 255) ** 2).mean()),
        ]
        return filebase

    def compute_color_mean_std(self, train_database_path: str = ""):
        area_database_paths = [
            f
            for f in os.scandir(self.save_dir)
            if f.name.startswith("Area_") and f.name.endswith(".yaml") and "color_mean_std" not in f.name
        ]
        print(area_database_paths)
        # for database_path in area_database_paths:
        #     database = self._load_yaml(database_path.path)
        #     color_mean, color_std = [], []
        #     for sample in database:
        #         color_std.append(sample["color_std"])
        #         color_mean.append(sample["color_mean"])

        #     color_mean = np.array(color_mean).mean(axis=0)
        #     color_std = np.sqrt(
        #         np.array(color_std).mean(axis=0) - color_mean**2
        #     )
        #     feats_mean_std = {
        #         "mean": [float(each) for each in color_mean],
        #         "std": [float(each) for each in color_std],
        #     }
        #     self._save_yaml(
        #         self.save_dir +"/"+ f"{database_path.name}_color_mean_std.yaml",
        #         feats_mean_std,
        #     )

        for database_path in area_database_paths:
            all_mean, all_std = [], []
            for let_out_path in area_database_paths:
                if database_path == let_out_path:
                    continue

                database = self._load_yaml(let_out_path.path)
                for sample in database:
                    all_std.append(sample["color_std"])
                    all_mean.append(sample["color_mean"])

            all_color_mean = np.array(all_mean).mean(axis=0)
            all_color_std = np.sqrt(
                np.array(all_std).mean(axis=0) - all_color_mean**2
            )
            feats_mean_std = {
                "mean": [float(each) for each in all_color_mean],
                "std": [float(each) for each in all_color_std],
            }
            file_path = database_path.name.replace("_database.yaml", "")
            
            self._save_yaml(
                self.save_dir +"/"+ f"{file_path}_color_mean_std.yaml",
                feats_mean_std,
            )

    @logger.catch
    def fix_bugs_in_labels(self):
        pass

    def joint_database(
        self,
        train_modes=(
            "Area_1",
            "Area_2",
            # "Area_3",
            # "Area_4",
            # "Area_5",
            # "Area_6",
        ),
    ):
        for mode in train_modes:
            joint_db = []
            for let_out in train_modes:
                if mode == let_out:
                    continue
                joint_db.extend(
                    self._load_yaml(
                        self.save_dir +"/"+ (let_out + "_database.yaml")
                    )
                )
                print(joint_db)
            self._save_yaml(
                self.save_dir +"/"+ f"train_{mode}_database.yaml", joint_db
            )

    def _parse_scene_subscene(self, name):
        scene_match = re.match(r"scene(\d{4})_(\d{2})", name)
        return int(scene_match.group(1)), int(scene_match.group(2))


if __name__ == "__main__":
    Fire(S3DISPreprocessing)

以下のように実行してください。

python3 s3dis_preprocessing_custom_windows.py

data/processed/s3dis に s3dis 形式のカスタムデータが保存されています。

データ準備は完了です!

おわりに

今回はここまでとします。

s3dis データセットが作成できれば Mask3D だけでなく他のデータセットにも対応が可能となります。

次回は実際に Mask3D で学習する方法について説明します。

コメント

タイトルとURLをコピーしました