I'd like to implement a custom text codec which works like this: to encode a string, first encode the string to bytes (using UTF-8), and then transcode the byte sequence to a new byte sequence by piping through a subprocess. Decoding will take a byte sequence, transcode to a new byte sequence, and decode to a string using UTF-8. The intended use case for this is to make it easier to apply tools like pandas and dask when the source data is file-level encrypted. Many of the tools in this ecosystem work transparently with anything that's registered in the codecs module, but it can be more challenging to use an arbitrary filter externally. For example, if you need to read a single encrypted file into a Pandas data frame, it's straightforward enough to open the file in binary mode, pipe that into a subprocess, and pass the subprocess's output file handle to pandas. But one of the great features of dask is accepting a glob pattern and stitching together all matching files into a large out-of-core data frame, and this won't work out of the box if each file is encrypted. It would work if it were possible to pass an encoding='gpg' argument.
I'm trying to address this by writing a custom text codec that essentially pipes the file through GPG and decodes to strings, but I'm having a difficult time finding a way to correctly implement the three interfaces that the codec module requires. At the bottom of this post is an implementation of the custom codec (using tr for simplicity). The questions I have about this are
- Why does
open(..., encoding='subprocess')use theIncrementalEncoderandIncrementalDecoderinterfaces, instead of theStreamWriterandStreamReaderinterfaces? - Why does
open(..., encoding='subprocess')never callIncrementalEncoder.encodewithfinalset toTrue? - What is a reasonable approach for getting this to work?
Although there are other ways to work around the specific use cases that I have, I'm still interested in any comments or suggestions about this problem to learn more about Python.
Here is a simple implementation of a codec that uses a subprocess to transcode bytes. Use examples follow.
import codecs
import subprocess as sp
TEXT_ENCODING = 'utf-8'
PROCESS = ['tr', 'a-zA-Z', 'A-Za-z']
def encode_bytes(byts):
proc = sp.run(
PROCESS,
input=byts,
stdout=sp.PIPE,
check=True
)
return proc.stdout
def decode_bytes(byts):
return encode_bytes(byts)
class Codec(codecs.Codec):
def encode(chars, errors='strict'):
byts = chars.encode(TEXT_ENCODING)
return (encode_bytes(byts), len(chars))
def decode(byts, errors='strict'):
decoded = decode_bytes(byts)
chars = decoded.decode(TEXT_ENCODING)
return (chars, len(byts))
class IncrementalEncoder(codecs.BufferedIncrementalEncoder):
def _buffer_encode(self, chars, errors, final):
if final:
print("Finalizing the encoding.")
return (Codec.encode(chars, errors), len(chars))
else:
print("Incremental encoding.")
return (b'', 0)
class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
def _buffer_decode(self, byts, errors, final):
if final:
print("Finalizing the decoding.")
return (Codec.decode(byts, errors), len(byts))
else:
print("Incremental decoding.")
return ("", 0)
class StreamWriter(codecs.StreamWriter):
def __init__(self, stream, errors='strict'):
super().__init__(stream, errors)
self._subprocess_start()
def _subprocess_start(self):
self.process = sp.Popen(
PROCESS,
stdin=sp.PIPE,
stdout=self.stream,
)
def _subprocess_stop(self, exc_type, exc_value, traceback):
self.process.__exit__(exc_type, exc_value, traceback)
if self.process.returncode is not 0:
raise sp.CalledProcessError(self.process.returncode, self.process.args)
def __exit__(self, exc_type, exc_value, traceback):
try:
self._subprocess_stop(exc_type, exc_value, traceback)
finally:
super().__exit__(exc_type, exc_value, traceback)
def write(self, chars):
byts = chars.encode(TEXT_ENCODING)
return self.process.stdin.write(byts)
def writelines(self, list):
return self.write(''.join(list).encode(TEXT_ENCODING))
def reset(self):
self._subprocess_stop(None, None, None)
self._subprocess_start()
class StreamReader(codecs.StreamReader):
def __init__(self, stream, errors='strict'):
super().__init__(stream, errors)
self._subprocess_start()
def _subprocess_start(self):
self.process = sp.Popen(
PROCESS,
stdin=self.stream,
stdout=sp.PIPE,
)
def _subprocess_stop(self, type, value, tb):
self.process.__exit__(type, value, tb)
if self.process.returncode is not 0:
raise sp.CalledProcessError(self.process.returncode, self.process.args)
def __exit__(self, type, value, tb):
try:
self._subprocess_stop(type, value, tb)
finally:
super().__exit__(type, value, tb)
def read(self, size=-1, chars=-1, firstline=False):
byts = self.process.stdout.read(size)
chars = byts.decode(TEXT_ENCODING)
return chars
def readlines(self, sizehint=None, keepends=True):
chars = self.read()
lines = chars.splitlines(keepends)
return lines
def reset(self):
self._subprocess_stop(None, None, None)
self._subprocess_start()
def subprocess_search(encoding):
if encoding == 'subprocess':
return codecs.CodecInfo(
name='subprocess',
encode=Codec.encode,
decode=Codec.decode,
incrementalencoder=IncrementalEncoder,
incrementaldecoder=IncrementalDecoder,
streamwriter=StreamWriter,
streamreader=StreamReader,
)
else:
return None
codecs.register(subprocess_search)
After running the above, we can try out each of the three interfaces alone and see that they work as expected. However, when we try to use open in text mode with the subprocess encoding, we see that it doesn't use the stream interfaces, and it never finalizes the incremental interfaces.
In [1]: Codec.encode("Hello, world!")
Out[1]: (b'hELLO, WORLD!', 13)
In [2]: Codec.decode(b'hELLO, WORLD!')
Out[2]: ('Hello, world!', 13)
In [3]: incenc = IncrementalEncoder()
In [4]: incenc.encode("Hello, ", False)
Incremental encoding.
Out[4]: b''
In [5]: incenc.encode("world!", True)
Finalizing the encoding.
Out[5]: (b'hELLO, WORLD!', 13)
In [6]: incdec = IncrementalDecoder()
In [7]: incdec.decode(b'hELLO', False)
Incremental decoding.
Out[7]: ''
In [8]: incdec.decode(b', WORLD!', True)
Finalizing the decoding.
Out[8]: ('Hello, world!', 13)
In [9]: with open('test.txt', 'wb') as f:
.....: with StreamWriter(f) as writer:
.....: writer.write("Hello, ")
.....: writer.write("world!")
In [10]: !cat test.txt
hELLO, WORLD!
In [11]: with open('test.txt', 'rb') as f:
.....: with StreamReader(f) as reader:
.....: print(reader.read())
.....:
Hello, world!
Now, since this is registered as a text encoding, we should be able to use it with open to read and write in text mode. But what happens here surprises me:
In [12]: with open('test2.txt', 'w', encoding='subprocess') as f:
.....: f.write('Hello, ')
.....: f.write("world!")
.....: f.close()
.....:
Incremental encoding.
Incremental encoding.
Aucun commentaire:
Enregistrer un commentaire