mirror of
https://github.com/modelscope/modelscope.git
synced 2025-12-18 01:07:44 +01:00
226 lines
9.2 KiB
Python
226 lines
9.2 KiB
Python
# Copyright (c) Alibaba, Inc. and its affiliates.
|
|
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from modelscope.utils.ast_utils import (FILES_MTIME_KEY, INDEX_KEY, MD5_KEY,
|
|
MODELSCOPE_PATH_KEY, REQUIREMENT_KEY,
|
|
VERSION_KEY, AstScanning,
|
|
FilesAstScanning,
|
|
generate_ast_template,
|
|
load_from_prebuilt, load_index)
|
|
|
|
p = Path(__file__)
|
|
|
|
MODELSCOPE_PATH = p.resolve().parents[2].joinpath('modelscope')
|
|
|
|
|
|
class AstScaningTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
print(('Testing %s.%s' % (type(self).__name__, self._testMethodName)))
|
|
self.tmp_dir = tempfile.TemporaryDirectory().name
|
|
self.tmp_dir2 = tempfile.TemporaryDirectory().name
|
|
self.test_file = os.path.join(self.tmp_dir, 'test.py')
|
|
if not os.path.exists(self.tmp_dir):
|
|
os.makedirs(self.tmp_dir)
|
|
|
|
fnames = ['1.py', '2.py', '3.py', '__init__.py']
|
|
self.folders = ['.', 'a', 'b', 'c']
|
|
dir_path = self.tmp_dir2
|
|
folder_dirs = [
|
|
os.path.join(dir_path, folder) for folder in self.folders
|
|
]
|
|
for folder in folder_dirs:
|
|
os.makedirs(folder, exist_ok=True)
|
|
for fname in fnames:
|
|
fpath = os.path.join(folder, fname)
|
|
with open(fpath, 'w') as f:
|
|
f.write('hello world')
|
|
|
|
for folder in folder_dirs:
|
|
print(f'folder: {os.listdir(folder)}')
|
|
|
|
def tearDown(self):
|
|
super().tearDown()
|
|
shutil.rmtree(self.tmp_dir)
|
|
shutil.rmtree(self.tmp_dir2)
|
|
|
|
def test_ast_scaning_class(self):
|
|
astScaner = AstScanning()
|
|
pipeline_file = os.path.join(MODELSCOPE_PATH, 'pipelines', 'nlp',
|
|
'fill_mask_pipeline.py')
|
|
output = astScaner.generate_ast(pipeline_file)
|
|
self.assertTrue(output['imports'] is not None)
|
|
self.assertTrue(output['from_imports'] is not None)
|
|
self.assertTrue(output['decorators'] is not None)
|
|
imports, from_imports, decorators = output['imports'], output[
|
|
'from_imports'], output['decorators']
|
|
self.assertIsInstance(imports, dict)
|
|
self.assertIsInstance(from_imports, dict)
|
|
self.assertIsInstance(decorators, list)
|
|
self.assertListEqual(list(set(imports.keys()) - set(['numpy'])), [])
|
|
self.assertEqual(len(from_imports.keys()), 8)
|
|
self.assertTrue(from_imports['modelscope.metainfo'] is not None)
|
|
self.assertEqual(from_imports['modelscope.metainfo'], ['Pipelines'])
|
|
self.assertEqual(decorators,
|
|
[('PIPELINES', 'fill-mask', 'fill-mask'),
|
|
('PIPELINES', 'fill-mask', 'fill-mask-ponet')])
|
|
|
|
def test_files_scaning_method(self):
|
|
fileScaner = FilesAstScanning()
|
|
# case of pass in files directly
|
|
pipeline_file = os.path.join(MODELSCOPE_PATH, 'pipelines', 'nlp',
|
|
'fill_mask_pipeline.py')
|
|
file_list = [pipeline_file]
|
|
output = fileScaner.get_files_scan_results(file_list)
|
|
self.assertTrue(output[INDEX_KEY] is not None)
|
|
self.assertTrue(output[REQUIREMENT_KEY] is not None)
|
|
index, requirements = output[INDEX_KEY], output[REQUIREMENT_KEY]
|
|
self.assertIsInstance(index, dict)
|
|
self.assertIsInstance(requirements, dict)
|
|
self.assertIsInstance(list(index.keys())[0], tuple)
|
|
index_0 = list(index.keys())[0]
|
|
self.assertIsInstance(index[index_0], dict)
|
|
self.assertTrue(index[index_0]['imports'] is not None)
|
|
self.assertIsInstance(index[index_0]['imports'], list)
|
|
self.assertTrue(index[index_0]['module'] is not None)
|
|
self.assertIsInstance(index[index_0]['module'], str)
|
|
index_0 = list(requirements.keys())[0]
|
|
self.assertIsInstance(requirements[index_0], list)
|
|
|
|
fileScaner.traversal_files(self.tmp_dir2, include_init=False)
|
|
self.assertTrue(
|
|
os.path.join(self.tmp_dir2, '__init__.py') not in
|
|
fileScaner.file_dirs)
|
|
|
|
fileScaner.traversal_files(self.tmp_dir2, include_init=True)
|
|
self.assertTrue(
|
|
os.path.join(self.tmp_dir2, '__init__.py') in fileScaner.file_dirs)
|
|
|
|
def test_file_mtime_md5_method(self):
|
|
fileScaner = FilesAstScanning()
|
|
# create first file
|
|
with open(self.test_file, 'w', encoding='utf-8') as f:
|
|
f.write('This is the new test!')
|
|
|
|
md5_1, mtime_1 = fileScaner.files_mtime_md5(self.tmp_dir, [])
|
|
md5_2, mtime_2 = fileScaner.files_mtime_md5(self.tmp_dir, [])
|
|
self.assertEqual(md5_1, md5_2)
|
|
self.assertEqual(mtime_1, mtime_2)
|
|
self.assertIsInstance(mtime_1, dict)
|
|
self.assertEqual(list(mtime_1.keys()), [self.test_file])
|
|
self.assertEqual(mtime_1[self.test_file], mtime_2[self.test_file])
|
|
|
|
time.sleep(2)
|
|
# case of revise
|
|
with open(self.test_file, 'w', encoding='utf-8') as f:
|
|
f.write('test again')
|
|
md5_3, mtime_3 = fileScaner.files_mtime_md5(self.tmp_dir, [])
|
|
self.assertNotEqual(md5_1, md5_3)
|
|
self.assertNotEqual(mtime_1[self.test_file], mtime_3[self.test_file])
|
|
|
|
# case of create
|
|
self.test_file_new = os.path.join(self.tmp_dir, 'test_1.py')
|
|
time.sleep(2)
|
|
with open(self.test_file_new, 'w', encoding='utf-8') as f:
|
|
f.write('test again')
|
|
md5_4, mtime_4 = fileScaner.files_mtime_md5(self.tmp_dir, [])
|
|
self.assertNotEqual(md5_1, md5_4)
|
|
self.assertNotEqual(md5_3, md5_4)
|
|
self.assertEqual(
|
|
set(mtime_4.keys()) - set([self.test_file, self.test_file_new]),
|
|
set())
|
|
|
|
def test_load_index_method(self):
|
|
# test full indexing case
|
|
output = load_index()
|
|
self.assertTrue(output[INDEX_KEY] is not None)
|
|
self.assertTrue(output[REQUIREMENT_KEY] is not None)
|
|
index, requirements = output[INDEX_KEY], output[REQUIREMENT_KEY]
|
|
self.assertIsInstance(index, dict)
|
|
self.assertIsInstance(requirements, dict)
|
|
self.assertIsInstance(list(index.keys())[0], tuple)
|
|
index_0 = list(index.keys())[0]
|
|
self.assertIsInstance(index[index_0], dict)
|
|
self.assertTrue(index[index_0]['imports'] is not None)
|
|
self.assertIsInstance(index[index_0]['imports'], list)
|
|
self.assertTrue(index[index_0]['module'] is not None)
|
|
self.assertIsInstance(index[index_0]['module'], str)
|
|
index_0 = list(requirements.keys())[0]
|
|
self.assertIsInstance(requirements[index_0], list)
|
|
self.assertIsInstance(output[MD5_KEY], str)
|
|
self.assertIsInstance(output[MODELSCOPE_PATH_KEY], str)
|
|
self.assertIsInstance(output[VERSION_KEY], str)
|
|
self.assertIsInstance(output[FILES_MTIME_KEY], dict)
|
|
|
|
# generate ast_template
|
|
file_path = os.path.join(self.tmp_dir, 'index_file.py')
|
|
index = generate_ast_template(file_path=file_path, force_rebuild=False)
|
|
self.assertTrue(os.path.exists(file_path))
|
|
self.assertEqual(output, index)
|
|
index_from_prebuilt = load_from_prebuilt(file_path)
|
|
self.assertEqual(index, index_from_prebuilt)
|
|
|
|
def test_update_load_index_method(self):
|
|
file_number = 20
|
|
file_list = []
|
|
for i in range(file_number):
|
|
filename = os.path.join(self.tmp_dir, f'test_{i}.py')
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
f.write('import os')
|
|
file_list.append(filename)
|
|
|
|
index_file = 'ast_indexer_1'
|
|
|
|
index = load_index(
|
|
file_list=file_list,
|
|
indexer_file_dir=self.tmp_dir,
|
|
indexer_file=index_file)
|
|
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
|
|
|
|
# no changing case, time should be less than original
|
|
index = load_index(
|
|
file_list=file_list,
|
|
indexer_file_dir=self.tmp_dir,
|
|
indexer_file=index_file)
|
|
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
|
|
|
|
# modify case
|
|
with open(file_list[0], 'a', encoding='utf-8') as f:
|
|
f.write('\n\nimport typing')
|
|
file_list.append(filename)
|
|
index = load_index(
|
|
file_list=file_list,
|
|
indexer_file_dir=self.tmp_dir,
|
|
indexer_file=index_file)
|
|
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
|
|
|
|
# adding new file, time should be less than original
|
|
test_file_new_2 = os.path.join(self.tmp_dir, 'test_new.py')
|
|
with open(test_file_new_2, 'w', encoding='utf-8') as f:
|
|
f.write('import os')
|
|
file_list.append(test_file_new_2)
|
|
|
|
index = load_index(
|
|
file_list=file_list,
|
|
indexer_file_dir=self.tmp_dir,
|
|
indexer_file=index_file)
|
|
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number + 1)
|
|
|
|
# deleting one file, time should be less than original
|
|
file_list.pop()
|
|
index = load_index(
|
|
file_list=file_list,
|
|
indexer_file_dir=self.tmp_dir,
|
|
indexer_file=index_file)
|
|
self.assertEqual(len(index[FILES_MTIME_KEY]), file_number)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|