datahub.models.record 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import

import re
import abc
import base64
from enum import Enum

import six

from . import types as _types
from .schema import RecordSchema, FieldType
from ..exceptions import InvalidParameterException
from ..utils import ErrorMessage, indent, to_str, bool_to_str, to_binary


[文档]class RecordType(Enum): """ Record type, there are two type: ``TUPLE`` and ``BLOB`` """ BLOB = 'BLOB' TUPLE = 'TUPLE'
@six.add_metaclass(abc.ABCMeta) class Record(object): """ Base Record class """ __slots__ = ('_values', '_shard_id', '_hash_key', '_partition_key', '_attributes', '_sequence', '_system_time') encode = 0 def __init__(self): self._values = None self._shard_id = '' self._hash_key = '' self._partition_key = '' self._attributes = dict() self._sequence = 0 self._system_time = 0 @property def values(self): return self._values @property def shard_id(self): return self._shard_id @shard_id.setter def shard_id(self, value): self._shard_id = value @property def hash_key(self): return self._hash_key @hash_key.setter def hash_key(self, value): self._hash_key = value @property def partition_key(self): return self.partition_key @partition_key.setter def partition_key(self, value): self._partition_key = value @property def attributes(self): return self._attributes @attributes.setter def attributes(self, value): self._attributes = value @property def sequence(self): return self._sequence @sequence.setter def sequence(self, value): self._sequence = value @property def system_time(self): return self._system_time @system_time.setter def system_time(self, value): self._system_time = value def get_attribute(self, key): if key not in self._attributes: return None return self._attributes[key] def put_attribute(self, key, value): if key is None or value is None: raise InvalidParameterException("key/value can not be None") self._attributes[key] = value def get_offset(self): return self._sequence, self._system_time @abc.abstractmethod def get_type(self): pass @abc.abstractmethod def encode_values(self): pass @abc.abstractmethod def decode_values(self): pass @abc.abstractmethod def encode_pb_record_data(self): pass def to_json(self): data = { "Data": self.encode_values() } if self._partition_key: data["PartitionKey"] = self._partition_key if self._hash_key: data["HashKey"] = self._hash_key if self._shard_id: data["ShardId"] = self._shard_id if self._attributes: data["Attributes"] = self._attributes if self._sequence: data["Sequence"] = self._sequence if self._system_time: data["SystemTime"] = self._system_time return data def to_pb_record_entry(self): pb_record_entry = { 'data': self.encode_pb_record_data() } if self._partition_key: pb_record_entry['partition_key'] = self._partition_key if self._hash_key: pb_record_entry['hash_key'] = self._hash_key if self._shard_id: pb_record_entry['shard_id'] = self._shard_id if self._attributes: pb_record_entry['attributes'] = { 'attributes': [{ 'key': k, 'value': v } for k, v in self._attributes.items()] } return pb_record_entry def __repr__(self): return to_str(self.to_json())
[文档]class BlobRecord(Record): """ Blob type record class Members: blob_data (:class:`str`): blob data """ def __init__(self, blob_data=None, values=None): super(BlobRecord, self).__init__() if blob_data: self._blob_data = to_binary(blob_data) elif values is not None: self._values = values self._blob_data = base64.b64decode(self._values) else: raise InvalidParameterException(ErrorMessage.MISSING_BLOB_RECORD_DATA) @property def blob_data(self): return self._blob_data def get_type(self): return RecordType.BLOB def encode_values(self): if not self._values: self._values = bytes.decode(base64.b64encode(self._blob_data)) return self._values def decode_values(self): return self._blob_data def encode_pb_record_data(self): return { 'data': [{'value': self._blob_data}] }
[文档]class TupleRecord(Record): """ Tuple type record class Members: field_list (:class:`list`): fields name_indices (:class:`list`): values :Example: >>> schema = RecordSchema.from_lists(['name', 'id'], [FieldType.STRING, FieldType.STRING]) >>> record = TupleRecord(schema=schema, values=['test', 'test2']) >>> record.values[0:2] >>> ['test', 'test2'] >>> record.set_value('name', 'test1') >>> record.values[0] >>> 'test1' >>> record.set_value(0, 'test3') >>> record.get_value(0) >>> 'test3' >>> record.get_value('name') >>> 'test3' >>> len(record.values) 2 >>> record.has_field('name') True """ __slots__ = ('_field_list', '_name_indices') def __init__(self, field_list=None, schema=None, values=None): super(TupleRecord, self).__init__() self._field_list = field_list if schema is not None: self._field_list = schema.field_list if self._field_list is None or len(self._field_list) == 0: raise InvalidParameterException(ErrorMessage.MISSING_TUPLE_RECORD_SCHEMA) self._values = [None, ] * len(self._field_list) if values is not None: if len(values) != len(self._field_list): raise InvalidParameterException('The values set to records are against the schema, ' 'expect len %s, got len %s' % (len(self._field_list), len(values))) self._set_values(values) self._name_indices = dict((field.name, index) for index, field in enumerate(self._field_list)) @property def field_list(self): return self._field_list @field_list.setter def field_list(self, value): self._field_list = value @property def values(self): return tuple(self._values) @values.setter def values(self, value): if len(value) != len(self._field_list): raise InvalidParameterException('The values set to records are against the schema, ' 'expect len %s, got len %s' % (len(self._field_list), len(value))) self._set_values(value) @property def name_indices(self): return self._name_indices @name_indices.setter def name_indices(self, value): self._name_indices = value def set_value(self, index_or_name, value): if isinstance(index_or_name, six.integer_types): self._set_value_by_index(index_or_name, value) else: self._set_value_by_name(to_str(index_or_name), value) def get_value(self, index_or_name): if isinstance(index_or_name, six.integer_types): return self._get_value_by_index(index_or_name) return self._get_value_by_name(to_str(index_or_name)) def has_field(self, field_name): return field_name in self._name_indices def get_type(self): return RecordType.TUPLE def encode_values(self): new_values = [] index = 0 for val in self._values: if FieldType.BOOLEAN == self._field_list[index].type: new_values.append(bool_to_str(val)) elif FieldType.DOUBLE == self._field_list[index].type: double_str = re.sub(r'0+e', 'e', '%.16e' % val) if val is not None else None new_values.append(to_str(double_str)) else: new_values.append(to_str(val)) index += 1 return new_values def decode_values(self): pass def encode_pb_record_data(self): pb_record_data = { 'data': [] } index = 0 for val in self._values: if FieldType.BOOLEAN == self._field_list[index].type: pb_record_data['data'].append({'value': to_binary(bool_to_str(val))}) else: pb_record_data['data'].append({'value': to_binary(val)}) index += 1 return pb_record_data def _set_values(self, values): for index, value in enumerate(values): if index >= len(self._field_list): break self._set_value_by_index(index, value) def _set_value_by_index(self, index, value): field = self._field_list[index] if not field.allow_null and value is None: raise InvalidParameterException('Filed with index %d can not be none' % index) val = _types.validate_value(value, field) self._values[index] = val def _set_value_by_name(self, name, value): self._set_value_by_index(self._name_indices[name], value) def _get_value_by_index(self, index): if not 0 <= index < len(self._values): raise InvalidParameterException('Index %d out of range' % index) return self._values[index] def _get_value_by_name(self, name): if name not in self._name_indices: raise InvalidParameterException('Field name %s does not exists' % name) return self.values[self._name_indices[name]] def __repr__(self): buf = six.StringIO() name_space = 2 * max(len(field.name) for field in self._field_list) if self._field_list else 0 type_space = 2 * max(len(field.type.value) for field in self._field_list) if self._field_list else 0 value_space = 2 * max(len(to_str(value if value is not None else "None")) for value in self._values) if self._values else 0 buf.write('TupleRecord {\n') buf.write(' Values {\n') field_strs = [' {0}{1}{2}'.format( '*name*'.ljust(name_space), '*type*'.ljust(type_space), '*value*'.ljust(value_space) )] for index, field in enumerate(self._field_list): field_strs.append(' {0}{1}{2}'.format( field.name.ljust(name_space), field.type.value.ljust(type_space), to_str(self._values[index] if self._values[index] is not None else "None").ljust(value_space) )) buf.write(indent('\n'.join(field_strs), 2)) buf.write('\n') buf.write(' }\n') if self._attributes: attribute_key_space = 2 * max(len(to_str(key)) for key in self._attributes) if self._attributes else 0 attribute_value_space = 2 * max(len(to_str(self._attributes[key])) for key in self._attributes) if self._attributes else 0 buf.write(' Attributes {\n') field_strs = [' {0}{1}'.format( '*key*'.ljust(attribute_key_space), '*value*'.ljust(attribute_value_space) )] for key in self._attributes: field_strs.append(' {0}{1}'.format( to_str(key).ljust(attribute_key_space), to_str(self._attributes[key]).ljust(attribute_value_space) )) buf.write(indent('\n'.join(field_strs), 2)) buf.write('\n') buf.write(' }\n') buf.write('}\n') return buf.getvalue()
[文档]class FailedRecord(object): """ Failed record info Members: comment (:class:`str`): subscription description create_time (:class:`int`): create time is_owner (:class:`bool`): owner or not last_modify_time (:class:`int`): last modify time state (:class:`datahub.models.SubscriptionState`): subscription state sub_id (:class:`str`): subscription id topic_name (:class:`str`): topic name type (:class:`int`): type """ __slots__ = ('_index', '_error_code', '_error_message') def __init__(self, index, error_code, error_message): self._index = index self._error_code = error_code self._error_message = error_message @property def index(self): return self._index @index.setter def index(self, value): self._index = value @property def error_code(self): return self._error_code @error_code.setter def error_code(self, value): self._error_code = value @property def error_message(self): return self._error_message @error_message.setter def error_message(self, value): self._error_message = value def to_json(self): return { 'Index': self._index, 'ErrorCode': self._error_code, 'ErrorMessage': self._error_message } @classmethod def from_pb_message(cls, failed_record_pb): index = failed_record_pb.index error_code = failed_record_pb.error_code error_message = failed_record_pb.error_message return cls(index, error_code, error_message) def __repr__(self): return to_str(self.to_json())