Files
modelscope/tests/pipelines/test_table_question_answering.py

202 lines
8.3 KiB
Python
Raw Permalink Normal View History

# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import unittest
from threading import Thread
from typing import List
import json
from transformers import BertTokenizer
from modelscope.hub.snapshot_download import snapshot_download
from modelscope.models import Model
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.pipelines.nlp import TableQuestionAnsweringPipeline
from modelscope.preprocessors import TableQuestionAnsweringPreprocessor
[to #42322933] NLP 1030 Refactor Features: 1. Refactor the directory structure of nlp models. All model files are placed into either the model folder or the task_model folder 2. Refactor all the comments to google style 3. Add detail comments to important tasks and nlp models, to list the description of the model, and its preprocessor&trainer 4. Model Exporting now supports a direct all to TorchModelExporter(no need to derive from it) 5. Refactor model save_pretrained method to support direct running(independent from trainer) 6. Remove the judgement of Model in the pipeline base class, to support outer register models running in our pipelines 7. Nlp trainer now has a NLPTrainingArguments class , user can pass arguments into the dataclass, and use it as a normal cfg_modify_fn, to simplify the operation of modify cfg. 8. Merge the BACKBONES and the MODELS, so user can get a backbone with the Model.from_pretrained call 9. Model.from_pretrained now support a task argument, so user can use a backbone and load it with a specific task class. 10. Support Preprocessor.from_pretrained method 11. Add standard return classes to important nlp tasks, so some of the pipelines and the models are independent now, the return values of the models will always be tensors, and the pipelines will take care of the conversion to numpy and the following stuffs. 12. Split the file of the nlp preprocessors, to make the dir structure more clear. Bugs Fixing: 1. Fix a bug that lr_scheduler can be called earlier than the optimizer's step 2. Fix a bug that the direct call of Pipelines (not from pipeline(xxx)) throws error 3. Fix a bug that the trainer will not call the correct TaskDataset class 4. Fix a bug that the internal loading of dataset will throws error in the trainer class Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/10490585
2022-10-25 12:26:25 +08:00
from modelscope.preprocessors.nlp.space_T_cn.fields.database import Database
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.utils.test_utils import test_level
def tableqa_tracking_and_print_results_with_history(
pipelines: List[TableQuestionAnsweringPipeline]):
test_case = {
'utterance': [
'有哪些风险类型?',
'风险类型有多少种?',
'珠江流域的小型水库的库容总量是多少?',
'那平均值是多少?',
'那水库的名称呢?',
'换成中型的呢?',
]
}
for p in pipelines:
historical_queries = None
for question in test_case['utterance']:
output_dict = p({
'question': question,
'history_sql': historical_queries
})[OutputKeys.OUTPUT]
print('question', question)
print('sql text:', output_dict[OutputKeys.SQL_STRING])
print('sql query:', output_dict[OutputKeys.SQL_QUERY])
2023-05-22 10:53:18 +08:00
print('query result:', output_dict[OutputKeys.QUERY_RESULT])
print('json dumps', json.dumps(output_dict, ensure_ascii=False))
print()
historical_queries = output_dict[OutputKeys.HISTORY]
def tableqa_tracking_and_print_results_without_history(
pipelines: List[TableQuestionAnsweringPipeline]):
test_case = {
'utterance': [['列出油耗大于8但是功率低于200的名称和价格', 'car'],
['油耗低于5的suv有哪些', 'car'], ['上个月收益超过3的有几个基金', 'fund'],
['净值不等于1的基金平均月收益率和年收益率是多少', 'fund'],
['计算机或者成绩优秀的同学有哪些?学号是多少?', 'student'],
['本部博士生中平均身高是多少?', 'student'],
['长江流域和珠江流域的水库库容总量是多少?', 'reservoir'],
['今天星期几?', 'reservoir']]
}
for p in pipelines:
for question, table_id in test_case['utterance']:
output_dict = p({
'question': question,
'table_id': table_id
})[OutputKeys.OUTPUT]
print('question', question)
print('sql text:', output_dict[OutputKeys.SQL_STRING])
print('sql query:', output_dict[OutputKeys.SQL_QUERY])
2023-05-22 10:53:18 +08:00
print('query result:', output_dict[OutputKeys.QUERY_RESULT])
print('json dumps', json.dumps(output_dict, ensure_ascii=False))
print()
def tableqa_tracking_and_print_results_with_tableid(
pipelines: List[TableQuestionAnsweringPipeline]):
test_case = {
'utterance': [
['有哪些风险类型?', 'fund', False],
['风险类型有多少种?', 'fund', True],
['珠江流域的小型水库的库容总量是多少?', 'reservoir', False],
['那平均值是多少?', 'reservoir', True],
['那水库的名称呢?', 'reservoir', True],
['换成中型的呢?', 'reservoir', True],
['近7年来车辆的销量趋势', 'car_sales', False],
['近7年来车辆的销量月环比是多少呢', 'car_sales', True],
],
}
for p in pipelines:
historical_queries = None
for question, table_id, use_history in test_case['utterance']:
output_dict = p({
'question':
question,
'table_id':
table_id,
'history_sql':
historical_queries if use_history else None
})[OutputKeys.OUTPUT]
print('question', question)
print('sql text:', output_dict[OutputKeys.SQL_STRING])
print('sql query:', output_dict[OutputKeys.SQL_QUERY])
2023-05-22 10:53:18 +08:00
print('query result:', output_dict[OutputKeys.QUERY_RESULT])
print('json dumps', json.dumps(output_dict, ensure_ascii=False))
print()
historical_queries = output_dict[OutputKeys.HISTORY]
class TableQuestionAnswering(unittest.TestCase):
def setUp(self) -> None:
self.task = Tasks.table_question_answering
self.model_id = 'damo/nlp_convai_text2sql_pretrain_cn'
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_by_direct_model_download(self):
cache_path = snapshot_download(self.model_id)
preprocessor = TableQuestionAnsweringPreprocessor(model_dir=cache_path)
pipelines = [
pipeline(
Tasks.table_question_answering,
model=cache_path,
preprocessor=preprocessor)
]
tableqa_tracking_and_print_results_with_history(pipelines)
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_by_direct_model_download_with_multithreads(self):
cache_path = snapshot_download(self.model_id)
pl = pipeline(Tasks.table_question_answering, model=cache_path)
def print_func(pl, i):
result = pl({
'question': '上个月收益从低到高排前七的基金的名称和风险等级是什么',
'table_id': 'fund',
'history_sql': None
})
print(i, result[OutputKeys.OUTPUT][OutputKeys.SQL_QUERY],
2023-05-22 10:53:18 +08:00
result[OutputKeys.OUTPUT][OutputKeys.QUERY_RESULT],
json.dumps(result))
procs = []
for i in range(5):
proc = Thread(target=print_func, args=(pl, i))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_run_with_model_from_modelhub(self):
model = Model.from_pretrained(self.model_id)
self.tokenizer = BertTokenizer(
os.path.join(model.model_dir, ModelFile.VOCAB_FILE))
db = Database(
tokenizer=self.tokenizer,
table_file_path=[
os.path.join(model.model_dir, 'databases', fname)
for fname in os.listdir(
os.path.join(model.model_dir, 'databases'))
],
syn_dict_file_path=os.path.join(model.model_dir, 'synonym.txt'),
is_use_sqlite=True)
preprocessor = TableQuestionAnsweringPreprocessor(
model_dir=model.model_dir, db=db)
pipelines = [
pipeline(
Tasks.table_question_answering,
model=model,
preprocessor=preprocessor,
db=db)
]
tableqa_tracking_and_print_results_with_tableid(pipelines)
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_with_model_from_modelhub_with_other_classes(self):
model = Model.from_pretrained(self.model_id)
self.tokenizer = BertTokenizer(
os.path.join(model.model_dir, ModelFile.VOCAB_FILE))
db = Database(
tokenizer=self.tokenizer,
table_file_path=[
os.path.join(model.model_dir, 'databases', fname)
for fname in os.listdir(
os.path.join(model.model_dir, 'databases'))
],
syn_dict_file_path=os.path.join(model.model_dir, 'synonym.txt'),
is_use_sqlite=True)
preprocessor = TableQuestionAnsweringPreprocessor(
model_dir=model.model_dir, db=db)
pipelines = [
pipeline(
Tasks.table_question_answering,
model=model,
preprocessor=preprocessor,
db=db)
]
tableqa_tracking_and_print_results_without_history(pipelines)
if __name__ == '__main__':
unittest.main()