datahub.models.schema 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import

import json
from enum import Enum

import six

from .. import utils
from ..exceptions import InvalidParameterException
from ..utils import ErrorMessage, check_type, to_str


[文档]class FieldType(Enum): """ Field Types, datahub support 5 types of field, there are: ``TINYINT``, ``SMALLINT``, ``INTEGER``, ``BIGINT``, ``STRING``, ``BOOLEAN``, ``TIMESTAMP``, ``FLOAT``, ``DOUBLE``, ``DECIMAL`` """ TINYINT = 'tinyint' SMALLINT = 'smallint' INTEGER = 'integer' BIGINT = 'bigint' STRING = 'string' BOOLEAN = 'boolean' TIMESTAMP = 'timestamp' FLOAT = 'float' DOUBLE = 'double' DECIMAL = 'decimal'
[文档]class Field(object): """ Field Members: name (:class:`str`): field name type (:class:`str`): field type """ __slots__ = ('_name', '_type', '_allow_null') def __init__(self, name, field_type, allow_null=True): if not check_type(field_type, FieldType): raise InvalidParameterException(ErrorMessage.INVALID_TYPE % ('field_type', FieldType.__name__)) self._name = utils.to_str(name) self._type = field_type self._allow_null = allow_null @property def name(self): return self._name @name.setter def name(self, value): self._name = value @property def type(self): return self._type @type.setter def type(self, value): self._type = value @property def allow_null(self): return self._allow_null @allow_null.setter def allow_null(self, value): self._allow_null = value def to_json(self): data = { "name": self._name, "type": self._type.value } if not self._allow_null: data["notnull"] = not self._allow_null return data @classmethod def from_json(cls, field_json): allow_null = not field_json['notnull'] if 'notnull' in field_json else True return cls(field_json['name'], FieldType(field_json['type'].lower()), allow_null) def __repr__(self): return '<field {0}, type {1}, allow_null {2}>'.format(self._name, self._type.name.lower(), self._allow_null)
[文档]class RecordSchema(object): """ Record schema class, Tuple type Topic will use it. Members: fields (list): fields :Example: >>> schema = RecordSchema.from_lists( \ ['bigint_field' , 'string_field' , 'double_field' , 'bool_field' , 'time_field'], \ [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP] \ ) >>> .. seealso:: :class:`datahub.models.FieldType` """ def __init__(self, field_list=None): self._field_list = field_list if field_list else [] self._field_dict = {} duplicates = set() for field in self._field_list: if field.name in self._field_dict: duplicates.add(field.name) else: self._field_dict[field.name] = field if duplicates: raise InvalidParameterException('Duplicate field names: %s' % ', '.join(duplicates)) @property def field_list(self): return self._field_list def add_field(self, field): if field.name not in self._field_dict: self._field_list.append(field) self._field_dict[field.name] = field else: raise InvalidParameterException('Field name %s already exists' % field.name) def get_field(self, index_or_name): if isinstance(index_or_name, six.integer_types): return self._get_field_by_index(index_or_name) return self._get_field_by_name(utils.to_str(index_or_name)) def _get_field_by_index(self, index): if not 0 <= index < len(self._field_list): raise InvalidParameterException('Index %d out of range' % index) return self._field_list[index] def _get_field_by_name(self, name): if name not in self._field_dict: raise InvalidParameterException('Field name %s does not exists' % name) return self._field_dict[name] def to_json(self): return { 'fields': [field.to_json() for field in self._field_list] } def to_json_string(self): return json.dumps(self.to_json()) @classmethod def from_lists(cls, names, types, allow_nulls=None): if len(names) != len(types) or (allow_nulls and len(allow_nulls) != len(names)): raise InvalidParameterException('Length of lists are not equal') field_list = [] for index in range(0, len(names)): allow_null = allow_nulls[index] if allow_nulls else True field_list.append(Field(names[index], types[index], allow_null)) return cls(field_list=field_list) @classmethod def from_json(cls, fields_json): field_list = [] for field in fields_json['fields']: field_list.append(Field.from_json(field)) return cls(field_list=field_list) @classmethod def from_json_str(cls, fields_jsonstr): return cls.from_json(json.loads(fields_jsonstr)) def __repr__(self): buf = six.StringIO() name_space = 2 * max(len(field.name) for field in self._field_list) if self._field_list else 0 type_space = 2 * max(len(field.type.value) for field in self._field_list) if self._field_list else 0 allow_null_space = 8 buf.write('RecordSchema {\n') field_strs = [] for field in self._field_list: field_strs.append('{0}{1}{2}'.format( field.name.ljust(name_space), field.type.value.ljust(type_space), to_str(field.allow_null).ljust(allow_null_space) )) buf.write(utils.indent('\n'.join(field_strs), 2)) buf.write('\n') buf.write('}\n') return buf.getvalue()