複数枚の画像から動作認識できる 3D-ResNets-PyTorch をカスタムデータで学習させる【Python】

AI
スポンサーリンク
スポンサーリンク

はじめに

前回は、単一の推論ファイルを作成しました。

今回は、カスタムデータで学習させる方法について説明します。

前提条件

前提条件は以下の通りです。

  • Python3.9
  • torch == 1.13.0+cu117, torchvision == 0.14.0+cu117
  • 作業は WSL2 で実施します

学習データの準備

学習データを準備していきます。
各種データは hmdb-51 データセットから一部を使用しています。

train.avi

test.avi

defect.avi

3つの動画を用意しました。

test.avi は推論時に使用します。

動画を画像に変換

まずは動画を 1 フレーム毎に画像に変換していきます。

mkdir custom_data
cd custom_data
mkdir images
mkdir movies
cd movies

今作成した movies フォルダに train.avi, test.avi, defect.avi を保存してください。

移動したら、変換プログラムを作成していきます。

cd ../../3D-ResNets-PyTorch
touch convert_movie2img.py

convert_movie2img.py

import cv2
import glob
import os


folderpath = glob.glob("../custom_data/movies/*.avi")

for f in folderpath:
    basename = os.path.basename(f)
    folder_name = os.path.splitext(basename)[0]
    os.mkdir("../custom_data/images/"+folder_name)
    cap = cv2.VideoCapture(f)
    index = 1

    # 不要な部分は飛ばす
    for i in range(10):
        ret = cap.grab()

    while True:
        # 1フレーム毎を学習対象とする
        for i in range(1):
            ret = cap.grab()
        ret, frame = cap.retrieve()
        if not ret:
            break
        img_name = "../custom_data/images/"+folder_name+"/image_{:05}.jpg".format(index)
        img = cv2.resize(frame, (320,240))
        cv2.imwrite(img_name, img)
        index += 1

    print(basename)

実行します。

python3 convert_movie2img.py

custom_data/images フォルダに train, test, defect フォルダが作成されます。

それを以下のように変更してください。

images
├── Defect
│   ├── DefectTest
│   └── WalkDefect
└── Walk
    ├── Walk
    └── WalkTest

WalkDefect, DefectTest は defect.avi の画像を
Walk は train.avi の画像を
WalkTest は test.avi の画像を保存してください。

学習用プログラムの修正

datasets/videodataset.py の 80,81 行目

if i % (n_videos // 5) == 0:
    print('dataset loading [{}/{}]'.format(i, len(video_ids)))
↓
# if i % (n_videos // 5) == 0:
#     print('dataset loading [{}/{}]'.format(i, len(video_ids)))

main.py の 245-253 行目

val_loader = torch.utils.data.DataLoader(val_data,
                                         batch_size=(opt.batch_size //
                                                     opt.n_val_samples),
                                         shuffle=False,
                                         num_workers=opt.n_threads,
                                         pin_memory=True,
                                         sampler=val_sampler,
                                         worker_init_fn=worker_init_fn,
                                         collate_fn=collate_fn)
↓
val_loader = torch.utils.data.DataLoader(val_data,
                                         batch_size=(opt.batch_size),
                                         shuffle=False,
                                         num_workers=opt.n_threads,
                                         pin_memory=True,
                                         sampler=val_sampler,
                                         worker_init_fn=worker_init_fn,
                                         collate_fn=collate_fn)

これで修正は完了です。

アノテーションデータの作成

アノテーションデータの作成をしていきます。

アノテーションデータとは言えど、クラス名・ファイルパス・動画フレーム数 を設定するくらいです。

touch custom_data.json

custom_data.json

{"labels": ["Walk", "Defect"], 
  "database": {
    "Walk":
      {"subset": "training", "annotations": {"label": "Walk", "segment": [1, 85]}},
    "WalkDefect": 
      {"subset": "validation", "annotations": {"label": "Defect", "segment": [1, 61]}},
    "WalkTest": 
      {"subset": "validation", "annotations": {"label": "Walk", "segment": [1, 222]}},
    "DefectTest": 
      {"subset": "training", "annotations": {"label": "Defect", "segment": [1, 61]}}
  }
}

上から説明していきます。

"labels": ["Walk", "Defect"]

これはクラス名です。labels は固定で Walk, Defect はクラス名です。

"database": {

database タグ以降に動画一つ一つのアノテーションデータを作成していきます。

"Walk":
      {"subset": "training", "annotations": {"label": "Walk", "segment": [1, 85]}},

Walk タグは、動画が保存されているフォルダを指定します。
subset は training, validation の二つから選択します。
annotations タグがクラス名と動画フレームを指定します。
label はクラス名を指定します。
segment は [1, 画像の枚数+1] を指定します。

ここまできたら、あとは学習を実行するだけです。

学習の実行

学習は、以下のコマンドで実行します。

mkdir custom_results
python3 main.py --root_path ./ --video_path ../custom_data/images --annotation_path ./custom_data.json --result_path custom_results --dataset ucf101 --model resnet --model_depth 50 --n_classes 2 --batch_size 2 --n_threads 1 --n_epochs 400 --checkpoint 20 --learning_rate 0.01

n_epochs, checkpoint, learning_rate は適当に設定してください。

こんな感じで、学習が完了します。

学習結果の確認

学習結果を確認するプログラムを作成します。

single_inference.py

from pathlib import Path
import random
from PIL import Image
import numpy as np
import torch
import torch.nn.functional as F
from torch.backends import cudnn
import matplotlib.pyplot  as plt

from opts import parse_opts
from model import generate_model
from mean import get_mean_std
from spatial_transforms import (Compose, Normalize, Resize, CenterCrop,ToTensor, ScaleValue, PickFirstChannels)

def json_serial(obj):
    if isinstance(obj, Path):
        return str(obj)


def get_opt():
    opt = parse_opts()

    opt.mean, opt.std = get_mean_std(opt.value_scale, dataset=opt.mean_dataset)
    opt.n_input_channels = 3

    opt.resume_path = "./custom_results/save_400.pth"
    opt.root_path = "./"

    opt.device = torch.device('cpu' if opt.no_cuda else 'cuda')
    if not opt.no_cuda:
        cudnn.benchmark = True

    opt.output_topk = 1
    opt.n_classes = 2
    opt.model_depth = 50
    opt.arch = '{}-{}'.format(opt.model, opt.model_depth)
    return opt


def resume_model(resume_path, arch, model):
    print('loading checkpoint {} model'.format(resume_path))
    checkpoint = torch.load(resume_path, map_location='cpu')
    assert arch == checkpoint['arch']

    if hasattr(model, 'module'):
        model.module.load_state_dict(checkpoint['state_dict'])
    else:
        model.load_state_dict(checkpoint['state_dict'])
    return model


def get_normalize_method(mean, std, no_mean_norm, no_std_norm):
    if no_mean_norm:
        if no_std_norm:
            return Normalize([0, 0, 0], [1, 1, 1])
        else:
            return Normalize([0, 0, 0], std)
    else:
        if no_std_norm:
            return Normalize(mean, [1, 1, 1])
        else:
            return Normalize(mean, std)


def get_inference_utils(opt):
    assert opt.inference_crop in ['center', 'nocrop']

    normalize = get_normalize_method(opt.mean, opt.std, opt.no_mean_norm,
                                     opt.no_std_norm)

    spatial_transform = [Resize(opt.sample_size)]
    if opt.inference_crop == 'center':
        spatial_transform.append(CenterCrop(opt.sample_size))
    spatial_transform.append(ToTensor())
    if opt.input_type == 'flow':
        spatial_transform.append(PickFirstChannels(n=2))
    spatial_transform.extend([ScaleValue(opt.value_scale), normalize])
    spatial_transform = Compose(spatial_transform)

    return spatial_transform

       
def inference_main(opt):
    random.seed(opt.manual_seed)
    np.random.seed(opt.manual_seed)
    torch.manual_seed(opt.manual_seed)

    model = generate_model(opt)
    model = resume_model(opt.resume_path, opt.arch, model)
    model.eval()

    spatial_transform = get_inference_utils(opt)

    with torch.no_grad():
        video = []
        for i in range(1,17):
            # img = Image.open("../UCF101_images/UCF101/ApplyEyeMakeup/v_ApplyEyeMakeup_g01_c01/image_{:05}.jpg".format(i))
            img = Image.open("../custom_data/images/Walk/WalkTest/image_{:05}.jpg".format(i))
            # img = Image.open("../custom_data/images/Defect/DefectTest/image_{:05}.jpg".format(i))
            video.append(img.convert('RGB'))

        fig = plt.figure()
        for i, im in enumerate(video):
            fig.add_subplot(4,4,i+1).set_title(str(i))
            plt.imshow(im)
        plt.show()

        # single data
        spatial_transform.randomize_parameters()
        clip = [spatial_transform(img) for img in video]
        clips = torch.unsqueeze((torch.stack(clip, 0).permute(1, 0, 2, 3)),0)

        outputs = model(clips)
        print(outputs)
        outputs = F.softmax(outputs, dim=1).cpu()
        print(outputs)
        sorted_scores, locs = torch.topk(torch.mean(outputs, dim=0),
                                    k=min(opt.output_topk, opt.n_classes))
        print("score:",sorted_scores.item(), "class:",locs.item())


if __name__ == '__main__':
    opt = get_opt()
    inference_main(opt)

上記を実行すると、まずは入力データが表示されます。

コンソールに結果が表示されます。

loading checkpoint ./custom_results/save_400.pth model
tensor([[ 211.3719, -216.9188]])
tensor([[1., 0.]])
score: 1.0 class: 0

続いて、Defect クラスが検出できているか確認します。

出力は以下のようになります。

loading checkpoint ./custom_results/save_400.pth model
tensor([[-23.0008,  22.9723]])
tensor([[1.0818e-20, 1.0000e+00]])
score: 1.0 class: 1

問題なく検出できていますね!

おわりに

今回は、3D-ResNets-PyTorch をカスタムデータで学習させる方法について説明しました。

これで動画認識できるようになりました。

次に気になる AI がありましたら、また説明できればと思います。

コメント

タイトルとURLをコピーしました