datahub.core 源代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import

from .utils import type_assert
from .implement import DataHubJson, DataHubPB
from .models import CompressFormat, RecordSchema, FieldType, CursorType, ConnectorType, ConnectorConfig,\
    ConnectorState, ConnectorOffset, SubscriptionState


[文档]class DataHub(object): """ Main entrance to DataHub. Convenient operations on DataHub objects are provided. Please refer to `DataHub docs <https://datahub.console.aliyun.com/intro/index.html>`_ to see the details. Generally, basic operations such as ``create``, ``list``, ``delete``, ``update`` are provided for each DataHub object. Take the ``project`` as an example. To create an DataHub instance, access_id and access_key is required, and should ensure correctness, or ``SignatureNotMatch`` error will throw. :param access_id: Aliyun Access ID :param secret_access_key: Aliyun Access Key :param endpoint: Rest service URL :param enable_pb: enable protobuf when put/get records, default value is False in version <= 2.11, default value will be True in version >= 2.12 :param compress_format: compress format :type compress_format: :class:`datahub.models.compress.CompressFormat` :Example: >>> datahub = DataHub('**your access id**', '**your access key**', '**endpoint**') >>> datahub_pb = DataHub('**your access id**', '**your access key**', '**endpoint**', enable_pb=True) >>> datahub_lz4 = DataHub('**your access id**', '**your access key**', '**endpoint**', compress_format=CompressFormat.LZ4) >>> >>> project_result = datahub.get_project('datahub_test') >>> >>> print(project_result is None) >>> """ def __init__(self, access_id, access_key, endpoint=None, enable_pb=True, compress_format=CompressFormat.NONE, **kwargs): if enable_pb: self._datahub_impl = DataHubPB(access_id, access_key, endpoint, compress_format, **kwargs) else: self._datahub_impl = DataHubJson(access_id, access_key, endpoint, compress_format, **kwargs)
[文档] def list_project(self): """ List all project names :return: projects in datahub server :rtype: :class:`datahub.models.results.ListProjectResult` """ return self._datahub_impl.list_project()
[文档] @type_assert(object, str, str) def create_project(self, project_name, comment): """ Create a new project by given name and comment :param project_name: project name :param comment: description of project :return: none :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name is not valid :raise: :class:`datahub.exceptions.ResourceExistException` if project is already existed """ self._datahub_impl.create_project(project_name, comment)
[文档] @type_assert(object, str) def get_project(self, project_name): """ Get a project by given name :param project_name: project name :return: the right project :rtype: :class:`datahub.models.GetProjectResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if project not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name is empty .. see also:: :class:`datahub.models.Project` """ return self._datahub_impl.get_project(project_name)
[文档] @type_assert(object, str, str) def update_project(self, project_name, comment): """ Update project comment :param project_name: project name :param comment: new description of project :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if project not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name is empty or comment is invalid """ return self._datahub_impl.update_project(project_name, comment)
[文档] @type_assert(object, str) def delete_project(self, project_name): """ Delete the project by given name :param project_name: project name :return: none :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name is empty """ self._datahub_impl.delete_project(project_name)
[文档] @type_assert(object, str) def list_topic(self, project_name): """ Get all topics of a project :param project_name: project name :return: all topics of the project :rtype: :class:`datahub.models.ListTopicResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name is empty """ return self._datahub_impl.list_topic(project_name)
[文档] @type_assert(object, str, str, int, int, str, bool) def create_blob_topic(self, project_name, topic_name, shard_count, life_cycle, comment, extend_mode=None): """ Create blob topic :param project_name: project name :param topic_name: topic name :param shard_count: shard count :param life_cycle: life cycle :param comment: comment :param extend_mode: use expansion method to increase shard :return: none :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type :raise: :class:`datahub.exceptions.ResourceNotFoundException` if project not existed :raise: :class:`datahub.exceptions.ResourceExistException` if topic is already existed """ self._datahub_impl.create_blob_topic(project_name, topic_name, shard_count, life_cycle, extend_mode, comment)
[文档] @type_assert(object, str, str, int, int, RecordSchema, str, bool) def create_tuple_topic(self, project_name, topic_name, shard_count, life_cycle, record_schema, comment, extend_mode=None): """ Create tuple topic :param project_name: project name :param topic_name: topic name :param shard_count: shard count :param life_cycle: life cycle :param record_schema: record schema for tuple record type :type record_schema: :class:`datahub.models.RecordSchema` :param comment: comment :param extend_mode: use expansion method to increase shard :return: none :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type :raise: :class:`datahub.exceptions.ResourceNotFoundException` if project not existed :raise: :class:`datahub.exceptions.ResourceExistException` if topic is already existed """ self._datahub_impl.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, extend_mode, comment)
[文档] @type_assert(object, str, str) def get_topic(self, project_name, topic_name): """ Get a topic :param topic_name: topic name :param project_name: project name :return: topic info :rtype: :class:`datahub.models.GetTopicResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the project name or topic name is empty """ return self._datahub_impl.get_topic(project_name, topic_name)
[文档] @type_assert(object, str, str, int, str) def update_topic(self, project_name, topic_name, life_cycle, comment): """ Update topic info, only life cycle and comment can be modified. :param topic_name: topic name :param project_name: project name :param life_cycle: life cycle of topic :param comment: topic comment :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the project name or topic name is empty; life_cycle is not positive """ self._datahub_impl.update_topic(project_name, topic_name, life_cycle, comment)
[文档] @type_assert(object, str, str) def delete_topic(self, project_name, topic_name): """ Delete a topic :param topic_name: topic name :param project_name: project name :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the project name or topic name is empty """ self._datahub_impl.delete_topic(project_name, topic_name)
[文档] @type_assert(object, str, str, str, FieldType) def append_field(self, project_name, topic_name, field_name, field_type): """ Append field to a tuple topic :param project_name: project name :param topic_name: topic name :param field_name: field name :param field_type: field type :type field_type: :class:`datahub.models.FieldType` :return: none :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or field_name is empty; field_type is wrong type """ self._datahub_impl.append_field(project_name, topic_name, field_name, field_type)
[文档] @type_assert(object, str, str, int) def wait_shards_ready(self, project_name, topic_name, timeout=30): """ Wait all shard state in ``active`` or ``closed``. It always be invoked when create a topic, and will be blocked and until all shards state in ``active`` or ``closed`` or timeout . :param project_name: project name :param topic_name: topic name :param timeout: -1 means it will be blocked until all shards state in ``active`` or ``closed``, else will be wait timeout seconds :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the project name or topic name is empty; timeout < 0 :raise: :class:`datahub.exceptions.DatahubException` if timeout """ self._datahub_impl.wait_shards_ready(project_name, topic_name, timeout)
[文档] @type_assert(object, str, str) def list_shard(self, project_name, topic_name): """ List all shards of a topic :param project_name: project name :param topic_name: topic name :return: shards info :rtype: :class:`datahub.models.ListTopicResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the project name or topic name is empty """ return self._datahub_impl.list_shard(project_name, topic_name)
[文档] @type_assert(object, str, str, str, str) def merge_shard(self, project_name, topic_name, shard_id, adj_shard_id): """ Merge shards :param project_name: project name :param topic_name: topic name :param shard_id: shard id :param adj_shard_id: adjacent shard id :return: shards info after merged :rtype: :class:`datahub.models.MergeShardResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or shard not exists :raise: :class:`datahub.exceptions.InvalidOperationException` if the shard is not active :raise: :class:`datahub.exceptions.InvalidParameterException` if the shards not adjacent; project name, topic name, shard id or adjacent shard id is empty :raise: :class:`datahub.exceptions.LimitExceededException` if merge shard operation limit exceeded """ return self._datahub_impl.merge_shard(project_name, topic_name, shard_id, adj_shard_id)
[文档] @type_assert(object, str, str, str, str) def split_shard(self, project_name, topic_name, shard_id, split_key=''): """ Split shard :param project_name: project name :param topic_name: topic name :param shard_id: split shard id :param split_key: split key, if not given, choose the median :return: shards info after split :rtype: :class:`datahub.models.SplitShardResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or shard not exists :raise: :class:`datahub.exceptions.InvalidOperationException` if the shard is not active :raise: :class:`datahub.exceptions.InvalidParameterException` if the key range is invalid; project name, topic name or shard id is empty :raise: :class:`datahub.exceptions.LimitExceededException` if split shard operation limit exceeded """ return self._datahub_impl.split_shard(project_name, topic_name, shard_id, split_key)
[文档] @type_assert(object, str, str, str, CursorType, int) def get_cursor(self, project_name, topic_name, shard_id, cursor_type, param=-1): """ Get cursor. When you invoke get_blob_records/get_tuple_records, you must be invoke it to get a cursor first :param project_name: project name :param topic_name: topic name :param shard_id: shard id :param cursor_type: cursor type :type cursor_type: :class:`datahub.models.CursorType` :param param: param is system time if cursor_type == CursorType.SYSTEM_TIME while sequence if cursor_type==CursorType.SEQUENCE :return: cursor info :rtype: :class:`datahub.models.GetCursorResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or shard not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the param is invalid; project_name, topic_name or shard_id is empty; cursor_type is wrong type; param is missing """ return self._datahub_impl.get_cursor(project_name, topic_name, shard_id, cursor_type, param)
[文档] @type_assert(object, str, str, list) def put_records(self, project_name, topic_name, record_list): """ Put records to a topic :param project_name: project name :param topic_name: topic name :param record_list: record list :type record_list: :class:`list` :return: failed records info :rtype: :class:`datahub.models.PutRecordsResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the record is not well-formed; project_name or topic_name is empty :raise: :class:`datahub.exceptions.InvalidOperationException` if the shard is not active :raise: :class:`datahub.exceptions.LimitExceededException` if query rate or throughput rate limit exceeded :raise: :class:`datahub.exceptions.DatahubException` if crc is wrong in pb mode .. see also:: :class:`datahub.models.Record` """ return self._datahub_impl.put_records(project_name, topic_name, record_list)
[文档] @type_assert(object, str, str, str, list) def put_records_by_shard(self, project_name, topic_name, shard_id, record_list): """ Put records to specific shard of topic :param project_name: project name :param topic_name: topic name :param shard_id: shard id :param record_list: record list :type record_list: :class:`list` :return: failed records info :rtype: :class:`datahub.models.PutRecordsResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the record is not well-formed; project_name, topic_name or shard_id is empty :raise: :class:`datahub.exceptions.InvalidOperationException` if the shard is not active :raise: :class:`datahub.exceptions.LimitExceededException` if query rate or throughput rate limit exceeded :raise: :class:`datahub.exceptions.DatahubException` if crc is wrong in pb mode .. see also:: :class:`datahub.models.Record` """ return self._datahub_impl.put_records_by_shard(project_name, topic_name, shard_id, record_list)
[文档] @type_assert(object, str, str, str, str, int) def get_blob_records(self, project_name, topic_name, shard_id, cursor, limit_num, sub_id=None): """ Get records from a topic :param project_name: project name :param topic_name: topic name :param shard_id: shard id :param cursor: the cursor :param limit_num: record number need to read :return: result include record list, start sequence, record num and next cursor :rtype: :class:`datahub.models.GetRecordsResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic or shard not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty :raise: :class:`datahub.exceptions.DatahubException` if crc is wrong in pb mode """ return self._datahub_impl.get_blob_records(project_name, topic_name, sub_id, shard_id, cursor, limit_num)
[文档] @type_assert(object, str, str, str, RecordSchema, str, int) def get_tuple_records(self, project_name, topic_name, shard_id, record_schema, cursor, limit_num, sub_id=None): """ Get records from a topic :param project_name: project name :param topic_name: topic name :param shard_id: shard id :param record_schema: tuple record schema :param filter_: filter :type record_schema: :class:`datahub.models.RecordSchema` :param cursor: the cursor :param limit_num: record number need to read :return: result include record list, start sequence, record num and next cursor :rtype: :class:`datahub.models.GetRecordsResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic or shard not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty :raise: :class:`datahub.exceptions.DatahubException` if crc is wrong in pb mode """ return self._datahub_impl.get_tuple_records(project_name, topic_name, sub_id, shard_id, record_schema, cursor, limit_num)
[文档] @type_assert(object, str, str, str) def get_metering_info(self, project_name, topic_name, shard_id): """ Get a shard metering info :param project_name: project name :param topic_name: topic name :param shard_id: shard id :return: the shard metering info :rtype: :class:`datahub.models.GetMeteringInfoResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or shard not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the project_name, topic_name or shard_id is empty """ return self._datahub_impl.get_metering_info(project_name, topic_name, shard_id)
[文档] @type_assert(object, str, str) def list_connector(self, project_name, topic_name): """ Create a data connector :param project_name: project name :param topic_name: topic name :return: data connector names list :rtype: :class:`datahub.models.ListConnectorResult` :raise: :class:`datahub.exceptions.InvalidParameterException` if the project_name or topic_name is empty """ return self._datahub_impl.list_connector(project_name, topic_name)
[文档] @type_assert(object, str, str, ConnectorType, list, ConnectorConfig, start_time=int) def create_connector(self, project_name, topic_name, connector_type, column_fields, config, start_time=-1): """ Create a data connector :param project_name: project name :param topic_name: topic name :param connector_type: connector type :type connector_type: :class:`datahub.models.ConnectorType` :param column_fields: column fields :type column_fields: :class:`list` :param config: connector config :type config: :class:`datahub.models.ConnectorConfig` :param start_time: start timestamp in milliseconds :type start_time: :class:`int` :return: connector id :rtype: :class:`datahub.models.CreateConnectorResult` :raise: :class:`datahub.exceptions.ResourceExistException` if connector is already existed :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if the column field or config is invalid; project_name or topic_name is empty; connector_type or config is wrong type """ return self._datahub_impl.create_connector(project_name, topic_name, connector_type, column_fields, config, start_time)
[文档] @type_assert(object, str, str, (ConnectorType, str), ConnectorConfig) def update_connector(self, project_name, topic_name, connector_id, config): """ :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :param config: connector config :type config: :class:`datahub.models.ConnectorConfig` :return: none """ self._datahub_impl.update_connector(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id, config)
[文档] @type_assert(object, str, str, (ConnectorType, str)) def get_connector(self, project_name, topic_name, connector_id): """ Get a data connector :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :return: data connector info :rtype: :class:`datahub.models.GetConnectorResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or connector not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name or topic_name is empty; connector_type or config is wrong type """ return self._datahub_impl.get_connector(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id)
[文档] @type_assert(object, str, str, (ConnectorType, str)) def delete_connector(self, project_name, topic_name, connector_id): """ Delete a data connector :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or connector not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name or topic_name is empty; connector_type is wrong type """ self._datahub_impl.delete_connector(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id)
[文档] @type_assert(object, str, str, (ConnectorType, str), str) def get_connector_shard_status(self, project_name, topic_name, connector_id, shard_id=''): """ Get data connector shard status :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :param shard_id: shard id :return: data connector shard status :rtype: :class:`datahub.models.results.GetConnectorShardStatusResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic, shard or connector not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or shard_id is empty; connector_type is wrong type """ return self._datahub_impl.get_connector_shard_status(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id, shard_id)
[文档] @type_assert(object, str, str, (ConnectorType, str), str) def reload_connector(self, project_name, topic_name, connector_id, shard_id=''): """ Reload data connector by given shard id or reload all shards without any shard id given :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :param shard_id: shard id :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or connector not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or filed_name is empty; connector_type is wrong type """ self._datahub_impl.reload_connector(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id, shard_id)
[文档] @type_assert(object, str, str, (ConnectorType, str), str) def append_connector_field(self, project_name, topic_name, connector_id, field_name): """ Append field to a connector :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :param field_name: field name :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or connector not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or filed_name is empty; connector_type is wrong type """ self._datahub_impl.append_connector_field(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id, field_name)
[文档] @type_assert(object, str, str, (ConnectorType, str)) def get_connector_done_time(self, project_name, topic_name, connector_id): """ Get connector done time :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` """ return self._datahub_impl.get_connector_done_time(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id)
[文档] @type_assert(object, str, str, (ConnectorType, str), ConnectorState) def update_connector_state(self, project_name, topic_name, connector_id, connector_state): """ Update data connector state :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :param connector_state: connector state :type connector_state: :class:`datahub.models.ConnectorState` :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or connector not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name or topic_name is empty; connector_type or connector_state is wrong type """ self._datahub_impl.update_connector_state(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id, connector_state)
[文档] @type_assert(object, str, str, (ConnectorType, str), str, ConnectorOffset) def update_connector_offset(self, project_name, topic_name, connector_id, shard_id, connector_offset): """ Update data connector offset :param project_name: project name :param topic_name: topic name :param connector_id: connector id, compatible for connector type :type connector_id: :class:`str` or :class:`datahub.models.ConnectorType` :param shard_id: shard id :param connector_offset: current sequence :type connector_offset: :class:`datahub.models.ConnectorOffset` :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or connector not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name or topic_name is empty; connector_type or connector_state is wrong type """ self._datahub_impl.update_connector_offset(project_name, topic_name, connector_id.value if isinstance(connector_id, ConnectorType) else connector_id, shard_id, connector_offset)
[文档] @type_assert(object, str, str, str) def init_and_get_subscription_offset(self, project_name, topic_name, sub_id, shard_ids): """ Open subscription offset session :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :param shard_ids: shard ids :return: offset info :rtype :class:`datahub.models.InitAndGetSubscriptionOffsetResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name, sub_id or shard_id is empty """ return self._datahub_impl.init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_ids)
[文档] @type_assert(object, str, str, str) def get_subscription_offset(self, project_name, topic_name, sub_id, shard_ids=None): """ Get subscription offset :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :param shard_ids: shard ids :return: offset info :rtype: :class:`datahub.models.results.GetSubscriptionOffsetResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or sub_id is empty """ return self._datahub_impl.get_subscription_offset(project_name, topic_name, sub_id, shard_ids)
[文档] @type_assert(object, str, str, str, dict) def update_subscription_offset(self, project_name, topic_name, sub_id, offsets): """ Update subscription offset :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :param offsets: offsets :type offsets: :class:`dict` :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidOperationException` if the offset session closed or offset version changed :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or sub_id is empty; offsets is wrong type """ self._datahub_impl.update_subscription_offset(project_name, topic_name, sub_id, offsets)
[文档] @type_assert(object, str, str, str) def create_subscription(self, project_name, topic_name, comment): """ Create subscription to a topic :param project_name: project name :param topic_name: topic name :param comment: comment for subscription :return: create result contains subscription id :rtype: :class:`datahub.models.results.CreateSubscriptionResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project or topic not exists :raise: :class:`datahub.exceptions.LimitExceededException` if limit of subscription number is exceeded :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name or topic_name is empty """ return self._datahub_impl.create_subscription(project_name, topic_name, comment)
[文档] @type_assert(object, str, str, str) def delete_subscription(self, project_name, topic_name, sub_id): """ Delete subscription by subscription id :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or sub_id is empty """ self._datahub_impl.delete_subscription(project_name, topic_name, sub_id)
[文档] @type_assert(object, str, str, str) def get_subscription(self, project_name, topic_name, sub_id): """ Get subscription :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :return: subscription info :rtype: :class:`datahub.models.results.GetSubscriptionResult` :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or sub_id is empty """ return self._datahub_impl.get_subscription(project_name, topic_name, sub_id)
[文档] @type_assert(object, str, str, str, str) def update_subscription(self, project_name, topic_name, sub_id, comment): """ Update subscription :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :param comment: new comment :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or sub_id is empty """ self._datahub_impl.update_subscription(project_name, topic_name, sub_id, comment)
[文档] @type_assert(object, str, str, str, SubscriptionState) def update_subscription_state(self, project_name, topic_name, sub_id, state): """ Update subscription state :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :param state: new state :type state: :class:`datahub.models.SubscriptionState` :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or sub_id is empty; state is wrong type """ self._datahub_impl.update_subscription_state(project_name, topic_name, sub_id, state)
[文档] @type_assert(object, str, str, str, int, int) def list_subscription(self, project_name, topic_name, query_key, page_index, page_size): """ Query subscription in range [start, end) start = (page_index - 1) * page_size + 1 end = start + page_size :param project_name: project name :param topic_name: topic name :param query_key: query key for search :param page_index: page index :param page_size: page size :return: subscription info list :rtype: :class:`datahub.models.results.ListSubscriptionResult` :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name or topic_name is empty; page_index <= 0 or page_size < 0 """ return self._datahub_impl.list_subscription(project_name, topic_name, query_key, page_index, page_size)
[文档] @type_assert(object, str, str, str, dict) def reset_subscription_offset(self, project_name, topic_name, sub_id, offsets): """ Update subscription offset :param project_name: project name :param topic_name: topic name :param sub_id: subscription id :param offsets: offsets :type: :class:`dict` :return: none :raise: :class:`datahub.exceptions.ResourceNotFoundException` if the project, topic or subscription not exists :raise: :class:`datahub.exceptions.InvalidParameterException` if project_name, topic_name or sub_id is empty; offsets is wrong type """ self._datahub_impl.reset_subscription_offset(project_name, topic_name, sub_id, offsets)