PyDatahub: Datahub Python SDK¶
PyDatahub 是Datahub的Python版本的SDK,它对Datahub服务提供的各个RESTful API接口进行了封装,提供了简单方便的Python编程接口。有关Datahub服务的详细介绍请参见 阿里云官网介绍 。
- Requirements:
- setuptools (>=39.2.0)
- requests (>=2.4.0)
- simplejson (>=3.3.0)
- six (>=1.1.0)
- enum34 (>=1.1.5 for python_version < ‘3.4’)
- crcmod (>=1.7)
- lz4 (>=2.0.0)
- cprotobuf (==0.1.9)
- funcsigs (>=1.0.2)
安装指南¶
安装PyDatahub¶
源码安装¶
$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ python setup.py install
注: 没有网络的情况下可以通过如下方式安装依赖:
$ cd dependency
$ pip install -r first.txt
$ pip install -r second.txt
常见问题¶
如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下:
$ sudo apt-get install python-dev # for python2.x installs
$ sudo apt-get install python3-dev # for python3.x installs
$ sudo yum install python-devel # for python2.x installs
$ sudo yum install python34-devel # for python3.4 installs
如果使用windows操作系统,根据提示信息可到 此处 下载对应版本的 Visual C++ SDK
快速上手¶
Datahub相关的基本概念¶
详情参见 DataHub基本概念 。
准备工作¶
- 访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供可访问的DataHub服务地址。
- 登陆 Datahub WebConsole页面 ,创建Project
日志信息¶
可以在自己的代码中设置日志的输出和打印级别,sdk中主要包含一些debug日志和error日志,以下是将sdk的DEBUG日志打印到控制台的配置样例
import logging
logger = logging.getLogger('datahub')
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
初始化DataHub对象¶
Datahub Python SDK提供的所有API接口均由 datahub.DataHub
类实现,所以第一步就是初始化一个DataHub对象。
可选项:支持protobuf传输,主要在put/get record时,使用protobuf协议。Datahub版本未支持protobuf时需要手动指定enable_pb为False
from datahub import DataHub
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint, enable_pb=False) # Json mode: for datahub server version <= 2.11
dh = DataHub(access_id, access_key, endpoint) # Use protobuf when put/get record, for datahub server version > 2.11
dh = DataHub(access_id, access_key, endpoint, compress_format=CompressFormat.LZ4) # use lz4 compression when put/get record
更多详细定义: DataHub
接口示例¶
针对常用接口分别给出以下示例:
project操作¶
项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。
创建Project¶
- create_project接口创建新的Project
dh.create_project(project_name, comment)
创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。
删除Project¶
- delete_project接口删除Project
dh.delete_project(project_name)
要删除Project,必须保证Project内没有Topic。
列出Project¶
- list_project接口能够获取datahub服务下的所有Project的名字
projects_result = dh.list_project()
list_project返回的结果是ListProjectResult对象,其中包含成员project_names,是一个包含Project名字的list。
查询Project¶
- get_project接口获取一个Project的详细信息
project_result = dh.get_project(project_name)
get_project返回的结果是GetProjectResult对象,其中包含project_name, comment, create_time, last_modify_time这四个成员。
topic操作¶
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型。Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列。Blob类型的Topic仅支持写入一块二进制数据。
Tuple Topic¶
Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型 | 含义 | 值域 |
---|---|---|
Bigint | 8字节有符号整型。 | -9223372036854775807 ~ 9223372036854775807 |
String | 字符串,只支持UTF-8编码。 | 单个String列最长允许1MB。 |
Boolean | 布尔类型 | True/False或true/false或0/1 |
Double | 8字节双精度浮点数 | -1.0 * 10^308 ~ 1.0 * 10^308 |
TimeStamp | 时间戳类型 | 表示到微秒的时间戳类型 |
创建示例¶
project_name = 'topic_test_project'
topic_name = 'tuple_topic_test_topic'
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field' ],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]
)
try:
dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, 'comment')
print("create topic success!")
print("=======================================\n\n")
except InvalidParameterException as e:
print(e)
print("=======================================\n\n")
except ResourceExistException as e:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
新增field¶
dh.append_field(project_name, topic_name, field_name, field_type)
新增field必须是allow_null为True的,给出field_name和field_type作为参数即可,field_type为FieldType枚举类型。
Blob Topic¶
Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
创建示例¶
project_name = 'topic_test_project'
topic_name = 'blob_topic_test_topic'
shard_count = 3
life_cycle = 7
try:
dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, 'comment')
print("create topic success!")
print("=======================================\n\n")
except InvalidParameterException as e:
print(e)
print("=======================================\n\n")
except ResourceExistException as e:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
schema类型¶
schema是用来标明数据存储的名称和对应类型的,在创建tuple topic 和 读写 record 的时候用到。因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。
获取schema¶
- 对于已创建的topic,可以使用get_topic接口来获取schema信息
topic_result = dh.get_topic(project_name, topic_name)
record_schema = topic_result.record_schema
详细定义: Schema
定义schema¶
要创建新的tuple topic,需要自己定义schema,schema可以通过以下方式进行初始化
详细定义: Schema
- 通过lists定义schema
from datahub.models import RecordSchema, FieldType, Field
record_schema1 = RecordSchema.from_lists(
['bigint_field' , 'string_field' , 'double_field' , 'bool_field' , 'event_time1' ],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]
)
record_schema2 = RecordSchema.from_lists(
['bigint_field' , 'string_field' , 'double_field' , 'bool_field' , 'event_time1' ],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP],
[True , False , True , False , True ]
)
必须的参数为2个list,第一个list是对应field的名称,第二个list是对应field的类型,第三个list可选,True为对应feild允许为None, False为对应field不能为None,不传第三个list则默认所有field都为True,即可以为None
- 通过json字符串定义schema
record_schema_1 = RecordSchema.from_json_str(json_str)
json字符串的格式如下:
“{“fields”:[{“type”:”BIGINT”,”name”:”a”},{“type”:”STRING”,”name”:”b”}]}”
- 逐个对schema进行set
record_schema = RecordSchema()
record_schema.add_field(Field('bigint_field', FieldType.BIGINT))
record_schema.add_field(Field('string_field', FieldType.STRING), False)
record_schema.add_field(Field('double_field', FieldType.DOUBLE))
record_schema.add_field(Field('bool_field', FieldType.BOOLEAN))
record_schema.add_field(Field('event_time1', FieldType.TIMESTAMP))
参数为Field对象,Field构造函数第一个参数是field的名称,第二个是field的类型,第三个参数可选,True表示field的值允许为None, False表示field的值不能为None,True,即可以为None
数据发布/订阅¶
发布数据¶
向某个topic下发布数据记录时,每条数据记录需要指定该topic下的一个shard, 因此一般需要通过 list_shard
接口查看下当前topic下的shard列表。
- list_shard接口获取topic下的所有shard
shards_result = dh.list_shard(project_name, topic_name)
返回结果是一个ListShardResult对象,它的shards成员是一个List,其中每个元素是一个Shard对象,可以获取shard_id,state,begin_hash_key,end_hash_key等信息。
- put_records接口向一个topic发布数据
put_result = dh.put_records(project_name, topic_name, records)
其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。 返回结果是一个PutDataRecordResult对象,包含failed_record_count和failed_records两个成员,failed_records是一个FailedRecord类型的List,每个FailedRecord对象包含index,error_code,error_massage三种信息。
写入Tuple类型Record示例¶
from datahub.models import RecordSchema, FieldType, Field
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name, topic_name)
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
print("=======================================\n\n")
shard_result = dh.list_shard(project_name, topic_name)
for shard in shard_result.shards:
print(shard)
print("=======================================\n\n")
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field' ],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]
)
# 建议使用 put_records_by_shard
records = []
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
record0.put_attribute('AK', '47')
records.append(record0)
record1 = TupleRecord(schema=record_schema)
record1.values = [1, 'yc1', 10.01, True, 1455869335000000]
records.append(record1)
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value('double_field', 10.03)
record2.set_value('bool_field', False)
record2.set_value('time_field', 1455869335000
records.append(record2)
dh.put_records_by_shard(project_name, topic_name, shards[0].shard_id, records)
# records = []
# record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
# record0.shard_id = shards[0].shard_id
# record0.put_attribute('AK', '47')
# records.append(record0)
# record1 = TupleRecord(schema=record_schema)
# record1.values = [1, 'yc1', 10.01, True, 1455869335000000]
# record1.shard_id = shards[1].shard_id
# records.append(record1)
# record2 = TupleRecord(schema=record_schema)
# record2.set_value(0, 3)
# record2.set_value(1, 'yc3')
# record2.set_value('double_field', 10.03)
# record2.set_value('bool_field', False)
# record2.set_value('time_field', 1455869335000013)
# record2.shard_id = shards[2].shard_id
# records.append(record2)
# put_result = dh.put_records(project_name, topic_name, records)
print("put tuple %d records" % len(records))
print("failed records: \n%s" % put_result)
# failed_indexs如果非空最好对failed record再进行重试
print("=======================================\n\n")
except DatahubException as e:
print traceback.format_exc()
sys.exit(-1)
订阅数据¶
订阅一个topic下的数据,同样需要指定对应的shard,同时需要指定读取游标位置,通过 get_cursor
接口获取
- 获取Cursor,可以通过四种方式获取:OLDEST, LATEST, SEQUENCE, SYSTEM_TIME
- OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
- LATEST: 表示获取的cursor指向当前最新的record
- SEQUENCE: 表示获取的cursor指向该序列的record
- SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SEQUENCE, sequence)
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, system_time)
get_cursor接口返回类型是GetCursorResult类型的对象,它的成员cursor用于get_data_record接口读取指定位置的数据
从指定shard读取数据,需要指定从哪个cursor开始读,并指定读取的上限数据条数,如果从cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。
dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit_num)
dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit_num)
消费Tuple类型Record示例¶
try:
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
cursor_result = dh.get_cursor(project_name, topic_name, '0', CursorType.OLDEST)
cursor = cursor_result.cursor
while True:
get_result = dh.get_tuple_records(project_name, topic_name, '0', topic_result.record_schema, cursor, 10)
for record in get_result.records:
print(record)
if 0 == get_result.record_count:
time.sleep(1)
cursor = get_result.next_cursor
except DatahubException as e:
print traceback.format_exc()
sys.exit(-1)
shard操作¶
Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态: Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
列出shard¶
- list_shard接口列出topic中所有的shard信息
shards_result = dh.list_shard(project_name, topic_name)
list_shard返回的结果是ListShardResult对象,其中包含shards成员,是Shard对象的list,Shard对象包含shard_id, begin_hash_key, end_hash_key, state等多个信息。
合并shard¶
- merge_shard接口合并两个相邻的shard
merge_result = dh.merge_shard(project_name, topic_name, shard_id, adj_shard_id)
传入两个相邻的shard id,合并成功返回MergeShardResult对象,其中包含新生成的shard_id, begin_hash_key, end_hash_key三个成员。
分裂shard¶
- split_shard接口根据所给的split key将指定的shard分裂为2个相邻的shard
split_result = dh.split_shard(project_name, topic_name, shard_id)
split_result = dh.split_shard(project_name, topic_name, shard_id, split_key)
split_shard返回的结果是SplitShardResult对象,其中包含成员new_shards,是一个ShardBase对象的list,ShardBase对象只包含shard_id, begin_hash_key, end_hash_key三个信息。 如果不指定split_key,会自动查询该shard的hash key的范围,为split_key指定一个中间值进行分裂。
meter操作¶
metering info是对shard的资源占用情况的统计信息,一小时更新一次
connector操作¶
DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。
创建connector¶
- create_connector接口能够创建新的connector
给指定的topic创建指定类型的connector,由(project_name, topic_name, connector_id)确定唯一的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 column_fields对象是包含str的list,内容是topic中的field_name。
from collections import OrderedDict
from datahub.models import OdpsConnectorConfig, ConnectorType, PartitionMode
column_fields = ['f1', 'f2', 'f3']
partition_config = OrderedDict([
("ds", "%Y%m%d"),
("hh", "%H"),
("mm", "%M")
])
connector_config = OdpsConnectorConfig(
project_name, table_name, odps_endpoint,
tunnel_endpoint, connector_access_id, connector_access_key,
partition_mode, time_range, partition_config)
start_time = int(time.time() * 1000)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_ODPS,
column_fields, connector_config, start_time)
print(create_result.connector_id)
创建odps connector,connector_config是OdpsConnectorConfig类型的对象,需要指定connector_project_name, connector_table_name, odps_endpoint, tunnel_endpoint, connector_access_id, connector_access_key, partition_mode, time_range, partition_config。 partition_mode 是PartitionMode枚举类,包括SYSTEM_TIME,EVENT_TIME,USER_DEFINE三种,partition_config是一个OrderedDict,其中item的顺序要与table中保持一致。start_time可以指定开始同步的时间,可以省略,默认从datahub中最早的数据开始同步。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。
更多connector相关详细定义: Connector
from datahub.models import DatabaseConnectorConfig
column_fields = ['f1', 'f2', 'f3']
connector_config = DatabaseConnectorConfig(host, port, database, user, password, table, ignore)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_ADS, column_fields, connector_config)
print(create_result.connector_id)
创建ads connector,connector_config是DatabaseConnectorConfig类型的对象。其中ignore为bool类型,表示是否选择IgnoreInto模式,其他均为str类型的db信息。 ReplaceInto与IgnoreInto: ReplaceInto模式下,会使用replace into语句将数据插入,反之,IgnoreInto会使用insert方式插入数据库(replace into将根据主键覆盖记录,ignore into将忽略冲突进行写入)。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector
更多connector相关详细定义: Connector
from datahub.models import EsConnectorConfig
column_fields = ['f1', 'f2', 'f3']
connector_config = EsConnectorConfig(index, es_endpoint, es_user, es_password,
es_id_fields, es_type_fields, proxy_mode)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_ES, column_fields, connector_config)
print(create_result.connector_id)
创建es connector,connector_config是EsConnectorConfig类型的对象。其中proxy_mode表示是否使用代理模式,若为true将不会扫描es所有node,直接通过代理写入,vpc es必须使用该选项。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。
更多connector相关详细定义: Connector
from datahub.models import FcConnectorConfig
column_fields = ['f1', 'f2', 'f3']
connector_config = FcConnectorConfig(fc_endpoint, fc_service, fc_function, auth_mode, fc_access_id, fc_access_key)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_FC, column_fields, connector_config)
print(create_result.connector_id)
创建fc connector,connector_config是FcConnectorConfig类型的对象。其中 fc_endpoint,fc_service,fc_function 是 function compute 服务的信息, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。
更多connector相关详细定义: Connector
from datahub.models import DatabaseConnectorConfig
column_fields = ['f1', 'f2', 'f3']
connector_config = DatabaseConnectorConfig(host, port, database, user, password, table, ignore)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_MYSQL, column_fields, connector_config)
print(create_result.connector_id)
创建 mysql connector,connector_config 是DatabaseConnectorConfig类型的对象。其中ignore为bool类型,表示是否选择IgnoreInto模式,其他均为str类型的db信息。 ReplaceInto与IgnoreInto: ReplaceInto模式下,会使用replace into语句将数据插入,反之,IgnoreInto会使用insert方式插入数据库(replace into将根据主键覆盖记录,ignore into将忽略冲突进行写入)。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。
更多connector相关详细定义: Connector
from datahub.models import OssConnectorConfig
column_fields = ['f1', 'f2', 'f3']
connector_config = OssConnectorConfig(oss_endpoint, oss_bucket, prefix, time_format, time_range,
auth_mode, oss_access_id, oss_access_key)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_OSS, column_fields, connector_config)
print(create_result.connector_id)
创建oss connector,connector_config是OssConnectorConfig类型的对象。其中 oss_endpoint 是OSS服务的endpoint, oss_bucket 是OSS服务的Buckect,prefix 是OSS服务的目录前缀, time_format指定时间格式,可使用’%Y%m%d%H%M’,time_range是oss分区存储的时间范围,单位是分钟, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。
更多connector相关详细定义: Connector
from datahub.models import OtsConnectorConfig
column_fields = ['f1', 'f2', 'f3']
write_mode = WriteMode.PUT
connector_config = OtsConnectorConfig(ots_endpoint, ots_instance, ots_table,
auth_mode, ots_access_id, ots_access_key, write_mode)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_OTS, column_fields, connector_config)
print(create_result.connector_id)
创建ots connector,connector_config是OtsConnectorConfig类型的对象。其中 ots_endpoint 是OTS服务的endpoint, ots_instance 是OTS服务的实例, ots_table 是OTS表名,write_mode是WriteMode枚举类型,默认是PUT模式, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。
更多connector相关详细定义: Connector
from datahub.models import DataHubConnectorConfig
column_fields = ['f1', 'f2', 'f3']
connector_config = DataHubConnectorConfig(datahub_endpoint, datahub_project_name, datahub_topic_name,
auth_mode, datahub_access_id, datahub_access_key)
create_result = dh.create_connector(project_name, topic_name, ConnectorType.SINK_DATAHUB, column_fields, connector_config)
print(create_result.connector_id)
创建datahub connector,connector_config是DataHubConnectorConfig类型的对象。其中 datahub_endpoint 是DataHub服务的endpoint, datahub_project_name 是DataHub服务的项目名, datahub_topic_name 是项目下的主题名 datahub_topic_name, auth_mode 鉴权模式是 AuthMode枚举类型,AK模式需要填写ak信息,STS模式可以不填写。 2.14版本之前创建不返回结果,从2.14版本的服务开始,返回的结果中包含connectorId,用于标志topic下唯一的connector。
更多connector相关详细定义: Connector
列出connector¶
- list_connector接口能够列出指定topic下的connector名称
connectors_result = dh.list_connector(project_name, topic_name)
connector_ids = connectors_result.connector_ids
print(connector_ids)
list_connector返回的结果是ListConnectorResult对象,包含connector_ids成员,是connectorId的list。
更新connector¶
- update_connector接口更新指定的connector配置
dh.update_connector(project_name, topic_name, connector_type, connector_id)
通过指定(project_name, topic_name, connector_id)确定唯一的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。connector_config是ConnectorConfig对象。
# 直接构造新的connector_config
new_odps_project_name = "1"
new_system_time_table_name = "2"
new_odps_endpoint = "3"
new_tunnel_endpoint = "4"
new_odps_access_id = "5"
new_odps_access_key = "6"
new_partition_config = OrderedDict([("pt", "%Y%m%d"), ("ct", "%H%M")])
new_connector_config = OdpsConnectorConfig(new_odps_project_name, new_system_time_table_name, new_odps_endpoint,
new_tunnel_endpoint, new_odps_access_id, new_odps_access_key,
PartitionMode.USER_DEFINE, 30, new_partition_config)
dh.update_connector(cproject_name, topic_name, connector_id, new_connector_config)
#获取原本的connector_config进行部分修改
new_connector_config = dh.get_connector(connector_test_project_name, system_time_topic_name, connector_id).config
new_connector_config.project_name = "1"
new_connector_config.table_name = "2"
new_connector_config.odps_endpoint = "3"
new_connector_config.tunnel_endpoint = "4"
new_connector_config.access_id = "5"
new_connector_config.access_key = "6"
dh.update_connector(project_name, topic_name, ConnectorType.SINK_ODPS, new_connector_config)
删除connector¶
- delete_connector接口删除指定的connector
dh.delete_connector(project_name, topic_name, connector_id)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。
查询connector¶
- get_connector接口能够查询指定的connector信息
connector_result = dh.get_connector(project_name, topic_name, connector_id)
print(connector_result)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 get_connector返回的结果是GetConnectorResult对象,成员包含connector_id, column_fields, type, state, creator, owner, config。 其中type是ConnectorType枚举类型的对象,state是ConnectorState枚举类型的对象,config是具体connector类型对应的config对象。
查询connector shard状态¶
- get_connector_shard_status接口查询connector中指定shard的状态
status_result = dh.get_connector_shard_status(project_name, topic_name, connector_id, shard_id)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 参数中的shard_id 不指定的情况下,表示获取所有shard的status信息。 get_connector_shard_status返回的结果是GetDataShardStatusResult对象,其中包含成员 shard_status_infos 是一个dict,key是shard_id, value 是 ShardStatusEntry类型的对象。
重启connector shard¶
- reload_connector接口能够重启connector中指定的shard
dh.reload_connector(project_name, topic_name, connector_id, shard_id)
dh.reload_connector(project_name, topic_name, connector_id)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 指定shard_id,可以重启对应的shard,不指定shard_id重启connector下全部shard
添加新field¶
- append_connector_field接口可以给connector添加新的field,但仍需是odps表中存在对应的列。
dh.append_connector_field(project_name, topic_name, connector_id, field_name)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 field_name需要在topic的schema中存在。
更新connector状态¶
- update_connector_state接口可以更改指定connector状态
dh.update_connector_state(project_name, topic_name, connector_id, state)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 传入的state是ConnectorState枚举类的对象,分为CONNECTOR_RUNNING和CONNECTOR_STOPPED,只有将状态置为CONNECTOR_STOPPED才能够更新connector shard点位。
详细定义: Connector
更新connector点位¶
- update_connector_offset接口可以更改指定connector点位
connector_offset = ConnectorOffset(100, 1582801630000)
dh.update_connector_state(project_name, topic_name, connector_id, shard_id, connector_offset)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 传入的connector_offset是ConnectorOffset类的对象,成员有 sequence 和 timestamp(单位毫秒)。shard_id 传 ‘’ 表示所有shard都指定到同一个点位
详细定义: Connector
查询connector完成时间¶
result = dh.get_connector_done_time(project_name, topic_name, connector_id)
print(result.done_time)
通过指定(project_name, topic_name, connector_id)三个参数删除对应的connector,其中connector_id是创建connector时返回的结果,也可以通过list_connector来获取。 get_connector_done_time返回的结果是GetConnectorDoneTimeResult对象,包含成员done_time表示完成时间,time_zone表示时区信息,time_window表示时间窗口大小,单位是秒。
subscription操作¶
订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。
创建subscription¶
- create_subscription能够创建一个新的订阅
create_result = dh.create_subscription(project_name, topic_name, 'comment')
create_subscription返回的结果是CreateSubscriptionResult对象,其中包含sub_id成员,即创建的订阅id
详细定义: Results
删除subscription¶
- delete_subscription接口删除一个订阅
dh.delete_subscription(project_name, topic_name, sub_id)
传入需要删除的sub_id来删除指定的订阅
查询subscription¶
- get_subscription接口能够查询subscription的详细信息
subscription_result = dh.get_subscription(project_name, topic_name, create_result.sub_id)
get_subscription返回的是GetSubscriptionResult对象,其中包含成员comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, type。 其中state是SubscriptionState枚举类的对象,分为ACTIVE和INACTIVE。
详细定义: Subscription, Results
更新subscription¶
- update_subscription接口能够更新subscription
dh.update_subscription(project_name, topic_name, sub_id, new_comment)
update_subscription更新对应sub_id的subscription的comment
更新subscription状态¶
- update_subscription_state接口更新subscription的状态
dh.update_subscription_state(project_name, topic_name, sub_id, state)
update_subscription_state更新对应sub_id的subscription状态,state是SubscriptionState枚举类的对象,分为ACTIVE和INACTIVE。
列出subscription¶
- list_subscription接口列出topic下的所有subscription
subscriptions_result = dh.list_subscription(project_name, topic_name, query_key, page_index, page_size)
传入query_key作为搜索条件,可以传空字符串,通过page_index和page_size获取指定范围的subscription信息,如page_index=1, page_size=10,获取1-10个subscription; page_index=2, page_size=5则获取6-10的subscription。 list_subscription返回的是ListSubscriptionResult对象,其中包含total_count和subscriptions两个成员。 total_count是topic下总共包含的subscription数量,subscriptions是Subscription对象的list。 Subscription对象包含成员comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, type。 其中state是SubscriptionState枚举类的对象,分为ACTIVE和INACTIVE。
详细定义: Subscription, Results
offset操作¶
一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作
初始化offset¶
- init_and_get_subscription_offset接口初始化offset,是开始消费的第一步
init_result = dh.init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_id)
init_result = dh.init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_ids)
最后一个参数可以是一个shard_id,也可以是shard_id的list,指定要初始化的shard init_and_get_subscription_offset返回的是InitAndGetSubscriptionOffsetResult对象,包含offsets成员,是一个OffsetWithSession对象, 其中包含成员sequence, timestamp, version, session_id。 sequence和timestamp就是点位信息,第一次初始化的session_id为0,之后每次初始化都会+1
详细定义: Results
获取offset¶
- get_subscription_offset能够获取订阅的offset信息
offset_result = dh.get_subscription_offset(project_name, topic_name, sub_id, shard_id)
offset_result = dh.get_subscription_offset(project_name, topic_name, sub_id, shard_ids)
最后一个参数可以是一个shard_id,也可以是shard_id的list,指定要初始化的shard get_subscription_offset返回的是GetSubscriptionOffsetResult对象,包含OffsetWithVersion对象的list。 OffsetWithVersion类是OffsetWithSession的父类,只包含sequence, timestamp, version
更新offset¶
- update_subscription_offset接口能够更新offset
offsets1 = {
'0': OffsetWithSession(sequence0, timestamp0, version0, session_id0),
'1': OffsetWithSession(sequence1, timestamp1, version1, session_id1)
}
offsets2 = {
'0': {
'Sequence': 0,
'Timestamp': 0,
'Version': 0,
'SessionId': 0
},
'1': {
'Sequence': 1,
'Timestamp': 1,
'Version': 1,
'SessionId': 1
}
}
dh.update_subscription_offset(project_name, topic_name, sub_id, offsets1)
dh.update_subscription_offset(project_name, topic_name, sub_id, offsets2)
参数offsets是一个dict,其中的key是shard_id,value可以是OffsetWithSession对象,也可以是一个dict,如果version和session_id发生变化,就会更新失败。 当错误信息指出version发生变化,可以通过get_subscription_offset接口获取最新的version信息,继续消费。 当错误信息指出session_id发生变化,就只能再次使用init_and_get_subscription_offset初始化offset信息,再继续消费。
详细定义: Offset
重置offset¶
- reset_subscription_offset接口能够重置offset信息并更新version
offsets1 = {
'0': OffsetWithSession(sequence0, timestamp0, version0, session_id0),
'1': OffsetWithSession(sequence1, timestamp1, version1, session_id1)
}
offsets2 = {
'0': {
'Sequence': 0,
'Timestamp': 0,
'Version': 0,
'SessionId': 0
},
'1': {
'Sequence': 1,
'Timestamp': 1,
'Version': 1,
'SessionId': 1
}
}
offsets3 = {
'0': OffsetBase(sequence0, timestamp0),
'1': OffsetBase(sequence1, timestamp1)
}
offsets4 = {
'0': {
'Sequence': 0,
'Timestamp': 0
},
'1': {
'Sequence': 1,
'Timestamp': 1
}
}
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets1)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets2)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets3)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets4)
参数offsets是一个dict,其中的key是shard_id,value可以是OffsetBase对象以及其子类对象,也可以是一个dict。 OffsetBase是OffsetWithVersion的父类,只包含sequence, timestamp
详细定义: Offset
API Reference¶
DataHub¶
-
class
datahub.models.compress.
CompressFormat
[源代码]¶ CompressFormat enum class, there are:
NONE
,LZ4
,ZLIB
,DEFLATE
-
class
datahub.
DataHub
(access_id, access_key, endpoint=None, enable_pb=True, compress_format=<CompressFormat.NONE: ''>, **kwargs)[源代码]¶ Main entrance to DataHub.
Convenient operations on DataHub objects are provided. Please refer to DataHub docs to see the details.
Generally, basic operations such as
create
,list
,delete
,update
are provided for each DataHub object. Take theproject
as an example.To create an DataHub instance, access_id and access_key is required, and should ensure correctness, or
SignatureNotMatch
error will throw.参数: - access_id – Aliyun Access ID
- secret_access_key – Aliyun Access Key
- endpoint – Rest service URL
- enable_pb – enable protobuf when put/get records, default value is False in version <= 2.11, default value will be True in version >= 2.12
- compress_format (
datahub.models.compress.CompressFormat
) – compress format
Example: >>> datahub = DataHub('**your access id**', '**your access key**', '**endpoint**') >>> datahub_pb = DataHub('**your access id**', '**your access key**', '**endpoint**', enable_pb=True) >>> datahub_lz4 = DataHub('**your access id**', '**your access key**', '**endpoint**', compress_format=CompressFormat.LZ4) >>> >>> project_result = datahub.get_project('datahub_test') >>> >>> print(project_result is None) >>>
-
append_connector_field
(**kwargs)[源代码]¶ Append field to a connector
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type - field_name – field name
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or filed_name is empty; connector_type is wrong type
-
append_field
(**kwargs)[源代码]¶ Append field to a tuple topic
参数: - project_name – project name
- topic_name – topic name
- field_name – field name
- field_type (
datahub.models.FieldType
) – field type
返回: none
Raise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or field_name is empty; field_type is wrong type
-
create_blob_topic
(**kwargs)[源代码]¶ Create blob topic
参数: - project_name – project name
- topic_name – topic name
- shard_count – shard count
- life_cycle – life cycle
- comment – comment
- extend_mode – use expansion method to increase shard
返回: none
Raise: datahub.exceptions.InvalidParameterException
if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong typeRaise: datahub.exceptions.ResourceNotFoundException
if project not existedRaise: datahub.exceptions.ResourceExistException
if topic is already existed
-
create_connector
(**kwargs)[源代码]¶ Create a data connector
参数: - project_name – project name
- topic_name – topic name
- connector_type (
datahub.models.ConnectorType
) – connector type - column_fields (
list
) – column fields - config (
datahub.models.ConnectorConfig
) – connector config - start_time (
int
) – start timestamp in milliseconds
返回: connector id
返回类型: datahub.models.CreateConnectorResult
Raise: datahub.exceptions.ResourceExistException
if connector is already existedRaise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the column field or config is invalid; project_name or topic_name is empty; connector_type or config is wrong type
-
create_project
(**kwargs)[源代码]¶ Create a new project by given name and comment
参数: - project_name – project name
- comment – description of project
返回: none
Raise: datahub.exceptions.InvalidParameterException
if project_name is not validRaise: datahub.exceptions.ResourceExistException
if project is already existed
-
create_subscription
(**kwargs)[源代码]¶ Create subscription to a topic
参数: - project_name – project name
- topic_name – topic name
- comment – comment for subscription
返回: create result contains subscription id
返回类型: Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.LimitExceededException
if limit of subscription number is exceededRaise: datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty
-
create_tuple_topic
(**kwargs)[源代码]¶ Create tuple topic
参数: - project_name – project name
- topic_name – topic name
- shard_count – shard count
- life_cycle – life cycle
- record_schema (
datahub.models.RecordSchema
) – record schema for tuple record type - comment – comment
- extend_mode – use expansion method to increase shard
返回: none
Raise: datahub.exceptions.InvalidParameterException
if project_name is empty; topic_name is not valid; life_cycle is not positive; record_schema is wrong typeRaise: datahub.exceptions.ResourceNotFoundException
if project not existedRaise: datahub.exceptions.ResourceExistException
if topic is already existed
-
delete_connector
(**kwargs)[源代码]¶ Delete a data connector
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not existsRaise: datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type is wrong type
-
delete_project
(**kwargs)[源代码]¶ Delete the project by given name
参数: project_name – project name 返回: none Raise: datahub.exceptions.InvalidParameterException
if project_name is empty
-
delete_subscription
(**kwargs)[源代码]¶ Delete subscription by subscription id
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
-
delete_topic
(**kwargs)[源代码]¶ Delete a topic
参数: - topic_name – topic name
- project_name – project name
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the project name or topic name is empty
-
get_blob_records
(**kwargs)[源代码]¶ Get records from a topic
参数: - project_name – project name
- topic_name – topic name
- shard_id – shard id
- cursor – the cursor
- limit_num – record number need to read
返回: result include record list, start sequence, record num and next cursor
返回类型: datahub.models.GetRecordsResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic or shard not existsRaise: datahub.exceptions.InvalidParameterException
if the cursor is invalid; project_name, topic_name, shard_id, or cursor is emptyRaise: datahub.exceptions.DatahubException
if crc is wrong in pb mode
-
get_connector
(**kwargs)[源代码]¶ Get a data connector
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type
返回: data connector info
返回类型: datahub.models.GetConnectorResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not existsRaise: datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type or config is wrong type
-
get_connector_done_time
(**kwargs)[源代码]¶ Get connector done time
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type
-
get_connector_shard_status
(**kwargs)[源代码]¶ Get data connector shard status
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type - shard_id – shard id
返回: data connector shard status
返回类型: Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic, shard or connector not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or shard_id is empty; connector_type is wrong type
-
get_cursor
(**kwargs)[源代码]¶ Get cursor. When you invoke get_blob_records/get_tuple_records, you must be invoke it to get a cursor first
参数: - project_name – project name
- topic_name – topic name
- shard_id – shard id
- cursor_type (
datahub.models.CursorType
) – cursor type - param – param is system time if cursor_type == CursorType.SYSTEM_TIME while sequence if cursor_type==CursorType.SEQUENCE
返回: cursor info
返回类型: datahub.models.GetCursorResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not existsRaise: datahub.exceptions.InvalidParameterException
if the param is invalid; project_name, topic_name or shard_id is empty; cursor_type is wrong type; param is missing
-
get_metering_info
(**kwargs)[源代码]¶ Get a shard metering info
参数: - project_name – project name
- topic_name – topic name
- shard_id – shard id
返回: the shard metering info
返回类型: datahub.models.GetMeteringInfoResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not existsRaise: datahub.exceptions.InvalidParameterException
if the project_name, topic_name or shard_id is empty
-
get_project
(**kwargs)[源代码]¶ Get a project by given name
参数: project_name – project name 返回: the right project 返回类型: datahub.models.GetProjectResult
Raise: datahub.exceptions.ResourceNotFoundException
if project not existsRaise: datahub.exceptions.InvalidParameterException
if project_name is empty
-
get_subscription
(**kwargs)[源代码]¶ Get subscription
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
返回: subscription info
返回类型: Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
-
get_subscription_offset
(**kwargs)[源代码]¶ Get subscription offset
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
- shard_ids – shard ids
返回: offset info
返回类型: Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
-
get_topic
(**kwargs)[源代码]¶ Get a topic
参数: - topic_name – topic name
- project_name – project name
返回: topic info
返回类型: datahub.models.GetTopicResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the project name or topic name is empty
-
get_tuple_records
(**kwargs)[源代码]¶ Get records from a topic
参数: - project_name – project name
- topic_name – topic name
- shard_id – shard id
- record_schema (
datahub.models.RecordSchema
) – tuple record schema - filter – filter
- cursor – the cursor
- limit_num – record number need to read
返回: result include record list, start sequence, record num and next cursor
返回类型: datahub.models.GetRecordsResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic or shard not existsRaise: datahub.exceptions.InvalidParameterException
if the cursor is invalid; project_name, topic_name, shard_id, or cursor is emptyRaise: datahub.exceptions.DatahubException
if crc is wrong in pb mode
-
init_and_get_subscription_offset
(**kwargs)[源代码]¶ Open subscription offset session
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
- shard_ids – shard ids
返回: offset info
:rtype
datahub.models.InitAndGetSubscriptionOffsetResult
:raise:datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not exists :raise:datahub.exceptions.InvalidParameterException
if project_name, topic_name, sub_id or shard_id is empty
-
list_connector
(**kwargs)[源代码]¶ Create a data connector
参数: - project_name – project name
- topic_name – topic name
返回: data connector names list
返回类型: datahub.models.ListConnectorResult
Raise: datahub.exceptions.InvalidParameterException
if the project_name or topic_name is empty
-
list_project
()[源代码]¶ List all project names
返回: projects in datahub server 返回类型: datahub.models.results.ListProjectResult
-
list_shard
(**kwargs)[源代码]¶ List all shards of a topic
参数: - project_name – project name
- topic_name – topic name
返回: shards info
返回类型: datahub.models.ListTopicResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the project name or topic name is empty
-
list_subscription
(**kwargs)[源代码]¶ Query subscription in range [start, end)
start = (page_index - 1) * page_size + 1
end = start + page_size
参数: - project_name – project name
- topic_name – topic name
- query_key – query key for search
- page_index – page index
- page_size – page size
返回: subscription info list
返回类型: Raise: datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; page_index <= 0 or page_size < 0
-
list_topic
(**kwargs)[源代码]¶ Get all topics of a project
参数: project_name – project name 返回: all topics of the project 返回类型: datahub.models.ListTopicResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project not existsRaise: datahub.exceptions.InvalidParameterException
if project_name is empty
-
merge_shard
(**kwargs)[源代码]¶ Merge shards
参数: - project_name – project name
- topic_name – topic name
- shard_id – shard id
- adj_shard_id – adjacent shard id
返回: shards info after merged
返回类型: datahub.models.MergeShardResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not existsRaise: datahub.exceptions.InvalidOperationException
if the shard is not activeRaise: datahub.exceptions.InvalidParameterException
if the shards not adjacent; project name, topic name, shard id or adjacent shard id is emptyRaise: datahub.exceptions.LimitExceededException
if merge shard operation limit exceeded
-
put_records
(**kwargs)[源代码]¶ Put records to a topic
参数: - project_name – project name
- topic_name – topic name
- record_list (
list
) – record list
返回: failed records info
返回类型: datahub.models.PutRecordsResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the record is not well-formed; project_name or topic_name is emptyRaise: datahub.exceptions.InvalidOperationException
if the shard is not activeRaise: datahub.exceptions.LimitExceededException
if query rate or throughput rate limit exceededRaise: datahub.exceptions.DatahubException
if crc is wrong in pb mode
-
put_records_by_shard
(**kwargs)[源代码]¶ Put records to specific shard of topic
参数: - project_name – project name
- topic_name – topic name
- shard_id – shard id
- record_list (
list
) – record list
返回: failed records info
返回类型: datahub.models.PutRecordsResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the record is not well-formed; project_name, topic_name or shard_id is emptyRaise: datahub.exceptions.InvalidOperationException
if the shard is not activeRaise: datahub.exceptions.LimitExceededException
if query rate or throughput rate limit exceededRaise: datahub.exceptions.DatahubException
if crc is wrong in pb mode
-
reload_connector
(**kwargs)[源代码]¶ Reload data connector by given shard id or reload all shards without any shard id given
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type - shard_id – shard id
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or filed_name is empty; connector_type is wrong type
-
reset_subscription_offset
(**kwargs)[源代码]¶ Update subscription offset
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
- offsets – offsets
Type: dict
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty; offsets is wrong type
-
split_shard
(**kwargs)[源代码]¶ Split shard
参数: - project_name – project name
- topic_name – topic name
- shard_id – split shard id
- split_key – split key, if not given, choose the median
返回: shards info after split
返回类型: datahub.models.SplitShardResult
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or shard not existsRaise: datahub.exceptions.InvalidOperationException
if the shard is not activeRaise: datahub.exceptions.InvalidParameterException
if the key range is invalid; project name, topic name or shard id is emptyRaise: datahub.exceptions.LimitExceededException
if split shard operation limit exceeded
-
update_connector
(**kwargs)[源代码]¶ 参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type - config (
datahub.models.ConnectorConfig
) – connector config
返回: none
-
update_connector_offset
(**kwargs)[源代码]¶ Update data connector offset
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type - shard_id – shard id
- connector_offset (
datahub.models.ConnectorOffset
) – current sequence
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not existsRaise: datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type or connector_state is wrong type
-
update_connector_state
(**kwargs)[源代码]¶ Update data connector state
参数: - project_name – project name
- topic_name – topic name
- connector_id (
str
ordatahub.models.ConnectorType
) – connector id, compatible for connector type - connector_state (
datahub.models.ConnectorState
) – connector state
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or connector not existsRaise: datahub.exceptions.InvalidParameterException
if project_name or topic_name is empty; connector_type or connector_state is wrong type
-
update_project
(**kwargs)[源代码]¶ Update project comment
参数: - project_name – project name
- comment – new description of project
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if project not existsRaise: datahub.exceptions.InvalidParameterException
if project_name is empty or comment is invalid
-
update_subscription
(**kwargs)[源代码]¶ Update subscription
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
- comment – new comment
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty
-
update_subscription_offset
(**kwargs)[源代码]¶ Update subscription offset
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
- offsets (
dict
) – offsets
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not existsRaise: datahub.exceptions.InvalidOperationException
if the offset session closed or offset version changedRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty; offsets is wrong type
-
update_subscription_state
(**kwargs)[源代码]¶ Update subscription state
参数: - project_name – project name
- topic_name – topic name
- sub_id – subscription id
- state (
datahub.models.SubscriptionState
) – new state
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project, topic or subscription not existsRaise: datahub.exceptions.InvalidParameterException
if project_name, topic_name or sub_id is empty; state is wrong type
-
update_topic
(**kwargs)[源代码]¶ Update topic info, only life cycle and comment can be modified.
参数: - topic_name – topic name
- project_name – project name
- life_cycle – life cycle of topic
- comment – topic comment
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the project name or topic name is empty; life_cycle is not positive
-
wait_shards_ready
(**kwargs)[源代码]¶ Wait all shard state in
active
orclosed
. It always be invoked when create a topic, and will be blocked and until all shards state inactive
orclosed
or timeout .参数: - project_name – project name
- topic_name – topic name
- timeout – -1 means it will be blocked until all shards state in
active
orclosed
, else will be wait timeout seconds
返回: none
Raise: datahub.exceptions.ResourceNotFoundException
if the project or topic not existsRaise: datahub.exceptions.InvalidParameterException
if the project name or topic name is empty; timeout < 0Raise: datahub.exceptions.DatahubException
if timeout
Auth¶
-
class
datahub.auth.
Account
(*args, **kwargs)[源代码]¶ Base Account Class.
-
get_type
()[源代码]¶ Get account type, subclass must be provided.
返回: the account type 返回类型: datahub.auth.AccountType
-
-
class
datahub.auth.
AliyunAccount
(*args, **kwargs)[源代码]¶ Aliyun account implement base from
datahub.auth.Account
-
get_type
()[源代码]¶ Get account type.
返回: the account type 返回类型: datahub.auth.AccountType
-
Schema¶
-
class
datahub.models.
FieldType
[源代码]¶ - Field Types, datahub support 5 types of field, there are:
TINYINT
,SMALLINT
,INTEGER
,BIGINT
,STRING
,BOOLEAN
,TIMESTAMP
,FLOAT
,DOUBLE
,DECIMAL
-
class
datahub.models.
Field
(name, field_type, allow_null=True)[源代码]¶ - Members:
name (
str
): field nametype (
str
): field type
-
class
datahub.models.
RecordSchema
(field_list=None)[源代码]¶ Record schema class, Tuple type Topic will use it.
- Members:
- fields (list): fields
Example: >>> schema = RecordSchema.from_lists( ['bigint_field' , 'string_field' , 'double_field' , 'bool_field' , 'time_field'], [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP] ) >>>
Record¶
-
class
datahub.models.
BlobRecord
(blob_data=None, values=None)[源代码]¶ Blob type record class
- Members:
- blob_data (
str
): blob data
-
class
datahub.models.
TupleRecord
(field_list=None, schema=None, values=None)[源代码]¶ Tuple type record class
- Members:
field_list (
list
): fieldsname_indices (
list
): values
Example: >>> schema = RecordSchema.from_lists(['name', 'id'], [FieldType.STRING, FieldType.STRING]) >>> record = TupleRecord(schema=schema, values=['test', 'test2']) >>> record.values[0:2] >>> ['test', 'test2'] >>> record.set_value('name', 'test1') >>> record.values[0] >>> 'test1' >>> record.set_value(0, 'test3') >>> record.get_value(0) >>> 'test3' >>> record.get_value('name') >>> 'test3' >>> len(record.values) 2 >>> record.has_field('name') True
-
class
datahub.models.
FailedRecord
(index, error_code, error_message)[源代码]¶ Failed record info
- Members:
comment (
str
): subscription descriptioncreate_time (
int
): create timeis_owner (
bool
): owner or notlast_modify_time (
int
): last modify timestate (
datahub.models.SubscriptionState
): subscription statesub_id (
str
): subscription idtopic_name (
str
): topic nametype (
int
): type
Shard¶
-
class
datahub.models.
ShardBase
(shard_id, begin_hash_key, end_hash_key)[源代码]¶ Shard base info
- Members:
shard_id (
str
): shard idbegin_hash_key (
str
): begin hash keyend_hash_key (
str
): end hash key
-
class
datahub.models.
Shard
(shard_id, begin_hash_key, end_hash_key, state, closed_time, parent_shard_ids, left_shard_id, right_shard_id)[源代码]¶ Shard info
- Members:
shard_id (
str
): shard idbegin_hash_key (
str
): begin hash keyend_hash_key (
str
): end hash keystate (
datahub.models.ShardState
): shard stateclosed_time (
str
): closed timeparent_shard_ids (list): parent shard ids
left_shard_id (
str
): left shard idright_shard_id (
str
): right shard id
Cursor¶
Offset¶
-
class
datahub.models.
OffsetBase
(sequence, timestamp)[源代码]¶ offset base class
- Members:
sequence (
int
): sequencetimestamp (
int
): timestamp
Subscription¶
-
class
datahub.models.
Subscription
(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[源代码]¶ subscription info
- Members:
comment (
str
): subscription descriptioncreate_time (
int
): create timeis_owner (
bool
): owner or notlast_modify_time (
int
): last modify timestate (
datahub.models.SubscriptionState
): subscription statesub_id (
str
): subscription idtopic_name (
str
): topic nametype (
int
): type
Connector¶
-
class
datahub.models.
ConnectorState
[源代码]¶ ConnectorState enum class, there are:
CONNECTOR_RUNNING
,CONNECTOR_STOPPED
-
class
datahub.models.
PartitionMode
[源代码]¶ PartitionMode enum class, there are:
USER_DEFINE
,SYSTEM_TIME
,EVENT_TIME
-
class
datahub.models.
ConnectorType
[源代码]¶ ConnectorType enum class, there are:
SINK_ODPS
,SINK_ADS
,SINK_ES
,SINK_FC
,SINK_MYSQL
,SINK_OSS
,SINK_OTS
,SINK_HOLOGRES
,SINK_DATAHUB
-
class
datahub.models.
OdpsConnectorConfig
(project_name, table_name, odps_endpoint, tunnel_endpoint, access_id, access_key, partition_mode, time_range, partition_config=None)[源代码]¶ Connector config for odps
- Members:
project_name (
str
): odps project nametable_name (
str
): odps table nameodps_endpoint (
str
): odps endpointtunnel_endpoint (
str
): tunnel endpointaccess_id (
str
): odps access idaccess_key (
str
): odps access keypartition_mode (
datahub.models.connector.PartitionMode
): partition modetime_range (
int
): time rangepartition_config(
collections.OrderedDict
): partition config
-
class
datahub.models.
DatabaseConnectorConfig
(host, port, database, user, password, table, ignore)[源代码]¶ Connector config for database
- Members:
host (
str
): hostport (
int
): portdatabase (
str
): databaseuser (
str
): userpassword (
str
): passwordtable (
str
): tableignore (
bool
): ignore insert error
-
class
datahub.models.
EsConnectorConfig
(index, endpoint, user, password, id_fields, type_fields, proxy_mode)[源代码]¶ Connector config for ElasticSearch
- Members:
index (
str
): indexendpoint (
str
): endpointuser (
str
): userpassword (
str
): passwordid_fields (
list
): id fieldstype_fields (
list
): type fieldsproxy_mode (
bool
): proxy mode
-
class
datahub.models.
FcConnectorConfig
(endpoint, service, func, auth_mode, access_id='', access_key='')[源代码]¶ Connector config for FunctionCompute
- Members:
endpoint (
str
): endpointservice (
str
): servicefunc (
str
): functionauth_mode (
datahub.models.connector.AuthMode
): auth modeaccess_id (
str
): access idaccess_key (
str
): access key
-
class
datahub.models.
OssConnectorConfig
(endpoint, bucket, prefix, time_format, time_range, auth_mode, access_id='', access_key='')[源代码]¶ Connector config for ObjectStoreService
- Members:
endpoint (
str
): endpointbucket (
str
): bucketprefix (
str
): prefixtime_format (
str
): time formattime_range (
int
): time rangeauth_mode (
datahub.models.connector.AuthMode
): auth modeaccess_id (
str
): access idaccess_key (
str
): access key
-
class
datahub.models.
OtsConnectorConfig
(endpoint, instance, table, auth_mode, access_id='', access_key='', write_mode=<WriteMode.PUT: 'PUT'>)[源代码]¶ Connector config for OpenTableStore
- Members:
endpoint (
str
): endpointinstance (
str
): instancetable (
str
): tableauth_mode (
datahub.models.connector.AuthMode
): auth modeaccess_id (
str
): access idaccess_key (
str
): access keywrite_mode (
datahub.models.connector.WriteMode
): write mode
Results¶
-
class
datahub.models.results.
ListProjectResult
(project_names)[源代码]¶ Request params of list projects api
- Members:
- project_names (
list
): list of project names
-
class
datahub.models.results.
GetProjectResult
(project_name, comment, create_time, last_modify_time)[源代码]¶ Result of get project api
- Members:
project_name (
str
): project namecomment (
str
): project descriptioncreate_time (
int
): create timelast_modify_time(
int
): last modify time
-
class
datahub.models.results.
ListTopicResult
(topic_names)[源代码]¶ Result of list topics api
- Members:
- topic_names (
list
): list of topic names
-
class
datahub.models.results.
GetTopicResult
(**kwargs)[源代码]¶ Result of get topic api
- Members:
project_name (
str
): project nametopic_name (
str
): topic nameshard_count (
int
) shard countlife_cycle (
int
) life cyclerecord_type (
datahub.models.RecordType
): record typerecord_schema (
datahub.models.RecordSchema
): record schemacomment (
str
): project descriptioncreate_time (
int
): create timelast_modify_time(
int
): last modify time
-
class
datahub.models.results.
ListShardResult
(shards)[源代码]¶ Result of list shards api
- Members:
- shards (
list
): list ofdatahub.models.Shard
-
class
datahub.models.results.
MergeShardResult
(shard_id, begin_hash_key, end_hash_key)[源代码]¶ Result of merge shard api
- Members:
shard_id (
str
): shard idbegin_hash_key (
str
): begin hash keyend_hash_key (
str
): end hash key
-
class
datahub.models.results.
SplitShardResult
(new_shards)[源代码]¶ Result of split shard api
- Members:
- new_shards (
list
): list ofdatahub.models.ShardBase
-
class
datahub.models.results.
GetCursorResult
(cursor, record_time, sequence)[源代码]¶ Request params of get cursor api
- Members:
cursor (
str
): cursorrecord_time (
int
): record timesequence (
int
): sequence
-
class
datahub.models.results.
PutRecordsResult
(failed_record_count, failed_records)[源代码]¶ Result of put records api
- Members:
failed_record_count (
int
): failed record countfailed_records (
list
): list ofdatahub.models.FailedRecord
-
class
datahub.models.results.
GetRecordsResult
(next_cursor, record_count, start_seq, records)[源代码]¶ Result of get records api
- Members:
next_cursor (
str
): next cursorrecord_count (
int
): record countstart_squ (
int
): start sequencerecords (
list
): list ofdatahub.models.BlobRecord
/datahub.models.TupleRecord
-
class
datahub.models.results.
GetMeteringInfoResult
(active_time, storage)[源代码]¶ Result of get metering info api;
- Members:
active_time (
int
): active timestorage (
int
): storage
-
class
datahub.models.results.
ListConnectorResult
(connector_names, connector_ids)[源代码]¶ Result of list data connector
- Members:
- connector_names (
list
): list of data connector names
-
class
datahub.models.results.
GetConnectorResult
(cluster_addr, connector_id, connector_type, state, creator, owner, create_time, column_fields, config, extra_config, shard_contexts, sub_id)[源代码]¶ Result of get data connector
- Members:
cluster_addr (
str
): cluster addressconnector_id (
str
): connector idtype (
datahub.models.ConnectorType
): connector typestate (
datahub.models.ConnectorState
): connector statecreator (
str
): creatorowner (
str
): ownercreate_time (
int
): create timecolumn_fields (
list
): list of column fieldsconfig (
datahub.models.OdpsConnectorConfig
): configextra_config (
dict
): extra configshard_contexts (
list
): list ofdatahub.models.ShardContext
sub_id (
str
): subscription id used by connector
-
class
datahub.models.results.
GetConnectorShardStatusResult
(shard_status_infos)[源代码]¶ Result of get data connector shard status
- Members:
- shard_status_infos (
dict
): shard status entry map
-
class
datahub.models.results.
InitAndGetSubscriptionOffsetResult
(offsets)[源代码]¶ Result of init and get subscription offset api
- Members:
- offsets (
list
): list ofdatahub.models.OffsetWithSession
-
class
datahub.models.results.
GetSubscriptionOffsetResult
(offsets)[源代码]¶ Result of get subscription offset api
- Members:
- offsets (
list
): list ofdatahub.models.OffsetWithVersion
-
class
datahub.models.results.
CreateSubscriptionResult
(sub_id)[源代码]¶ Result of create subscription api
- Members:
- sub_id (
str
): subscription id
-
class
datahub.models.results.
GetSubscriptionResult
(comment, create_time, is_owner, last_modify_time, state, sub_id, topic_name, sub_type)[源代码]¶ Result of get subscription api
- Members:
comment (
str
): commentcreate_time (
int
): create timeis_owner (
bool
): owner or notlast_modify_time (
int
): last modify timestate (
str
): stateupdate_time (
int
): update timerecord_time (
int
): record timediscard_count (
int
): discard count
Exceptions¶
-
class
datahub.exceptions.
DatahubException
(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]¶ There was an base exception class that occurred while handling your request to datahub server.
-
class
datahub.exceptions.
ResourceExistException
(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]¶ The exception is raised while Datahub Object that you are creating is already exist.
-
class
datahub.exceptions.
ResourceNotFoundException
(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]¶ The exception is raised while Datahub Object that you are handling is not exist.
-
class
datahub.exceptions.
InvalidParameterException
(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]¶ The exception is raised while that your handling request parameter is invalid.
-
class
datahub.exceptions.
InvalidOperationException
(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]¶ The operation of shard is not support yet.
-
class
datahub.exceptions.
LimitExceededException
(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]¶ Too many request.
-
class
datahub.exceptions.
InternalServerException
(error_msg, status_code=-1, request_id=None, error_code=None)[源代码]¶ The Datahub server occurred error.