PyDatahub: Datahub Python SDK

PyDatahub 是Datahub的Python版本的SDK,它对Datahub服务提供的各个RESTful API接口进行了封装,提供了简单方便的Python编程接口。有关Datahub服务的详细介绍请参见 阿里云官网介绍

Requirements:
  • setuptools (>=39.2.0)
  • requests (>=2.4.0)
  • simplejson (>=3.3.0)
  • six (>=1.1.0)
  • enum34 (>=1.1.5 for python_version < ‘3.4’)
  • crcmod (>=1.7)
  • lz4 (>=2.0.0)
  • cprotobuf (==0.1.9)
  • funcsigs (>=1.0.2)

安装指南

基础环境准备

安装pip,可以参考 地址

安装PyDatahub

快速安装

$ pip install pydatahub

注: 这里PyDatahub的相关依赖包如果没有安装的话会自动安装。

源码安装

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ python setup.py install

注: 没有网络的情况下可以通过如下方式安装依赖:

$ cd dependency
$ pip install -r first.txt
$ pip install -r second.txt

安装验证

python -c "from datahub import DataHub"

如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!

常见问题

如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下:

$ sudo apt-get install python-dev   # for python2.x installs
$ sudo apt-get install python3-dev  # for python3.x installs

$ sudo yum install python-devel   # for python2.x installs
$ sudo yum install python34-devel   # for python3.4 installs

如果使用windows操作系统,根据提示信息可到 此处 下载对应版本的 Visual C++ SDK

快速上手

Datahub相关的基本概念

详情参见 DataHub基本概念

准备工作

  • 访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供可访问的DataHub服务地址。
  • 登陆 Datahub WebConsole页面 ,创建Project

日志信息

可以在自己的代码中设置日志的输出和打印级别,sdk中主要包含一些debug日志和error日志,以下是将sdk的DEBUG日志打印到控制台的配置样例

import logging

logger = logging.getLogger('datahub')
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)

初始化DataHub对象

Datahub Python SDK提供的所有API接口均由 datahub.DataHub 类实现,所以第一步就是初始化一个DataHub对象。 可选项:支持protobuf传输,主要在put/get record时,使用protobuf协议。Datahub版本未支持protobuf时需要手动指定enable_pb为False

from datahub import DataHub

access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint, enable_pb=False) # Json mode: for datahub server version <= 2.11
dh = DataHub(access_id, access_key, endpoint) # Use protobuf when put/get record, for datahub server version > 2.11
dh = DataHub(access_id, access_key, endpoint, compress_format=CompressFormat.LZ4) # use lz4 compression when put/get record

更多详细定义: DataHub

接口示例

针对常用接口分别给出以下示例:

project操作

项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建Project
  • create_project接口创建新的Project
dh.create_project(project_name, comment)

创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。

删除Project
  • delete_project接口删除Project
dh.delete_project(project_name)

要删除Project,必须保证Project内没有Topic。

列出Project
  • list_project接口能够获取datahub服务下的所有Project的名字
projects_result = dh.list_project()

list_project返回的结果是ListProjectResult对象,其中包含成员project_names,是一个包含Project名字的list。

查询Project
  • get_project接口获取一个Project的详细信息
project_result = dh.get_project(project_name)

get_project返回的结果是GetProjectResult对象,其中包含project_name, comment, create_time, last_modify_time这四个成员。

topic操作

Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型。Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列。Blob类型的Topic仅支持写入一块二进制数据。

Tuple Topic

Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:

类型 含义 值域
Bigint 8字节有符号整型。 -9223372036854775807 ~ 9223372036854775807
String 字符串,只支持UTF-8编码。 单个String列最长允许1MB。
Boolean 布尔类型 True/False或true/false或0/1
Double 8字节双精度浮点数 -1.0 * 10^308 ~ 1.0 * 10^308
TimeStamp 时间戳类型 表示到微秒的时间戳类型
创建示例
project_name = 'topic_test_project'
topic_name = 'tuple_topic_test_topic'
shard_count = 3
life_cycle = 7

record_schema = RecordSchema.from_lists(
    ['bigint_field',   'string_field',   'double_field',   'bool_field',      'time_field'       ],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]
)

try:
    dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, 'comment')
    print("create topic success!")
    print("=======================================\n\n")
except InvalidParameterException as e:
    print(e)
    print("=======================================\n\n")
except ResourceExistException as e:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)
新增field
dh.append_field(project_name, topic_name, field_name, field_type)

新增field必须是allow_null为True的,给出field_name和field_type作为参数即可,field_type为FieldType枚举类型。

Blob Topic

Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。

创建示例
project_name = 'topic_test_project'
topic_name = 'blob_topic_test_topic'
shard_count = 3
life_cycle = 7


try:
    dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, 'comment')
    print("create topic success!")
    print("=======================================\n\n")
except InvalidParameterException as e:
    print(e)
    print("=======================================\n\n")
except ResourceExistException as e:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

schema类型

schema是用来标明数据存储的名称和对应类型的,在创建tuple topic 和 读写 record 的时候用到。因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。

获取schema
  • 对于已创建的topic,可以使用get_topic接口来获取schema信息
topic_result = dh.get_topic(project_name, topic_name)
record_schema = topic_result.record_schema

详细定义: Schema

定义schema

要创建新的tuple topic,需要自己定义schema,schema可以通过以下方式进行初始化

详细定义: Schema

  • 通过lists定义schema
from datahub.models import RecordSchema, FieldType, Field

record_schema1 = RecordSchema.from_lists(
    ['bigint_field'  , 'string_field'  , 'double_field'  , 'bool_field'     , 'event_time1'      ],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]
)

record_schema2 = RecordSchema.from_lists(
    ['bigint_field'  , 'string_field'  , 'double_field'  , 'bool_field'     , 'event_time1'      ],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP],
    [True            , False           , True            , False            , True               ]
)

必须的参数为2个list,第一个list是对应field的名称,第二个list是对应field的类型,第三个list可选,True为对应feild允许为None, False为对应field不能为None,不传第三个list则默认所有field都为True,即可以为None

  • 通过json字符串定义schema
record_schema_1 = RecordSchema.from_json_str(json_str)

json字符串的格式如下:

“{“fields”:[{“type”:”BIGINT”,”name”:”a”},{“type”:”STRING”,”name”:”b”}]}”

  • 逐个对schema进行set
record_schema = RecordSchema()
record_schema.add_field(Field('bigint_field', FieldType.BIGINT))
record_schema.add_field(Field('string_field', FieldType.STRING), False)
record_schema.add_field(Field('double_field', FieldType.DOUBLE))
record_schema.add_field(Field('bool_field', FieldType.BOOLEAN))
record_schema.add_field(Field('event_time1', FieldType.TIMESTAMP))

参数为Field对象,Field构造函数第一个参数是field的名称,第二个是field的类型,第三个参数可选,True表示field的值允许为None, False表示field的值不能为None,True,即可以为None

数据发布/订阅

发布数据

向某个topic下发布数据记录时,每条数据记录需要指定该topic下的一个shard, 因此一般需要通过 list_shard 接口查看下当前topic下的shard列表。

  • list_shard接口获取topic下的所有shard
shards_result = dh.list_shard(project_name, topic_name)

返回结果是一个ListShardResult对象,它的shards成员是一个List,其中每个元素是一个Shard对象,可以获取shard_id,state,begin_hash_key,end_hash_key等信息。

详细定义: Shard, Results

  • put_records接口向一个topic发布数据
put_result = dh.put_records(project_name, topic_name, records)

其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。 返回结果是一个PutDataRecordResult对象,包含failed_record_count和failed_records两个成员,failed_records是一个FailedRecord类型的List,每个FailedRecord对象包含index,error_code,error_massage三种信息。

详细定义: Record, Results

写入Tuple类型Record示例
from datahub.models import RecordSchema, FieldType, Field

try:
    # block等待所有shard状态ready
    dh.wait_shards_ready(project_name, topic_name)

    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)
    print("=======================================\n\n")

    shard_result = dh.list_shard(project_name, topic_name)
    for shard in shard_result.shards:
        print(shard)
    print("=======================================\n\n")

    record_schema = RecordSchema.from_lists(
        ['bigint_field',   'string_field',   'double_field',   'bool_field',      'time_field'       ],
        [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]
    )

    # 建议使用 put_records_by_shard
    records = []

    record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.put_attribute('AK', '47')
    records.append(record0)

    record1 = TupleRecord(schema=record_schema)
    record1.values = [1, 'yc1', 10.01, True, 1455869335000000]
    records.append(record1)

    record2 = TupleRecord(schema=record_schema)
    record2.set_value(0, 3)
    record2.set_value(1, 'yc3')
    record2.set_value('double_field', 10.03)
    record2.set_value('bool_field', False)
    record2.set_value('time_field', 1455869335000
    records.append(record2)

    dh.put_records_by_shard(project_name, topic_name, shards[0].shard_id, records)

    # records = []

    # record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    # record0.shard_id = shards[0].shard_id
    # record0.put_attribute('AK', '47')
    # records.append(record0)

    # record1 = TupleRecord(schema=record_schema)
    # record1.values = [1, 'yc1', 10.01, True, 1455869335000000]
    # record1.shard_id = shards[1].shard_id
    # records.append(record1)

    # record2 = TupleRecord(schema=record_schema)
    # record2.set_value(0, 3)
    # record2.set_value(1, 'yc3')
    # record2.set_value('double_field', 10.03)
    # record2.set_value('bool_field', False)
    # record2.set_value('time_field', 1455869335000013)
    # record2.shard_id = shards[2].shard_id
    # records.append(record2)

    # put_result = dh.put_records(project_name, topic_name, records)

    print("put tuple %d records" % len(records))
    print("failed records: \n%s" % put_result)
    # failed_indexs如果非空最好对failed record再进行重试
    print("=======================================\n\n")
except DatahubException as e:
    print traceback.format_exc()
    sys.exit(-1)
订阅数据

订阅一个topic下的数据,同样需要指定对应的shard,同时需要指定读取游标位置,通过 get_cursor 接口获取

  • 获取Cursor,可以通过四种方式获取:OLDEST, LATEST, SEQUENCE, SYSTEM_TIME
    • OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
    • LATEST: 表示获取的cursor指向当前最新的record
    • SEQUENCE: 表示获取的cursor指向该序列的record
    • SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SEQUENCE, sequence)
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, system_time)

get_cursor接口返回类型是GetCursorResult类型的对象,它的成员cursor用于get_data_record接口读取指定位置的数据

从指定shard读取数据,需要指定从哪个cursor开始读,并指定读取的上限数据条数,如果从cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。

dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit_num)
dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit_num)
消费Tuple类型Record示例
try:
    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)

    cursor_result = dh.get_cursor(project_name, topic_name, '0', CursorType.OLDEST)
    cursor = cursor_result.cursor
    while True:
        get_result = dh.get_tuple_records(project_name, topic_name, '0', topic_result.record_schema, cursor, 10)
        for record in get_result.records:
            print(record)
        if 0 == get_result.record_count:
            time.sleep(1)
        cursor = get_result.next_cursor

except DatahubException as e:
    print traceback.format_exc()
    sys.exit(-1)

shard操作

Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态: Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出shard
  • list_shard接口列出topic中所有的shard信息
shards_result = dh.list_shard(project_name, topic_name)

list_shard返回的结果是ListShardResult对象,其中包含shards成员,是Shard对象的list,Shard对象包含shard_id, begin_hash_key, end_hash_key, state等多个信息。

详细定义: Shard, Results

合并shard
  • merge_shard接口合并两个相邻的shard
merge_result = dh.merge_shard(project_name, topic_name, shard_id, adj_shard_id)

传入两个相邻的shard id,合并成功返回MergeShardResult对象,其中包含新生成的shard_id, begin_hash_key, end_hash_key三个成员。

详细定义: Shard, Results

分裂shard
  • split_shard接口根据所给的split key将指定的shard分裂为2个相邻的shard
split_result = dh.split_shard(project_name, topic_name, shard_id)
split_result = dh.split_shard(project_name, topic_name, shard_id, split_key)

split_shard返回的结果是SplitShardResult对象,其中包含成员new_shards,是一个ShardBase对象的list,ShardBase对象只包含shard_id, begin_hash_key, end_hash_key三个信息。 如果不指定split_key,会自动查询该shard的hash key的范围,为split_key指定一个中间值进行分裂。

详细定义: Shard, Results

meter操作

metering info是对shard的资源占用情况的统计信息,一小时更新一次

获取metering info
  • get_metering_info接口获取指定shard的统计信息
result = dh.get_metering_info(project_name, topic_name, shard_id)

get_metering_info返回的结果是GetMeteringInfoResult对象,包含active_time, storage两个成员。

详细定义: Results

connector操作

DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

创建connector
  • create_connector接口能够创建新的connector

给指定的topic创建指定类型的connector,由(project_name, topic_name, connector_id)确定唯一的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 column_fields对象是包含str的list,内容是topic中的field_name。

from collections import OrderedDict
from datahub.models import OdpsConnectorConfig, ConnectorType, PartitionMode

column_fields = ['f1', 'f2', 'f3']
partition_config = OrderedDict([
    ("ds", "%Y%m%d"),
    ("hh", "%H"),
    ("mm", "%M")
])

connector_config = OdpsConnectorConfig(
    project_name, table_name, odps_endpoint,
    tunnel_endpoint, connector_access_id, connector_access_key,
    partition_mode, time_range, partition_config)

start_time = int(time.time() * 1000)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_ODPS,
                                    column_fields, connector_config, start_time)
print(create_result.connector_id)

创建odps connector,connector_config是OdpsConnectorConfig类型的对象,需要指定connector_project_name, connector_table_name, odps_endpoint, tunnel_endpoint, connector_access_id, connector_access_key, partition_mode, time_range, partition_config。 partition_mode 是PartitionMode枚举类,包括SYSTEM_TIME,EVENT_TIME,USER_DEFINE三种,partition_config是一个OrderedDict,其中item的顺序要与table中保持一致。start_time可以指定开始同步的时间,可以省略,默认从datahub中最早的数据开始同步。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。

更多connector相关详细定义: Connector

from datahub.models import DatabaseConnectorConfig

column_fields = ['f1', 'f2', 'f3']
connector_config = DatabaseConnectorConfig(host, port, database, user, password, table, ignore)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_ADS, column_fields, connector_config)
print(create_result.connector_id)

创建ads connector,connector_config是DatabaseConnectorConfig类型的对象。其中ignore为bool类型,表示是否选择IgnoreInto模式,其他均为str类型的db信息。 ReplaceInto与IgnoreInto: ReplaceInto模式下,会使用replace into语句将数据插入,反之,IgnoreInto会使用insert方式插入数据库(replace into将根据主键覆盖记录,ignore into将忽略冲突进行写入)。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector

更多connector相关详细定义: Connector

from datahub.models import EsConnectorConfig

column_fields = ['f1', 'f2', 'f3']
connector_config = EsConnectorConfig(index, es_endpoint, es_user, es_password,
                                     es_id_fields, es_type_fields, proxy_mode)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_ES, column_fields, connector_config)
print(create_result.connector_id)

创建es connector,connector_config是EsConnectorConfig类型的对象。其中proxy_mode表示是否使用代理模式,若为true将不会扫描es所有node,直接通过代理写入,vpc es必须使用该选项。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。

更多connector相关详细定义: Connector

from datahub.models import FcConnectorConfig

column_fields = ['f1', 'f2', 'f3']
connector_config = FcConnectorConfig(fc_endpoint, fc_service, fc_function, auth_mode, fc_access_id, fc_access_key)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_FC, column_fields, connector_config)
print(create_result.connector_id)

创建fc connector,connector_config是FcConnectorConfig类型的对象。其中 fc_endpoint,fc_service,fc_function 是 function compute 服务的信息, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。

更多connector相关详细定义: Connector

from datahub.models import DatabaseConnectorConfig

column_fields = ['f1', 'f2', 'f3']
connector_config = DatabaseConnectorConfig(host, port, database, user, password, table, ignore)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_MYSQL, column_fields, connector_config)
print(create_result.connector_id)

创建 mysql connector,connector_config 是DatabaseConnectorConfig类型的对象。其中ignore为bool类型,表示是否选择IgnoreInto模式,其他均为str类型的db信息。 ReplaceInto与IgnoreInto: ReplaceInto模式下,会使用replace into语句将数据插入,反之,IgnoreInto会使用insert方式插入数据库(replace into将根据主键覆盖记录,ignore into将忽略冲突进行写入)。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。

更多connector相关详细定义: Connector

from datahub.models import OssConnectorConfig

column_fields = ['f1', 'f2', 'f3']
connector_config = OssConnectorConfig(oss_endpoint, oss_bucket, prefix, time_format, time_range,
                                      auth_mode, oss_access_id, oss_access_key)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_OSS, column_fields, connector_config)
print(create_result.connector_id)

创建oss connector,connector_config是OssConnectorConfig类型的对象。其中 oss_endpoint 是OSS服务的endpoint, oss_bucket 是OSS服务的Buckect,prefix 是OSS服务的目录前缀, time_format指定时间格式,可使用’%Y%m%d%H%M’,time_range是oss分区存储的时间范围,单位是分钟, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。

更多connector相关详细定义: Connector

from datahub.models import OtsConnectorConfig

column_fields = ['f1', 'f2', 'f3']
write_mode = WriteMode.PUT
connector_config = OtsConnectorConfig(ots_endpoint, ots_instance, ots_table,
                                      auth_mode, ots_access_id, ots_access_key, write_mode)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_OTS, column_fields, connector_config)
print(create_result.connector_id)

创建ots connector,connector_config是OtsConnectorConfig类型的对象。其中 ots_endpoint 是OTS服务的endpoint, ots_instance 是OTS服务的实例, ots_table 是OTS表名,write_mode是WriteMode枚举类型,默认是PUT模式, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。

更多connector相关详细定义: Connector

from datahub.models import DataHubConnectorConfig

column_fields = ['f1', 'f2', 'f3']
connector_config = DataHubConnectorConfig(datahub_endpoint, datahub_project_name, datahub_topic_name,
                                          auth_mode, datahub_access_id, datahub_access_key)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_DATAHUB, column_fields, connector_config)
print(create_result.connector_id)

创建datahub connector,connector_config是DataHubConnectorConfig类型的对象。其中 datahub_endpoint 是DataHub服务的endpoint, datahub_project_name 是DataHub服务的项目名, datahub_topic_name 是项目下的主题名 datahub_topic_name, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。

更多connector相关详细定义: Connector

列出connector
  • list_connector接口能够列出指定topic下的connector名称
connectors_result = dh.list_connector(project_name, topic_name)
connector_ids = connectors_result.connector_ids
print(connector_ids)

list_connector返回的结果是ListConnectorResult对象,包含connector_ids成员,是connectorId的list。

更新connector
  • update_connector接口更新指定的connector配置
dh.update_connector(project_name, topic_name, connector_type, connector_id)

通过指定(project_name, topic_name, connector_id)确定唯一的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。connector_config是ConnectorConfig对象。

# 直接构造新的connector_config
new_odps_project_name = "1"
new_system_time_table_name = "2"
new_odps_endpoint = "3"
new_tunnel_endpoint = "4"
new_odps_access_id = "5"
new_odps_access_key = "6"

new_partition_config = OrderedDict([("pt", "%Y%m%d"), ("ct", "%H%M")])
new_connector_config = OdpsConnectorConfig(new_odps_project_name, new_system_time_table_name, new_odps_endpoint,
                                           new_tunnel_endpoint, new_odps_access_id, new_odps_access_key,
                                           PartitionMode.USER_DEFINE, 30, new_partition_config)

dh.update_connector(cproject_name, topic_name, connector_id, new_connector_config)

#获取原本的connector_config进行部分修改
new_connector_config = dh.get_connector(connector_test_project_name, system_time_topic_name, connector_id).config

new_connector_config.project_name = "1"
new_connector_config.table_name = "2"
new_connector_config.odps_endpoint = "3"
new_connector_config.tunnel_endpoint = "4"
new_connector_config.access_id = "5"
new_connector_config.access_key = "6"

dh.update_connector(project_name, topic_name, ConnectorType.SINK_ODPS, new_connector_config)
删除connector
  • delete_connector接口删除指定的connector
dh.delete_connector(project_name, topic_name, connector_id)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。

查询connector
  • get_connector接口能够查询指定的connector信息
connector_result = dh.get_connector(project_name, topic_name, connector_id)
print(connector_result)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 get_connector返回的结果是GetConnectorResult对象,成员包含connector_id, column_fields, type, state, creator, owner, config。 其中type是ConnectorType枚举类型的对象,state是ConnectorState枚举类型的对象,config是具体connector类型对应的config对象。

详细定义: Results, Connector

查询connector shard状态
  • get_connector_shard_status接口查询connector中指定shard的状态
status_result = dh.get_connector_shard_status(project_name, topic_name, connector_id, shard_id)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 参数中的shard_id 不指定的情况下,表示获取所有shard的status信息。 get_connector_shard_status返回的结果是GetDataShardStatusResult对象,其中包含成员 shard_status_infos 是一个dict,key是shard_id, value 是 ShardStatusEntry类型的对象。

详细定义: Results, Connector

重启connector shard
  • reload_connector接口能够重启connector中指定的shard
dh.reload_connector(project_name, topic_name, connector_id, shard_id)
dh.reload_connector(project_name, topic_name, connector_id)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 指定shard_id,可以重启对应的shard,不指定shard_id重启connector下全部shard

添加新field
  • append_connector_field接口可以给connector添加新的field,但仍需是odps表中存在对应的列。
dh.append_connector_field(project_name, topic_name, connector_id, field_name)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 field_name需要在topic的schema中存在。

更新connector状态
  • update_connector_state接口可以更改指定connector状态
dh.update_connector_state(project_name, topic_name, connector_id, state)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 传入的state是ConnectorState枚举类的对象,分为CONNECTOR_RUNNING和CONNECTOR_STOPPED,只有将状态置为CONNECTOR_STOPPED才能够更新connector shard点位。

详细定义: Connector

更新connector点位
  • update_connector_offset接口可以更改指定connector点位
connector_offset = ConnectorOffset(100, 1582801630000)
dh.update_connector_state(project_name, topic_name, connector_id, shard_id, connector_offset)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 传入的connector_offset是ConnectorOffset类的对象,成员有 sequence 和 timestamp(单位毫秒)。shard_id 传 ‘’ 表示所有shard都指定到同一个点位

详细定义: Connector

查询connector完成时间
result = dh.get_connector_done_time(project_name, topic_name, connector_id)
print(result.done_time)

通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 get_connector_done_time返回的结果是GetConnectorDoneTimeResult对象,包含成员done_time表示完成时间,time_zone表示时区信息,time_window表示时间窗口大小,单位是秒。

subscription操作

订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建subscription
  • create_subscription能够创建一个新的订阅
create_result = dh.create_subscription(project_name, topic_name, 'comment')

create_subscription返回的结果是CreateSubscriptionResult对象,其中包含sub_id成员,即创建的订阅id

详细定义: Results

删除subscription
  • delete_subscription接口删除一个订阅
dh.delete_subscription(project_name, topic_name, sub_id)

传入需要删除的sub_id来删除指定的订阅

查询subscription
  • get_subscription接口能够查询subscription的详细信息
subscription_result = dh.get_subscription(project_name, topic_name, create_result.sub_id)

get_subscription返回的是GetSubscriptionResult对象,其中包含成员comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, type。 其中state是SubscriptionState枚举类的对象,分为ACTIVE和INACTIVE。

详细定义: Subscription, Results

更新subscription
  • update_subscription接口能够更新subscription
dh.update_subscription(project_name, topic_name, sub_id, new_comment)

update_subscription更新对应sub_id的subscription的comment

更新subscription状态
  • update_subscription_state接口更新subscription的状态
dh.update_subscription_state(project_name, topic_name, sub_id, state)

update_subscription_state更新对应sub_id的subscription状态,state是SubscriptionState枚举类的对象,分为ACTIVE和INACTIVE。

列出subscription
  • list_subscription接口列出topic下的所有subscription
subscriptions_result = dh.list_subscription(project_name, topic_name, query_key, page_index, page_size)

传入query_key作为搜索条件,可以传空字符串,通过page_index和page_size获取指定范围的subscription信息,如page_index=1, page_size=10,获取1-10个subscription; page_index=2, page_size=5则获取6-10的subscription。 list_subscription返回的是ListSubscriptionResult对象,其中包含total_count和subscriptions两个成员。 total_count是topic下总共包含的subscription数量,subscriptions是Subscription对象的list。 Subscription对象包含成员comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, type。 其中state是SubscriptionState枚举类的对象,分为ACTIVE和INACTIVE。

详细定义: Subscription, Results

offset操作

一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作

初始化offset
  • init_and_get_subscription_offset接口初始化offset,是开始消费的第一步
init_result = dh.init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_id)
init_result = dh.init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_ids)

最后一个参数可以是一个shard_id,也可以是shard_id的list,指定要初始化的shard init_and_get_subscription_offset返回的是InitAndGetSubscriptionOffsetResult对象,包含offsets成员,是一个OffsetWithSession对象, 其中包含成员sequence, timestamp, version, session_id。 sequence和timestamp就是点位信息,第一次初始化的session_id为0,之后每次初始化都会+1

详细定义: Results

获取offset
  • get_subscription_offset能够获取订阅的offset信息
offset_result = dh.get_subscription_offset(project_name, topic_name, sub_id, shard_id)
offset_result = dh.get_subscription_offset(project_name, topic_name, sub_id, shard_ids)

最后一个参数可以是一个shard_id,也可以是shard_id的list,指定要初始化的shard get_subscription_offset返回的是GetSubscriptionOffsetResult对象,包含OffsetWithVersion对象的list。 OffsetWithVersion类是OffsetWithSession的父类,只包含sequence, timestamp, version

详细定义: Offset, Results

更新offset
  • update_subscription_offset接口能够更新offset
offsets1 = {
    '0': OffsetWithSession(sequence0, timestamp0, version0, session_id0),
    '1': OffsetWithSession(sequence1, timestamp1, version1, session_id1)
}

offsets2 = {
    '0': {
        'Sequence': 0,
        'Timestamp': 0,
        'Version': 0,
        'SessionId': 0
    },
    '1': {
        'Sequence': 1,
        'Timestamp': 1,
        'Version': 1,
        'SessionId': 1
    }
}

dh.update_subscription_offset(project_name, topic_name, sub_id, offsets1)
dh.update_subscription_offset(project_name, topic_name, sub_id, offsets2)

参数offsets是一个dict,其中的key是shard_id,value可以是OffsetWithSession对象,也可以是一个dict,如果version和session_id发生变化,就会更新失败。 当错误信息指出version发生变化,可以通过get_subscription_offset接口获取最新的version信息,继续消费。 当错误信息指出session_id发生变化,就只能再次使用init_and_get_subscription_offset初始化offset信息,再继续消费。

详细定义: Offset

重置offset
  • reset_subscription_offset接口能够重置offset信息并更新version
offsets1 = {
    '0': OffsetWithSession(sequence0, timestamp0, version0, session_id0),
    '1': OffsetWithSession(sequence1, timestamp1, version1, session_id1)
}

offsets2 = {
    '0': {
        'Sequence': 0,
        'Timestamp': 0,
        'Version': 0,
        'SessionId': 0
    },
    '1': {
        'Sequence': 1,
        'Timestamp': 1,
        'Version': 1,
        'SessionId': 1
    }
}

offsets3 = {
    '0': OffsetBase(sequence0, timestamp0),
    '1': OffsetBase(sequence1, timestamp1)
}

offsets4 = {
    '0': {
        'Sequence': 0,
        'Timestamp': 0
    },
    '1': {
        'Sequence': 1,
        'Timestamp': 1
    }
}

dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets1)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets2)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets3)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets4)

参数offsets是一个dict,其中的key是shard_id,value可以是OffsetBase对象以及其子类对象,也可以是一个dict。 OffsetBase是OffsetWithVersion的父类,只包含sequence, timestamp

详细定义: Offset

API Reference

DataHub

class datahub.models.compress.CompressFormat[源代码]

CompressFormat enum class, there are: NONE, LZ4, ZLIB, DEFLATE

class datahub.DataHub(access_id, access_key, endpoint=None, enable_pb=True, compress_format=<CompressFormat.NONE: ''>, **kwargs)[源代码]

Main entrance to DataHub.

Convenient operations on DataHub objects are provided. Please refer to DataHub docs to see the details.

Generally, basic operations such as create, list, delete, update are provided for each DataHub object. Take the project as an example.

To create an DataHub instance, access_id and access_key is required, and should ensure correctness, or SignatureNotMatch error will throw.

参数:
  • access_id – Aliyun Access ID
  • secret_access_key – Aliyun Access Key
  • endpoint – Rest service URL
  • enable_pb – enable protobuf when put/get records, default value is False in version <= 2.11, default value will be True in version >= 2.12
  • compress_format (datahub.models.compress.CompressFormat) – compress format
Example:
>>> datahub = DataHub('**your access id**', '**your access key**', '**endpoint**')
>>> datahub_pb = DataHub('**your access id**', '**your access key**', '**endpoint**', enable_pb=True)
>>> datahub_lz4 = DataHub('**your access id**', '**your access key**', '**endpoint**', compress_format=CompressFormat.LZ4)
>>>
>>> project_result = datahub.get_project('datahub_test')
>>>
>>> print(project_result is None)
>>>
append_connector_field(**kwargs)[源代码]

Append field to a connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • field_name – field name
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or filed_name is empty; connector_type is wrong type

append_field(**kwargs)[源代码]

Append field to a tuple topic

参数:
  • project_name – project name
  • topic_name – topic name
  • field_name – field name
  • field_type (datahub.models.FieldType) – field type
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or field_name is empty; field_type is wrong type

create_blob_topic(**kwargs)[源代码]

Create blob topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_count – shard count
  • life_cycle – life cycle
  • comment – comment
  • extend_mode – use expansion method to increase shard
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type

Raise:

datahub.exceptions.ResourceNotFoundException if project not existed

Raise:

datahub.exceptions.ResourceExistException if topic is already existed

create_connector(**kwargs)[源代码]

Create a data connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_type (datahub.models.ConnectorType) – connector type
  • column_fields (list) – column fields
  • config (datahub.models.ConnectorConfig) – connector config
  • start_time (int) – start timestamp in milliseconds
返回:

connector id

返回类型:

datahub.models.CreateConnectorResult

Raise:

datahub.exceptions.ResourceExistException if connector is already existed

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the column field or config is invalid; project_name or topic_name is empty; connector_type or config is wrong type

create_project(**kwargs)[源代码]

Create a new project by given name and comment

参数:
  • project_name – project name
  • comment – description of project
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is not valid

Raise:

datahub.exceptions.ResourceExistException if project is already existed

create_subscription(**kwargs)[源代码]

Create subscription to a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • comment – comment for subscription
返回:

create result contains subscription id

返回类型:

datahub.models.results.CreateSubscriptionResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.LimitExceededException if limit of subscription number is exceeded

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty

create_tuple_topic(**kwargs)[源代码]

Create tuple topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_count – shard count
  • life_cycle – life cycle
  • record_schema (datahub.models.RecordSchema) – record schema for tuple record type
  • comment – comment
  • extend_mode – use expansion method to increase shard
返回:

none

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong type

Raise:

datahub.exceptions.ResourceNotFoundException if project not existed

Raise:

datahub.exceptions.ResourceExistException if topic is already existed

delete_connector(**kwargs)[源代码]

Delete a data connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type is wrong type

delete_project(**kwargs)[源代码]

Delete the project by given name

参数:project_name – project name
返回:none
Raise:datahub.exceptions.InvalidParameterException if project_name is empty
delete_subscription(**kwargs)[源代码]

Delete subscription by subscription id

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

delete_topic(**kwargs)[源代码]

Delete a topic

参数:
  • topic_name – topic name
  • project_name – project name
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

get_blob_records(**kwargs)[源代码]

Get records from a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • cursor – the cursor
  • limit_num – record number need to read
返回:

result include record list, start sequence, record num and next cursor

返回类型:

datahub.models.GetRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

get_connector(**kwargs)[源代码]

Get a data connector

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
返回:

data connector info

返回类型:

datahub.models.GetConnectorResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or config is wrong type

get_connector_done_time(**kwargs)[源代码]

Get connector done time

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
get_connector_shard_status(**kwargs)[源代码]

Get data connector shard status

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • shard_id – shard id
返回:

data connector shard status

返回类型:

datahub.models.results.GetConnectorShardStatusResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic, shard or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or shard_id is empty; connector_type is wrong type

get_cursor(**kwargs)[源代码]

Get cursor. When you invoke get_blob_records/get_tuple_records, you must be invoke it to get a cursor first

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • cursor_type (datahub.models.CursorType) – cursor type
  • param – param is system time if cursor_type == CursorType.SYSTEM_TIME while sequence if cursor_type==CursorType.SEQUENCE
返回:

cursor info

返回类型:

datahub.models.GetCursorResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the param is invalid; project_name, topic_name or shard_id is empty; cursor_type is wrong type; param is missing

get_metering_info(**kwargs)[源代码]

Get a shard metering info

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
返回:

the shard metering info

返回类型:

datahub.models.GetMeteringInfoResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the project_name, topic_name or shard_id is empty

get_project(**kwargs)[源代码]

Get a project by given name

参数:project_name – project name
返回:the right project
返回类型:datahub.models.GetProjectResult
Raise:datahub.exceptions.ResourceNotFoundException if project not exists
Raise:datahub.exceptions.InvalidParameterException if project_name is empty
get_subscription(**kwargs)[源代码]

Get subscription

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
返回:

subscription info

返回类型:

datahub.models.results.GetSubscriptionResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

get_subscription_offset(**kwargs)[源代码]

Get subscription offset

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • shard_ids – shard ids
返回:

offset info

返回类型:

datahub.models.results.GetSubscriptionOffsetResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

get_topic(**kwargs)[源代码]

Get a topic

参数:
  • topic_name – topic name
  • project_name – project name
返回:

topic info

返回类型:

datahub.models.GetTopicResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

get_tuple_records(**kwargs)[源代码]

Get records from a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • record_schema (datahub.models.RecordSchema) – tuple record schema
  • filter – filter
  • cursor – the cursor
  • limit_num – record number need to read
返回:

result include record list, start sequence, record num and next cursor

返回类型:

datahub.models.GetRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic or shard not exists

Raise:

datahub.exceptions.InvalidParameterException if the cursor is invalid; project_name, topic_name, shard_id, or cursor is empty

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

init_and_get_subscription_offset(**kwargs)[源代码]

Open subscription offset session

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • shard_ids – shard ids
返回:

offset info

:rtype datahub.models.InitAndGetSubscriptionOffsetResult :raise: datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists :raise: datahub.exceptions.InvalidParameterException if project_name, topic_name, sub_id or shard_id is empty

list_connector(**kwargs)[源代码]

Create a data connector

参数:
  • project_name – project name
  • topic_name – topic name
返回:

data connector names list

返回类型:

datahub.models.ListConnectorResult

Raise:

datahub.exceptions.InvalidParameterException if the project_name or topic_name is empty

list_project()[源代码]

List all project names

返回:projects in datahub server
返回类型:datahub.models.results.ListProjectResult
list_shard(**kwargs)[源代码]

List all shards of a topic

参数:
  • project_name – project name
  • topic_name – topic name
返回:

shards info

返回类型:

datahub.models.ListTopicResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty

list_subscription(**kwargs)[源代码]

Query subscription in range [start, end)

start = (page_index - 1) * page_size + 1

end = start + page_size

参数:
  • project_name – project name
  • topic_name – topic name
  • query_key – query key for search
  • page_index – page index
  • page_size – page size
返回:

subscription info list

返回类型:

datahub.models.results.ListSubscriptionResult

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; page_index <= 0 or page_size < 0

list_topic(**kwargs)[源代码]

Get all topics of a project

参数:project_name – project name
返回:all topics of the project
返回类型:datahub.models.ListTopicResult
Raise:datahub.exceptions.ResourceNotFoundException if the project not exists
Raise:datahub.exceptions.InvalidParameterException if project_name is empty
merge_shard(**kwargs)[源代码]

Merge shards

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • adj_shard_id – adjacent shard id
返回:

shards info after merged

返回类型:

datahub.models.MergeShardResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.InvalidParameterException if the shards not adjacent; project name, topic name, shard id or adjacent shard id is empty

Raise:

datahub.exceptions.LimitExceededException if merge shard operation limit exceeded

put_records(**kwargs)[源代码]

Put records to a topic

参数:
  • project_name – project name
  • topic_name – topic name
  • record_list (list) – record list
返回:

failed records info

返回类型:

datahub.models.PutRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the record is not well-formed; project_name or topic_name is empty

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.LimitExceededException if query rate or throughput rate limit exceeded

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

put_records_by_shard(**kwargs)[源代码]

Put records to specific shard of topic

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – shard id
  • record_list (list) – record list
返回:

failed records info

返回类型:

datahub.models.PutRecordsResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the record is not well-formed; project_name, topic_name or shard_id is empty

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.LimitExceededException if query rate or throughput rate limit exceeded

Raise:

datahub.exceptions.DatahubException if crc is wrong in pb mode

reload_connector(**kwargs)[源代码]

Reload data connector by given shard id or reload all shards without any shard id given

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • shard_id – shard id
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or filed_name is empty; connector_type is wrong type

reset_subscription_offset(**kwargs)[源代码]

Update subscription offset

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • offsets – offsets
Type:

dict

返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; offsets is wrong type

split_shard(**kwargs)[源代码]

Split shard

参数:
  • project_name – project name
  • topic_name – topic name
  • shard_id – split shard id
  • split_key – split key, if not given, choose the median
返回:

shards info after split

返回类型:

datahub.models.SplitShardResult

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or shard not exists

Raise:

datahub.exceptions.InvalidOperationException if the shard is not active

Raise:

datahub.exceptions.InvalidParameterException if the key range is invalid; project name, topic name or shard id is empty

Raise:

datahub.exceptions.LimitExceededException if split shard operation limit exceeded

update_connector(**kwargs)[源代码]
参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • config (datahub.models.ConnectorConfig) – connector config
返回:

none

update_connector_offset(**kwargs)[源代码]

Update data connector offset

参数:
  • project_name – project name
  • topic_name – topic name
  • connector_id (str or datahub.models.ConnectorType) – connector id, compatible for connector type
  • shard_id – shard id
  • connector_offset (datahub.models.ConnectorOffset) – current sequence
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or connector_state is wrong type

update_connector_state(**kwargs)[源代码]

Update data connector state

参数:
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or connector not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name or topic_name is empty; connector_type or connector_state is wrong type

update_project(**kwargs)[源代码]

Update project comment

参数:
  • project_name – project name
  • comment – new description of project
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if project not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name is empty or comment is invalid

update_subscription(**kwargs)[源代码]

Update subscription

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • comment – new comment
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty

update_subscription_offset(**kwargs)[源代码]

Update subscription offset

参数:
  • project_name – project name
  • topic_name – topic name
  • sub_id – subscription id
  • offsets (dict) – offsets
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidOperationException if the offset session closed or offset version changed

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; offsets is wrong type

update_subscription_state(**kwargs)[源代码]

Update subscription state

参数:
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project, topic or subscription not exists

Raise:

datahub.exceptions.InvalidParameterException if project_name, topic_name or sub_id is empty; state is wrong type

update_topic(**kwargs)[源代码]

Update topic info, only life cycle and comment can be modified.

参数:
  • topic_name – topic name
  • project_name – project name
  • life_cycle – life cycle of topic
  • comment – topic comment
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty; life_cycle is not positive

wait_shards_ready(**kwargs)[源代码]

Wait all shard state in active or closed. It always be invoked when create a topic, and will be blocked and until all shards state in active or closed or timeout .

参数:
  • project_name – project name
  • topic_name – topic name
  • timeout – -1 means it will be blocked until all shards state in active or closed, else will be wait timeout seconds
返回:

none

Raise:

datahub.exceptions.ResourceNotFoundException if the project or topic not exists

Raise:

datahub.exceptions.InvalidParameterException if the project name or topic name is empty; timeout < 0

Raise:

datahub.exceptions.DatahubException if timeout

Auth

class datahub.auth.AccountType[源代码]

Account type.

Only Support ‘aliyun’ type now.

class datahub.auth.Account(*args, **kwargs)[源代码]

Base Account Class.

get_type()[源代码]

Get account type, subclass must be provided.

返回:the account type
返回类型:datahub.auth.AccountType
sign_request(request)[源代码]

Generator signature for request, subclass must be provided.

参数:request – request object
返回:none
class datahub.auth.AliyunAccount(*args, **kwargs)[源代码]

Aliyun account implement base from datahub.auth.Account

get_type()[源代码]

Get account type.

返回:the account type
返回类型:datahub.auth.AccountType
sign_request(request)[源代码]

Generator signature for request.

参数:request – request object
返回:none

Schema

class datahub.models.FieldType[源代码]
Field Types, datahub support 5 types of field, there are:
TINYINT, SMALLINT, INTEGER, BIGINT, STRING, BOOLEAN, TIMESTAMP, FLOAT, DOUBLE, DECIMAL
class datahub.models.Field(name, field_type, allow_null=True)[源代码]
Members:

name (str): field name

type (str): field type

class datahub.models.RecordSchema(field_list=None)[源代码]

Record schema class, Tuple type Topic will use it.

Members:
fields (list): fields
Example:
>>> schema = RecordSchema.from_lists(         ['bigint_field'  , 'string_field'  , 'double_field'  , 'bool_field'     , 'time_field'],         [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]     )
>>>

Record

class datahub.models.RecordType[源代码]

Record type, there are two type: TUPLE and BLOB

class datahub.models.BlobRecord(blob_data=None, values=None)[源代码]

Blob type record class

Members:
blob_data (str): blob data
class datahub.models.TupleRecord(field_list=None, schema=None, values=None)[源代码]

Tuple type record class

Members:

field_list (list): fields

name_indices (list): values

Example:
>>> schema = RecordSchema.from_lists(['name', 'id'], [FieldType.STRING, FieldType.STRING])
>>> record = TupleRecord(schema=schema, values=['test', 'test2'])
>>> record.values[0:2]
>>> ['test', 'test2']
>>> record.set_value('name', 'test1')
>>> record.values[0]
>>> 'test1'
>>> record.set_value(0, 'test3')
>>> record.get_value(0)
>>> 'test3'
>>> record.get_value('name')
>>> 'test3'
>>> len(record.values)
2
>>> record.has_field('name')
True
class datahub.models.FailedRecord(index, error_code, error_message)[源代码]

Failed record info

Members:

comment (str): subscription description

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (datahub.models.SubscriptionState): subscription state

sub_id (str): subscription id

topic_name (str): topic name

type (int): type

Shard

class datahub.models.ShardState[源代码]

Shard state, there are: OPENING, ACTIVE, CLOSED, CLOSING

class datahub.models.ShardBase(shard_id, begin_hash_key, end_hash_key)[源代码]

Shard base info

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

class datahub.models.Shard(shard_id, begin_hash_key, end_hash_key, state, closed_time, parent_shard_ids, left_shard_id, right_shard_id)[源代码]

Shard info

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

state (datahub.models.ShardState): shard state

closed_time (str): closed time

parent_shard_ids (list): parent shard ids

left_shard_id (str): left shard id

right_shard_id (str): right shard id

class datahub.models.ShardContext(shard_id, start_sequence, end_sequence, current_sequence)[源代码]

Shard context

Members:

shard_id (str): shard id

start_sequence (str): start sequence

end_sequence (str): end sequence

current_sequence (str): current sequence

Cursor

class datahub.models.CursorType[源代码]

Cursor type enum, there are: OLDEST, LATEST, SYSTEM_TIME, SEQUENCE

Offset

class datahub.models.OffsetBase(sequence, timestamp)[源代码]

offset base class

Members:

sequence (int): sequence

timestamp (int): timestamp

class datahub.models.OffsetWithVersion(sequence, timestamp, version)[源代码]

offset with version class

Members:

sequence (int): sequence

timestamp (int): timestamp

version (int): version

class datahub.models.OffsetWithSession(sequence, timestamp, version, session_id)[源代码]

offset with session class

Members:

sequence (int): sequence

timestamp (int): timestamp

version (int): version

session_id (int): session id

Subscription

class datahub.models.SubscriptionState[源代码]

Subscription state, there are: INACTIVE, ACTIVE

class datahub.models.Subscription(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[源代码]

subscription info

Members:

comment (str): subscription description

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (datahub.models.SubscriptionState): subscription state

sub_id (str): subscription id

topic_name (str): topic name

type (int): type

Connector

class datahub.models.ConnectorState[源代码]

ConnectorState enum class, there are: CONNECTOR_RUNNING, CONNECTOR_STOPPED

class datahub.models.AuthMode[源代码]

AuthMode enum class, there are: ak, sts

class datahub.models.PartitionMode[源代码]

PartitionMode enum class, there are: USER_DEFINE, SYSTEM_TIME, EVENT_TIME

class datahub.models.ConnectorType[源代码]

ConnectorType enum class, there are: SINK_ODPS, SINK_ADS, SINK_ES, SINK_FC, SINK_MYSQL, SINK_OSS, SINK_OTS, SINK_HOLOGRES, SINK_DATAHUB

class datahub.models.OdpsConnectorConfig(project_name, table_name, odps_endpoint, tunnel_endpoint, access_id, access_key, partition_mode, time_range, partition_config=None)[源代码]

Connector config for odps

Members:

project_name (str): odps project name

table_name (str): odps table name

odps_endpoint (str): odps endpoint

tunnel_endpoint (str): tunnel endpoint

access_id (str): odps access id

access_key (str): odps access key

partition_mode (datahub.models.connector.PartitionMode): partition mode

time_range (int): time range

partition_config(collections.OrderedDict): partition config

class datahub.models.DatabaseConnectorConfig(host, port, database, user, password, table, ignore)[源代码]

Connector config for database

Members:

host (str): host

port (int): port

database (str): database

user (str): user

password (str): password

table (str): table

ignore (bool): ignore insert error

class datahub.models.EsConnectorConfig(index, endpoint, user, password, id_fields, type_fields, proxy_mode)[源代码]

Connector config for ElasticSearch

Members:

index (str): index

endpoint (str): endpoint

user (str): user

password (str): password

id_fields (list): id fields

type_fields (list): type fields

proxy_mode (bool): proxy mode

class datahub.models.FcConnectorConfig(endpoint, service, func, auth_mode, access_id='', access_key='')[源代码]

Connector config for FunctionCompute

Members:

endpoint (str): endpoint

service (str): service

func (str): function

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

class datahub.models.OssConnectorConfig(endpoint, bucket, prefix, time_format, time_range, auth_mode, access_id='', access_key='')[源代码]

Connector config for ObjectStoreService

Members:

endpoint (str): endpoint

bucket (str): bucket

prefix (str): prefix

time_format (str): time format

time_range (int): time range

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

class datahub.models.OtsConnectorConfig(endpoint, instance, table, auth_mode, access_id='', access_key='', write_mode=<WriteMode.PUT: 'PUT'>)[源代码]

Connector config for OpenTableStore

Members:

endpoint (str): endpoint

instance (str): instance

table (str): table

auth_mode (datahub.models.connector.AuthMode): auth mode

access_id (str): access id

access_key (str): access key

write_mode (datahub.models.connector.WriteMode): write mode

Results

class datahub.models.results.ListProjectResult(project_names)[源代码]

Request params of list projects api

Members:
project_names (list): list of project names
class datahub.models.results.GetProjectResult(project_name, comment, create_time, last_modify_time)[源代码]

Result of get project api

Members:

project_name (str): project name

comment (str): project description

create_time (int): create time

last_modify_time(int): last modify time

class datahub.models.results.ListTopicResult(topic_names)[源代码]

Result of list topics api

Members:
topic_names (list): list of topic names
class datahub.models.results.GetTopicResult(**kwargs)[源代码]

Result of get topic api

Members:

project_name (str): project name

topic_name (str): topic name

shard_count (int) shard count

life_cycle (int) life cycle

record_type (datahub.models.RecordType): record type

record_schema (datahub.models.RecordSchema): record schema

comment (str): project description

create_time (int): create time

last_modify_time(int): last modify time

class datahub.models.results.ListShardResult(shards)[源代码]

Result of list shards api

Members:
shards (list): list of datahub.models.Shard
class datahub.models.results.MergeShardResult(shard_id, begin_hash_key, end_hash_key)[源代码]

Result of merge shard api

Members:

shard_id (str): shard id

begin_hash_key (str): begin hash key

end_hash_key (str): end hash key

class datahub.models.results.SplitShardResult(new_shards)[源代码]

Result of split shard api

Members:
new_shards (list): list of datahub.models.ShardBase
class datahub.models.results.GetCursorResult(cursor, record_time, sequence)[源代码]

Request params of get cursor api

Members:

cursor (str): cursor

record_time (int): record time

sequence (int): sequence

class datahub.models.results.PutRecordsResult(failed_record_count, failed_records)[源代码]

Result of put records api

Members:

failed_record_count (int): failed record count

failed_records (list): list of datahub.models.FailedRecord

class datahub.models.results.GetRecordsResult(next_cursor, record_count, start_seq, records)[源代码]

Result of get records api

Members:

next_cursor (str): next cursor

record_count (int): record count

start_squ (int): start sequence

records (list): list of datahub.models.BlobRecord/datahub.models.TupleRecord

class datahub.models.results.GetMeteringInfoResult(active_time, storage)[源代码]

Result of get metering info api;

Members:

active_time (int): active time

storage (int): storage

class datahub.models.results.ListConnectorResult(connector_names, connector_ids)[源代码]

Result of list data connector

Members:
connector_names (list): list of data connector names
class datahub.models.results.GetConnectorResult(cluster_addr, connector_id, connector_type, state, creator, owner, create_time, column_fields, config, extra_config, shard_contexts, sub_id)[源代码]

Result of get data connector

Members:

cluster_addr (str): cluster address

connector_id (str): connector id

type (datahub.models.ConnectorType): connector type

state (datahub.models.ConnectorState): connector state

creator (str): creator

owner (str): owner

create_time (int): create time

column_fields (list): list of column fields

config (datahub.models.OdpsConnectorConfig): config

extra_config (dict): extra config

shard_contexts (list): list of datahub.models.ShardContext

sub_id (str): subscription id used by connector

class datahub.models.results.GetConnectorShardStatusResult(shard_status_infos)[源代码]

Result of get data connector shard status

Members:
shard_status_infos (dict): shard status entry map
class datahub.models.results.InitAndGetSubscriptionOffsetResult(offsets)[源代码]

Result of init and get subscription offset api

Members:
offsets (list): list of datahub.models.OffsetWithSession
class datahub.models.results.GetSubscriptionOffsetResult(offsets)[源代码]

Result of get subscription offset api

Members:
offsets (list): list of datahub.models.OffsetWithVersion
class datahub.models.results.CreateSubscriptionResult(sub_id)[源代码]

Result of create subscription api

Members:
sub_id (str): subscription id
class datahub.models.results.GetSubscriptionResult(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[源代码]

Result of get subscription api

Members:

comment (str): comment

create_time (int): create time

is_owner (bool): owner or not

last_modify_time (int): last modify time

state (str): state

update_time (int): update time

record_time (int): record time

discard_count (int): discard count

class datahub.models.results.ListSubscriptionResult(total_count, subscriptions)[源代码]

Result of query subscription api

Exceptions

class datahub.exceptions.DatahubException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

There was an base exception class that occurred while handling your request to datahub server.

class datahub.exceptions.ResourceExistException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The exception is raised while Datahub Object that you are creating is already exist.

class datahub.exceptions.ResourceNotFoundException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The exception is raised while Datahub Object that you are handling is not exist.

class datahub.exceptions.InvalidParameterException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The exception is raised while that your handling request parameter is invalid.

class datahub.exceptions.InvalidOperationException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The operation of shard is not support yet.

class datahub.exceptions.LimitExceededException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

Too many request.

class datahub.exceptions.InternalServerException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The Datahub server occurred error.

class datahub.exceptions.AuthorizationFailedException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The authorization failed error.

class datahub.exceptions.NoPermissionException(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]

The operation without permission.