[Fix] Fix preview csv stream loading (#1665)

This commit is contained in:
Xingjun.Wang
2026-04-14 14:17:23 +08:00
committed by GitHub
parent 25de84cfc0
commit fc0e2159a3
3 changed files with 32 additions and 25 deletions

View File

@@ -140,7 +140,10 @@ class CsvDatasetBuilder(csv.Csv):
def _generate_tables(self, files, base_dir):
# Raise csv field size limit to avoid errors with large cells
if self.csv_engine == 'python':
csv_module.field_size_limit(sys.maxsize)
try:
csv_module.field_size_limit(sys.maxsize)
except OverflowError:
csv_module.field_size_limit(2147483647)
schema = pa.schema(self.config.features.type
) if self.config.features is not None else None
@@ -150,11 +153,12 @@ class CsvDatasetBuilder(csv.Csv):
} if schema else None
for file_idx, file in enumerate(files):
pd_kwargs = dict(
iterator=True, dtype=dtype, delimiter=self.csv_delimiter)
iterator=True,
dtype=dtype,
delimiter=self.csv_delimiter,
chunksize=self.csv_chunksize or 10000)
if self.csv_engine is not None:
pd_kwargs['engine'] = self.csv_engine
if self.csv_chunksize is not None:
pd_kwargs['chunksize'] = self.csv_chunksize
csv_file_reader = pd.read_csv(file, **pd_kwargs)
transform_fields = []
for field_name in csv_file_reader._engine.names:

View File

@@ -249,7 +249,10 @@ class MsDataset:
if config_kwargs.get('engine') == 'python':
import csv as csv_module
import sys
csv_module.field_size_limit(sys.maxsize)
try:
csv_module.field_size_limit(sys.maxsize)
except OverflowError:
csv_module.field_size_limit(2147483647)
# Init context config
dataset_context_config = DatasetContextConfig(

View File

@@ -80,7 +80,7 @@ class TestCommitScheduler(unittest.TestCase):
delete_credential()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_invalid_folder_path_nonexistent(self) -> None:
"""Test that CommitScheduler raises error for non-existent folder."""
nonexistent_path = self.cache_dir / 'nonexistent'
@@ -95,7 +95,7 @@ class TestCommitScheduler(unittest.TestCase):
token=TEST_ACCESS_TOKEN1)
self.assertIn('does not exist', str(cm.exception))
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_invalid_interval(self) -> None:
"""Test that CommitScheduler raises error for invalid interval values."""
# Test zero interval
@@ -120,7 +120,7 @@ class TestCommitScheduler(unittest.TestCase):
token=TEST_ACCESS_TOKEN1)
self.assertIn('positive', str(cm.exception))
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_initialization_with_defaults(self) -> None:
"""Test CommitScheduler initialization with default parameters."""
with patch.object(HubApi, 'create_repo') as mock_create:
@@ -146,7 +146,7 @@ class TestCommitScheduler(unittest.TestCase):
# Verify create_repo was called
mock_create.assert_called_once()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_initialization_with_custom_parameters(self) -> None:
"""Test CommitScheduler initialization with custom parameters."""
custom_interval = 10
@@ -188,7 +188,7 @@ class TestCommitScheduler(unittest.TestCase):
call_args = mock_create.call_args
self.assertEqual(call_args.kwargs.get('visibility'), 'public')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
@patch.object(CommitScheduler, 'commit_scheduled_changes')
def test_mocked_scheduler_execution(self, mock_push: MagicMock) -> None:
"""Test scheduler with mocked commit_scheduled_changes method."""
@@ -221,7 +221,7 @@ class TestCommitScheduler(unittest.TestCase):
self.assertEqual(result.oid, 'test123')
self.assertEqual(result.commit_message, 'Test commit')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_scheduler_stop(self) -> None:
"""Test stopping the scheduler."""
with patch.object(HubApi, 'create_repo'):
@@ -242,7 +242,7 @@ class TestCommitScheduler(unittest.TestCase):
# Scheduler should be stopped
self.assertTrue(self.scheduler._CommitScheduler__stopped)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_context_manager(self) -> None:
"""Test CommitScheduler as context manager."""
file_path = self.watched_folder / 'test_file.txt'
@@ -275,7 +275,7 @@ class TestCommitScheduler(unittest.TestCase):
# Should have triggered commit_scheduled_changes on exit
mock_push.assert_called()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_trigger_manual_commit(self) -> None:
"""Test manually triggering a commit."""
with patch.object(HubApi, 'create_repo'), \
@@ -302,7 +302,7 @@ class TestCommitScheduler(unittest.TestCase):
self.assertEqual(result.commit_message, 'Manual commit')
mock_push.assert_called()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_stopped_scheduler_no_push(self) -> None:
"""Test that stopped scheduler doesn't perform push operations."""
with patch.object(HubApi, 'create_repo'):
@@ -325,7 +325,7 @@ class TestCommitScheduler(unittest.TestCase):
# Result should be None for stopped scheduler
self.assertIsNone(result)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_sync_local_folder_to_hub(self) -> None:
"""Test sync local folder to remote repo."""
hub_cache = self.cache_dir / 'hub'
@@ -409,7 +409,7 @@ class TestPartialFileIO(unittest.TestCase):
if self.cache_dir.exists():
shutil.rmtree(self.cache_dir, ignore_errors=True)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_partial_file_read_with_limit(self) -> None:
"""Test reading partial file with size limit."""
partial_file = PartialFileIO(self.test_file, size_limit=5)
@@ -424,7 +424,7 @@ class TestPartialFileIO(unittest.TestCase):
partial_file.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_partial_file_read_by_chunks(self) -> None:
"""Test reading partial file in chunks."""
partial_file = PartialFileIO(self.test_file, size_limit=8)
@@ -444,7 +444,7 @@ class TestPartialFileIO(unittest.TestCase):
partial_file.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_partial_file_read_oversized_chunk(self) -> None:
"""Test reading more than size limit in one go."""
partial_file = PartialFileIO(self.test_file, size_limit=5)
@@ -455,7 +455,7 @@ class TestPartialFileIO(unittest.TestCase):
partial_file.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_partial_file_len(self) -> None:
"""Test __len__ method returns correct size limit."""
partial_file = PartialFileIO(self.test_file, size_limit=7)
@@ -467,7 +467,7 @@ class TestPartialFileIO(unittest.TestCase):
self.assertEqual(len(large_partial), 15) # Actual file size
large_partial.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_partial_file_seek_and_tell(self) -> None:
"""Test seek and tell operations."""
partial_file = PartialFileIO(self.test_file, size_limit=10)
@@ -497,7 +497,7 @@ class TestPartialFileIO(unittest.TestCase):
partial_file.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_partial_file_not_implemented_methods(self) -> None:
"""Test that unsupported methods raise NotImplementedError."""
partial_file = PartialFileIO(self.test_file, size_limit=5)
@@ -511,7 +511,7 @@ class TestPartialFileIO(unittest.TestCase):
partial_file.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_partial_file_repr(self) -> None:
"""Test string representation of PartialFileIO."""
partial_file = PartialFileIO(self.test_file, size_limit=5)
@@ -523,7 +523,7 @@ class TestPartialFileIO(unittest.TestCase):
partial_file.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_file_modification_after_creation(self) -> None:
"""Test that file modifications after PartialFileIO creation don't affect size limit."""
partial_file = PartialFileIO(self.test_file, size_limit=20)
@@ -544,7 +544,7 @@ class TestPartialFileIO(unittest.TestCase):
partial_file.close()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_high_size_limit(self) -> None:
"""Test size limit larger than file size."""
file = PartialFileIO(self.test_file, size_limit=20)
@@ -555,7 +555,7 @@ class TestPartialFileIO(unittest.TestCase):
self.assertEqual(len(file), 15)
self.assertEqual(file._size_limit, 15)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_with_commit_operation_add(self) -> None:
"""Test with CommitOperationAdd."""
op_truncated = CommitOperationAdd(