import gradio as gr import argparse import gdown import cv2 import numpy as np import os import sys from PIL import Image sys.path.append(sys.path[0]+"/tracker") sys.path.append(sys.path[0]+"/tracker/model") from track_anything import TrackingAnything from track_anything import parse_augment, save_image_to_userfolder, read_image_from_userfolder import requests import json import torchvision import torch import torch.nn.functional as F from tools.painter import mask_painter from tqdm import tqdm import psutil import time try: from mmcv.cnn import ConvModule except: os.system("mim install mmcv") # download checkpoints def download_checkpoint(url, folder, filename): os.makedirs(folder, exist_ok=True) filepath = os.path.join(folder, filename) if not os.path.exists(filepath): print("download checkpoints ......") response = requests.get(url, stream=True) with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) print("download successfully!") return filepath def download_checkpoint_from_google_drive(file_id, folder, filename): os.makedirs(folder, exist_ok=True) filepath = os.path.join(folder, filename) if not os.path.exists(filepath): print("Downloading checkpoints from Google Drive... tips: If you cannot see the progress bar, please try to download it manuall \ and put it in the checkpointes directory. E2FGVI-HQ-CVPR22.pth: https://github.com/MCG-NKU/E2FGVI(E2FGVI-HQ model)") url = f"https://drive.google.com/uc?id={file_id}" gdown.download(url, filepath, quiet=False) print("Downloaded successfully!") return filepath # convert points input to prompt state def get_prompt(click_state, click_input): inputs = json.loads(click_input) points = click_state[0] labels = click_state[1] for input in inputs: points.append(input[:2]) labels.append(input[2]) click_state[0] = points click_state[1] = labels prompt = { "prompt_type":["click"], "input_point":click_state[0], "input_label":click_state[1], "multimask_output":"True", } return prompt # extract frames from upload video def get_frames_from_video(video_state, interactive_state, mask_dropdown): """ Args: video_path:str timestamp:float64 Return [[0:nearest_frame], [nearest_frame:], nearest_frame] """ user_name = time.time() mask_dir = os.path.join(args.davisdir, "Annotations/480p", args.sequence) masks_path = [os.path.join(mask_dir, name) for name in os.listdir(mask_dir)] masks_path.sort() first_masks_path = masks_path[0] first_masks = np.asarray(Image.open(first_masks_path)) frames_name = os.listdir(os.path.join(args.davisdir, "JPEGImages/480p", args.sequence)) frames_name.sort() frames = [os.path.join(args.davisdir, "JPEGImages/480p", args.sequence, i) for i in frames_name] first_image = read_image_from_userfolder(frames[0]) image_size = (first_image.shape[0], first_image.shape[1]) try: if os.path.exists(os.path.join(args.davisdir, "workspace/gt_mask", args.sequence)): old_gt_mask = [np.asarray(Image.open(os.path.join(args.davisdir, "workspace/gt_mask", args.sequence, "{:05d}.png".format(i)))) for i in range(len(frames))] # first_masks_path.append(gt_mask_path) else: old_gt_mask = [np.zeros((image_size[0], image_size[1]), np.uint8)]*len(frames) except: old_gt_mask = [np.zeros((image_size[0], image_size[1]), np.uint8)]*len(frames) # for mask_path in first_masks_path: # mask = np.asarray(Image.open(mask_path)) # interactive_state["multi_mask"]["masks"].append(mask) # interactive_state["multi_mask"]["mask_names"].append("mask_{:03d}".format(len(interactive_state["multi_mask"]["masks"]))) # mask_dropdown.append("mask_{:03d}".format(len(interactive_state["multi_mask"]["masks"]))) # mask_dropdown.sort() mask_dropdown_num = list(np.unique(first_masks)) mask_dropdown_num.remove(0) for i in mask_dropdown_num: mask = first_masks==i interactive_state["multi_mask"]["masks"].append(mask*1) interactive_state["multi_mask"]["mask_names"].append("mask_{:03d}".format(len(interactive_state["multi_mask"]["masks"]))) mask_dropdown.append("mask_{:03d}".format(len(interactive_state["multi_mask"]["masks"]))) first_template_mask = interactive_state["multi_mask"]["masks"][int(mask_dropdown[0].split("_")[1]) - 1] * (int(mask_dropdown[0].split("_")[1])) for i in range(1,len(mask_dropdown)): mask_number = int(mask_dropdown[i].split("_")[1]) - 1 first_template_mask = np.clip(first_template_mask+interactive_state["multi_mask"]["masks"][mask_number]*(mask_number+1), 0, mask_number+1) video_path = generate_video_from_frames(frames, output_path=os.path.join(args.davisdir, "workspace/frame2video", "{}.mp4".format(args.sequence))) # frames = # save image path video_state["video_name"] = os.path.split(video_path)[-1] video_state["user_name"] = user_name # os.makedirs(os.path.join("/tmp/{}/originimages/{}".format(video_state["user_name"], video_state["video_name"])), exist_ok=True) os.makedirs(os.path.join("/tmp/{}/paintedimages/{}".format(video_state["user_name"], video_state["video_name"])), exist_ok=True) operation_log = [("",""),("Upload video already. Try click the image for adding targets to track and inpaint.","Normal")] # initialize video_state video_state = { "user_name": user_name, "video_name": os.path.split(video_path)[-1], "origin_images": frames, "painted_images": frames.copy(), "masks": old_gt_mask, "sam_mask": np.zeros((image_size[0], image_size[1]), np.uint8), "logits": [None]*len(frames), "select_frame_number": 0, "fps": 30 } video_state['masks'][0] = first_template_mask.astype(np.uint8) select_frame, run_status = show_mask(video_state, interactive_state, mask_dropdown) # generate video if masks exists video_info = "Video Name: {}, FPS: {}, Total Frames: {}, Image Size:{}".format(video_state["video_name"], video_state["fps"], len(frames), image_size) model.samcontroler.sam_controler.reset_image() model.samcontroler.sam_controler.set_image(first_image) if os.path.exists(os.path.join(args.davisdir, "workspace/gt_mask", args.sequence)): video_output, video_state = get_mask_from_vot(video_state, output_path=os.path.join(args.davisdir, "workspace/result_video", "{}".format(video_state["video_name"]))) return video_path, video_state, video_info, first_image, gr.update(visible=True, maximum=len(frames), value=1), \ gr.update(visible=True, maximum=len(frames), value=len(frames)), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), \ gr.update(visible=True, value=select_frame), gr.update(visible=True), gr.update(visible=True, value=video_output), gr.update(visible=True, choices=interactive_state["multi_mask"]["mask_names"], value=mask_dropdown), \ gr.update(visible=True), gr.update(visible=True), gr.update(visible=True, value=operation_log), interactive_state else: return video_path, video_state, video_info, first_image, gr.update(visible=True, maximum=len(frames), value=1), \ gr.update(visible=True, maximum=len(frames), value=len(frames)), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), \ gr.update(visible=True, value=select_frame), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True, choices=interactive_state["multi_mask"]["mask_names"], value=mask_dropdown), \ gr.update(visible=True), gr.update(visible=True), gr.update(visible=True, value=operation_log), interactive_state def run_example(example): return example # get the select frame from gradio slider def select_template(image_selection_slider, video_state, interactive_state, mask_dropdown): # images = video_state[1] image_selection_slider -= 1 video_state["select_frame_number"] = image_selection_slider select_frame, operation_log_1 = show_mask(video_state, interactive_state, mask_dropdown) # once select a new template frame, set the image in sam model.samcontroler.sam_controler.reset_image() model.samcontroler.sam_controler.set_image(read_image_from_userfolder(video_state["origin_images"][image_selection_slider])) # update the masks when select a new template frame # if video_state["masks"][image_selection_slider] is not None: # video_state["painted_images"][image_selection_slider] = mask_painter(video_state["origin_images"][image_selection_slider], video_state["masks"][image_selection_slider]) operation_log = [("",""), ("Select frame {}. Try click image and add mask for tracking.".format(image_selection_slider),"Normal")] # return read_image_from_userfolder(video_state["painted_images"][image_selection_slider]), video_state, interactive_state, operation_log return select_frame, video_state, interactive_state, operation_log # set the tracking end frame def get_end_number(track_pause_number_slider, video_state, interactive_state): track_pause_number_slider -= 1 interactive_state["track_end_number"] = track_pause_number_slider mask = video_state["masks"][track_pause_number_slider] frame = video_state["origin_images"][track_pause_number_slider] num_objs = mask.max() painted_image = np.asarray(Image.open(frame).convert('RGB')) for obj in range(1, num_objs+1): if np.max(mask==obj) == 0: continue painted_image = mask_painter(painted_image, (mask==obj).astype('uint8'), mask_color=obj+1) operation_log = [("",""),("Set the tracking finish at frame {}".format(track_pause_number_slider),"Normal")] # operation_log = [("",""), ("Select {} for tracking or inpainting".format(mask_dropdown),"Normal")] # select_frame, operation_log_1 = show_mask(video_state, interactive_state, mask_dropdown) # return read_image_from_userfolder(video_state["painted_images"][track_pause_number_slider]),interactive_state, operation_log return painted_image,interactive_state, operation_log # return select_frame,interactive_state, operation_log def get_resize_ratio(resize_ratio_slider, interactive_state): interactive_state["resize_ratio"] = resize_ratio_slider return interactive_state # use sam to get the mask def sam_refine(video_state, point_prompt, click_state, interactive_state, evt:gr.SelectData): """ Args: template_frame: PIL.Image point_prompt: flag for positive or negative button click click_state: [[points], [labels]] """ if point_prompt == "Positive": coordinate = "[[{},{},1]]".format(evt.index[0], evt.index[1]) interactive_state["positive_click_times"] += 1 else: coordinate = "[[{},{},0]]".format(evt.index[0], evt.index[1]) interactive_state["negative_click_times"] += 1 # prompt for sam model model.samcontroler.sam_controler.reset_image() model.samcontroler.sam_controler.set_image(read_image_from_userfolder(video_state["origin_images"][video_state["select_frame_number"]])) prompt = get_prompt(click_state=click_state, click_input=coordinate) mask, logit, painted_image = model.first_frame_click( image=read_image_from_userfolder(video_state["origin_images"][video_state["select_frame_number"]]), points=np.array(prompt["input_point"]), labels=np.array(prompt["input_label"]), multimask=prompt["multimask_output"], ) video_state["sam_mask"] = mask video_state["logits"][video_state["select_frame_number"]] = logit # video_state["painted_images"][video_state["select_frame_number"]] = save_image_to_userfolder(video_state, index=video_state["select_frame_number"], image=cv2.cvtColor(np.asarray(painted_image),cv2.COLOR_BGR2RGB),type=False) operation_log = [("",""), ("Use SAM for segment. You can try add positive and negative points by clicking. Or press Clear clicks button to refresh the image. Press Add mask button when you are satisfied with the segment","Normal")] return painted_image, video_state, interactive_state, operation_log def add_multi_mask(video_state, interactive_state, mask_dropdown): try: mask_dropdown_num = [int(i.split('_')[1]) for i in mask_dropdown] if len(mask_dropdown_num) == 0: missing_mask_id = 1 else: for i in range(1, mask_dropdown_num[-1]+1): if i not in mask_dropdown_num: missing_mask_id = i break else: missing_mask_id = mask_dropdown_num[-1] + 1 mask = video_state["sam_mask"] if missing_mask_id > len(interactive_state["multi_mask"]["masks"]): interactive_state["multi_mask"]["masks"].append(mask) interactive_state["multi_mask"]["mask_names"].append("mask_{:03d}".format(len(interactive_state["multi_mask"]["masks"]))) mask_dropdown.append("mask_{:03d}".format(len(interactive_state["multi_mask"]["masks"]))) else: interactive_state["multi_mask"]["masks"][missing_mask_id-1] = mask mask_dropdown.append("mask_{:03d}".format(missing_mask_id)) mask_dropdown.sort() # interactive_state["multi_mask"]["masks"].append(mask) template_mask = interactive_state["multi_mask"]["masks"][int(mask_dropdown[0].split("_")[1]) - 1] * (int(mask_dropdown[0].split("_")[1])) for i in range(1,len(mask_dropdown)): mask_number = int(mask_dropdown[i].split("_")[1]) - 1 template_mask = np.clip(template_mask+interactive_state["multi_mask"]["masks"][mask_number]*(mask_number+1), 0, mask_number+1) video_state["masks"][video_state["select_frame_number"]]= template_mask select_frame, run_status = show_mask(video_state, interactive_state, mask_dropdown) operation_log = [("",""),("Added a mask, use the mask select for target tracking or inpainting.","Normal")] except: operation_log = [("Please click the left image to generate mask.", "Error"), ("","")] return interactive_state, gr.update(choices=interactive_state["multi_mask"]["mask_names"], value=mask_dropdown), select_frame, [[],[]], operation_log def clear_click(video_state, click_state): click_state = [[],[]] template_frame = read_image_from_userfolder(video_state["origin_images"][video_state["select_frame_number"]]) operation_log = [("",""), ("Clear points history and refresh the image.","Normal")] return template_frame, click_state, operation_log def remove_multi_mask(interactive_state, mask_dropdown): interactive_state["multi_mask"]["mask_names"]= [] interactive_state["multi_mask"]["masks"] = [] operation_log = [("",""), ("Remove all mask, please add new masks","Normal")] return interactive_state, gr.update(choices=[],value=[]), operation_log def show_mask(video_state, interactive_state, mask_dropdown): mask_dropdown.sort() select_frame = read_image_from_userfolder(video_state["origin_images"][video_state["select_frame_number"]]) select_mask = video_state["masks"][video_state["select_frame_number"]] mask_dropdown_num = [int(i.split('_')[1]) for i in mask_dropdown] if mask_dropdown_num == []: return select_frame, [("",""), ("Please add mask first","Error")] new_masks = [(select_mask==i+1) * (i+1) for i in range(mask_dropdown_num[-1])] for i in mask_dropdown_num: interactive_state["multi_mask"]["masks"][i-1] = new_masks[i-1] for i in range(len(mask_dropdown)): mask_number = int(mask_dropdown[i].split("_")[1]) - 1 mask = interactive_state["multi_mask"]["masks"][mask_number] select_frame = mask_painter(select_frame, mask.astype('uint8'), mask_color=mask_number+2) operation_log = [("",""), ("Select {} for tracking or inpainting".format(mask_dropdown),"Normal")] return select_frame, operation_log # tracking vos def vos_tracking_video(video_state, interactive_state, mask_dropdown): operation_log = [("",""), ("Track the selected masks, and then you can select the masks for inpainting.","Normal")] model.xmem.clear_memory() if interactive_state["track_end_number"]: following_frames = video_state["origin_images"][video_state["select_frame_number"]:interactive_state["track_end_number"]+1] else: following_frames = video_state["origin_images"][video_state["select_frame_number"]:] if interactive_state["multi_mask"]["masks"]: if len(mask_dropdown) == 0: mask_dropdown = ["mask_001"] mask_dropdown.sort() template_mask = interactive_state["multi_mask"]["masks"][int(mask_dropdown[0].split("_")[1]) - 1] * (int(mask_dropdown[0].split("_")[1])) for i in range(1,len(mask_dropdown)): mask_number = int(mask_dropdown[i].split("_")[1]) - 1 template_mask = np.clip(template_mask+interactive_state["multi_mask"]["masks"][mask_number]*(mask_number+1), 0, mask_number+1) video_state["masks"][video_state["select_frame_number"]]= template_mask else: template_mask = video_state["masks"][video_state["select_frame_number"]] fps = video_state["fps"] # operation error if len(np.unique(template_mask))==1: template_mask[0][0]=1 operation_log = [("Error! Please add at least one mask to track by clicking the left image.","Error"), ("","")] # return video_output, video_state, interactive_state, operation_error masks, logits = model.generator(images=following_frames, template_mask=template_mask, video_state=video_state) # clear GPU memory model.xmem.clear_memory() if interactive_state["track_end_number"]: video_state["masks"][video_state["select_frame_number"]:interactive_state["track_end_number"]+1] = masks video_state["logits"][video_state["select_frame_number"]:interactive_state["track_end_number"]+1] = logits # video_state["painted_images"][video_state["select_frame_number"]:interactive_state["track_end_number"]+1] = painted_images_path else: video_state["masks"][video_state["select_frame_number"]:] = masks video_state["logits"][video_state["select_frame_number"]:] = logits # video_state["painted_images"][video_state["select_frame_number"]:] = painted_images_path # video_output = generate_video_from_frames(video_state["painted_images"], output_path="./result/track/{}".format(video_state["video_name"]), fps=fps) # import video_input to name the output video os.system("rm {}".format(os.path.join(args.davisdir, "workspace/result_video", "{}".format(video_state["video_name"])))) video_output, video_state = get_mask_from_vot(video_state, output_path=os.path.join(args.davisdir, "workspace/result_video", "{}".format(video_state["video_name"]))) interactive_state["inference_times"] += 1 print("For generating this tracking result, inference times: {}, click times: {}, positive: {}, negative: {}".format(interactive_state["inference_times"], interactive_state["positive_click_times"]+interactive_state["negative_click_times"], interactive_state["positive_click_times"], interactive_state["negative_click_times"])) #### shanggao code for mask save if interactive_state["mask_save"]: mask_save_dir = os.path.join(args.davisdir,'workspace/gt_mask', video_state["video_name"].split('.')[0]) if not os.path.exists(mask_save_dir): os.makedirs(mask_save_dir) i = 0 print("save mask") for mask in tqdm(video_state["masks"]): # np.save(os.path.join(mask_save_dir, '{:05d}.npy'.format(i)), mask) Image.fromarray(mask).save(os.path.join(mask_save_dir, '{:05d}.png'.format(i))) i+=1 #### shanggao code for mask save return video_output, video_state, interactive_state, operation_log # inpaint def inpaint_video(video_state, interactive_state, mask_dropdown): operation_log = [("",""), ("Removed the selected masks.","Normal")] # solve memory frames = np.asarray(video_state["origin_images"]) fps = video_state["fps"] inpaint_masks = np.asarray(video_state["masks"]) if len(mask_dropdown) == 0: mask_dropdown = ["mask_001"] mask_dropdown.sort() # convert mask_dropdown to mask numbers inpaint_mask_numbers = [int(mask_dropdown[i].split("_")[1]) for i in range(len(mask_dropdown))] # interate through all masks and remove the masks that are not in mask_dropdown unique_masks = np.unique(inpaint_masks) num_masks = len(unique_masks) - 1 for i in range(1, num_masks + 1): if i in inpaint_mask_numbers: continue inpaint_masks[inpaint_masks==i] = 0 # inpaint for videos try: inpainted_frames = model.baseinpainter.inpaint(frames, inpaint_masks, ratio=interactive_state["resize_ratio"]) # numpy array, T, H, W, 3 video_output = generate_video_from_paintedframes(inpainted_frames, output_path="./result/inpaint/{}".format(video_state["video_name"]), fps=fps) except: operation_log = [("Error! You are trying to inpaint without masks input. Please track the selected mask first, and then press inpaint. If VRAM exceeded, please use the resize ratio to scaling down the image size.","Error"), ("","")] inpainted_frames = video_state["origin_images"] video_output = generate_video_from_frames(inpainted_frames, output_path="./result/inpaint/{}".format(video_state["video_name"]), fps=fps) # import video_input to name the output video return video_output, operation_log # # generate video after vos inference def generate_video_from_frames(frames_path, output_path, fps=30): """ Generates a video from a list of frames. Args: frames (list of numpy arrays): The frames to include in the video. output_path (str): The path to save the generated video. fps (int, optional): The frame rate of the output video. Defaults to 30. """ if os.path.exists(output_path): return output_path frames = [] print("read frames from sequence") for file in tqdm(frames_path): frames.append(read_image_from_userfolder(file)) frames = [ensure_divisible_by_two(image) for image in frames] frames = torch.from_numpy(np.asarray(frames)) if not os.path.exists(os.path.dirname(output_path)): os.makedirs(os.path.dirname(output_path)) print("generate video from frames for preview") torchvision.io.write_video(output_path, frames, fps=fps, video_codec="libx264") return output_path def generate_video_from_paintedframes(frames, output_path, fps=30): """ Generates a video from a list of frames. Args: frames (list of numpy arrays): The frames to include in the video. output_path (str): The path to save the generated video. fps (int, optional): The frame rate of the output video. Defaults to 30. """ frames = torch.from_numpy(np.asarray(frames)) if not os.path.exists(os.path.dirname(output_path)): os.makedirs(os.path.dirname(output_path)) torchvision.io.write_video(output_path, frames, fps=fps, video_codec="libx264") return output_path def get_mask_from_vot(video_state, output_path, fps=30): masks = video_state["masks"] frames = video_state["origin_images"] # video_painted_images = [] height, width = np.asarray(Image.open(video_state["origin_images"][0])).shape[:2] new_size = (width//2, height//2) painted_images = [] print("painting mask") for i in tqdm(range(len(masks))): num_objs = masks[i].max() painted_image = np.asarray(Image.open(frames[i]).convert('RGB')) for obj in range(1, num_objs+1): if np.max(masks[i]==obj) == 0: continue painted_image = mask_painter(painted_image, (masks[i]==obj).astype('uint8'), mask_color=obj+1) painted_images.append(cv2.resize(painted_image, new_size, interpolation=cv2.INTER_AREA)) # video_painted_images.append(save_image_to_userfolder(video_state, index=i, image=cv2.cvtColor(np.asarray(painted_image),cv2.COLOR_BGR2RGB), type=False)) painted_images = [ensure_divisible_by_two(image) for image in painted_images] painted_images = torch.from_numpy(np.asarray(painted_images)) # resize for accelerating video generation # new_size = [painted_images.size(1)//2, painted_images.size(2)//2] # painted_images_resized = F.interpolate(painted_images, size=new_size, mode='bilinear') print("saving result videos") if not os.path.exists(os.path.dirname(output_path)): os.makedirs(os.path.dirname(output_path)) torchvision.io.write_video(output_path, painted_images, fps=fps, video_codec="libx264") # video_state["painted_images"] = video_painted_images return output_path, video_state def ensure_divisible_by_two(image): height, width = image.shape[:2] if width % 2 != 0: width -= 1 if height % 2 != 0: height -= 1 return cv2.resize(image, (width, height)) # args, defined in track_anything.py args = parse_augment() # check and download checkpoints if needed SAM_checkpoint_dict = { 'vit_h': "sam_vit_h_4b8939.pth", 'vit_l': "sam_vit_l_0b3195.pth", "vit_b": "sam_vit_b_01ec64.pth" } SAM_checkpoint_url_dict = { 'vit_h': "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth", 'vit_l': "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth", 'vit_b': "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_b_01ec64.pth" } sam_checkpoint = SAM_checkpoint_dict[args.sam_model_type] sam_checkpoint_url = SAM_checkpoint_url_dict[args.sam_model_type] xmem_checkpoint = "XMem-s012.pth" xmem_checkpoint_url = "https://github.com/hkchengrex/XMem/releases/download/v1.0/XMem-s012.pth" e2fgvi_checkpoint = "E2FGVI-HQ-CVPR22.pth" e2fgvi_checkpoint_id = "10wGdKSUOie0XmCr8SQ2A2FeDe-mfn5w3" folder ="./checkpoints" SAM_checkpoint = download_checkpoint(sam_checkpoint_url, folder, sam_checkpoint) xmem_checkpoint = download_checkpoint(xmem_checkpoint_url, folder, xmem_checkpoint) e2fgvi_checkpoint = download_checkpoint_from_google_drive(e2fgvi_checkpoint_id, folder, e2fgvi_checkpoint) # args.port = 12213 # args.device = "cuda:8" # args.mask_save = True # args.votdir = "/home/dataset/vots2023/" # initialize sam, xmem, e2fgvi models model = TrackingAnything(SAM_checkpoint, xmem_checkpoint, e2fgvi_checkpoint,args) title = """
Gradio demo for Track Anything, a flexible and interactive tool for video object tracking, segmentation, and inpainting. To use it, simply upload your video, or click one of the examples to load them. Code: Track-Anything If you stuck in unknown errors, please feel free to watch the Tutorial video.