Avro Schema python serialize and deseralize

Seralize and Deseralize avro data with schema ahead

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter, BinaryDecoder, BinaryEncoder
import io

class panAvroDataSchem:

def dReader (self, msg, schm = None):
_bw = io.BytesIO(msg)
if not schm:
_br = DataFileReader(_bw, DatumReader())
else:
_br = DataFileReader(_bw, DatumReader(readers_schema = avro.schema.parse(schm)))
_sch = _br.meta.get('avro.schema').decode('utf-8')
_val = []
for _x in _br:
_val.append(_x)
return (_sch, _val)

def dWriter (self, schm, msg):
_bw = io.BytesIO()
_dw = DataFileWriter(_bw, DatumWriter(), avro.schema.parse(schm))
for _x in msg:
_dw.append(_x)
_dw.flush()
_bw.seek(0)
_val = _bw.read()
_bw.close()
return _val

Seralize and Deseralize avro data without schema ahead

import json
import avro.schema
import io
import types

from avro.io import DatumReader, DatumWriter, BinaryDecoder, BinaryEncoder

class AvroDataSernDeser(object):
__readAdaptor = None
__writeAdaptor = None

def __init__(self, write_schema, read_schema=None):
if write_schema is not None and isinstance(write_schema, types.StringType):
write_schema = avro.schema.parse(write_schema)
self.__writeAdaptor = DatumWriter(writers_schema=write_schema)

if read_schema is None:
read_schema = write_schema
elif isinstance(read_schema, types.StringType):
read_schema = avro.schema.parse(read_schema)

self.__readAdaptor = DatumReader(writers_schema=write_schema, readers_schema=read_schema)

def serialize(self, data):
if isinstance(data, types.StringType):
# need to be the dictionary object
data = json.loads(data)

_bw = io.BytesIO()
_enc = BinaryEncoder(_bw)
self.__writeAdaptor.write(data, _enc)

return _bw.getvalue()

def deserialize(self, data):
_br = io.BytesIO(data)
_dec = BinaryDecoder(_br)
return self.__readAdaptor.read(_dec)

Leave a Reply

Your email address will not be published. Required fields are marked *