API Reference

DataHub

class datahub.models.compress.CompressFormat[源代码]

CompressFormat enum class, there are: NONE, LZ4, ZLIB, DEFLATE

class datahub.DataHub(access_id, access_key, endpoint=None, enable_pb=True, compress_format=<CompressFormat.NONE: ''>, **kwargs)[源代码]

Main entrance to DataHub.

Convenient operations on DataHub objects are provided. Please refer to DataHub docs to see the details.

Generally, basic operations such as create, list, delete, update are provided for each DataHub object. Take the project as an example.

To create an DataHub instance, access_id and access_key is required, and should ensure correctness, or SignatureNotMatch error will throw.

参数:
  • access_id – Aliyun Access ID
  • secret_access_key – Aliyun Access Key
  • endpoint – Rest service URL
  • enable_pb – enable protobuf when put/get records, default value is False in version <= 2.11, default value will be True in version >= 2.12
  • compress_format (datahub.models.compress.CompressFormat) – compress format
Example:
>>> datahub = DataHub('**your access id**', '**your access key**', '**endpoint**')
>>> datahub_pb = DataHub('**your access id**', '**your access key**', '**endpoint**', enable_pb=True)
>>> datahub_lz4 = DataHub('**your access id**', '**your access key**', '**endpoint**', compress_format=CompressFormat.LZ4)
>>>
>>> project_result = datahub.get_project('datahub_test')
>>>
>>> print(project_result is None)
>>>
append_connector_field(**kwargs)[源代码]

Append field to a connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • field_name – field name
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or filed_name is empty; connector_type is wrong type

append_field(**kwargs)[源代码]

Append field to a tuple topic

参数:
  • project_name – project name
  • topic_name – topic name
  • field_name – field name
  • field_type (datahub.models.FieldType) – field type
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or field_name is empty; field_type is wrong type

create_blob_topic(**kwargs)[源代码]

Create blob topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_count – shard count
  • life_cycle – life cycle
  • comment – comment
  • extend_mode – use expansion method to increase shard
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type

Raise:

datahub.exceptions.ResourceNotFoundException if project not existed

Raise:

datahub.exceptions.ResourceExistException if topic is already existed

create_connector(**kwargs)[源代码]

Create a data connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_type (datahub.models.ConnectorType) – connector type
  • column_fields (list) – column fields
  • config (datahub.models.ConnectorConfig) – connector config
  • start_time (int) – start timestamp in milliseconds
返回:

connector id

返回类型:

datahub.models.CreateConnectorResult

Raise:

datahub.exceptions.ResourceExistException if connector is already existed

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the column field or config is invalid; project_name or topic_name is empty; connector_type or config is wrong type

create_project(**kwargs)[源代码]

Create a new project by given name and comment

参数:
  • project_name – project name
  • comment – description of project
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is not valid

Raise:

datahub.exceptions.ResourceExistException if project is already existed

create_subscription(**kwargs)[源代码]

Create subscription to a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • comment – comment for subscription
返回:

create result contains subscription id

返回类型:

datahub.models.results.CreateSubscriptionResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.LimitExceededException if limit of subscription number is exceeded

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty

create_tuple_topic(**kwargs)[源代码]

Create tuple topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_count – shard count
  • life_cycle – life cycle
  • record_schema (datahub.models.RecordSchema) – record schema for tuple record type
  • comment – comment
  • extend_mode – use expansion method to increase shard
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type

Raise:

datahub.exceptions.ResourceNotFoundException if project not existed

Raise:

datahub.exceptions.ResourceExistException if topic is already existed

delete_connector(**kwargs)[源代码]

Delete a data connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type is wrong type

delete_project(**kwargs)[源代码]

Delete the project by given name

参数:project_name – project name
返回:none
Raise:datahub.exceptions.InvalidParameterException if project_name is empty
delete_subscription(**kwargs)[源代码]

Delete subscription by subscription id

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

delete_topic(**kwargs)[源代码]

Delete a topic

参数:
  • topic_name – topic name
  • project_name – project name
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

get_blob_records(**kwargs)[源代码]

Get records from a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • cursor – the cursor
  • limit_num – record number need to read
返回:

result include record list, start sequence, record num and next cursor

返回类型:

datahub.models.GetRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

get_connector(**kwargs)[源代码]

Get a data connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
返回:

data connector info

返回类型:

datahub.models.GetConnectorResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or config is wrong type

get_connector_done_time(**kwargs)[源代码]

Get connector done time

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
get_connector_shard_status(**kwargs)[源代码]

Get data connector shard status

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • shard_id – shard id
返回:

data connector shard status

返回类型:

datahub.models.results.GetConnectorShardStatusResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic, shard or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or shard_id is empty; connector_type is wrong type

get_cursor(**kwargs)[源代码]

Get cursor. When you invoke get_blob_records/get_tuple_records, you must be invoke it to get a cursor first

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • cursor_type (datahub.models.CursorType) – cursor type
  • param – param is system time if cursor_type == CursorType.SYSTEM_TIME while sequence if cursor_type==CursorType.SEQUENCE
返回:

cursor info

返回类型:

datahub.models.GetCursorResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the param is invalid; project_name, topic_name or shard_id is empty; cursor_type is wrong type; param is missing

get_metering_info(**kwargs)[源代码]

Get a shard metering info

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
返回:

the shard metering info

返回类型:

datahub.models.GetMeteringInfoResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the project_name, topic_name or shard_id is empty

get_project(**kwargs)[源代码]

Get a project by given name

参数:project_name – project name
返回:the right project
返回类型:datahub.models.GetProjectResult
Raise:datahub.exceptions.ResourceNotFoundException if project not exists
Raise:datahub.exceptions.InvalidParameterException if project_name is empty
get_subscription(**kwargs)[源代码]

Get subscription

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
返回:

subscription info

返回类型:

datahub.models.results.GetSubscriptionResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

get_subscription_offset(**kwargs)[源代码]

Get subscription offset

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • shard_ids – shard ids
返回:

offset info

返回类型:

datahub.models.results.GetSubscriptionOffsetResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

get_topic(**kwargs)[源代码]

Get a topic

参数:
  • topic_name – topic name
  • project_name – project name
返回:

topic info

返回类型:

datahub.models.GetTopicResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

get_tuple_records(**kwargs)[源代码]

Get records from a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • record_schema (datahub.models.RecordSchema) – tuple record schema
  • filter – filter
  • cursor – the cursor
  • limit_num – record number need to read
返回:

result include record list, start sequence, record num and next cursor

返回类型:

datahub.models.GetRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

init_and_get_subscription_offset(**kwargs)[源代码]

Open subscription offset session

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • shard_ids – shard ids
返回:

offset info

:rtype datahub.models.InitAndGetSubscriptionOffsetResult :raise: datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists :raise: datahub.exceptions.InvalidParameterException if project_name, topic_name, sub_id or shard_id is empty

list_connector(**kwargs)[源代码]

Create a data connector

参数:
  • project_name – project name
  • topic_name – topic name
返回:

data connector names list

返回类型:

datahub.models.ListConnectorResult

Raise:

datahub.exceptions.InvalidParameterException if the project_name or topic_name is empty

list_project()[源代码]

List all project names

返回:projects in datahub server
返回类型:datahub.models.results.ListProjectResult
list_shard(**kwargs)[源代码]

List all shards of a topic

参数:
  • project_name – project name
  • topic_name – topic name
返回:

shards info

返回类型:

datahub.models.ListTopicResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

list_subscription(**kwargs)[源代码]

Query subscription in range [start, end)

start = (page_index - 1) * page_size + 1

end = start + page_size

参数:
  • project_name – project name
  • topic_name – topic name
  • query_key – query key for search
  • page_index – page index
  • page_size – page size
返回:

subscription info list

返回类型:

datahub.models.results.ListSubscriptionResult

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; page_index <= 0 or page_size < 0

list_topic(**kwargs)[源代码]

Get all topics of a project

参数:project_name – project name
返回:all topics of the project
返回类型:datahub.models.ListTopicResult
Raise:datahub.exceptions.ResourceNotFoundException if the project not exists
Raise:datahub.exceptions.InvalidParameterException if project_name is empty
merge_shard(**kwargs)[源代码]

Merge shards

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • adj_shard_id – adjacent shard id
返回:

shards info after merged

返回类型:

datahub.models.MergeShardResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.InvalidParameterException if the shards not adjacent; project name, topic name, shard id or adjacent shard id is empty

Raise:

datahub.exceptions.LimitExceededException if merge shard operation limit exceeded

put_records(**kwargs)[源代码]

Put records to a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • record_list (list) – record list
返回:

failed records info

返回类型:

datahub.models.PutRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the record is not well-formed; project_name or topic_name is empty

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.LimitExceededException if query rate or throughput rate limit exceeded

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

put_records_by_shard(**kwargs)[源代码]

Put records to specific shard of topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • record_list (list) – record list
返回:

failed records info

返回类型:

datahub.models.PutRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the record is not well-formed; project_name, topic_name or shard_id is empty

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.LimitExceededException if query rate or throughput rate limit exceeded

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

reload_connector(**kwargs)[源代码]

Reload data connector by given shard id or reload all shards without any shard id given

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • shard_id – shard id
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or filed_name is empty; connector_type is wrong type

reset_subscription_offset(**kwargs)[源代码]

Update subscription offset

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • offsets – offsets
Type:

dict

返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; offsets is wrong type

split_shard(**kwargs)[源代码]

Split shard

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – split shard id
  • split_key – split key, if not given, choose the median
返回:

shards info after split

返回类型:

datahub.models.SplitShardResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.InvalidParameterException if the key range is invalid; project name, topic name or shard id is empty

Raise:

datahub.exceptions.LimitExceededException if split shard operation limit exceeded

update_connector(**kwargs)[源代码]
参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • config (datahub.models.ConnectorConfig) – connector config
返回:

none

update_connector_offset(**kwargs)[源代码]

Update data connector offset

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • shard_id – shard id
  • connector_offset (datahub.models.ConnectorOffset) – current sequence
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or connector_state is wrong type

update_connector_state(**kwargs)[源代码]

Update data connector state

参数:
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or connector_state is wrong type

update_project(**kwargs)[源代码]

Update project comment

参数:
  • project_name – project name
  • comment – new description of project
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if project not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty or comment is invalid

update_subscription(**kwargs)[源代码]

Update subscription

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • comment – new comment
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

update_subscription_offset(**kwargs)[源代码]

Update subscription offset

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • offsets (dict) – offsets
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidOperationException if the offset session closed or offset version changed

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; offsets is wrong type

update_subscription_state(**kwargs)[源代码]

Update subscription state

参数:
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; state is wrong type

update_topic(**kwargs)[源代码]

Update topic info, only life cycle and comment can be modified.

参数:
  • topic_name – topic name
  • project_name – project name
  • life_cycle – life cycle of topic
  • comment – topic comment
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty; life_cycle is not positive

wait_shards_ready(**kwargs)[源代码]

Wait all shard state in active or closed. It always be invoked when create a topic, and will be blocked and until all shards state in active or closed or timeout .

参数:
  • project_name – project name
  • topic_name – topic name
  • timeout – -1 means it will be blocked until all shards state in active or closed, else will be wait timeout seconds
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty; timeout < 0

Raise:

datahub.exceptions.DatahubException if timeout

Auth

class datahub.auth.AccountType[源代码]

Account type.

Only Support ‘aliyun’ type now.

class datahub.auth.Account(*args, **kwargs)[源代码]

Base Account Class.

get_type()[源代码]

Get account type, subclass must be provided.

返回:the account type
返回类型:datahub.auth.AccountType
sign_request(request)[源代码]

Generator signature for request, subclass must be provided.

参数:request – request object
返回:none
class datahub.auth.AliyunAccount(*args, **kwargs)[源代码]

Aliyun account implement base from datahub.auth.Account

get_type()[源代码]

Get account type.

返回:the account type
返回类型:datahub.auth.AccountType
sign_request(request)[源代码]

Generator signature for request.

参数:request – request object
返回:none

Schema

class datahub.models.FieldType[源代码]
Field Types, datahub support 5 types of field, there are:
TINYINT, SMALLINT, INTEGER, BIGINT, STRING, BOOLEAN, TIMESTAMP, FLOAT, DOUBLE, DECIMAL
class datahub.models.Field(name, field_type, allow_null=True)[源代码]
Members:

name (str): field name

type (str): field type

class datahub.models.RecordSchema(field_list=None)[源代码]

Record schema class, Tuple type Topic will use it.

Members:
fields (list): fields
Example:
>>> schema = RecordSchema.from_lists(         ['bigint_field'  , 'string_field'  , 'double_field'  , 'bool_field'     , 'time_field'],         [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]     )
>>>

Record

class datahub.models.RecordType[源代码]

Record type, there are two type: TUPLE and BLOB

class datahub.models.BlobRecord(blob_data=None, values=None)[源代码]

Blob type record class

Members:
blob_data (str): blob data
class datahub.models.TupleRecord(field_list=None, schema=None, values=None)[源代码]

Tuple type record class

Members:

field_list (list): fields

name_indices (list): values

Example:
>>> schema = RecordSchema.from_lists(['name', 'id'], [FieldType.STRING, FieldType.STRING])
>>> record = TupleRecord(schema=schema, values=['test', 'test2'])
>>> record.values[0:2]
>>> ['test', 'test2']
>>> record.set_value('name', 'test1')
>>> record.values[0]
>>> 'test1'
>>> record.set_value(0, 'test3')
>>> record.get_value(0)
>>> 'test3'
>>> record.get_value('name')
>>> 'test3'
>>> len(record.values)
2
>>> record.has_field('name')
True
class datahub.models.FailedRecord(index, error_code, error_message)[源代码]

Failed record info

Members:

comment (str): subscription description

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (datahub.models.SubscriptionState): subscription state

sub_id (str): subscription id

topic_name (str): topic name

type (int): type

Shard

class datahub.models.ShardState[源代码]

Shard state, there are: OPENING, ACTIVE, CLOSED, CLOSING

class datahub.models.ShardBase(shard_id, begin_hash_key, end_hash_key)[源代码]

Shard base info

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

class datahub.models.Shard(shard_id, begin_hash_key, end_hash_key, state, closed_time, parent_shard_ids, left_shard_id, right_shard_id)[源代码]

Shard info

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

state (datahub.models.ShardState): shard state

closed_time (str): closed time

parent_shard_ids (list): parent shard ids

left_shard_id (str): left shard id

right_shard_id (str): right shard id

class datahub.models.ShardContext(shard_id, start_sequence, end_sequence, current_sequence)[源代码]

Shard context

Members:

shard_id (str): shard id

start_sequence (str): start sequence

end_sequence (str): end sequence

current_sequence (str): current sequence

Cursor

class datahub.models.CursorType[源代码]

Cursor type enum, there are: OLDEST, LATEST, SYSTEM_TIME, SEQUENCE

Offset

class datahub.models.OffsetBase(sequence, timestamp)[源代码]

offset base class

Members:

sequence (int): sequence

timestamp (int): timestamp

class datahub.models.OffsetWithVersion(sequence, timestamp, version)[源代码]

offset with version class

Members:

sequence (int): sequence

timestamp (int): timestamp

version (int): version

class datahub.models.OffsetWithSession(sequence, timestamp, version, session_id)[源代码]

offset with session class

Members:

sequence (int): sequence

timestamp (int): timestamp

version (int): version

session_id (int): session id

Subscription

class datahub.models.SubscriptionState[源代码]

Subscription state, there are: INACTIVE, ACTIVE

class datahub.models.Subscription(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[源代码]

subscription info

Members:

comment (str): subscription description

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (datahub.models.SubscriptionState): subscription state

sub_id (str): subscription id

topic_name (str): topic name

type (int): type

Connector

class datahub.models.ConnectorState[源代码]

ConnectorState enum class, there are: CONNECTOR_RUNNING, CONNECTOR_STOPPED

class datahub.models.AuthMode[源代码]

AuthMode enum class, there are: ak, sts

class datahub.models.PartitionMode[源代码]

PartitionMode enum class, there are: USER_DEFINE, SYSTEM_TIME, EVENT_TIME

class datahub.models.ConnectorType[源代码]

ConnectorType enum class, there are: SINK_ODPS, SINK_ADS, SINK_ES, SINK_FC, SINK_MYSQL, SINK_OSS, SINK_OTS, SINK_HOLOGRES, SINK_DATAHUB

class datahub.models.OdpsConnectorConfig(project_name, table_name, odps_endpoint, tunnel_endpoint, access_id, access_key, partition_mode, time_range, partition_config=None)[源代码]

Connector config for odps

Members:

project_name (str): odps project name

table_name (str): odps table name

odps_endpoint (str): odps endpoint

tunnel_endpoint (str): tunnel endpoint

access_id (str): odps access id

access_key (str): odps access key

partition_mode (datahub.models.connector.PartitionMode): partition mode

time_range (int): time range

partition_config(collections.OrderedDict): partition config

class datahub.models.DatabaseConnectorConfig(host, port, database, user, password, table, ignore)[源代码]

Connector config for database

Members:

host (str): host

port (int): port

database (str): database

user (str): user

password (str): password

table (str): table

ignore (bool): ignore insert error

class datahub.models.EsConnectorConfig(index, endpoint, user, password, id_fields, type_fields, proxy_mode)[源代码]

Connector config for ElasticSearch

Members:

index (str): index

endpoint (str): endpoint

user (str): user

password (str): password

id_fields (list): id fields

type_fields (list): type fields

proxy_mode (bool): proxy mode

class datahub.models.FcConnectorConfig(endpoint, service, func, auth_mode, access_id='', access_key='')[源代码]

Connector config for FunctionCompute

Members:

endpoint (str): endpoint

service (str): service

func (str): function

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

class datahub.models.OssConnectorConfig(endpoint, bucket, prefix, time_format, time_range, auth_mode, access_id='', access_key='')[源代码]

Connector config for ObjectStoreService

Members:

endpoint (str): endpoint

bucket (str): bucket

prefix (str): prefix

time_format (str): time format

time_range (int): time range

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

class datahub.models.OtsConnectorConfig(endpoint, instance, table, auth_mode, access_id='', access_key='', write_mode=<WriteMode.PUT: 'PUT'>)[源代码]

Connector config for OpenTableStore

Members:

endpoint (str): endpoint

instance (str): instance

table (str): table

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

write_mode (datahub.models.connector.WriteMode): write mode

Results

class datahub.models.results.ListProjectResult(project_names)[源代码]

Request params of list projects api

Members:
project_names (list): list of project names
class datahub.models.results.GetProjectResult(project_name, comment, create_time, last_modify_time)[源代码]

Result of get project api

Members:

project_name (str): project name

comment (str): project description

create_time (int): create time

last_modify_time(int): last modify time

class datahub.models.results.ListTopicResult(topic_names)[源代码]

Result of list topics api

Members:
topic_names (list): list of topic names
class datahub.models.results.GetTopicResult(**kwargs)[源代码]

Result of get topic api

Members:

project_name (str): project name

topic_name (str): topic name

shard_count (int) shard count

life_cycle (int) life cycle

record_type (datahub.models.RecordType): record type

record_schema (datahub.models.RecordSchema): record schema

comment (str): project description

create_time (int): create time

last_modify_time(int): last modify time

class datahub.models.results.ListShardResult(shards)[源代码]

Result of list shards api

Members:
shards (list): list of datahub.models.Shard
class datahub.models.results.MergeShardResult(shard_id, begin_hash_key, end_hash_key)[源代码]

Result of merge shard api

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

class datahub.models.results.SplitShardResult(new_shards)[源代码]

Result of split shard api

Members:
new_shards (list): list of datahub.models.ShardBase
class datahub.models.results.GetCursorResult(cursor, record_time, sequence)[源代码]

Request params of get cursor api

Members:

cursor (str): cursor

record_time (int): record time

sequence (int): sequence

class datahub.models.results.PutRecordsResult(failed_record_count, failed_records)[源代码]

Result of put records api

Members:

failed_record_count (int): failed record count

failed_records (list): list of datahub.models.FailedRecord

class datahub.models.results.GetRecordsResult(next_cursor, record_count, start_seq, records)[源代码]

Result of get records api

Members:

next_cursor (str): next cursor

record_count (int): record count

start_squ (int): start sequence

records (list): list of datahub.models.BlobRecord/datahub.models.TupleRecord

class datahub.models.results.GetMeteringInfoResult(active_time, storage)[源代码]

Result of get metering info api;

Members:

active_time (int): active time

storage (int): storage

class datahub.models.results.ListConnectorResult(connector_names, connector_ids)[源代码]

Result of list data connector

Members:
connector_names (list): list of data connector names
class datahub.models.results.GetConnectorResult(cluster_addr, connector_id, connector_type, state, creator, owner, create_time, column_fields, config, extra_config, shard_contexts, sub_id)[源代码]

Result of get data connector

Members:

cluster_addr (str): cluster address

connector_id (str): connector id

type (datahub.models.ConnectorType): connector type

state (datahub.models.ConnectorState): connector state

creator (str): creator

owner (str): owner

create_time (int): create time

column_fields (list): list of column fields

config (datahub.models.OdpsConnectorConfig): config

extra_config (dict): extra config

shard_contexts (list): list of datahub.models.ShardContext

sub_id (str): subscription id used by connector

class datahub.models.results.GetConnectorShardStatusResult(shard_status_infos)[源代码]

Result of get data connector shard status

Members:
shard_status_infos (dict): shard status entry map
class datahub.models.results.InitAndGetSubscriptionOffsetResult(offsets)[源代码]

Result of init and get subscription offset api

Members:
offsets (list): list of datahub.models.OffsetWithSession
class datahub.models.results.GetSubscriptionOffsetResult(offsets)[源代码]

Result of get subscription offset api

Members:
offsets (list): list of datahub.models.OffsetWithVersion
class datahub.models.results.CreateSubscriptionResult(sub_id)[源代码]

Result of create subscription api

Members:
sub_id (str): subscription id
class datahub.models.results.GetSubscriptionResult(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[源代码]

Result of get subscription api

Members:

comment (str): comment

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (str): state

update_time (int): update time

record_time (int): record time

discard_count (int): discard count

class datahub.models.results.ListSubscriptionResult(total_count, subscriptions)[源代码]

Result of query subscription api

Exceptions

class datahub.exceptions.DatahubException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

There was an base exception class that occurred while handling your request to datahub server.

class datahub.exceptions.ResourceExistException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The exception is raised while Datahub Object that you are creating is already exist.

class datahub.exceptions.ResourceNotFoundException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The exception is raised while Datahub Object that you are handling is not exist.

class datahub.exceptions.InvalidParameterException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The exception is raised while that your handling request parameter is invalid.

class datahub.exceptions.InvalidOperationException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The operation of shard is not support yet.

class datahub.exceptions.LimitExceededException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

Too many request.

class datahub.exceptions.InternalServerException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The Datahub server occurred error.

class datahub.exceptions.AuthorizationFailedException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The authorization failed error.

class datahub.exceptions.NoPermissionException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The operation without permission.