物体の三次元姿勢推定 CenterSnap -pose_data_custom.pyの説明- 【Python】

AI
スポンサーリンク
スポンサーリンク

はじめに

前回は、create_nocs_results.py のプログラムについて説明しました。

今回はもう一つのプログラムである pose_data_custom.py について説明していきます。

前提条件

前提条件は以下の通りです。

プログラム再掲

pose_data_custom.py

import os
import sys
import glob
import cv2
import numpy as np
import _pickle as cPickle
from tqdm import tqdm
sys.path.append('../lib')
from lib.align import align_nocs_to_depth
from lib.utils import load_depth
import h5py

def create_img_list(data_dir):
    """ Create train/val/test data list for CAMERA and Real. """
    # CAMERA dataset
    for subset in ['train', 'val']:
        img_list = []
        img_dir = os.path.join(data_dir, 'CAMERA', subset)
        folder_list = [name for name in os.listdir(img_dir) if os.path.isdir(os.path.join(img_dir, name))]
        img_list_ = glob.glob(img_dir+"/000000/color/*.jpg")
        for i in range(len(img_list_)):
            folder_id = 0
            img_id = int(i)
            img_path = os.path.join(subset, '{:06d}'.format(folder_id))#, 'color/{:06d}.jpg'.format(img_id))
            img_list.append(img_path)
        with open(os.path.join(data_dir, 'CAMERA', subset+'_list_all.txt'), 'w') as f:
            for img_path in img_list:
                f.write("%s\n" % img_path)
    # Real dataset
    for subset in ['train', 'test']:
        img_list = []
        img_dir = os.path.join(data_dir, 'Real', subset)
        folder_list = [name for name in sorted(os.listdir(img_dir)) if os.path.isdir(os.path.join(img_dir, name))]
        for folder in folder_list:
            img_paths = glob.glob(os.path.join(img_dir, folder, 'color/*.jpg'))
            img_paths = sorted(img_paths)
            for img_full_path in img_paths:
                img_name = os.path.basename(img_full_path)
                img_ind = img_name.split('_')[0]
                img_path = os.path.join(subset, folder)#, 'color/{}'.format(img_ind))
                img_list.append(img_path)
        with open(os.path.join(data_dir, 'Real', subset+'_list_all.txt'), 'w') as f:
            for img_path in img_list:
                f.write("%s\n" % img_path)
    print('Write all data paths to file done!')


def process_data(img_path, depth, path_dict):
    """ Load instance masks for the objects in the image. """
    # mask_path = img_path + '_mask.png'
    mask_path = path_dict["mask"]
    mask = cv2.imread(mask_path)[:, :, 2]
    mask = np.array(mask, dtype=np.int32)
    all_inst_ids = sorted(list(np.unique(mask)))
    if all_inst_ids[-1] != 255:
        all_inst_ids.append(255)
    assert all_inst_ids[-1] == 255
    del all_inst_ids[-1]    # remove background
    num_all_inst = len(all_inst_ids)
    h, w = mask.shape

    # coord_path = img_path + '_coord.png'
    coord_path = path_dict["nocs"]
    with h5py.File(coord_path) as f:
      nocs_h5py = np.array(f["nocs"])

    # coord_map = cv2.imread(coord_path)[:, :, :3]
    # coord_map = coord_map[:, :, (2, 1, 0)]
    coord_map = nocs_h5py[:, :, :3]
    coord_map = coord_map[:, :, (2, 1, 0)]
    # flip z axis of coord map
    coord_map = np.array(coord_map, dtype=np.float32)#  / 255
    coord_map[:, :, 2] = 1 - coord_map[:, :, 2]

    class_ids = []
    instance_ids = []
    model_list = []
    masks = np.zeros([h, w, num_all_inst], dtype=np.uint8)
    coords = np.zeros((h, w, num_all_inst, 3), dtype=np.float32)
    bboxes = np.zeros((num_all_inst, 4), dtype=np.int32)

    meta_path = path_dict["meta"]
    # meta_path = img_path + '_meta.txt'
    with open(meta_path, 'r') as f:
        i = 0
        for line in f:
            line_info = line.strip().split(' ')
            inst_id = int(line_info[0])
            cls_id = int(line_info[1])
            # background objects and non-existing objects
            if cls_id == 0 or (inst_id not in all_inst_ids):
                continue
            if len(line_info) == 3:
                model_id = line_info[2]    # Real scanned objs
            else:
                model_id = line_info[3]    # CAMERA objs
            # remove one mug instance in CAMERA train due to improper model
            if model_id == 'b9be7cfe653740eb7633a2dd89cec754':
                continue
            # process foreground objects
            inst_mask = np.equal(mask, inst_id)
            # bounding box
            horizontal_indicies = np.where(np.any(inst_mask, axis=0))[0]
            vertical_indicies = np.where(np.any(inst_mask, axis=1))[0]
            assert horizontal_indicies.shape[0], print(img_path)
            x1, x2 = horizontal_indicies[[0, -1]]
            y1, y2 = vertical_indicies[[0, -1]]
            # x2 and y2 should not be part of the box. Increment by 1.
            x2 += 1
            y2 += 1
            # object occupies full image, rendering error, happens in CAMERA dataset
            if np.any(np.logical_or((x2-x1) > 700, (y2-y1) > 500)):
                print(x2-x1, y2-y1)
                return None, None, None, None, None, None
            # not enough valid depth observation
            final_mask = np.logical_and(inst_mask, depth > 0)
            if np.sum(final_mask) < 64:
                continue
            class_ids.append(cls_id)
            instance_ids.append(inst_id)
            model_list.append(model_id)
            masks[:, :, i] = inst_mask
            coords[:, :, i, :] = np.multiply(coord_map, np.expand_dims(inst_mask, axis=-1))
            bboxes[i] = np.array([y1, x1, y2, x2])
            i += 1
    # no valid foreground objects
    if i == 0:
        return None, None, None, None, None, None

    masks = masks[:, :, :i]
    coords = np.clip(coords[:, :, :i, :], 0, 1)
    bboxes = bboxes[:i, :]
    return masks, coords, class_ids, instance_ids, model_list, bboxes


def annotate_camera_train(data_dir):
    """ Generate gt labels for CAMERA train data. """
    camera_train = open(os.path.join(data_dir, 'CAMERA', 'train_list_all.txt')).read().splitlines()
    # intrinsics = np.array([[577.5, 0, 319.5], [0, 577.5, 239.5], [0, 0, 1]])
    intrinsics = np.array([[572.4, 0, 325.3], [0, 573.6, 242.0], [0, 0, 1.0]])
    # meta info for re-label mug category
    # with open(os.path.join(data_dir, 'obj_models/mug_meta.pkl'), 'rb') as f:
    #     mug_meta = cPickle.load(f)

    valid_img_list = []
    index = 0
    for img_path in tqdm(camera_train):
        path_dict = {}
        img_full_path = os.path.join(data_dir, 'CAMERA', img_path)
        depth_composed_path = "{:06}.png".format(index)
        path_dict["nocs"] = img_full_path + '/coord/{}.hdf5'.format(index)
        path_dict["meta"] = img_full_path + '/meta/{:04}_meta.txt'.format(index)
        path_dict["mask"] = img_full_path + '/mask_independent/{:06}.png'.format(index)
        path_dict["color"] = img_full_path + '/color/{:06}.jpg'.format(index)
        depth_full_path = os.path.join(data_dir,'camera_full_depths', depth_composed_path)
        all_exist = os.path.exists(img_full_path + '/color/{:06}.jpg'.format(index)) and \
                    os.path.exists(img_full_path + '/coord/{}.hdf5'.format(index)) and \
                    os.path.exists(img_full_path + '/depth/{:06}.png'.format(index)) and \
                    os.path.exists(img_full_path + '/mask/{:06}_mask.png'.format(index)) and \
                    os.path.exists(img_full_path + '/meta/{:04}_meta.txt'.format(index))
        index += 1
        # all_exist = os.path.exists(img_full_path + '_color.png') and \
        #             os.path.exists(img_full_path + '_coord.png') and \
        #             os.path.exists(img_full_path + '_depth.png') and \
        #             os.path.exists(img_full_path + '_mask.png') and \
        #             os.path.exists(img_full_path + '_meta.txt')
        if not all_exist:
            print("annotate_camera_train_path")
            continue
        # depth = load_depth(img_full_path)
        depth = load_depth(depth_full_path)
        masks, coords, class_ids, instance_ids, model_list, bboxes = process_data(img_full_path, depth, path_dict)
        if instance_ids is None:
            print("annotate_camera_train_path instance ids")
            continue
        # Umeyama alignment of GT NOCS map with depth image
        scales, rotations, translations, error_messages, _ = \
            align_nocs_to_depth(masks, coords, depth, intrinsics, instance_ids, img_path)
        if error_messages:
            print("annotate_camera_train_path error msg", error_messages)
            continue
        # re-label for mug category
        for i in range(len(class_ids)):
            pass
            # if class_ids[i] == 6:
            #     T0 = mug_meta[model_list[i]][0]
            #     s0 = mug_meta[model_list[i]][1]
            #     T = translations[i] - scales[i] * rotations[i] @ T0
            #     s = scales[i] / s0
            #     scales[i] = s
            #     translations[i] = T
        # write results
        gts = {}
        gts['class_ids'] = class_ids    # int list, 1 to 6
        gts['bboxes'] = bboxes  # np.array, [[y1, x1, y2, x2], ...]
        gts['scales'] = scales.astype(np.float32)  # np.array, scale factor from NOCS model to depth observation
        gts['rotations'] = rotations.astype(np.float32)    # np.array, R
        gts['translations'] = translations.astype(np.float32)  # np.array, T
        gts['instance_ids'] = instance_ids  # int list, start from 1
        gts['model_list'] = model_list  # str list, model id/name
        os.makedirs(img_full_path + "/pkl", exist_ok=True)
        with open(img_full_path + "/pkl/" + '{:06}_label.pkl'.format(index-1), 'wb') as f:
            cPickle.dump(gts, f)
        valid_img_list.append(img_path)
    # write valid img list to file
    with open(os.path.join(data_dir, 'CAMERA/train_list.txt'), 'w') as f:
        for img_path in valid_img_list:
            f.write("%s\n" % img_path)


def annotate_real_train(data_dir):
    """ Generate gt labels for Real train data through PnP. """
    real_train = open(os.path.join(data_dir, 'Real/train_list_all.txt')).read().splitlines()
    # intrinsics = np.array([[591.0125, 0, 322.525], [0, 590.16775, 244.11084], [0, 0, 1]])
    intrinsics = np.array([[572.4, 0, 325.3], [0, 573.6, 242.0], [0, 0, 1.0]])
    # scale factors for all instances
    scale_factors = {}
    # path_to_size = glob.glob(os.path.join(data_dir, 'obj_models/real_train', '*_norm.txt'))
    # path_to_size = glob.glob(os.path.join(data_dir, 'models_obj/real_train', '*.txt'))
    path_to_size = glob.glob(os.path.join(data_dir, 'models_obj', 'obj_000001_norm.txt'))
    for inst_path in sorted(path_to_size):
        instance = os.path.basename(inst_path).split('.')[0]
        bbox_dims = np.loadtxt(inst_path)
        scale_factors[instance] = np.linalg.norm(bbox_dims)
    # meta info for re-label mug category
    # with open(os.path.join(data_dir, 'obj_models/mug_meta.pkl'), 'rb') as f:
    #     mug_meta = cPickle.load(f)
    index = 0
    valid_img_list = []
    for img_path in tqdm(real_train):
        img_full_path = os.path.join(data_dir, 'Real', img_path)
        path_dict = {}
        depth_composed_path = "{:06}.png".format(index)
        path_dict["nocs"] = img_full_path + '/coord/{}.hdf5'.format(index)
        path_dict["meta"] = img_full_path + '/meta/{:04}_meta.txt'.format(index)
        path_dict["mask"] = img_full_path + '/mask_independent/{:06}.png'.format(index)
        path_dict["color"] = img_full_path + '/color/{:06}.jpg'.format(index)
        depth_full_path = os.path.join(data_dir,'camera_full_depths', depth_composed_path)
        all_exist = os.path.exists(img_full_path + '/color/{:06}.jpg'.format(index)) and \
                    os.path.exists(img_full_path + '/coord/{}.hdf5'.format(index)) and \
                    os.path.exists(img_full_path + '/depth/{:06}.png'.format(index)) and \
                    os.path.exists(img_full_path + '/mask/{:06}_mask.png'.format(index)) and \
                    os.path.exists(img_full_path + '/meta/{:04}_meta.txt'.format(index))
        index += 1
        # all_exist = os.path.exists(img_full_path + '_color.png') and \
        #             os.path.exists(img_full_path + '_coord.png') and \
        #             os.path.exists(img_full_path + '_depth.png') and \
        #             os.path.exists(img_full_path + '_mask.png') and \
        #             os.path.exists(img_full_path + '_meta.txt')
        if not all_exist:
            print("annotate_real_train pass")
            continue
        # depth = load_depth(img_full_path)
        depth = load_depth(depth_full_path)
        masks, coords, class_ids, instance_ids, model_list, bboxes = process_data(img_full_path, depth, path_dict)
        if instance_ids is None:
            continue
        # compute pose
        num_insts = len(class_ids)
        scales = np.zeros(num_insts)
        rotations = np.zeros((num_insts, 3, 3))
        translations = np.zeros((num_insts, 3))
        for i in range(num_insts):
            s = scale_factors["obj_00"+model_list[i]+"_norm"]
            mask = masks[:, :, i]
            idxs = np.where(mask)
            coord = coords[:, :, i, :]
            coord_pts = s * (coord[idxs[0], idxs[1], :] - 0.5)
            coord_pts = coord_pts[:, :, None]
            img_pts = np.array([idxs[1], idxs[0]]).transpose()
            img_pts = img_pts[:, :, None].astype(float)
            distCoeffs = np.zeros((4, 1))    # no distoration
            retval, rvec, tvec = cv2.solvePnP(coord_pts, img_pts, intrinsics, distCoeffs)
            assert retval
            R, _ = cv2.Rodrigues(rvec)
            T = np.squeeze(tvec)
            # re-label for mug category
            # if class_ids[i] == 6:
            #     T0 = mug_meta[model_list[i]][0]
            #     s0 = mug_meta[model_list[i]][1]
            #     T = T - s * R @ T0
            #     s = s / s0
            scales[i] = s
            rotations[i] = R
            translations[i] = T
        # write results
        gts = {}
        gts['class_ids'] = class_ids    # int list, 1 to 6
        gts['bboxes'] = bboxes  # np.array, [[y1, x1, y2, x2], ...]
        gts['scales'] = scales.astype(np.float32)  # np.array, scale factor from NOCS model to depth observation
        gts['rotations'] = rotations.astype(np.float32)    # np.array, R
        gts['translations'] = translations.astype(np.float32)  # np.array, T
        gts['instance_ids'] = instance_ids  # int list, start from 1
        gts['model_list'] = model_list  # str list, model id/name
        with open(img_full_path + '_label.pkl', 'wb') as f:
            cPickle.dump(gts, f)
        valid_img_list.append(img_path)
    # write valid img list to file
    with open(os.path.join(data_dir, 'Real/train_list.txt'), 'w') as f:
        for img_path in valid_img_list:
            f.write("%s\n" % img_path)


def annotate_test_data(data_dir):
    """ Generate gt labels for test data.
        Properly copy handle_visibility provided by NOCS gts.
    """
    # Statistics:
    # test_set    missing file     bad rendering    no (occluded) fg    occlusion (< 64 pts)
    #   val        3792 imgs        132 imgs         1856 (23) imgs      50 insts
    #   test       0 img            0 img            0 img               2 insts

    camera_val = open(os.path.join(data_dir, 'CAMERA', 'val_list_all.txt')).read().splitlines()
    real_test = open(os.path.join(data_dir, 'Real', 'test_list_all.txt')).read().splitlines()
    camera_intrinsics = np.array([[577.5, 0, 319.5], [0, 577.5, 239.5], [0, 0, 1]])
    real_intrinsics = np.array([[591.0125, 0, 322.525], [0, 590.16775, 244.11084], [0, 0, 1]])
    # compute model size
    model_file_path = ['models_obj/camera_val.pkl', 'models_obj/real_test.pkl']
    models = {}
    for path in model_file_path:
        with open(os.path.join(data_dir, path), 'rb') as f:
            models.update(cPickle.load(f))
    model_sizes = {}
    for key in models.keys():
        model_sizes[key] = 2 * np.amax(np.abs(models[key]), axis=0)
    # meta info for re-label mug category
    # with open(os.path.join(data_dir, 'obj_models/mug_meta.pkl'), 'rb') as f:
    #     mug_meta = cPickle.load(f)

    subset_meta = [('CAMERA', camera_val, camera_intrinsics, 'val'), ('Real', real_test, real_intrinsics, 'test')]
    index = 0
    for source, img_list, intrinsics, subset in subset_meta:
        valid_img_list = []
        for img_path in tqdm(img_list):
            img_full_path = os.path.join(data_dir, source, img_path)
            path_dict = {}
            depth_composed_path = "{:06}.png".format(index)
            path_dict["nocs"] = img_full_path + '/coord/{}.hdf5'.format(index)
            path_dict["meta"] = img_full_path + '/meta/{:04}_meta.txt'.format(index)
            path_dict["mask"] = img_full_path + '/mask_independent/{:06}.png'.format(index)
            path_dict["color"] = img_full_path + '/color/{:06}.jpg'.format(index)
            depth_full_path = os.path.join(data_dir,'camera_full_depths', depth_composed_path)
            all_exist = os.path.exists(img_full_path + '/color/{:06}.jpg'.format(index)) and \
                        os.path.exists(img_full_path + '/coord/{}.hdf5'.format(index)) and \
                        os.path.exists(img_full_path + '/depth/{:06}.png'.format(index)) and \
                        os.path.exists(img_full_path + '/mask/{:06}_mask.png'.format(index)) and \
                        os.path.exists(img_full_path + '/meta/{:04}_meta.txt'.format(index))
            index += 1
            # all_exist = os.path.exists(img_full_path + '_color.png') and \
            #             os.path.exists(img_full_path + '_coord.png') and \
            #             os.path.exists(img_full_path + '_depth.png') and \
            #             os.path.exists(img_full_path + '_mask.png') and \
            #             os.path.exists(img_full_path + '_meta.txt')
            if not all_exist:
                continue
            # depth = load_depth(img_full_path)
            depth = load_depth(depth_full_path)
            masks, coords, class_ids, instance_ids, model_list, bboxes = process_data(img_full_path, depth, path_dict)
            if instance_ids is None:
                continue
            num_insts = len(instance_ids)
            # match each instance with NOCS ground truth to properly assign gt_handle_visibility
            nocs_dir = os.path.join(os.path.dirname(data_dir), 'results/nocs_results')
            if source == 'CAMERA':
                nocs_path = os.path.join(nocs_dir, 'val', 'results_val_{}_{}.pkl'.format(
                    img_path.split('/')[-2], img_path.split('/')[-1]))
            else:
                nocs_path = os.path.join(nocs_dir, 'real_test', 'results_test_{}_{}.pkl'.format(
                    img_path.split('/')[-2], img_path.split('/')[-1]))
            with open(nocs_path, 'rb') as f:
                nocs = cPickle.load(f)
            gt_class_ids = nocs['gt_class_ids']
            gt_bboxes = nocs['gt_bboxes']
            gt_sRT = nocs['gt_RTs']
            gt_handle_visibility = nocs['gt_handle_visibility']
            map_to_nocs = []
            for i in range(num_insts):
                gt_match = -1
                for j in range(len(gt_class_ids)):
                    if gt_class_ids[j] != class_ids[i]:
                        continue
                    if np.sum(np.abs(bboxes[i] - gt_bboxes[j])) > 5:
                        continue
                    # match found
                    gt_match = j
                    break
                # check match validity
                assert gt_match > -1, print(img_path, instance_ids[i], 'no match for instance')
                assert gt_match not in map_to_nocs, print(img_path, instance_ids[i], 'duplicate match')
                map_to_nocs.append(gt_match)
            # copy from ground truth, re-label for mug category
            handle_visibility = gt_handle_visibility[map_to_nocs]
            sizes = np.zeros((num_insts, 3))
            poses = np.zeros((num_insts, 4, 4))
            scales = np.zeros(num_insts)
            rotations = np.zeros((num_insts, 3, 3))
            translations = np.zeros((num_insts, 3))
            for i in range(num_insts):
                gt_idx = map_to_nocs[i]
                sizes[i] = model_sizes[model_list[i]]
                sRT = gt_sRT[gt_idx]
                s = np.cbrt(np.linalg.det(sRT[:3, :3]))
                R = sRT[:3, :3] / s
                T = sRT[:3, 3]
                # re-label mug category
                if class_ids[i] == 6:
                    T0 = mug_meta[model_list[i]][0]
                    s0 = mug_meta[model_list[i]][1]
                    T = T - s * R @ T0
                    s = s / s0
                # used for test during training
                scales[i] = s
                rotations[i] = R
                translations[i] = T
                # used for evaluation
                sRT = np.identity(4, dtype=np.float32)
                sRT[:3, :3] = s * R
                sRT[:3, 3] = T
                poses[i] = sRT
            # write results
            gts = {}
            gts['class_ids'] = np.array(class_ids)    # int list, 1 to 6
            gts['bboxes'] = bboxes    # np.array, [[y1, x1, y2, x2], ...]
            gts['instance_ids'] = instance_ids    # int list, start from 1
            gts['model_list'] = model_list    # str list, model id/name
            gts['size'] = sizes   # 3D size of NOCS model
            gts['scales'] = scales.astype(np.float32)    # np.array, scale factor from NOCS model to depth observation
            gts['rotations'] = rotations.astype(np.float32)    # np.array, R
            gts['translations'] = translations.astype(np.float32)    # np.array, T
            gts['poses'] = poses.astype(np.float32)    # np.array
            gts['handle_visibility'] = handle_visibility    # handle visibility of mug
            with open(img_full_path + '_label.pkl', 'wb') as f:
                cPickle.dump(gts, f)
            valid_img_list.append(img_path)
        # write valid img list to file
        with open(os.path.join(data_dir, source, subset+'_list.txt'), 'w') as f:
            for img_path in valid_img_list:
                f.write("%s\n" % img_path)


if __name__ == '__main__':
    data_dir = '/path/to/makeNOCS/output_data/bop_data/lm/'
    # create list for all data
    create_img_list(data_dir)
    # annotate dataset and re-write valid data to list
    annotate_camera_train(data_dir)
    print("================== annotate camera train complete ===================")
    annotate_real_train(data_dir)
    print("================== annotate real train complete ===================")
    annotate_test_data(data_dir)
    print("================== annotate test data complete ===================")

プログラムの実行方法は以下です。

cd makeNOCS/object-deformnet/preprocess
python3 -m preprocess.pose_data_custom

プログラム説明

import os
import sys
import glob
import cv2
import numpy as np
import _pickle as cPickle
from tqdm import tqdm
sys.path.append('../lib')
from lib.align import align_nocs_to_depth
from lib.utils import load_depth
import h5py

create_img_list 関数

def create_img_list(data_dir):
    """ Create train/val/test data list for CAMERA and Real. """
    # CAMERA dataset
    for subset in ['train', 'val']:
        img_list = []
        img_dir = os.path.join(data_dir, 'CAMERA', subset)
        folder_list = [name for name in os.listdir(img_dir) if os.path.isdir(os.path.join(img_dir, name))]
        img_list_ = glob.glob(img_dir+"/000000/color/*.jpg")

create_img_list は、単純に画像ファイルの一覧を txt 形式で出力する関数です。

subset は CAMERA フォルダを見てもらえれば分かりますが、train, val が存在するのでループを回していきます。

img_list_ に該当する jpg 形式の画像を 取得します。

for i in range(len(img_list_)):
    folder_id = 0
    img_id = int(i)
    img_path = os.path.join(subset, '{:06d}'.format(folder_id))#, 'color/{:06d}.jpg'.format(img_id))
    img_list.append(img_path)

glob.glob で取得した画像はフルパスなので、学習用にmakeNOCS/output_data/bop_data/lm/CAMERA 以降のパスで画像を指定しておきます。

with open(os.path.join(data_dir, 'CAMERA', subset+'_list_all.txt'), 'w') as f:
    for img_path in img_list:
        f.write("%s\n" % img_path)

画像パスを .txt 形式で保存しておきます。

# Real dataset
for subset in ['train', 'test']:
    img_list = []
    img_dir = os.path.join(data_dir, 'Real', subset)
    folder_list = [name for name in sorted(os.listdir(img_dir)) if os.path.isdir(os.path.join(img_dir, name))]
    for folder in folder_list:
        img_paths = glob.glob(os.path.join(img_dir, folder, 'color/*.jpg'))
        img_paths = sorted(img_paths)
        for img_full_path in img_paths:
            img_name = os.path.basename(img_full_path)
            img_ind = img_name.split('_')[0]
            img_path = os.path.join(subset, folder)#, 'color/{}'.format(img_ind))
            img_list.append(img_path)
    with open(os.path.join(data_dir, 'Real', subset+'_list_all.txt'), 'w') as f:
        for img_path in img_list:
            f.write("%s\n" % img_path)
print('Write all data paths to file done!')

Real データセットも、CAMERAデータセットと同様に作成しておきます。

def process_data(img_path, depth, path_dict):
    """ Load instance masks for the objects in the image. """
    # mask_path = img_path + '_mask.png'
    mask_path = path_dict["mask"]
    mask = cv2.imread(mask_path)[:, :, 2]
    mask = np.array(mask, dtype=np.int32)

process_data 関数

次に、繰り返し使用する関数である process_data 関数について説明していきます。

引数は画像のパス、深度画像のパス、その他のパスの辞書変数(nocs, meta, mask, color) となります。

path_dict から mask のパス情報を読み出し、opencv で画像を読込みます。

all_inst_ids = sorted(list(np.unique(mask)))
if all_inst_ids[-1] != 255:
    all_inst_ids.append(255)
assert all_inst_ids[-1] == 255
del all_inst_ids[-1]    # remove background
num_all_inst = len(all_inst_ids)
h, w = mask.shape

深度画像のデータは、インスタンス毎に 0, 1, 2 …(最大255) となっていますので、順番にソートしておきます。

# coord_path = img_path + '_coord.png'
coord_path = path_dict["nocs"]
with h5py.File(coord_path) as f:
    nocs_h5py = np.array(f["nocs"])

次に、nocs データを処理していきます。nocs の h5 データを numpy 形式で変数に格納しておきます。

# coord_map = cv2.imread(coord_path)[:, :, :3]
# coord_map = coord_map[:, :, (2, 1, 0)]
coord_map = nocs_h5py[:, :, :3]
coord_map = coord_map[:, :, (2, 1, 0)]
# flip z axis of coord map
coord_map = np.array(coord_map, dtype=np.float32)#  / 255
coord_map[:, :, 2] = 1 - coord_map[:, :, 2]

コメントアウト部分は、デフォルトの pose_data.py です。
h5 ファイルではすでに 0 – 1 の間で正規化されているので、画素を 255 で割る必要はありません。

また、nocs データはz, y, x の順で並んでいるので、x, y, z に並び替えます。さらに 1-z を処理することで、座標を反転しておきます。

class_ids = []
instance_ids = []
model_list = []
masks = np.zeros([h, w, num_all_inst], dtype=np.uint8)
coords = np.zeros((h, w, num_all_inst, 3), dtype=np.float32)
bboxes = np.zeros((num_all_inst, 4), dtype=np.int32)

使用する空配列を作成しておきます。

meta_path = path_dict["meta"]
# meta_path = img_path + '_meta.txt'
with open(meta_path, 'r') as f:

画像ごとのアノテーション情報が記載されたmetaデータを読込みます。

i = 0
for line in f:
    line_info = line.strip().split(' ')
    inst_id = int(line_info[0])
    cls_id = int(line_info[1])

テキストの行ごとに処理していきます。

# background objects and non-existing objects
if cls_id == 0 or (inst_id not in all_inst_ids):
    continue
if len(line_info) == 3:
    model_id = line_info[2]    # Real scanned objs
else:
    model_id = line_info[3]    # CAMERA objs
# remove one mug instance in CAMERA train due to improper model
if model_id == 'b9be7cfe653740eb7633a2dd89cec754':
    continue

情報が欠けていないか確認します。

# process foreground objects
inst_mask = np.equal(mask, inst_id)

インスタンスIDに対応する画像のマスク部分を抽出します。

# bounding box
horizontal_indicies = np.where(np.any(inst_mask, axis=0))[0]
vertical_indicies = np.where(np.any(inst_mask, axis=1))[0]
assert horizontal_indicies.shape[0], print(img_path)
x1, x2 = horizontal_indicies[[0, -1]]
y1, y2 = vertical_indicies[[0, -1]]
# x2 and y2 should not be part of the box. Increment by 1.
x2 += 1
y2 += 1
if np.any(np.logical_or((x2-x1) > 700, (y2-y1) > 500)):
    print(x2-x1, y2-y1)
    return None, None, None, None, None, None

マスク画像からバウンディングボックスを作成します。

レンダリングの画像サイズからはみ出る場合は、無視します。

# not enough valid depth observation
final_mask = np.logical_and(inst_mask, depth > 0)
if np.sum(final_mask) < 64:
    continue

画像内に十分なマスクがない場合は、無視します。

lass_ids.append(cls_id)
instance_ids.append(inst_id)
model_list.append(model_id)
masks[:, :, i] = inst_mask
coords[:, :, i, :] = np.multiply(coord_map, np.expand_dims(inst_mask, axis=-1))
bboxes[i] = np.array([y1, x1, y2, x2])
i += 1

各種変数にデータを格納します。

# no valid foreground objects
if i == 0:
    return None, None, None, None, None, None

masks = masks[:, :, :i]
coords = np.clip(coords[:, :, :i, :], 0, 1)
bboxes = bboxes[:i, :]
return masks, coords, class_ids, instance_ids, model_list, bboxes

これで process_data 関数は終了です。

annotate_camera_train 関数

ようやく、主要な関数である annotate_camera_train を説明していきます。

def annotate_camera_train(data_dir):
    """ Generate gt labels for CAMERA train data. """
    camera_train = open(os.path.join(data_dir, 'CAMERA', 'train_list_all.txt')).read().splitlines()
    # intrinsics = np.array([[577.5, 0, 319.5], [0, 577.5, 239.5], [0, 0, 1]])
    intrinsics = np.array([[572.4, 0, 325.3], [0, 573.6, 242.0], [0, 0, 1.0]])
    # meta info for re-label mug category
    # with open(os.path.join(data_dir, 'obj_models/mug_meta.pkl'), 'rb') as f:
    #     mug_meta = cPickle.load(f)

camera_train 変数に画像データのパスを読込みます。

intrinsics は、カメラパラメータです。お使いのカメラによって調整してください。

valid_img_list = []
index = 0
for img_path in tqdm(camera_train):
    path_dict = {}
    img_full_path = os.path.join(data_dir, 'CAMERA', img_path)
    depth_composed_path = "{:06}.png".format(index)
    path_dict["nocs"] = img_full_path + '/coord/{}.hdf5'.format(index)
    path_dict["meta"] = img_full_path + '/meta/{:04}_meta.txt'.format(index)
    path_dict["mask"] = img_full_path + '/mask_independent/{:06}.png'.format(index)
    path_dict["color"] = img_full_path + '/color/{:06}.jpg'.format(index)
    depth_full_path = os.path.join(data_dir,'camera_full_depths', depth_composed_path)
    all_exist = os.path.exists(img_full_path + '/color/{:06}.jpg'.format(index)) and \
                os.path.exists(img_full_path + '/coord/{}.hdf5'.format(index)) and \
                os.path.exists(img_full_path + '/depth/{:06}.png'.format(index)) and \
                os.path.exists(img_full_path + '/mask/{:06}_mask.png'.format(index)) and \
                os.path.exists(img_full_path + '/meta/{:04}_meta.txt'.format(index))
    index += 1
    # all_exist = os.path.exists(img_full_path + '_color.png') and \
    #             os.path.exists(img_full_path + '_coord.png') and \
    #             os.path.exists(img_full_path + '_depth.png') and \
    #             os.path.exists(img_full_path + '_mask.png') and \
    #             os.path.exists(img_full_path + '_meta.txt')
    if not all_exist:
        print("annotate_camera_train_path")
        continue
    # depth = load_depth(img_full_path)
    depth = load_depth(depth_full_path)
    masks, coords, class_ids, instance_ids, model_list, bboxes = process_data(img_full_path, depth, path_dict)
    if instance_ids is None:
        print("annotate_camera_train_path instance ids")
        continue

画像データを一枚ずつ処理していきます。

今まで作成してきたデータのパスを作成し、process_data 関数へと投げます。

# Umeyama alignment of GT NOCS map with depth image
scales, rotations, translations, error_messages, _ = \
    align_nocs_to_depth(masks, coords, depth, intrinsics, instance_ids, img_path)
if error_messages:
    print("annotate_camera_train_path error msg", error_messages)
    continue

梅山アライメント法で nocs と 深度画像の位置合わせを行います。

# re-label for mug category
for i in range(len(class_ids)):
    pass
    # if class_ids[i] == 6:
    #     T0 = mug_meta[model_list[i]][0]
    #     s0 = mug_meta[model_list[i]][1]
    #     T = translations[i] - scales[i] * rotations[i] @ T0
    #     s = scales[i] / s0
    #     scales[i] = s
    #     translations[i] = T

ここは飛ばします。

# write results
gts = {}
gts['class_ids'] = class_ids    # int list, 1 to 6
gts['bboxes'] = bboxes  # np.array, [[y1, x1, y2, x2], ...]
gts['scales'] = scales.astype(np.float32)  # np.array, scale factor from NOCS model to depth observation
gts['rotations'] = rotations.astype(np.float32)    # np.array, R
gts['translations'] = translations.astype(np.float32)  # np.array, T
gts['instance_ids'] = instance_ids  # int list, start from 1
gts['model_list'] = model_list  # str list, model id/name
os.makedirs(img_full_path + "/pkl", exist_ok=True)
with open(img_full_path + "/pkl/" + '{:06}_label.pkl'.format(index-1), 'wb') as f:
    cPickle.dump(gts, f)
valid_img_list.append(img_path)

必要な情報を pkl 中間ファイルへ出力します。

# write valid img list to file
with open(os.path.join(data_dir, 'CAMERA/train_list.txt'), 'w') as f:
    for img_path in valid_img_list:
        f.write("%s\n" % img_path)

さらに train_list.txt を作成しておきます。

Real データについて

今回は Real データを用意できないので、CAMERAデータで学習させてから nocs を出力させ、Real データでファインチューニングする予定です。

なので、該当部分である annotate_real_train, annotate_test_data の説明は割愛します。

おわりに

今回は pose_data_custom.py について説明しました。

次回はようやく、ShapeNetCore で作成した中間ファイルを用いて AutoEncoder を学習させます。

コメント

タイトルとURLをコピーしました