mirror of
https://github.com/asciinema/asciinema.git
synced 2026-05-18 05:04:50 +02:00
Move incremental utf-8 decoding from async_writer to writer
This commit is contained in:
@@ -77,6 +77,8 @@ class writer():
|
||||
self.path = path
|
||||
self.mode = mode
|
||||
self.buffering = buffering
|
||||
self.stdin_decoder = codecs.getincrementaldecoder('UTF-8')('replace')
|
||||
self.stdout_decoder = codecs.getincrementaldecoder('UTF-8')('replace')
|
||||
|
||||
if mode == 'w':
|
||||
self.header = {'version': 2, 'width': width, 'height': height}
|
||||
@@ -97,11 +99,22 @@ class writer():
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
self.file.close()
|
||||
|
||||
def write_event(self, ts, type=None, data=None):
|
||||
if type is None:
|
||||
self.__write_line(ts)
|
||||
def write_event(self, ts, etype=None, data=None):
|
||||
if etype is None:
|
||||
ts, etype, data = ts
|
||||
|
||||
if etype == 'o':
|
||||
if type(data) == str:
|
||||
data = data.encode(encoding='utf-8', errors='strict')
|
||||
text = self.stdout_decoder.decode(data)
|
||||
self.__write_line([ts, etype, text])
|
||||
elif etype == 'i':
|
||||
if type(data) == str:
|
||||
data = data.encode(encoding='utf-8', errors='strict')
|
||||
text = self.stdin_decoder.decode(data)
|
||||
self.__write_line([ts, etype, text])
|
||||
else:
|
||||
self.__write_line([ts, type, data])
|
||||
self.__write_line([ts, etype, data])
|
||||
|
||||
def write_stdout(self, ts, data):
|
||||
self.write_event(ts, 'o', data)
|
||||
@@ -128,8 +141,6 @@ class async_writer():
|
||||
self.rec_stdin = rec_stdin
|
||||
self.start_time_offset = start_time_offset
|
||||
self.queue = Queue()
|
||||
self.stdin_decoder = codecs.getincrementaldecoder('UTF-8')('replace')
|
||||
self.stdout_decoder = codecs.getincrementaldecoder('UTF-8')('replace')
|
||||
|
||||
def __enter__(self):
|
||||
mode = 'a' if self.start_time_offset > 0 else 'w'
|
||||
@@ -147,18 +158,12 @@ class async_writer():
|
||||
|
||||
def write_stdin(self, data):
|
||||
if self.rec_stdin:
|
||||
text = self.stdin_decoder.decode(data)
|
||||
|
||||
if text:
|
||||
ts = round(time.time() - self.start_time, 6)
|
||||
self.queue.put([ts, 'i', text])
|
||||
ts = round(time.time() - self.start_time, 6)
|
||||
self.queue.put([ts, 'i', data])
|
||||
|
||||
def write_stdout(self, data):
|
||||
text = self.stdout_decoder.decode(data)
|
||||
|
||||
if text:
|
||||
ts = round(time.time() - self.start_time, 6)
|
||||
self.queue.put([ts, 'o', text])
|
||||
ts = round(time.time() - self.start_time, 6)
|
||||
self.queue.put([ts, 'o', data])
|
||||
|
||||
|
||||
class Recorder:
|
||||
|
||||
0
tests/asciicast/__init__.py
Normal file
0
tests/asciicast/__init__.py
Normal file
23
tests/asciicast/v2_test.py
Normal file
23
tests/asciicast/v2_test.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from ..test_helper import Test
|
||||
import asciinema.asciicast.v2 as v2
|
||||
import tempfile
|
||||
|
||||
|
||||
class TestWriter(Test):
|
||||
|
||||
def test_writing(self):
|
||||
_file, path = tempfile.mkstemp()
|
||||
|
||||
with v2.writer(path, width=80, height=24) as w:
|
||||
w.write_stdout(1, 'x') # ensure it supports both str and bytes
|
||||
w.write_stdout(2, bytes.fromhex('78 c5 bc c3 b3 c5'))
|
||||
w.write_stdout(3, bytes.fromhex('82 c4 87'))
|
||||
w.write_stdout(4, bytes.fromhex('78 78'))
|
||||
|
||||
with open(path, 'r') as f:
|
||||
text = f.read()
|
||||
assert text == '{"version": 2, "width": 80, "height": 24}\n' + \
|
||||
'[1, "o", "x"]\n' + \
|
||||
'[2, "o", "xżó"]\n' + \
|
||||
'[3, "o", "łć"]\n' + \
|
||||
'[4, "o", "xx"]\n', 'got:\n\n%s' % text
|
||||
Reference in New Issue
Block a user