Presto执行计划 - 分发Statement到不同的QueryExecution

Presto执行计划 - 分发Statement到不同的QueryExecution

Presto执行计划 - 分发Statement到不同的QueryExecution

阅读源码时梳理逻辑用,无阅读价值

获取QueryExecution

queryexecution表示一次查询执行,用于启动、停止与管理一个查询,以及统计这个查询的相关信息。

在 经过Antlr 4语法解析起进行语法分析后,最终生成了一个Node,然后转成Statement,然后再包装成PreparedQuery

再看后续代码,在 dispatchQueryFactory.createDispatchQuery()方法中对不同类型的Statement 进行分发处理,其中对应的类为:QueryExecutionFactory

io.trino.dispatcher.DispatchManager.createQueryInternal()
1
2
3
4
5
6
7
8
 preparedQuery = queryPreparer.prepareQuery(session, query);
// ....
DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
session,
query,
preparedQuery,
slug,
selectionContext.getResourceGroupId());
io.trino.dispatcher.LocalDispatchQueryFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
public DispatchQuery createDispatchQuery(
Session session,
String query,
PreparedQuery preparedQuery,
Slug slug,
ResourceGroupId resourceGroup)
{
WarningCollector warningCollector = warningCollectorFactory.create();
QueryStateMachine stateMachine = QueryStateMachine.begin(
query,
preparedQuery.getPrepareSql(),
session,
locationFactory.createQueryLocation(session.getQueryId()),
resourceGroup,
isTransactionControlStatement(preparedQuery.getStatement()),
transactionManager,
accessControl,
executor,
metadata,
warningCollector,
StatementUtils.getQueryType(preparedQuery.getStatement().getClass()));

// It is important that `queryCreatedEvent` is called here. Moving it past the `executor.submit` below
// can result in delivering query-created event after query analysis has already started.
// That can result in misbehaviour of plugins called during analysis phase (e.g. access control auditing)
// which depend on the contract that event was already delivered.
//
// Note that for immediate and in-order delivery of query events we depend on synchronous nature of
// QueryMonitor and EventListenerManager.
queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty()));

ListenableFuture<QueryExecution> queryExecutionFuture = executor.submit(() -> {
// 在这里根据 statement 的类别生成不同的QueryExecution
// 一种是:DataDefinitionExecution,创建视图/创建表/重命名列之类的DDL操作使用这个执行器
// 一种是:SqlQueryExecution,查询/show操作/use//insert/delete 之类的操作使用这个执行器
// 具体见后面的表格
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(preparedQuery.getStatement().getClass());
if (queryExecutionFactory == null) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported statement type: " + preparedQuery.getStatement().getClass().getSimpleName());
}

try {
return queryExecutionFactory.createQueryExecution(preparedQuery, stateMachine, slug, warningCollector);
}
catch (Throwable e) {
stateMachine.transitionToFailed(e);
throw e;
}
});

return new LocalDispatchQuery(
stateMachine,
queryExecutionFuture,
queryMonitor,
clusterSizeMonitor,
executor,
queryManager::createQuery);
}

QueryExecutionFactory有两个实现类,DataDefinitionExecutionFactory以及SqlQueryExecutionFactory,Statement分配的逻辑见如下表格:

Statement的实现类 QueryExecutionFactory的实现类
AddColumn
Call
Comment
CreateRole
CreateSchema
CreateTable
CreateView
Deallocate
DropColumn
DropRole
DropSchema
DropTable
DropView
CreateMaterializedView
DropMaterializedView
Grant GrantRoles
Prepare
RenameColumn
RenameTable
RenameView
ResetSession
Revoke
RevokeRoles
Rollback
SetPath
SetRole
SetSchemaAuthorization
SetSession
SetTableAuthorization
SetViewAuthorization
StartTransaction
Use
截止到21年9月,更多见QueryExecutionFactoryModule类的configure方法
DataDefinitionExecutionFactory
属于DataDefinitionExecution 范畴的Statement 都有对应的Task
Query
Explain
Analyze
CreateTableAsSelect
Insert
Delete
Update
ShowCatalogs
ShowCreate
ShowFunctions
ShowGrants
ShowRoles
ShowRoleGrants
ShowSchemas
ShowSession
ShowStats
ShowTables
ShowColumns
DescribeInput
DescribeOutput
CreateSchema
DropSchema
RenameSchema
SetSchemaAuthorization
AddColumn
SetTableAuthorization
CreateTable
RenameTable
Comment
RenameColumn
DropColumn
DropTable
CreateView
RenameView
SetViewAuthorization
DropView
CreateMaterializedView
RefreshMaterializedView
DropMaterializedView
Use
SetSession
ResetSession
StartTransaction
Commit
Rollback
Call
CreateRole
DropRole
GrantRoles
RevokeRoles
SetRole
Grant
Revoke
Prepare
Deallocate
SetPath
截止到21年9月,更多见StatementUtils类的static{}代码块
SqlQueryExecutionFactory

启动QueryExecution

获取QueryExecution之后,SqlQueryQueueManager方法将QueryExecution与配置的查询队列规则进行匹配,如匹配成功且队列未满,则将QueryExecution加入匹配队列。查询队列按照 FIFO规则调度查询。最后启动QueryExecution。

DataDefinitionExecution启动直接调用其绑定的DataDefinitionTask实现类的execute方法即可。以addColumn为例,由于addColumn与addColumnTask绑定,会执行AddColumnTask 的execute方法。

AddColumnTask.execute() 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Override
public ListenableFuture<?> execute(
AddColumn statement,
TransactionManager transactionManager,
Metadata metadata,
AccessControl accessControl,
QueryStateMachine stateMachine,
List<Expression> parameters,
WarningCollector warningCollector)
{
Session session = stateMachine.getSession();
// 获取表名
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
// 根据表名,从metadata 获取TableHandle
// TableHandle中包括了CatalogName,ConnectorTableHandle,ConnectorTransactionHandle
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
if (tableHandle.isEmpty()) {
if (!statement.isTableExists()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
}
return immediateFuture(null);
}

CatalogName catalogName = metadata.getCatalogHandle(session, tableName.getCatalogName())
.orElseThrow(() -> new TrinoException(NOT_FOUND, "Catalog does not exist: " + tableName.getCatalogName()));
// 检测是否能够进行添加列操作
accessControl.checkCanAddColumns(session.toSecurityContext(), tableName);

// 从metadata api 中获取列操作器
Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, tableHandle.get());
// 添加列操作
// 在trino/presto 定义添加列的标准接口(SPI),添加列的实现是在Connector中实现
ColumnDefinition element = statement.getColumn();
// 数据类型等校验
Type type;
try {
type = metadata.getType(toTypeSignature(element.getType()));
}
catch (TypeNotFoundException e) {
throw semanticException(TYPE_NOT_FOUND, element, "Unknown type '%s' for column '%s'", element.getType(), element.getName());
}
if (type.equals(UNKNOWN)) {
throw semanticException(COLUMN_TYPE_UNKNOWN, element, "Unknown type '%s' for column '%s'", element.getType(), element.getName());
}
if (columnHandles.containsKey(element.getName().getValue().toLowerCase(ENGLISH))) {
if (!statement.isColumnNotExists()) {
throw semanticException(COLUMN_ALREADY_EXISTS, statement, "Column '%s' already exists", element.getName());
}
return immediateFuture(null);
}
if (!element.isNullable() && !metadata.getConnectorCapabilities(session, catalogName).contains(NOT_NULL_COLUMN_CONSTRAINT)) {
throw semanticException(NOT_SUPPORTED, element, "Catalog '%s' does not support NOT NULL for column '%s'", catalogName.getCatalogName(), element.getName());
}

Map<String, Expression> sqlProperties = mapFromProperties(element.getProperties());
Map<String, Object> columnProperties = metadata.getColumnPropertyManager().getProperties(
catalogName,
tableName.getCatalogName(),
sqlProperties,
session,
metadata,
accessControl,
parameterExtractor(statement, parameters));
// 封装列信息
ColumnMetadata column = ColumnMetadata.builder()
.setName(element.getName().getValue())
.setType(type)
.setNullable(element.isNullable())
.setComment(element.getComment())
.setProperties(columnProperties)
.build();
// 调用接口的addColumn方法,真正实现是在Connector中
metadata.addColumn(session, tableHandle.get(), column);

return immediateFuture(null);
}

从上面的代码可以发现,这些操作都和metadata 相关,Metadata是定义在io.trino.metadata包下面的接口,它唯一的实现类是 MetadataManager,该类中关于元数据操作的接口的实现使用了ConnectorMetadata接口。所有能够介入的数据源的连接器Connector,都采用SPI的机制实现其中的接口(可以部分,看支持的能力),就这样AddColumnTask,通过metadata 接口的实现类调用了Connector plugin 中的具体实现方法。(更多metadata 接口信息见 执行计划中基本概念章节中的 MetadataAPI

SqlQueryExecution启动比较复杂,需要执行查询计划、优化查询计划、分阶段执行查询计划。


Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×