#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import abc
import json
from collections import OrderedDict
from enum import Enum
import six
from datahub.exceptions import DatahubException
from ..utils import to_str, bool_to_str, to_text
if six.PY3:
long = int
[文档]class ConnectorType(Enum):
"""
ConnectorType enum class, there are: ``SINK_ODPS``, ``SINK_ADS``, ``SINK_ES``, ``SINK_FC``, \
``SINK_MYSQL``, ``SINK_OSS``, ``SINK_OTS``, ``SINK_HOLOGRES``, ``SINK_DATAHUB``
"""
SINK_ODPS = 'sink_odps'
SINK_ADS = 'sink_ads'
SINK_ES = 'sink_es'
SINK_FC = 'sink_fc'
SINK_MYSQL = 'sink_mysql'
SINK_OSS = 'sink_oss'
SINK_OTS = 'sink_ots'
SINK_HOLOGRES = 'sink_hologres'
SINK_DATAHUB = 'sink_datahub'
[文档]class ConnectorState(Enum):
"""
ConnectorState enum class, there are: ``CONNECTOR_RUNNING``, ``CONNECTOR_STOPPED``
"""
CONNECTOR_CREATED = 'CONNECTOR_CREATED' # deprecated
CONNECTOR_PAUSED = 'CONNECTOR_PAUSED' # deprecated
CONNECTOR_RUNNING = 'CONNECTOR_RUNNING'
CONNECTOR_STOPPED = 'CONNECTOR_STOPPED'
[文档]class PartitionMode(Enum):
"""
PartitionMode enum class, there are: ``USER_DEFINE``, ``SYSTEM_TIME``, ``EVENT_TIME``
"""
USER_DEFINE = 'USER_DEFINE'
SYSTEM_TIME = 'SYSTEM_TIME'
EVENT_TIME = 'EVENT_TIME'
[文档]class AuthMode(Enum):
"""
AuthMode enum class, there are: ``ak``, ``sts``
"""
AK = 'ak'
STS = 'sts'
class WriteMode(Enum):
"""
WriteMode enum class, there are: ``put``, ``update``
"""
PUT = 'PUT'
UPDATE = 'UPDATE'
class ConnectorShardStatus(Enum):
"""
ConnectorShardStatus enum class, there are: ``CONTEXT_PLANNED``, ``CONTEXT_EXECUTING``, ``CONTEXT_HANG``, \
``CONTEXT_PAUSED``, ``CONTEXT_FINISHED``, ``CONTEXT_DELETED``
"""
CONTEXT_PLANNED = 'CONTEXT_PLANNED'
CONTEXT_EXECUTING = 'CONTEXT_EXECUTING'
CONTEXT_HANG = 'CONTEXT_HANG'
CONTEXT_STOPPED = 'CONTEXT_STOPPED'
CONTEXT_PAUSED = 'CONTEXT_PAUSED'
CONTEXT_FINISHED = 'CONTEXT_FINISHED'
CONTEXT_DELETED = 'CONTEXT_DELETED'
class ShardStatusEntry(object):
"""
Connector shard status
Members:
current_sequence (:class:`int`): current sequence
current_timestamp (:class:`int`) current timestamp
done_time (:class:`int`) done timestamp
last_error_message (:class:`str`): last error message
state (:class:`datahub.models.connector.ConnectorShardStatus`): state
update_time (:class:`int`): update time
discard_count (:class:`int`): discard count
worker_addr (:class:`str`): worker address
"""
__slots__ = ('_current_sequence', '_current_timestamp', '_done_time', '_last_error_message',
'_state', '_update_time', '_discard_count', '_worker_addr')
def __init__(self, current_sequence, current_timestamp, done_time, last_error_message, state,
update_time, discard_count, worker_addr):
self._current_sequence = current_sequence
self._current_timestamp = current_timestamp
self._done_time = done_time
self._last_error_message = last_error_message
self._state = state
self._update_time = update_time
self._discard_count = discard_count
self._worker_addr = worker_addr
@property
def current_sequence(self):
return self._current_sequence
@current_sequence.setter
def current_sequence(self, value):
self._current_sequence = value
@property
def current_timestamp(self):
return self._current_timestamp
@current_timestamp.setter
def current_timestamp(self, value):
self._current_timestamp = value
@property
def done_time(self):
return self._done_time
@done_time.setter
def done_time(self, value):
self._done_time = value
@property
def last_error_message(self):
return self._last_error_message
@last_error_message.setter
def last_error_message(self, value):
self._last_error_message = value
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
@property
def update_time(self):
return self._update_time
@update_time.setter
def update_time(self, value):
self._update_time = value
@property
def discard_count(self):
return self._discard_count
@discard_count.setter
def discard_count(self, value):
self._discard_count = value
@property
def worker_addr(self):
return self._worker_addr
@worker_addr.setter
def worker_addr(self, value):
self._worker_addr = value
@classmethod
def from_dict(cls, dict_):
current_sequence = dict_.get('CurrentSequence', -1)
current_timestamp = dict_.get('CurrentTimestamp', -1)
done_time = dict_.get('DoneTime', -1)
last_error_message = dict_.get('LastErrorMessage', '')
state = ConnectorShardStatus(dict_['State'])
update_time = dict_.get('UpdateTime', -1)
discard_count = dict_.get('DiscardCount', 0)
worker_addr = dict_.get('WorkerAddress', '')
return cls(current_sequence, current_timestamp, done_time, last_error_message, state, update_time,
discard_count, worker_addr)
def to_json(self):
return {
'CurrentSequence': self._current_sequence,
'CurrentTimestamp': self._current_timestamp,
'DoneTime': self._done_time,
'LastErrorMessage': self._last_error_message,
'State': self._state.value,
'UpdateTime': self._update_time,
'DiscardCount': self._discard_count,
'WorkerAddress': self._worker_addr
}
class ConnectorOffset(object):
"""
Connector offset
Members:
sequence (:class:`int`): sequence
timestamp (:class:`int`): timestamp
"""
__slots__ = ('_sequence', '_timestamp')
def __init__(self, sequence=-1, timestamp=-1):
self._sequence = sequence
self._timestamp = timestamp
@property
def sequence(self):
return self._sequence
@sequence.setter
def sequence(self, value):
self._sequence = value
@property
def timestamp(self):
return self._timestamp
@timestamp.setter
def timestamp(self, value):
self._timestamp = value
@six.add_metaclass(abc.ABCMeta)
class ConnectorConfig(object):
"""
Connector config class
"""
@abc.abstractmethod
def to_json(self):
pass
@classmethod
@abc.abstractmethod
def from_dict(cls, dict_):
pass
def __repr__(self):
return to_str(self.to_json())
[文档]class OdpsConnectorConfig(ConnectorConfig):
"""
Connector config for odps
Members:
project_name (:class:`str`): odps project name
table_name (:class:`str`): odps table name
odps_endpoint (:class:`str`): odps endpoint
tunnel_endpoint (:class:`str`): tunnel endpoint
access_id (:class:`str`): odps access id
access_key (:class:`str`): odps access key
partition_mode (:class:`datahub.models.connector.PartitionMode`): partition mode
time_range (:class:`int`): time range
partition_config(:class:`collections.OrderedDict`): partition config
"""
__slots__ = ('_project_name', '_table_name', '_odps_endpoint', '_tunnel_endpoint', '_access_id',
'_access_key', '_partition_mode', '_time_range', '_partition_config')
def __init__(self, project_name, table_name, odps_endpoint, tunnel_endpoint,
access_id, access_key, partition_mode, time_range, partition_config=None):
if partition_config is None:
partition_config = {}
self._project_name = project_name
self._table_name = table_name
self._odps_endpoint = odps_endpoint
self._tunnel_endpoint = tunnel_endpoint
self._access_id = access_id
self._access_key = access_key
self._partition_mode = partition_mode
self._time_range = time_range
self._partition_config = partition_config
@property
def project_name(self):
return self._project_name
@project_name.setter
def project_name(self, value):
self._project_name = value
@property
def table_name(self):
return self._table_name
@table_name.setter
def table_name(self, value):
self._table_name = value
@property
def odps_endpoint(self):
return self._odps_endpoint
@odps_endpoint.setter
def odps_endpoint(self, value):
self._odps_endpoint = value
@property
def tunnel_endpoint(self):
return self._tunnel_endpoint
@tunnel_endpoint.setter
def tunnel_endpoint(self, value):
self._tunnel_endpoint = value
@property
def access_id(self):
return self._access_id
@access_id.setter
def access_id(self, value):
self._access_id = value
@property
def access_key(self):
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
@property
def partition_mode(self):
return self._partition_mode
@partition_mode.setter
def partition_mode(self, value):
self._partition_mode = value
@property
def time_range(self):
return self._time_range
@time_range.setter
def time_range(self, value):
self._time_range = value
@property
def partition_config(self):
return self._partition_config
@partition_config.setter
def partition_config(self, value):
self._partition_config = value
def to_json(self):
data = {
"Project": self._project_name,
"Table": self._table_name,
"OdpsEndpoint": self._odps_endpoint,
"TunnelEndpoint": self._tunnel_endpoint
}
if self._access_id:
data["AccessId"] = self._access_id
if self._access_key:
data["AccessKey"] = self._access_key
if self._partition_mode:
data['PartitionMode'] = self._partition_mode.value
if self._time_range and self._time_range > 0:
data['TimeRange'] = self._time_range
if self._partition_config:
data['PartitionConfig'] = self._partition_config
return data
@classmethod
def from_dict(cls, dict_):
partition_config_list = json.loads(to_text(dict_.get('PartitionConfig', '{}')))
partition_config = OrderedDict()
for partition in partition_config_list:
partition_config[partition['key']] = partition['value']
return cls(dict_.get('Project', ''), dict_.get('Table', ''), dict_.get('OdpsEndpoint', ''),
dict_.get('TunnelEndpoint', ''), dict_.get('AccessId', ''), dict_.get('AccessKey', ''),
PartitionMode(dict_['PartitionMode']), int(dict_.get('TimeRange', -1)), partition_config)
[文档]class DatabaseConnectorConfig(ConnectorConfig):
"""
Connector config for database
Members:
host (:class:`str`): host
port (:class:`int`): port
database (:class:`str`): database
user (:class:`str`): user
password (:class:`str`): password
table (:class:`str`): table
ignore (:class:`bool`): ignore insert error
"""
__slots__ = ('_host', '_port', '_database', '_user', '_password', '_table', '_ignore')
def __init__(self, host, port, database, user, password, table, ignore):
self._host = host
self._port = port
self._database = database
self._user = user
self._password = password
self._table = table
self._ignore = ignore
@property
def host(self):
return self._host
@host.setter
def host(self, value):
self._host = value
@property
def port(self):
return self._port
@port.setter
def port(self, value):
self._port = value
@property
def database(self):
return self._database
@database.setter
def database(self, value):
self._database = value
@property
def user(self):
return self._user
@user.setter
def user(self, value):
self._user = value
@property
def password(self):
return self._password
@password.setter
def password(self, value):
self._password = value
@property
def table(self):
return self._table
@table.setter
def table(self, value):
self._table = value
@property
def ignore(self):
return self._ignore
@ignore.setter
def ignore(self, value):
self._ignore = value
def to_json(self):
data = {
"Host": self._host,
"Port": to_str(self._port),
"Ignore": bool_to_str(self._ignore),
"Database": self._database,
"User": self._user,
"Password": self._password,
"Table": self._table,
}
return data
@classmethod
def from_dict(cls, dict_):
host = dict_.get('Host', '')
port = int(dict_.get('Port', '0'))
database = dict_.get('Database', '')
user = dict_.get('User', '')
password = dict_.get('Password', '')
table = dict_.get('Table', '')
ignore = bool(dict_.get('Ignore', 'True'))
return cls(host, port, database, user, password, table, ignore)
[文档]class EsConnectorConfig(ConnectorConfig):
"""
Connector config for ElasticSearch
Members:
index (:class:`str`): index
endpoint (:class:`str`): endpoint
user (:class:`str`): user
password (:class:`str`): password
id_fields (:class:`list`): id fields
type_fields (:class:`list`): type fields
proxy_mode (:class:`bool`): proxy mode
"""
__slots__ = ('_index', '_endpoint', '_user', '_password', '_id_fields',
'_type_fields', '_proxy_mode')
def __init__(self, index, endpoint, user, password, id_fields, type_fields, proxy_mode):
self._index = index
self._endpoint = endpoint
self._user = user
self._password = password
self._id_fields = id_fields
self._type_fields = type_fields
self._proxy_mode = proxy_mode
@property
def index(self):
return self._index
@index.setter
def index(self, value):
self._index = value
@property
def endpoint(self):
return self._endpoint
@endpoint.setter
def endpoint(self, value):
self._endpoint = value
@property
def user(self):
return self._user
@user.setter
def user(self, value):
self._user = value
@property
def password(self):
return self._password
@password.setter
def password(self, value):
self._password = value
@property
def id_fields(self):
return self._id_fields
@id_fields.setter
def id_fields(self, value):
self._id_fields = value
@property
def type_fields(self):
return self._type_fields
@type_fields.setter
def type_fields(self, value):
self._type_fields = value
@property
def proxy_mode(self):
return self._proxy_mode
@proxy_mode.setter
def proxy_mode(self, value):
self._proxy_mode = value
def to_json(self):
data = {
"Index": self._index,
"Endpoint": self._endpoint,
"User": self._user,
"Password": self._password,
"IDFields": self._id_fields,
"TypeFields": self._type_fields,
"ProxyMode": bool_to_str(self._proxy_mode)
}
return data
@classmethod
def from_dict(cls, dict_):
index = dict_.get('Index', '')
endpoint = dict_.get('Endpoint', '')
user = dict_.get('User', '')
password = dict_.get('Password', '')
id_fields = json.loads(dict_.get('IDFields', ''))
type_fields = json.loads(dict_.get('TypeFields', ''))
proxy_mode = bool(dict_.get('ProxyMode', 'False'))
return cls(index, endpoint, user, password, id_fields, type_fields, proxy_mode)
[文档]class FcConnectorConfig(ConnectorConfig):
"""
Connector config for FunctionCompute
Members:
endpoint (:class:`str`): endpoint
service (:class:`str`): service
func (:class:`str`): function
auth_mode (:class:`datahub.models.connector.AuthMode`): auth mode
access_id (:class:`str`): access id
access_key (:class:`str`): access key
"""
__slots__ = ('_endpoint', '_service', '_func', '_auth_mode', '_access_id', '_access_key')
def __init__(self, endpoint, service, func, auth_mode, access_id='', access_key=''):
self._endpoint = endpoint
self._service = service
self._func = func
self._auth_mode = auth_mode
self._access_id = access_id
self._access_key = access_key
@property
def endpoint(self):
return self._endpoint
@endpoint.setter
def endpoint(self, value):
self._endpoint = value
@property
def service(self):
return self._service
@service.setter
def service(self, value):
self._service = value
@property
def func(self):
return self._func
@func.setter
def func(self, value):
self._func = value
@property
def auth_mode(self):
return self._auth_mode
@auth_mode.setter
def auth_mode(self, value):
self._auth_mode = value
@property
def access_id(self):
return self._access_id
@access_id.setter
def access_id(self, value):
self._access_id = value
@property
def access_key(self):
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
def to_json(self):
data = {
"Endpoint": self._endpoint,
"Service": self._service,
"Function": self._func,
"AuthMode": self._auth_mode.value
}
if self._auth_mode == AuthMode.AK and self.access_id and self.access_key:
data["AccessId"] = self.access_id
data["AccessKey"] = self.access_key
return data
@classmethod
def from_dict(cls, dict_):
endpoint = dict_.get('Endpoint', '')
service = dict_.get('Service', '')
func = dict_.get('Function', '')
auth_mode = AuthMode(dict_.get('AuthMode', 'sts'))
access_id = dict_.get('AccessId', '')
access_key = dict_.get('AccessKey', '')
return cls(endpoint, service, func, auth_mode, access_id, access_key)
[文档]class OssConnectorConfig(ConnectorConfig):
"""
Connector config for ObjectStoreService
Members:
endpoint (:class:`str`): endpoint
bucket (:class:`str`): bucket
prefix (:class:`str`): prefix
time_format (:class:`str`): time format
time_range (:class:`int`): time range
auth_mode (:class:`datahub.models.connector.AuthMode`): auth mode
access_id (:class:`str`): access id
access_key (:class:`str`): access key
"""
__slots__ = (
'_endpoint', '_bucket', '_prefix', '_time_format', '_time_range', '_auth_mode', '_access_id', '_access_key')
def __init__(self, endpoint, bucket, prefix, time_format, time_range, auth_mode, access_id='', access_key=''):
self._endpoint = endpoint
self._bucket = bucket
self._prefix = prefix
self._time_format = time_format
self._time_range = time_range
self._auth_mode = auth_mode
self._access_id = access_id
self._access_key = access_key
@property
def endpoint(self):
return self._endpoint
@endpoint.setter
def endpoint(self, value):
self._endpoint = value
@property
def bucket(self):
return self._bucket
@bucket.setter
def bucket(self, value):
self._bucket = value
@property
def prefix(self):
return self._prefix
@prefix.setter
def prefix(self, value):
self._prefix = value
@property
def time_format(self):
return self._time_format
@time_format.setter
def time_format(self, value):
self._time_format = value
@property
def time_range(self):
return self._time_range
@time_range.setter
def time_range(self, value):
self._time_range = value
@property
def auth_mode(self):
return self._auth_mode
@auth_mode.setter
def auth_mode(self, value):
self._auth_mode = value
@property
def access_id(self):
return self._access_id
@access_id.setter
def access_id(self, value):
self._access_id = value
@property
def access_key(self):
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
def to_json(self):
data = {
"Endpoint": self._endpoint,
"Bucket": self._bucket,
"Prefix": self._prefix,
"TimeFormat": self._time_format,
"TimeRange": self._time_range,
"AuthMode": self._auth_mode.value
}
if self._auth_mode == AuthMode.AK and self.access_id and self.access_key:
data["AccessId"] = self.access_id
data["AccessKey"] = self.access_key
return data
@classmethod
def from_dict(cls, dict_):
endpoint = dict_.get('Endpoint', '')
bucket = dict_.get('Bucket', '')
prefix = dict_.get('Prefix', '')
time_format = dict_.get('TimeFormat', '')
time_range = int(dict_.get('TimeRange', '-1'))
auth_mode = AuthMode(dict_.get('AuthMode', 'sts'))
access_id = dict_.get('AccessId', '')
access_key = dict_.get('AccessKey', '')
return cls(endpoint, bucket, prefix, time_format, time_range, auth_mode, access_id, access_key)
[文档]class OtsConnectorConfig(ConnectorConfig):
"""
Connector config for OpenTableStore
Members:
endpoint (:class:`str`): endpoint
instance (:class:`str`): instance
table (:class:`str`): table
auth_mode (:class:`datahub.models.connector.AuthMode`): auth mode
access_id (:class:`str`): access id
access_key (:class:`str`): access key
write_mode (:class:`datahub.models.connector.WriteMode`): write mode
"""
__slots__ = ('_endpoint', '_instance', '_table', '_auth_mode', '_access_id', '_access_key', '_write_mode')
def __init__(self, endpoint, instance, table, auth_mode, access_id='', access_key='', write_mode=WriteMode.PUT):
self._endpoint = endpoint
self._instance = instance
self._table = table
self._auth_mode = auth_mode
self._access_id = access_id
self._access_key = access_key
self._write_mode = write_mode
@property
def endpoint(self):
return self._endpoint
@endpoint.setter
def endpoint(self, value):
self._endpoint = value
@property
def instance(self):
return self._instance
@instance.setter
def instance(self, value):
self._instance = value
@property
def table(self):
return self._table
@table.setter
def table(self, value):
self._table = value
@property
def auth_mode(self):
return self._auth_mode
@auth_mode.setter
def auth_mode(self, value):
self._auth_mode = value
@property
def access_id(self):
return self._access_id
@access_id.setter
def access_id(self, value):
self._access_id = value
@property
def access_key(self):
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
@property
def write_mode(self):
return self._write_mode
@write_mode.setter
def write_mode(self, value):
self._write_mode = value
def to_json(self):
data = {
"Endpoint": self._endpoint,
"InstanceName": self._instance,
"TableName": self._table,
"AuthMode": self._auth_mode.value,
"WriteMode": self._write_mode.value
}
if self._auth_mode == AuthMode.AK and self.access_id and self.access_key:
data["AccessId"] = self.access_id
data["AccessKey"] = self.access_key
return data
@classmethod
def from_dict(cls, dict_):
endpoint = dict_.get('Endpoint', '')
instance = dict_.get('InstanceName', '')
table = dict_.get('TableName', '')
auth_mode = AuthMode(dict_.get('AuthMode', 'sts'))
access_id = dict_.get('AccessId', '')
access_key = dict_.get('AccessKey', '')
write_mode = WriteMode(dict_.get('WriteMode', 'PUT'))
return cls(endpoint, instance, table, auth_mode, access_id, access_key, write_mode)
class HologresConnectorConfig(ConnectorConfig):
"""
Connector config for Hologres
Members:
project (:class:`str`): project
topic (:class:`str`): topic
endpoint (:class:`str`): endpoint
instance_id (:class:`str`): instance id
auth_mode (:class:`datahub.models.connector.AuthMode`): auth mode
timestamp_unit (:class:`str`): timestamp unit
access_id (:class:`str`): access id
access_key (:class:`str`): access key
"""
__slots__ = ('_project', '_topic', '_endpoint', '_instance_id', '_auth_mode', '_timestamp_unit', '_access_id', '_access_key')
def __init__(self, project, topic, endpoint, instance_id, auth_mode, timestamp_unit, access_id='', access_key=''):
self._project = project
self._topic = topic
self._endpoint = endpoint
self._instance_id = instance_id
self._auth_mode = auth_mode
self._timestamp_unit = timestamp_unit
self._access_id = access_id
self._access_key = access_key
@property
def project(self):
return self._project
@project.setter
def project(self, value):
self._project = value
@property
def topic(self):
return self._topic
@topic.setter
def topic(self, value):
self._topic = value
@property
def endpoint(self):
return self._endpoint
@endpoint.setter
def endpoint(self, value):
self._endpoint = value
@property
def instance_id(self):
return self._instance_id
@instance_id.setter
def instance_id(self, value):
self._instance_id = value
@property
def auth_mode(self):
return self._auth_mode
@auth_mode.setter
def auth_mode(self, value):
self._auth_mode = value
@property
def timestamp_unit(self):
return self._timestamp_unit
@timestamp_unit.setter
def timestamp_unit(self, value):
self._timestamp_unit = value
@property
def access_id(self):
return self._access_id
@access_id.setter
def access_id(self, value):
self._access_id = value
@property
def access_key(self):
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
def to_json(self):
data = {
"Endpoint": self._endpoint,
"Project": self._project,
"Topic": self._topic,
"InstanceId": self._instance_id,
"AuthMode": self._auth_mode.value,
"TimestampUnit": self._timestamp_unit
}
if self._auth_mode == AuthMode.AK and self.access_id and self.access_key:
data["AccessId"] = self.access_id
data["AccessKey"] = self.access_key
return data
@classmethod
def from_dict(cls, dict_):
project = dict_.get('Project', '')
topic = dict_.get('Topic', '')
endpoint = dict_.get('Endpoint', '')
instance_id = dict_.get('InstanceId', '')
auth_mode = AuthMode(dict_.get('AuthMode', 'ak'))
timestamp_unit = dict_.get('TimestampUnit', '')
access_id = dict_.get('AccessId', '')
access_key = dict_.get('AccessKey', '')
return cls(project, topic, endpoint, instance_id, auth_mode, timestamp_unit, access_id, access_key)
class DataHubConnectorConfig(ConnectorConfig):
"""
Connector config for DataHub
Members:
endpoint (:class:`str`): endpoint
project (:class:`str`): project
topic (:class:`str`): topic
auth_mode (:class:`datahub.models.connector.AuthMode`): auth mode
access_id (:class:`str`): access id
access_key (:class:`str`): access key
"""
__slots__ = ('_endpoint', '_project', '_topic', '_auth_mode', '_access_id', '_access_key')
def __init__(self, endpoint, project, topic, auth_mode, access_id='', access_key=''):
self._endpoint = endpoint
self._project = project
self._topic = topic
self._auth_mode = auth_mode
self._access_id = access_id
self._access_key = access_key
@property
def endpoint(self):
return self._endpoint
@endpoint.setter
def endpoint(self, value):
self._endpoint = value
@property
def project(self):
return self._project
@project.setter
def project(self, value):
self._project = value
@property
def topic(self):
return self._topic
@topic.setter
def topic(self, value):
self._topic = value
@property
def auth_mode(self):
return self._auth_mode
@auth_mode.setter
def auth_mode(self, value):
self._auth_mode = value
@property
def access_id(self):
return self._access_id
@access_id.setter
def access_id(self, value):
self._access_id = value
@property
def access_key(self):
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
def to_json(self):
data = {
"Endpoint": self._endpoint,
"Project": self._project,
"Topic": self._topic,
"AuthMode": self._auth_mode.value
}
if self._auth_mode == AuthMode.AK and self.access_id and self.access_key:
data["AccessId"] = self.access_id
data["AccessKey"] = self.access_key
return data
@classmethod
def from_dict(cls, dict_):
endpoint = dict_.get('Endpoint', '')
project = dict_.get('Project', '')
topic = dict_.get('Topic', '')
auth_mode = AuthMode(dict_.get('AuthMode', 'sts'))
access_id = dict_.get('AccessId', '')
access_key = dict_.get('AccessKey', '')
return cls(endpoint, project, topic, auth_mode, access_id, access_key)
connector_config_dict = {
ConnectorType.SINK_ODPS: OdpsConnectorConfig,
ConnectorType.SINK_ADS: DatabaseConnectorConfig,
ConnectorType.SINK_ES: EsConnectorConfig,
ConnectorType.SINK_FC: FcConnectorConfig,
ConnectorType.SINK_MYSQL: DatabaseConnectorConfig,
ConnectorType.SINK_OSS: OssConnectorConfig,
ConnectorType.SINK_OTS: OtsConnectorConfig,
ConnectorType.SINK_HOLOGRES: HologresConnectorConfig,
ConnectorType.SINK_DATAHUB: DataHubConnectorConfig
}
def get_connector_builder_by_type(connector_type):
builder = connector_config_dict.get(connector_type, None)
if not builder:
raise DatahubException('unsupported connector type')
return builder