topic操作¶
Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型。Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列。Blob类型的Topic仅支持写入一块二进制数据。
Tuple Topic¶
Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型 | 含义 | 值域 |
---|---|---|
Bigint | 8字节有符号整型。 | -9223372036854775807 ~ 9223372036854775807 |
String | 字符串,只支持UTF-8编码。 | 单个String列最长允许1MB。 |
Boolean | 布尔类型 | True/False或true/false或0/1 |
Double | 8字节双精度浮点数 | -1.0 * 10^308 ~ 1.0 * 10^308 |
TimeStamp | 时间戳类型 | 表示到微秒的时间戳类型 |
创建示例¶
project_name = 'topic_test_project'
topic_name = 'tuple_topic_test_topic'
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field' ],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP]
)
try:
dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, 'comment')
print("create topic success!")
print("=======================================\n\n")
except InvalidParameterException as e:
print(e)
print("=======================================\n\n")
except ResourceExistException as e:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
新增field¶
dh.append_field(project_name, topic_name, field_name, field_type)
新增field必须是allow_null为True的,给出field_name和field_type作为参数即可,field_type为FieldType枚举类型。
Blob Topic¶
Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
创建示例¶
project_name = 'topic_test_project'
topic_name = 'blob_topic_test_topic'
shard_count = 3
life_cycle = 7
try:
dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, 'comment')
print("create topic success!")
print("=======================================\n\n")
except InvalidParameterException as e:
print(e)
print("=======================================\n\n")
except ResourceExistException as e:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)