offset操作¶
一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作
初始化offset¶
- init_and_get_subscription_offset接口初始化offset,是开始消费的第一步
init_result = dh.init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_id)
init_result = dh.init_and_get_subscription_offset(project_name, topic_name, sub_id, shard_ids)
最后一个参数可以是一个shard_id,也可以是shard_id的list,指定要初始化的shard init_and_get_subscription_offset返回的是InitAndGetSubscriptionOffsetResult对象,包含offsets成员,是一个OffsetWithSession对象, 其中包含成员sequence, timestamp, version, session_id。 sequence和timestamp就是点位信息,第一次初始化的session_id为0,之后每次初始化都会+1
详细定义: Results
获取offset¶
- get_subscription_offset能够获取订阅的offset信息
offset_result = dh.get_subscription_offset(project_name, topic_name, sub_id, shard_id)
offset_result = dh.get_subscription_offset(project_name, topic_name, sub_id, shard_ids)
最后一个参数可以是一个shard_id,也可以是shard_id的list,指定要初始化的shard get_subscription_offset返回的是GetSubscriptionOffsetResult对象,包含OffsetWithVersion对象的list。 OffsetWithVersion类是OffsetWithSession的父类,只包含sequence, timestamp, version
更新offset¶
- update_subscription_offset接口能够更新offset
offsets1 = {
'0': OffsetWithSession(sequence0, timestamp0, version0, session_id0),
'1': OffsetWithSession(sequence1, timestamp1, version1, session_id1)
}
offsets2 = {
'0': {
'Sequence': 0,
'Timestamp': 0,
'Version': 0,
'SessionId': 0
},
'1': {
'Sequence': 1,
'Timestamp': 1,
'Version': 1,
'SessionId': 1
}
}
dh.update_subscription_offset(project_name, topic_name, sub_id, offsets1)
dh.update_subscription_offset(project_name, topic_name, sub_id, offsets2)
参数offsets是一个dict,其中的key是shard_id,value可以是OffsetWithSession对象,也可以是一个dict,如果version和session_id发生变化,就会更新失败。 当错误信息指出version发生变化,可以通过get_subscription_offset接口获取最新的version信息,继续消费。 当错误信息指出session_id发生变化,就只能再次使用init_and_get_subscription_offset初始化offset信息,再继续消费。
详细定义: Offset
重置offset¶
- reset_subscription_offset接口能够重置offset信息并更新version
offsets1 = {
'0': OffsetWithSession(sequence0, timestamp0, version0, session_id0),
'1': OffsetWithSession(sequence1, timestamp1, version1, session_id1)
}
offsets2 = {
'0': {
'Sequence': 0,
'Timestamp': 0,
'Version': 0,
'SessionId': 0
},
'1': {
'Sequence': 1,
'Timestamp': 1,
'Version': 1,
'SessionId': 1
}
}
offsets3 = {
'0': OffsetBase(sequence0, timestamp0),
'1': OffsetBase(sequence1, timestamp1)
}
offsets4 = {
'0': {
'Sequence': 0,
'Timestamp': 0
},
'1': {
'Sequence': 1,
'Timestamp': 1
}
}
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets1)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets2)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets3)
dh.reset_subscription_offset(project_name, topic_name, sub_id, offsets4)
参数offsets是一个dict,其中的key是shard_id,value可以是OffsetBase对象以及其子类对象,也可以是一个dict。 OffsetBase是OffsetWithVersion的父类,只包含sequence, timestamp
详细定义: Offset