Files
modelscope/tests/utils/test_ast.py

226 lines
9.2 KiB
Python
Raw Normal View History

# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import shutil
import tempfile
import time
import unittest
from pathlib import Path
from modelscope.utils.ast_utils import (FILES_MTIME_KEY, INDEX_KEY, MD5_KEY,
MODELSCOPE_PATH_KEY, REQUIREMENT_KEY,
VERSION_KEY, AstScanning,
FilesAstScanning,
generate_ast_template,
2023-01-11 10:43:56 +08:00
load_from_prebuilt, load_index)
p = Path(__file__)
MODELSCOPE_PATH = p.resolve().parents[2].joinpath('modelscope')
class AstScaningTest(unittest.TestCase):
def setUp(self):
print(('Testing %s.%s' % (type(self).__name__, self._testMethodName)))
self.tmp_dir = tempfile.TemporaryDirectory().name
self.tmp_dir2 = tempfile.TemporaryDirectory().name
self.test_file = os.path.join(self.tmp_dir, 'test.py')
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
fnames = ['1.py', '2.py', '3.py', '__init__.py']
self.folders = ['.', 'a', 'b', 'c']
dir_path = self.tmp_dir2
folder_dirs = [
os.path.join(dir_path, folder) for folder in self.folders
]
for folder in folder_dirs:
os.makedirs(folder, exist_ok=True)
for fname in fnames:
fpath = os.path.join(folder, fname)
with open(fpath, 'w') as f:
f.write('hello world')
for folder in folder_dirs:
print(f'folder: {os.listdir(folder)}')
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmp_dir)
shutil.rmtree(self.tmp_dir2)
def test_ast_scaning_class(self):
astScaner = AstScanning()
pipeline_file = os.path.join(MODELSCOPE_PATH, 'pipelines', 'nlp',
'fill_mask_pipeline.py')
output = astScaner.generate_ast(pipeline_file)
self.assertTrue(output['imports'] is not None)
self.assertTrue(output['from_imports'] is not None)
self.assertTrue(output['decorators'] is not None)
imports, from_imports, decorators = output['imports'], output[
'from_imports'], output['decorators']
self.assertIsInstance(imports, dict)
self.assertIsInstance(from_imports, dict)
self.assertIsInstance(decorators, list)
self.assertListEqual(list(set(imports.keys()) - set(['numpy'])), [])
self.assertEqual(len(from_imports.keys()), 8)
self.assertTrue(from_imports['modelscope.metainfo'] is not None)
self.assertEqual(from_imports['modelscope.metainfo'], ['Pipelines'])
self.assertEqual(decorators,
[('PIPELINES', 'fill-mask', 'fill-mask'),
('PIPELINES', 'fill-mask', 'fill-mask-ponet')])
def test_files_scaning_method(self):
fileScaner = FilesAstScanning()
# case of pass in files directly
pipeline_file = os.path.join(MODELSCOPE_PATH, 'pipelines', 'nlp',
'fill_mask_pipeline.py')
file_list = [pipeline_file]
output = fileScaner.get_files_scan_results(file_list)
self.assertTrue(output[INDEX_KEY] is not None)
self.assertTrue(output[REQUIREMENT_KEY] is not None)
index, requirements = output[INDEX_KEY], output[REQUIREMENT_KEY]
self.assertIsInstance(index, dict)
self.assertIsInstance(requirements, dict)
self.assertIsInstance(list(index.keys())[0], tuple)
index_0 = list(index.keys())[0]
self.assertIsInstance(index[index_0], dict)
self.assertTrue(index[index_0]['imports'] is not None)
self.assertIsInstance(index[index_0]['imports'], list)
self.assertTrue(index[index_0]['module'] is not None)
self.assertIsInstance(index[index_0]['module'], str)
index_0 = list(requirements.keys())[0]
self.assertIsInstance(requirements[index_0], list)
fileScaner.traversal_files(self.tmp_dir2, include_init=False)
self.assertTrue(
os.path.join(self.tmp_dir2, '__init__.py') not in
fileScaner.file_dirs)
fileScaner.traversal_files(self.tmp_dir2, include_init=True)
self.assertTrue(
os.path.join(self.tmp_dir2, '__init__.py') in fileScaner.file_dirs)
def test_file_mtime_md5_method(self):
fileScaner = FilesAstScanning()
# create first file
with open(self.test_file, 'w', encoding='utf-8') as f:
f.write('This is the new test!')
md5_1, mtime_1 = fileScaner.files_mtime_md5(self.tmp_dir, [])
md5_2, mtime_2 = fileScaner.files_mtime_md5(self.tmp_dir, [])
self.assertEqual(md5_1, md5_2)
self.assertEqual(mtime_1, mtime_2)
self.assertIsInstance(mtime_1, dict)
self.assertEqual(list(mtime_1.keys()), [self.test_file])
self.assertEqual(mtime_1[self.test_file], mtime_2[self.test_file])
time.sleep(2)
# case of revise
with open(self.test_file, 'w', encoding='utf-8') as f:
f.write('test again')
md5_3, mtime_3 = fileScaner.files_mtime_md5(self.tmp_dir, [])
self.assertNotEqual(md5_1, md5_3)
self.assertNotEqual(mtime_1[self.test_file], mtime_3[self.test_file])
# case of create
self.test_file_new = os.path.join(self.tmp_dir, 'test_1.py')
time.sleep(2)
with open(self.test_file_new, 'w', encoding='utf-8') as f:
f.write('test again')
md5_4, mtime_4 = fileScaner.files_mtime_md5(self.tmp_dir, [])
self.assertNotEqual(md5_1, md5_4)
self.assertNotEqual(md5_3, md5_4)
self.assertEqual(
set(mtime_4.keys()) - set([self.test_file, self.test_file_new]),
set())
def test_load_index_method(self):
# test full indexing case
output = load_index()
self.assertTrue(output[INDEX_KEY] is not None)
self.assertTrue(output[REQUIREMENT_KEY] is not None)
index, requirements = output[INDEX_KEY], output[REQUIREMENT_KEY]
self.assertIsInstance(index, dict)
self.assertIsInstance(requirements, dict)
self.assertIsInstance(list(index.keys())[0], tuple)
index_0 = list(index.keys())[0]
self.assertIsInstance(index[index_0], dict)
self.assertTrue(index[index_0]['imports'] is not None)
self.assertIsInstance(index[index_0]['imports'], list)
self.assertTrue(index[index_0]['module'] is not None)
self.assertIsInstance(index[index_0]['module'], str)
index_0 = list(requirements.keys())[0]
self.assertIsInstance(requirements[index_0], list)
self.assertIsInstance(output[MD5_KEY], str)
self.assertIsInstance(output[MODELSCOPE_PATH_KEY], str)
self.assertIsInstance(output[VERSION_KEY], str)
self.assertIsInstance(output[FILES_MTIME_KEY], dict)
2023-01-11 10:43:56 +08:00
# generate ast_template
file_path = os.path.join(self.tmp_dir, 'index_file.py')
index = generate_ast_template(file_path=file_path, force_rebuild=False)
self.assertTrue(os.path.exists(file_path))
self.assertEqual(output, index)
index_from_prebuilt = load_from_prebuilt(file_path)
self.assertEqual(index, index_from_prebuilt)
def test_update_load_index_method(self):
file_number = 20
file_list = []
for i in range(file_number):
filename = os.path.join(self.tmp_dir, f'test_{i}.py')
with open(filename, 'w', encoding='utf-8') as f:
f.write('import os')
file_list.append(filename)
index_file = 'ast_indexer_1'
index = load_index(
file_list=file_list,
indexer_file_dir=self.tmp_dir,
indexer_file=index_file)
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
# no changing case, time should be less than original
index = load_index(
file_list=file_list,
indexer_file_dir=self.tmp_dir,
indexer_file=index_file)
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
# modify case
with open(file_list[0], 'a', encoding='utf-8') as f:
f.write('\n\nimport typing')
file_list.append(filename)
index = load_index(
file_list=file_list,
indexer_file_dir=self.tmp_dir,
indexer_file=index_file)
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
# adding new file, time should be less than original
test_file_new_2 = os.path.join(self.tmp_dir, 'test_new.py')
with open(test_file_new_2, 'w', encoding='utf-8') as f:
f.write('import os')
file_list.append(test_file_new_2)
index = load_index(
file_list=file_list,
indexer_file_dir=self.tmp_dir,
indexer_file=index_file)
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number + 1)
# deleting one file, time should be less than original
file_list.pop()
index = load_index(
file_list=file_list,
indexer_file_dir=self.tmp_dir,
indexer_file=index_file)
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
if __name__ == '__main__':
unittest.main()