Spaces:
Runtime error
Runtime error
import argparse | |
import os | |
import subprocess | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
from multiprocessing import Pool | |
import pandas as pd | |
from tqdm import tqdm | |
from utils.logger import logger | |
MIN_SECONDS = int(os.getenv("MIN_SECONDS", 3)) | |
MAX_SECONDS = int(os.getenv("MAX_SECONDS", 10)) | |
def get_command(start_time, video_path, video_duration, output_path): | |
# Use FFmpeg to split the video. Re-encoding is needed to ensure the accuracy of the clip | |
# at the cost of consuming computational resources. | |
return [ | |
'ffmpeg', | |
'-hide_banner', | |
'-loglevel', 'panic', | |
'-ss', str(start_time.time()), | |
'-i', video_path, | |
'-t', str(video_duration), | |
'-c:v', 'libx264', | |
'-preset', 'veryfast', | |
'-crf', '22', | |
'-c:a', 'aac', | |
'-sn', | |
output_path | |
] | |
def clip_video_star(args): | |
return clip_video(*args) | |
def clip_video(video_path, timecode_list, output_folder, video_duration): | |
"""Recursively clip the video within the range of [MIN_SECONDS, MAX_SECONDS], | |
according to the timecode obtained from cogvideox/video_caption/cutscene_detect.py. | |
""" | |
try: | |
video_name = Path(video_path).stem | |
if len(timecode_list) == 0: # The video of a single scene. | |
splitted_timecode_list = [] | |
start_time = datetime.strptime("00:00:00.000", "%H:%M:%S.%f") | |
end_time = datetime.strptime(video_duration, "%H:%M:%S.%f") | |
cur_start = start_time | |
splitted_index = 0 | |
while cur_start < end_time: | |
cur_end = min(cur_start + timedelta(seconds=MAX_SECONDS), end_time) | |
cur_video_duration = (cur_end - cur_start).total_seconds() | |
if cur_video_duration < MIN_SECONDS: | |
cur_start = cur_end | |
splitted_index += 1 | |
continue | |
splitted_timecode_list.append([cur_start.strftime("%H:%M:%S.%f")[:-3], cur_end.strftime("%H:%M:%S.%f")[:-3]]) | |
output_path = os.path.join(output_folder, video_name + f"_{splitted_index}.mp4") | |
if os.path.exists(output_path): | |
logger.info(f"The clipped video {output_path} exists.") | |
cur_start = cur_end | |
splitted_index += 1 | |
continue | |
else: | |
command = get_command(cur_start, video_path, cur_video_duration, output_path) | |
try: | |
subprocess.run(command, check=True) | |
except Exception as e: | |
logger.warning(f"Run {command} error: {e}.") | |
finally: | |
cur_start = cur_end | |
splitted_index += 1 | |
for i, timecode in enumerate(timecode_list): # The video of multiple scenes. | |
start_time = datetime.strptime(timecode[0], "%H:%M:%S.%f") | |
end_time = datetime.strptime(timecode[1], "%H:%M:%S.%f") | |
video_duration = (end_time - start_time).total_seconds() | |
output_path = os.path.join(output_folder, video_name + f"_{i}.mp4") | |
if os.path.exists(output_path): | |
logger.info(f"The clipped video {output_path} exists.") | |
continue | |
if video_duration < MIN_SECONDS: | |
continue | |
if video_duration > MAX_SECONDS: | |
splitted_timecode_list = [] | |
cur_start = start_time | |
splitted_index = 0 | |
while cur_start < end_time: | |
cur_end = min(cur_start + timedelta(seconds=MAX_SECONDS), end_time) | |
cur_video_duration = (cur_end - cur_start).total_seconds() | |
if cur_video_duration < MIN_SECONDS: | |
break | |
splitted_timecode_list.append([cur_start.strftime("%H:%M:%S.%f")[:-3], cur_end.strftime("%H:%M:%S.%f")[:-3]]) | |
splitted_output_path = os.path.join(output_folder, video_name + f"_{i}_{splitted_index}.mp4") | |
if os.path.exists(splitted_output_path): | |
logger.info(f"The clipped video {splitted_output_path} exists.") | |
cur_start = cur_end | |
splitted_index += 1 | |
continue | |
else: | |
command = get_command(cur_start, video_path, cur_video_duration, splitted_output_path) | |
try: | |
subprocess.run(command, check=True) | |
except Exception as e: | |
logger.warning(f"Run {command} error: {e}.") | |
finally: | |
cur_start = cur_end | |
splitted_index += 1 | |
continue | |
# We found that the current scene detected by PySceneDetect includes a few frames from | |
# the next scene occasionally. Directly discard the last few frames of the current scene. | |
video_duration = video_duration - 0.5 | |
command = get_command(start_time, video_path, video_duration, output_path) | |
subprocess.run(command, check=True) | |
except Exception as e: | |
logger.warning(f"Clip video with {video_path}. Error is: {e}.") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Video Splitting") | |
parser.add_argument( | |
"--video_metadata_path", type=str, default=None, help="The path to the video dataset metadata (csv/jsonl)." | |
) | |
parser.add_argument( | |
"--video_path_column", | |
type=str, | |
default="video_path", | |
help="The column contains the video path (an absolute path or a relative path w.r.t the video_folder).", | |
) | |
parser.add_argument("--video_folder", type=str, default="", help="The video folder.") | |
parser.add_argument("--output_folder", type=str, default="outputs") | |
parser.add_argument("--n_jobs", type=int, default=16) | |
parser.add_argument("--resolution_threshold", type=float, default=0, help="The resolution threshold.") | |
args = parser.parse_args() | |
video_metadata_df = pd.read_json(args.video_metadata_path, lines=True) | |
num_videos = len(video_metadata_df) | |
video_metadata_df["resolution"] = video_metadata_df["frame_size"].apply(lambda x: x[0] * x[1]) | |
video_metadata_df = video_metadata_df[video_metadata_df["resolution"] >= args.resolution_threshold] | |
logger.info(f"Filter {num_videos - len(video_metadata_df)} videos with resolution smaller than {args.resolution_threshold}.") | |
video_path_list = video_metadata_df[args.video_path_column].to_list() | |
video_id_list = [Path(video_path).stem for video_path in video_path_list] | |
if len(video_id_list) != len(list(set(video_id_list))): | |
logger.warning("Duplicate file names exist in the input video path list.") | |
video_path_list = [os.path.join(args.video_folder, video_path) for video_path in video_path_list] | |
video_timecode_list = video_metadata_df["timecode_list"].to_list() | |
video_duration_list = video_metadata_df["duration"].to_list() | |
assert len(video_path_list) == len(video_timecode_list) | |
os.makedirs(args.output_folder, exist_ok=True) | |
args_list = [ | |
(video_path, timecode_list, args.output_folder, video_duration) | |
for video_path, timecode_list, video_duration in zip( | |
video_path_list, video_timecode_list, video_duration_list | |
) | |
] | |
with Pool(args.n_jobs) as pool: | |
# results = list(tqdm(pool.imap(clip_video_star, args_list), total=len(video_path_list))) | |
results = pool.imap(clip_video_star, args_list) | |
for result in tqdm(results, total=len(video_path_list)): | |
pass |