Presto 动态数据源改造方案

Presto 动态数据源改造方案

Presto 动态数据源改造方案

Presto 数据源加载机制原生是基于文件系统的,从配置目录中加载 数据源配置文件,从而实现不同源的数据库连接,而且整个过程是在presto集群启动时执行。而我们需要动态的增加/删除 数据源,并且不重启整个presto集群,所以对presto 加载数据源部分做了改造:

  1. 对外新增两个EndPoint:增加数据源API 和 删除数据源API
  2. 数据源的配置从文件系统迁移到数据库
  3. 在不重启集群的前提下,动态更新presto 数据源连接信息

动态增加/删除数据源,新增catalog endpoint

新增两个接口,用于向presto/trino 动态的增加和删除数据源:

  • POST http://{presto-api-base-url}/v1/catalog/add

  • DELETE http://{presto-api-base-url}/v1/catalog/delete

处理的流程图:

其中较为关键的是要区分 coordinator 和worker:

coordinator 除了维护自己节点的catalog信息,还需要接收 外部管理catalog 的api,并且将 catalog的变更需要同步通知给worker

worker 只需要接受coordinator的请求,维护自己节点的catalog信息即可

还有一个方案,就是在presto启动过程中,新开一个线程来定时监控 数据库的catalog表,如果这个表发生变更,那么就重新刷新presto 的catalog 信息。

这个方案不适合添加后立刻就能用的场景,所以换成由coordinator 来同步通知所有active 的worker节点

为了让catalog更快生效,那么需要更加迅速的将catalog变更信息同步到各个worker,所以可以引入线程池并发通知

举例:向coordinator调用添加一个db2的数据源:

1
2
3
4
curl -X POST \
http://{presto-api-base-url}/v1/catalog/add \
-H 'content-type: application/json' \
-d '{"catalogName":"db2","connectorName":"db2","properties":{"connection-url":"jdbc:db2://{db2-jdbc-url}","connection-user":"{user}","connection-password":"{password}"}}'

举例:向coordinator申请删除名称为db2的数据源:

1
curl -X DELETE http://{presto-api-base-url}/v1/catalog/delete?catalogName=db2

后续优化可以给两个API 加上权限控制

仅有上面的方案,使用过程还是会存在一定的问题:比如 新增了worker 节点或者 在调用新增/删除API时有部分worker 离线了,那么就会导致有些worker无法参与动态数据源的计算查询了,所以为了解决 catalog一致性问题,可以引入Mysql 之类的数据库,将catalog 信息统一维护到数据库中,这样worker 重启后还是能拿到同一份catalog 列表

从中间件加载Presto catalog数据

原生catalog 加载逻辑

  1. 当presto/trino 启动时,会调用Server.class 中的start(),然后实例化ServerMainModule类
  2. ServerMainModule类会实例化 StaticCatalog 相关的类
  3. 然后Server.class 会获取StaticCatalogStore的实例,然后调用loadCatalogs()方法
  4. StaticCatalogStore的loadCatalogs()方法是从 etc/catalog/目录读取所有*.properteis
  5. 然后根据配置文件循环调用ConnectorManager.class 的createCatalog()方法

动态Catalog逻辑

改造的逻辑同原生逻辑没有太大区别,最大的不同就是引入了MysqlJDBC实例,然后从数据库中加载catalog信息,最后还是通过ConnectorManager.class 的createCatalog()方法完成catalog 的加载

mysql 可以替换成其他中间件,主要看当前集群环境中有啥,比如可以放到zookeeper中或者其他db中

实施步骤

  1. 新增元数据表:
1
2
3
4
5
6
7
8
9
10
CREATE TABLE `catalog` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`catalogName` varchar(200) NOT NULL,
`connectorName` varchar(200) NOT NULL,
`properties` varchar(2000) DEFAULT NULL,
`status` smallint(6) NOT NULL DEFAULT '1',
`ctime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`mtime` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  1. 新增配置项 etc/config.properteis
1
2
3
4
5
6
7
8
trino.extension.jdbc.enable=true
trino.extension.jdbc.driver=com.mysql.jdbc.Driver
trino.extension.jdbc.url=jdbc:mysql://{jdbcurl}
trino.extension.jdbc.username={username}
trino.extension.jdbc.password={password}
trino.extension.jdbc.max-pool-size=2
dynamic-catalog.enable=true
dynamic-catalog.table-name={database}.catalog
  1. 重启集群

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×