はじめに
前回は、単一の推論ファイルを作成しました。
今回は、カスタムデータで学習させる方法について説明します。
前提条件
前提条件は以下の通りです。
- Python3.9
- torch == 1.13.0+cu117, torchvision == 0.14.0+cu117
- 作業は WSL2 で実施します
学習データの準備
学習データを準備していきます。
各種データは hmdb-51 データセットから一部を使用しています。
train.avi
test.avi
defect.avi
3つの動画を用意しました。
test.avi は推論時に使用します。
動画を画像に変換
まずは動画を 1 フレーム毎に画像に変換していきます。
mkdir custom_data
cd custom_data
mkdir images
mkdir movies
cd movies
今作成した movies フォルダに train.avi, test.avi, defect.avi を保存してください。
移動したら、変換プログラムを作成していきます。
cd ../../3D-ResNets-PyTorch
touch convert_movie2img.py
convert_movie2img.py
import cv2
import glob
import os
folderpath = glob.glob("../custom_data/movies/*.avi")
for f in folderpath:
basename = os.path.basename(f)
folder_name = os.path.splitext(basename)[0]
os.mkdir("../custom_data/images/"+folder_name)
cap = cv2.VideoCapture(f)
index = 1
# 不要な部分は飛ばす
for i in range(10):
ret = cap.grab()
while True:
# 1フレーム毎を学習対象とする
for i in range(1):
ret = cap.grab()
ret, frame = cap.retrieve()
if not ret:
break
img_name = "../custom_data/images/"+folder_name+"/image_{:05}.jpg".format(index)
img = cv2.resize(frame, (320,240))
cv2.imwrite(img_name, img)
index += 1
print(basename)
実行します。
python3 convert_movie2img.py
custom_data/images フォルダに train, test, defect フォルダが作成されます。
それを以下のように変更してください。
images
├── Defect
│ ├── DefectTest
│ └── WalkDefect
└── Walk
├── Walk
└── WalkTest
WalkDefect, DefectTest は defect.avi の画像を
Walk は train.avi の画像を
WalkTest は test.avi の画像を保存してください。
学習用プログラムの修正
datasets/videodataset.py の 80,81 行目
if i % (n_videos // 5) == 0:
print('dataset loading [{}/{}]'.format(i, len(video_ids)))
↓
# if i % (n_videos // 5) == 0:
# print('dataset loading [{}/{}]'.format(i, len(video_ids)))
main.py の 245-253 行目
val_loader = torch.utils.data.DataLoader(val_data,
batch_size=(opt.batch_size //
opt.n_val_samples),
shuffle=False,
num_workers=opt.n_threads,
pin_memory=True,
sampler=val_sampler,
worker_init_fn=worker_init_fn,
collate_fn=collate_fn)
↓
val_loader = torch.utils.data.DataLoader(val_data,
batch_size=(opt.batch_size),
shuffle=False,
num_workers=opt.n_threads,
pin_memory=True,
sampler=val_sampler,
worker_init_fn=worker_init_fn,
collate_fn=collate_fn)
これで修正は完了です。
アノテーションデータの作成
アノテーションデータの作成をしていきます。
アノテーションデータとは言えど、クラス名・ファイルパス・動画フレーム数 を設定するくらいです。
touch custom_data.json
custom_data.json
{"labels": ["Walk", "Defect"],
"database": {
"Walk":
{"subset": "training", "annotations": {"label": "Walk", "segment": [1, 85]}},
"WalkDefect":
{"subset": "validation", "annotations": {"label": "Defect", "segment": [1, 61]}},
"WalkTest":
{"subset": "validation", "annotations": {"label": "Walk", "segment": [1, 222]}},
"DefectTest":
{"subset": "training", "annotations": {"label": "Defect", "segment": [1, 61]}}
}
}
上から説明していきます。
"labels": ["Walk", "Defect"]
これはクラス名です。labels は固定で Walk, Defect はクラス名です。
"database": {
database タグ以降に動画一つ一つのアノテーションデータを作成していきます。
"Walk":
{"subset": "training", "annotations": {"label": "Walk", "segment": [1, 85]}},
Walk タグは、動画が保存されているフォルダを指定します。
subset は training, validation の二つから選択します。
annotations タグがクラス名と動画フレームを指定します。
label はクラス名を指定します。
segment は [1, 画像の枚数+1] を指定します。
ここまできたら、あとは学習を実行するだけです。
学習の実行
学習は、以下のコマンドで実行します。
mkdir custom_results
python3 main.py --root_path ./ --video_path ../custom_data/images --annotation_path ./custom_data.json --result_path custom_results --dataset ucf101 --model resnet --model_depth 50 --n_classes 2 --batch_size 2 --n_threads 1 --n_epochs 400 --checkpoint 20 --learning_rate 0.01
n_epochs, checkpoint, learning_rate は適当に設定してください。
こんな感じで、学習が完了します。
学習結果の確認
学習結果を確認するプログラムを作成します。
single_inference.py
from pathlib import Path
import random
from PIL import Image
import numpy as np
import torch
import torch.nn.functional as F
from torch.backends import cudnn
import matplotlib.pyplot as plt
from opts import parse_opts
from model import generate_model
from mean import get_mean_std
from spatial_transforms import (Compose, Normalize, Resize, CenterCrop,ToTensor, ScaleValue, PickFirstChannels)
def json_serial(obj):
if isinstance(obj, Path):
return str(obj)
def get_opt():
opt = parse_opts()
opt.mean, opt.std = get_mean_std(opt.value_scale, dataset=opt.mean_dataset)
opt.n_input_channels = 3
opt.resume_path = "./custom_results/save_400.pth"
opt.root_path = "./"
opt.device = torch.device('cpu' if opt.no_cuda else 'cuda')
if not opt.no_cuda:
cudnn.benchmark = True
opt.output_topk = 1
opt.n_classes = 2
opt.model_depth = 50
opt.arch = '{}-{}'.format(opt.model, opt.model_depth)
return opt
def resume_model(resume_path, arch, model):
print('loading checkpoint {} model'.format(resume_path))
checkpoint = torch.load(resume_path, map_location='cpu')
assert arch == checkpoint['arch']
if hasattr(model, 'module'):
model.module.load_state_dict(checkpoint['state_dict'])
else:
model.load_state_dict(checkpoint['state_dict'])
return model
def get_normalize_method(mean, std, no_mean_norm, no_std_norm):
if no_mean_norm:
if no_std_norm:
return Normalize([0, 0, 0], [1, 1, 1])
else:
return Normalize([0, 0, 0], std)
else:
if no_std_norm:
return Normalize(mean, [1, 1, 1])
else:
return Normalize(mean, std)
def get_inference_utils(opt):
assert opt.inference_crop in ['center', 'nocrop']
normalize = get_normalize_method(opt.mean, opt.std, opt.no_mean_norm,
opt.no_std_norm)
spatial_transform = [Resize(opt.sample_size)]
if opt.inference_crop == 'center':
spatial_transform.append(CenterCrop(opt.sample_size))
spatial_transform.append(ToTensor())
if opt.input_type == 'flow':
spatial_transform.append(PickFirstChannels(n=2))
spatial_transform.extend([ScaleValue(opt.value_scale), normalize])
spatial_transform = Compose(spatial_transform)
return spatial_transform
def inference_main(opt):
random.seed(opt.manual_seed)
np.random.seed(opt.manual_seed)
torch.manual_seed(opt.manual_seed)
model = generate_model(opt)
model = resume_model(opt.resume_path, opt.arch, model)
model.eval()
spatial_transform = get_inference_utils(opt)
with torch.no_grad():
video = []
for i in range(1,17):
# img = Image.open("../UCF101_images/UCF101/ApplyEyeMakeup/v_ApplyEyeMakeup_g01_c01/image_{:05}.jpg".format(i))
img = Image.open("../custom_data/images/Walk/WalkTest/image_{:05}.jpg".format(i))
# img = Image.open("../custom_data/images/Defect/DefectTest/image_{:05}.jpg".format(i))
video.append(img.convert('RGB'))
fig = plt.figure()
for i, im in enumerate(video):
fig.add_subplot(4,4,i+1).set_title(str(i))
plt.imshow(im)
plt.show()
# single data
spatial_transform.randomize_parameters()
clip = [spatial_transform(img) for img in video]
clips = torch.unsqueeze((torch.stack(clip, 0).permute(1, 0, 2, 3)),0)
outputs = model(clips)
print(outputs)
outputs = F.softmax(outputs, dim=1).cpu()
print(outputs)
sorted_scores, locs = torch.topk(torch.mean(outputs, dim=0),
k=min(opt.output_topk, opt.n_classes))
print("score:",sorted_scores.item(), "class:",locs.item())
if __name__ == '__main__':
opt = get_opt()
inference_main(opt)
上記を実行すると、まずは入力データが表示されます。
コンソールに結果が表示されます。
loading checkpoint ./custom_results/save_400.pth model
tensor([[ 211.3719, -216.9188]])
tensor([[1., 0.]])
score: 1.0 class: 0
続いて、Defect クラスが検出できているか確認します。
出力は以下のようになります。
loading checkpoint ./custom_results/save_400.pth model
tensor([[-23.0008, 22.9723]])
tensor([[1.0818e-20, 1.0000e+00]])
score: 1.0 class: 1
問題なく検出できていますね!
おわりに
今回は、3D-ResNets-PyTorch をカスタムデータで学習させる方法について説明しました。
これで動画認識できるようになりました。
次に気になる AI がありましたら、また説明できればと思います。
コメント