diff --git a/modelscope/metrics/builder.py b/modelscope/metrics/builder.py index 7115d471..2bd64613 100644 --- a/modelscope/metrics/builder.py +++ b/modelscope/metrics/builder.py @@ -41,6 +41,7 @@ class MetricKeys(object): MRR = 'mrr' NDCG = 'ndcg' AR = 'AR' + Colorfulness = 'colorfulness' task_default_metrics = { diff --git a/modelscope/metrics/image_colorization_metric.py b/modelscope/metrics/image_colorization_metric.py index bbaf4127..fe777928 100644 --- a/modelscope/metrics/image_colorization_metric.py +++ b/modelscope/metrics/image_colorization_metric.py @@ -1,10 +1,13 @@ # Copyright (c) Alibaba, Inc. and its affiliates. from typing import Dict +import cv2 import numpy as np import torch +import torch.nn as nn import torch.nn.functional as F from scipy import linalg +from torchvision import models from modelscope.metainfo import Metrics from modelscope.models.cv.image_inpainting.modules.inception import InceptionV3 @@ -26,9 +29,7 @@ class ImageColorizationMetric(Metric): def __init__(self): self.preds = [] self.targets = [] - - device = 'cuda' if torch.cuda.is_available() else 'cpu' - self.FID = FIDScore().to(device) + self.device = 'cuda' if torch.cuda.is_available() else 'cpu' def add(self, outputs: Dict, inputs: Dict): ground_truths = outputs['preds'] @@ -38,11 +39,19 @@ class ImageColorizationMetric(Metric): def evaluate(self): psnr_list = [] + cf_list = [] + + fid = calculate_fid(self.preds, self.targets, device=self.device) for (pred, target) in zip(self.preds, self.targets): - self.FID(pred, target) + # shape of pred: [8, 3, 256, 256] + cf_list.append(calculate_colorfulness(pred)) psnr_list.append(calculate_psnr(target[0], pred[0], crop_border=0)) - fid = self.FID.get_value() - return {MetricKeys.PSNR: np.mean(psnr_list), MetricKeys.FID: fid} + + return { + MetricKeys.PSNR: np.mean(psnr_list), + MetricKeys.FID: fid, + MetricKeys.Colorfulness: np.mean(cf_list) + } def merge(self, other: 'ImageColorizationMetric'): self.preds.extend(other.preds) @@ -54,3 +63,309 @@ class ImageColorizationMetric(Metric): def __setstate__(self, state): self.__init__() self.preds, self.targets = state + + +def image_colorfulness(image): + image = image * 255.0 + (R, G, B) = (image[0], image[1], image[2]) + rg = np.absolute(R - G) + yb = np.absolute(0.5 * (R + G) - B) + (rbMean, rbStd) = (np.mean(rg), np.std(rg)) + (ybMean, ybStd) = (np.mean(yb), np.std(yb)) + stdRoot = np.sqrt((rbStd**2) + (ybStd**2)) + meanRoot = np.sqrt((rbMean**2) + (ybMean**2)) + return stdRoot + (0.3 * meanRoot) + + +def calculate_colorfulness(pred): + total_colorfulness = 0 + for img_tensor in pred: + img_np = img_tensor.cpu().numpy() + + C = image_colorfulness(img_np) + total_colorfulness += C + + colorfulness = total_colorfulness / len(pred) + return colorfulness + + +class INCEPTION_V3_FID(nn.Module): + """pretrained InceptionV3 network returning feature maps""" + # Index of default block of inception to return, + # corresponds to output of final average pooling + DEFAULT_BLOCK_INDEX = 3 + + # Maps feature dimensionality to their output blocks indices + BLOCK_INDEX_BY_DIM = { + 64: 0, # First max pooling features + 192: 1, # Second max pooling featurs + 768: 2, # Pre-aux classifier features + 2048: 3 # Final average pooling features + } + + def __init__(self, + incep_state_dict, + output_blocks=[DEFAULT_BLOCK_INDEX], + resize_input=True): + """Build pretrained InceptionV3 + Args: + output_blocks (list of int): + Indices of blocks to return features of. Possible values are: + - 0: corresponds to output of first max pooling + - 1: corresponds to output of second max pooling + - 2: corresponds to output which is fed to aux classifier + - 3: corresponds to output of final average pooling + resize_input (bool): + If true, bilinearly resizes input to width and height 299 before + feeding input to model. As the network without fully connected + layers is fully convolutional, it should be able to handle inputs + of arbitrary size, so resizing might not be strictly needed + normalize_input (bool): + If true, normalizes the input to the statistics the pretrained + Inception network expects + """ + super(INCEPTION_V3_FID, self).__init__() + + self.resize_input = resize_input + self.output_blocks = sorted(output_blocks) + self.last_needed_block = max(output_blocks) + + assert self.last_needed_block <= 3, \ + 'Last possible output block index is 3' + + self.blocks = nn.ModuleList() + + inception = models.inception_v3() + inception.load_state_dict(incep_state_dict) + for param in inception.parameters(): + param.requires_grad = False + + # Block 0: input to maxpool1 + block0 = [ + inception.Conv2d_1a_3x3, inception.Conv2d_2a_3x3, + inception.Conv2d_2b_3x3, + nn.MaxPool2d(kernel_size=3, stride=2) + ] + self.blocks.append(nn.Sequential(*block0)) + + # Block 1: maxpool1 to maxpool2 + if self.last_needed_block >= 1: + block1 = [ + inception.Conv2d_3b_1x1, inception.Conv2d_4a_3x3, + nn.MaxPool2d(kernel_size=3, stride=2) + ] + self.blocks.append(nn.Sequential(*block1)) + + # Block 2: maxpool2 to aux classifier + if self.last_needed_block >= 2: + block2 = [ + inception.Mixed_5b, + inception.Mixed_5c, + inception.Mixed_5d, + inception.Mixed_6a, + inception.Mixed_6b, + inception.Mixed_6c, + inception.Mixed_6d, + inception.Mixed_6e, + ] + self.blocks.append(nn.Sequential(*block2)) + + # Block 3: aux classifier to final avgpool + if self.last_needed_block >= 3: + block3 = [ + inception.Mixed_7a, inception.Mixed_7b, inception.Mixed_7c, + nn.AdaptiveAvgPool2d(output_size=(1, 1)) + ] + self.blocks.append(nn.Sequential(*block3)) + + def forward(self, inp): + """Get Inception feature maps + Args: + inp (torch.tensor): + Input tensor of shape Bx3xHxW. Values are expected to be in + range (0, 1) + Returns: + List of torch.tensor corresponding to the selected output + block, sorted ascending by index + """ + outp = [] + x = inp + + if self.resize_input: + x = F.interpolate(x, size=(299, 299), mode='bilinear') + + x = x.clone() + # [-1.0, 1.0] --> [0, 1.0] + x = x * 0.5 + 0.5 + x[:, 0] = x[:, 0] * (0.229 / 0.5) + (0.485 - 0.5) / 0.5 + x[:, 1] = x[:, 1] * (0.224 / 0.5) + (0.456 - 0.5) / 0.5 + x[:, 2] = x[:, 2] * (0.225 / 0.5) + (0.406 - 0.5) / 0.5 + + for idx, block in enumerate(self.blocks): + x = block(x) + if idx in self.output_blocks: + outp.append(x) + + if idx == self.last_needed_block: + break + + return outp + + +def get_activations(images, model, batch_size, verbose=False): + """Calculates the activations of the pool_3 layer for all images. + Args: + images: Numpy array of dimension (n_images, 3, hi, wi). The values + must lie between 0 and 1. + model: Instance of inception model + batch_size: the images numpy array is split into batches with + batch size batch_size. A reasonable batch size depends + on the hardware. + verbose: If set to True and parameter out_step is given, the number + of calculated batches is reported. + Returns: + A numpy array of dimension (num images, dims) that contains the + activations of the given tensor when feeding inception with the + query tensor. + """ + model.eval() + + d0 = int(images.size(0)) + if batch_size > d0: + print(('Warning: batch size is bigger than the data size. ' + 'Setting batch size to data size')) + batch_size = d0 + + n_batches = d0 // batch_size + n_used_imgs = n_batches * batch_size + + pred_arr = np.empty((n_used_imgs, 2048)) + for i in range(n_batches): + if verbose: + print( + '\rPropagating batch %d/%d' % (i + 1, n_batches), + end='', + flush=True) + start = i * batch_size + end = start + batch_size + '''batch = torch.from_numpy(images[start:end]).type(torch.FloatTensor) + batch = Variable(batch, volatile=True) + if cfg.CUDA: + batch = batch.cuda()''' + batch = images[start:end] + + pred = model(batch)[0] + + # If model output is not scalar, apply global spatial average pooling. + # This happens if you choose a dimensionality not equal 2048. + if pred.shape[2] != 1 or pred.shape[3] != 1: + pred = F.adaptive_avg_pool2d(pred, output_size=(1, 1)) + + pred_arr[start:end] = pred.cpu().data.numpy().reshape(batch_size, -1) + + if verbose: + print(' done') + + return pred_arr + + +def calculate_activation_statistics(act): + """Calculation of the statistics used by the FID. + Args: + act: Numpy array of dimension (n_images, dim (e.g. 2048)). + Returns: + mu: The mean over samples of the activations of the pool_3 layer of + the inception model. + sigma: The covariance matrix of the activations of the pool_3 layer of + the inception model. + """ + mu = np.mean(act, axis=0) + sigma = np.cov(act, rowvar=False) + return mu, sigma + + +def calculate_frechet_distance(mu1, sigma1, mu2, sigma2, eps=1e-6): + """Numpy implementation of the Frechet Distance. + The Frechet distance between two multivariate Gaussians X_1 ~ N(mu_1, C_1) + and X_2 ~ N(mu_2, C_2) is + d^2 = ||mu_1 - mu_2||^2 + Tr(C_1 + C_2 - 2*sqrt(C_1*C_2)). + Stable version by Dougal J. Sutherland. + Args: + mu1: Numpy array containing the activations of a layer of the + nception net (like returned by the function 'get_predictions') + or generated samples. + mu2: The sample mean over activations, precalculated on an + representive data set. + sigma1: The covariance matrix over activations for generated samples. + sigma2: The covariance matrix over activations, precalculated on an + epresentive data set. + Returns: + The Frechet Distance. + """ + + mu1 = np.atleast_1d(mu1) + mu2 = np.atleast_1d(mu2) + + sigma1 = np.atleast_2d(sigma1) + sigma2 = np.atleast_2d(sigma2) + + assert mu1.shape == mu2.shape, \ + 'Training and test mean vectors have different lengths' + assert sigma1.shape == sigma2.shape, \ + 'Training and test covariances have different dimensions' + + diff = mu1 - mu2 + + # Product might be almost singular + covmean, _ = linalg.sqrtm(sigma1.dot(sigma2), disp=False) + if not np.isfinite(covmean).all(): + msg = ('fid calculation produces singular product; ' + 'adding %s to diagonal of cov estimates') % eps + print(msg) + offset = np.eye(sigma1.shape[0]) * eps + covmean = linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset)) + + # Numerical error might give slight imaginary component + if np.iscomplexobj(covmean): + if not np.allclose(np.diagonal(covmean).imag, 0, atol=1e-3): + m = np.max(np.abs(covmean.imag)) + raise ValueError('Imaginary component {}'.format(m)) + covmean = covmean.real + + tr_covmean = np.trace(covmean) + + return (diff.dot(diff) + np.trace(sigma1) + np.trace(sigma2) + - 2 * tr_covmean) + + +def calculate_fid(preds, targets, device): + incep_url = 'https://download.pytorch.org/models/inception_v3_google-1a9a5a14.pth' + try: + from torchvision.models.utils import load_state_dict_from_url + except ImportError: + from torch.utils.model_zoo import load_url as load_state_dict_from_url + incep_state_dict = load_state_dict_from_url(incep_url, progress=True) + + block_idx = INCEPTION_V3_FID.BLOCK_INDEX_BY_DIM[2048] + inception_model_fid = INCEPTION_V3_FID(incep_state_dict, [block_idx]) + inception_model_fid.to(device) + inception_model_fid.eval() + + fake_acts_set, acts_set = [], [] + with torch.no_grad(): + for (pred, gt) in zip(preds, targets): + pred, gt = pred.to(device), gt.to(device) + fake_act = get_activations(pred, inception_model_fid, + pred.shape[0]) + real_act = get_activations(gt, inception_model_fid, gt.shape[0]) + fake_acts_set.append(fake_act) + acts_set.append(real_act) + # break + acts_set = np.concatenate(acts_set, 0) + fake_acts_set = np.concatenate(fake_acts_set, 0) + + real_mu, real_sigma = calculate_activation_statistics(acts_set) + fake_mu, fake_sigma = calculate_activation_statistics(fake_acts_set) + fid_score = calculate_frechet_distance(real_mu, real_sigma, fake_mu, + fake_sigma) + return fid_score diff --git a/tests/trainers/test_image_colorization_trainer.py b/tests/trainers/test_image_colorization_trainer.py index 0c736c4b..de2883e0 100644 --- a/tests/trainers/test_image_colorization_trainer.py +++ b/tests/trainers/test_image_colorization_trainer.py @@ -89,6 +89,17 @@ class ImageColorizationTrainerTest(unittest.TestCase): task=Tasks.image_colorization, model=f'{self.tmp_dir}/output') pipeline_colorization('data/test/images/marilyn_monroe_4.jpg') + @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') + def test_evaluation(self): + kwargs = dict( + model=self.model_id, + train_dataset=self.dataset_train, + eval_dataset=self.dataset_val, + work_dir=self.tmp_dir) + trainer = build_trainer(default_args=kwargs) + results = trainer.evaluate() + print(results) + if __name__ == '__main__': unittest.main()