MacPython Logo from __future__ import *

buy music albums Silver Apples buy mp3 albums Tarrus Riley buy tracks mp3 Kravits buy Reaper albums mp3 buy Kravits albums music buy music Evita CD online albums mp3 Silver Apples download Madonna CD music buy tracks music Kravits download music albums Silver Apples

2004-05-18

Monkeypatching readline support into Python’s UTF-16 codecs

Filed under: python — bob @ 9:23 pm

[ utf16reader.py ]

__all__ = []

BUFFER_SIZE = 256

def readline_unsized(self, buff):
    while True:
        lines = buff.splitlines(True)
        if len(lines) > 1:
            return (u''.join(lines[1:]), lines[0])
        chunk = self.read(BUFFER_SIZE)
        if not chunk:
            return (u'', buff)
        else:
            buff += chunk
    
def readline_sized(self, buff, size):
    while True:
        lines = buff.splitlines(True)
        if len(lines) > 1:
            rval = lines.pop(0)
            if len(rval) > size:
                lines.insert(0, rval[size:])
                rval = rval[:size]
            return (u''.join(lines), rval)
        bytesread = len(buff)
        if size > bytesread:
            chunk = self.read(min(BUFFER_SIZE, size - bytesread))
            if not chunk:
                return (u'', buff)
            else:
                buff += chunk
        else:
            return (buff[size:], buff[:size])

def readline(self, size=None):
    buff = self._utf16_readline_buffer
    if size is None:
        buff, rval = readline_unsized(self, buff)
    else:
        buff, rval = readline_sized(self, buff, size)
    self._utf16_readline_buffer = buff
    return rval

def install():
    import encodings.utf_16 as utf_16
    import encodings.utf_16_be as utf_16_be
    import encodings.utf_16_le as utf_16_le
    for mod in (utf_16, utf_16_be, utf_16_le):
        mod.StreamReader.readline = readline
        mod.StreamReader._utf16_readline_buffer = u''

def test():
    from StringIO import StringIO
    import codecs
    from itertools import izip
    STRINGS = [
        u'\u304a\u3084\u3059\u307f\u306a\u3055\u3044n',
        u'Oysasumi nasain',
        u'Goodnight',
    ] * 500
    for codec in ('utf_16', 'utf_16_le', 'utf_16_be'):
        utxt = u''.join(STRINGS)
        txt = u''.join(STRINGS).encode(codec)
        def testreader():
           return codecs.getreader(codec)(StringIO(txt))
        # test readline()
        for new, orig in izip(testreader(), STRINGS):
            assert new == orig, '%r != %r' % (new, orig,)
        # test readlines()
        assert testreader().readlines() == STRINGS
        # test sized readline()
        idx = 0
        rdr = testreader()
        while idx < len(utxt):
            nextline  = rdr.readline(5)
            assert len(nextline) <= 5, 'len(%r) > 5' % (nextline,)
            if nextline.splitlines()[0] != nextline:
                # there was a newline
                nextchunk = utxt[idx:idx+len(nextline)]
                idx += len(nextline)
                assert nextchunk == nextline, '[a] %r != %r' % (nextline, nextchunk)
            else:
                nextchunk = utxt[idx:idx+5]
                idx += 5
                assert nextline == nextchunk, '[b] %r != %r' % (nextline, nextchunk)

if __name__ == '__main__':
    install()
    try:
        test()
    except:
        import sys, pdb, traceback
        tb = sys.exc_info()[2]
        traceback.print_exc()
        pdb.post_mortem(tb)

(revised based on comments from MA Lemburg)

Powered by WordPress