Leave a Comment
Seralize and Deseralize avro data with schema ahead
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter, BinaryDecoder, BinaryEncoder
import io
class panAvroDataSchem:
def dReader (self, msg, schm = None):
_bw = io.BytesIO(msg)
if not schm:
_br = DataFileReader(_bw, DatumReader())
else:
_br = DataFileReader(_bw, DatumReader(readers_schema = avro.schema.parse(schm)))
_sch = _br.meta.get('avro.schema').decode('utf-8')
_val = []
for _x in _br:
_val.append(_x)
return (_sch, _val)
def dWriter (self, schm, msg):
_bw = io.BytesIO()
_dw = DataFileWriter(_bw, DatumWriter(), avro.schema.parse(schm))
for _x in msg:
_dw.append(_x)
_dw.flush()
_bw.seek(0)
_val = _bw.read()
_bw.close()
return _val
Seralize and Deseralize avro data without schema ahead
import json
import avro.schema
import io
import types
from avro.io import DatumReader, DatumWriter, BinaryDecoder, BinaryEncoder
class AvroDataSernDeser(object):
__readAdaptor = None
__writeAdaptor = None
def __init__(self, write_schema, read_schema=None):
if write_schema is not None and isinstance(write_schema, types.StringType):
write_schema = avro.schema.parse(write_schema)
self.__writeAdaptor = DatumWriter(writers_schema=write_schema)
if read_schema is None:
read_schema = write_schema
elif isinstance(read_schema, types.StringType):
read_schema = avro.schema.parse(read_schema)
self.__readAdaptor = DatumReader(writers_schema=write_schema, readers_schema=read_schema)
def serialize(self, data):
if isinstance(data, types.StringType):
# need to be the dictionary object
data = json.loads(data)
_bw = io.BytesIO()
_enc = BinaryEncoder(_bw)
self.__writeAdaptor.write(data, _enc)
return _bw.getvalue()
def deserialize(self, data):
_br = io.BytesIO(data)
_dec = BinaryDecoder(_br)
return self.__readAdaptor.read(_dec)