採用はこちら!

Shinonome Tech Blog

株式会社Shinonomeの技術ブログ
7 min read

BadApple!! を terminalで動かしてみた

BadApple!!という動画をterminal上で表示する方法について解説していきたいと思います!!

こんにちは。データサイエンスコースのDaichiです。今日はAdvent Calendar24日目です。昨日のクリスマスパーティーはとっても楽しかったですね!!(なお、この記事を書いているのは12/7である。)

今回の記事ではBadApple!!という動画をterminal上で表示する方法について解説していきたいと思います!!

目次

  1. そもそも「BadApple!!」って何?
  2. 実装方針
  3. 使用ライブラリ
  4. コードの解説
  5. 完成品
  6. まとめ
  7. 感想
  8. 参考文献

1. そもそも「BadApple!!」って何?

"One bad apple spoils the bunch"

「BadApple!!」とは日本の同人サークル「上海アリス幻樂団」によって制作されたゲーム、「東方幻想郷 ~Lotus Land Story」の劇中歌です。

今回使用した動画は、2009年10月27日に「あにら」という方によって制作されニコニコ動画に投稿された影絵風3D東方動画作品です。

2. 実装方針

以下の流れで実装していきます。

  1. 動画のダウンロード
  2. 動画ファイルの情報を取得する
  3. videoをnumpy配列に変換
  4. terminalのサイズを取得し、1フレームずつリサイズ
  5. グレースケール変換、輝度ごとに指定の文字に変換
  6. terminalに出力

3. 使用ライブラリ

このコードを構成する上で重要なライブラリを2つ紹介します。

ffmpeg-python

ffmpegmとは動画・音声を記録・変換・再生するためのフリーソフトウェアです。本来はPythonに対応していないんですが、ffmpeg-pythonというパッケージによってpython上でffmpegを使えるようになります。

!pip install ffmpeg-python
import ffmpeg

OpenCV

OpenCVとは「Open Source Computer Vision Library」の略です。画像や動画の処理ができる機能がまとめられたオープンソースライブラリです。元々はCやC++用のライブラリでしたが、Pythonでも使用できます。「顔検出」、「文字認識」、「画像のパターンマッチング」などができます。

!pip install opencv-python
import cv2

その他

import os
import sys
import time
import shutil
from tqdm import tqdm
import subprocess
from concurrent.futures import ProcessPoolExecutor

import numpy as np

4. コードの解説

1. 動画のダウンロード

動画のダウンロードにはyoutube_dlというライブラリを使います。

youtube_dlのインストールは、以下のコマンドとなります。

pip install youtube_dl

またダウンロードは次のコードで行なえます。3行目のurlにyoutubeの動画のリンクをstr型で格納します。ydl_optsではオプションを指定します。

import youtube_dl 
ydl_opts = {}
url = "https://youtu.be/FtutLA63Cp8"
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
    ydl.download([url])

注意

youtubeから動画をダウンロードして個人観賞用として利用する分には違法性はないですが、動画を複製して配布する等は違法です。絶対にやめましょう。

その後に動画から音楽を取り出します。

# 動画データ
stream = ffmpeg.input("【東方】Bad Apple!! PV【影絵】-FtutLA63Cp8.mkv") 
# 音声データのファイル名
stream = ffmpeg.output(stream, "badapple.wav") 
# 実行
ffmpeg.run(stream)

2. 動画ファイルの情報を取得する

今回は「動画の縦横のピクセル」、「動画時間」の2つの情報を取得します。

# 動画ファイル情報の取得
video_info = ffmpeg.probe(video_path)
width = video_info["streams"][0]['width']
height = video_info["streams"][0]['height']
duration = float(video_info["format"]['duration'])

ffmpeg.probeは動画ファイルの情報を辞書型で取得する関数です。

  • 変数
    • width : 元の動画の横のピクセル数
    • height : 元の動画の縦のピクセル数
    • duration : 動画時間

参考 :
ffmpeg-pythonのgithubリポジトリ

3. videoをnumpy配列に変換

こちらはffmpeg-pythonのgithubリポジトリに使用例として乗っていたものをそのまま使いました。

out, _ = (
    ffmpeg
    .input(video_path, ss=0, t=duration)
    .output('pipe:', format='rawvideo', pix_fmt='rgb24')
    .run(capture_stdout=True)
)

arr = (
    np
    .frombuffer(out, np.uint8)
    .reshape([-1, height, width, 3])
)

4. terminalのサイズを取得

terminalのサイズを取得するためにはshutilという標準ライブラリを使います。ここで取得できる数値はピクセルではなく文字数です。コメントアウトしている数値はウィンドウの最大サイズです。

# ターミナルウィンドウサイズの取得
terminal_size = shutil.get_terminal_size()
fix_height = terminal_size.lines  # 64
fix_width = terminal_size.columns  # 204

ウィンドウサイズを変えても動画が正常に動くように、これをforで回して常にウィンドウサイズを取得する...みたいなことを考えたけどよく考えたら無理そうだったので断念した。

5. 各フレームずつリサイズ、グレースケール変換、輝度ごとに指定の文字に変換


リサイズ:
各フレームを一個前の章で取得したターミナルのサイズに変更します。

resize_im = cv2.resize(行列, dsize=(変更後の横の長さ, 変更後の縦の長さ))

グレースケール:
グレースケールとは、画像を色味のない明るさの度合いだけで表現したものです。RGBの3つの数字で表現された各フレームを以下の式でグレースケール化していきます。

V = 0.299*R + 0.587*G + 0.114*B

グレースケールには様々な方法がありこの式はITU-R Rec BT.601と言うらしいです。なんのこっちゃ


frame = arr.shape[0]
text_array = np.full((frame, fix_height, fix_width), None)
for i in tqdm(range(frame)):
    resize_im = cv2.resize(arr[i], dsize=(fix_width, fix_height))
    gray_sca = resize_im[:, :, 0] * 0.299 + \
        resize_im[:, :, 1] * 0.587 + resize_im[:, :, 2] * 0.114
    for j in range(fix_height):
        for k in range(fix_width):
            if 0 <= gray_sca[j, k] < 30:
                text_array[i, j, k] = " "
            elif 30 <= gray_sca[j, k] < 80:
                text_array[i, j, k] = "`"
            elif 80 <= gray_sca[j, k] < 160:
                text_array[i, j, k] = "+"
            else:
                text_array[i, j, k] = "@"

text_arrayは全要素が Noneで構成されたframe×fix_height×fix_widthの行列です。ここに変換後の文字を入れていきます。使う文字は スペース, ' ,+ ,@です。 4つの文字で明るさの差を表していきます。

6. terminalに出力

ここまでで作ってきたデータを映像・音楽としてアウトプットしていきます。


video関数の定義

映像を出力する関数を作ります。

def video(text_array, frame_per_time):
    base_time = time.time()
    next_time = 0
    for im in text_array:
        text = im.reshape(-1)
        sys.stdout.write("\r{}".format("".join(text)))
        sys.stdout.flush()
        next_time = ((base_time - time.time()) % frame_per_time)
        time.sleep(next_time)

解説

  1. 標準出力の上書き
    • 標準出力にはsys.stdout.writeを使いました。普通のprintでは映像が乱れてしまいますが上のコードのようにsys.stdout.write\rを指定すると出力の上書きができます。 複数行の書き換えは不可だった。
  2. 一定時間ごとに処理
    • frame_per_timeで基準となる時間決めて、その時間が経ったら次のforループに行くようにしている。
    • 基準となる時間 = (音楽の時間 / フレームの枚数)

music関数の定義

音声を出力する関数を作ります。

def music(music_path):
    try:
        subprocess.run(["ffplay", "-nodisp", "-autoexit", music_path],
                        timeout=duration, stderr=subprocess.DEVNULL)
    except subprocess.TimeoutExpired:
        print("再生終了")

解説

subprocess:

subprocess.run(args)

argsで指定されたコマンドを実行する関数。argsは一個の文字列か一個のリストである必要がある。リストの場合は指定したコマンドの引数も指定できる。

sunprocess.run関数でffplayというコマンドを叩きます。その際に-nodisp-autoexitをオプションとして指定します。意味は以下の通りです。

  • nodisp
    • 音声再生時にグラフィカルな表示を無効化
    • ↓↓つけないとこんなのが出てくる↓↓
  • autoexit
    • 最後まで再生が終われば終了

sunprocess.run関数の方には次の2つを引数に取ります。

  • timeout=duration
    • ここで指定した秒数でコマンドの実行を強制的に打ち切って終了します。その際にsubprocess.TimeoutExpiredの例外が発生します。
    • durationは「2. 動画ファイルの情報を取得する」で取得した動画の時間
  • stderr=subprocess.DEVNULL
    • エラー文の扱いを決める引数
    • subprocess.DEVNULLとした場合エラーが出力されない

複数のプログラムを同時に動かす方法として並列処理・並行処理という技術があります。この2つは名前は似ていますが全く違うものです。

並列処理・並行処理

  • 並列処理(マルチプロセス)
    • 処理1と処理2を同時に実行する
  • 並行処理(マルチスレッド)
    • 処理1と処理2を切り替えながら実行する

今回は音声と映像を同時に出力したいので並列処理を行います。並列処理にはconcurrent.futuresモジュールのProcessPoolExecutorというものを使います。max_workersでいくつのプログラムを同時に実行するかを指定します。

with ProcessPoolExecutor(max_workers=2) as executor:
    executor.submit(music, music_path)
    executor.submit(video, fix_height, text_array, frame_per_time)

コード全体

import os
import sys
import time
import shutil
from tqdm import tqdm
import subprocess
from concurrent.futures import ProcessPoolExecutor

import cv2
import ffmpeg
import numpy as np

# パスの設定
root_path = "/Users/sugawaradaichi/4nonome/py_prac/pyfolder/bad-apple"
name = "【東方】Bad Apple!! PV【影絵】-FtutLA63Cp8.mkv"
video_path = os.path.join(root_path, f"video/{name}")
music_path = os.path.join(root_path, f"music/badapple.wav")

# 映像情報の取得
video_info = ffmpeg.probe(video_path)
width = video_info["streams"][0]['width']
height = video_info["streams"][0]['height']
duration = float(video_info["format"]['duration'])



out, _ = (
    ffmpeg
    .input(video_path, ss=0, t=duration)
    .output('pipe:', format='rawvideo', pix_fmt='rgb24')
    .run(capture_stdout=True)
)

arr = (
    np
    .frombuffer(out, np.uint8)
    .reshape([-1, height, width, 3])
)

# ターミナルウィンドウサイズの取得
terminal_size = shutil.get_terminal_size()
fix_height = terminal_size.lines  # 64
fix_width = terminal_size.columns  # 204

def video(text_array, frame_per_time):
    base_time = time.time()
    next_time = 0
    for im in text_array:
        text = im.reshape(-1)
        sys.stdout.write("\r{}".format("".join(text)))
        sys.stdout.flush()
        next_time = ((base_time - time.time()) % frame_per_time)
        time.sleep(next_time)

def music(music_path):
    try:
        subprocess.run(["ffplay", "-nodisp", "-autoexit",  music_path],
                        timeout=duration, stderr=subprocess.DEVNULL)
    except subprocess.TimeoutExpired:
        print("再生終了")

if __name__ == "__main__":
    frame = arr.shape[0]
    text_array = np.full((frame, fix_height, fix_width), None)
    for i in tqdm(range(frame)):
        resize_im = cv2.resize(arr[i], dsize=(fix_width, fix_height))
        gray_sca = resize_im[:, :, 0] * 0.299 + \
            resize_im[:, :, 1] * 0.587 + resize_im[:, :, 2] * 0.114
        for j in range(fix_height):
            for k in range(fix_width):
                if 0 <= gray_sca[j, k] < 30:
                    text_array[i, j, k] = " "
                elif 30 <= gray_sca[j, k] < 80:
                    text_array[i, j, k] = "'"
                elif 80 <= gray_sca[j, k] < 160:
                    text_array[i, j, k] = "+"
                else:
                    text_array[i, j, k] = "@"

    frame_per_time = (duration / frame)

    with ProcessPoolExecutor(max_workers=2) as executor:
        executor.submit(music, music_path)
        executor.submit(video, text_array, frame_per_time)

5. 完成品

こちらが完成品になります。

時々画像が乱れるけど直せなかった...

6. まとめ

良かった点

  • いままでpythonを勉強していて並列処理・並行処理を学ぶことがなかったのでいい機会になった。
  • 標準出力にもsys.stdout.write, sys.stdout.writelinesなど様々な方法があり、printしか知らなかったときより視野が広がった。

反省

  • これ書くのに2週間ぐらいかかってる...(流石に時間かけすぎた)
  • 挙動が不安定になってしまった。複数行の出力の書き換えができなかったことが原因でした。

7. 感想

youtubeでbadapple!!をterminal上で再現する動画が上がっていて、それを見たのは確かB1のときです。その時から「あ〜プログラミングできたらこんなの作れるのか〜すげ〜」って思っていました。一年と少し経ってtech blogでその動画を再現する事になったとき、再現するプロセスがスラスラと出てきて大体2時間ぐらいで原型を作れたときは自分の成長を感じました。初めてshell芸(正確にはshell芸ではないが)に挑戦してものづくり感があって結構楽しかったです。

8. 参考文献

【Python】youtubeの動画をコピペ5行でダウンロードする方法(違法性なし)

Pythonで一定時間ごとに処理を実行する

concurrent.futures -- 並列タスク実行

subprocess --- サブプロセス管理

グレースケール画像のうんちく

ffmpeg-pythonで動画編集する

etc...