百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

linkedin/coral 代码速读

bigegpt 2024-08-20 11:06 2 浏览

前几天纷飞的大雪还未消融,周末又赶上降温。于是踏实关在屋里,快速读了一遍 linkedin/coral1 的代码,这篇笔记记录下整体流程。

1. 背景:SQL 重写的需求

大数据领域,随着数据量变大、时效性要求越来越多样化,SQL 计算引擎也越来越多,从原来的 HiveSQL,到如今的 Presto/Trino、Flink、Spark。同时,随着 storage format、table format、table schema 各个方向的精细发展,SQL 的形式也越来越多,短期内也很难出现事实上的统一标准。

SQL 往往需要在不同执行计算引擎间变更,比如:

  1. 分析师的 HiveSQL 运行很慢,希望能够修改为 TrinoSQL 执行
  2. 数仓工程师的 HiveSQL,希望能够统一修改为 SparkSQL 执行

由于不同 SQL 间存在语法差异,SQL 工程师就需要重新学习新 SQL 的特有语法,类比于开发工程师重新学习一门编程语言。

SparkSQL 和 HiveSQL 间语法差异很多,诸如 order by 别名、hash方法、返回值、grouping sets、时间函数、ambiguous的语义等。HiveSQL 和 PrestoSQL 也是如此。

另外,笔者看到的实际情况,往往是一份数据多处存储。例如对于一份数据,ClickHouse 以宽表的形式存在,Hive 则定义成多张表(或者 join 成 view)。如果想要统一 SQL,就需要建立 access layer:

               ┌──────────────┐
               │ access layer │
               └──────┬───────┘
                      │
                      │
   ┌─────────┬────────┼───────────┬─────────────┐
   │         │        │           │             │
   │         │        │           │             │
┌──┴──┐   ┌──┴──┐   ┌─┴──┐   ┌────┴──┐   ┌──────┴───┐
│Trino│   │Kafka│   │Hive│   │Iceberg│   │ClickHouse│
└─────┘   └─────┘   └────┘   └───────┘   └──────────┘

这一层是逻辑上的表,通过元数据关联到具体存储层(Trino、Kafka、Hive、Iceberg、CH、etc.)的表。

这里有点像 Flink 之前一直推广的流批一体2,不同的是其思路是想用 FlinkSQL 统一所有计算,而不是将 SQL 按不同引擎重写。

SQL 重写,python 里的小工具比较多,例如tobymao/sqlglot,没有过多依赖,用于简单的 SQL 重写、元数据提取就足够了。复杂 SQL 的场景,Coral 是一个比较好的解决方案。

2. 思路:Coral 介绍

linkedin 对 Coral 的定义是:SQL translation, analysis, and rewrite engine3.
Coral 的核心思路是将 HiveSQL 转为 calcite 的 RelNode(文档里的Coral IR?),然后再转为其他方言的 SQL.

由于存在 views 这一层,因此 coral-hive 需要负责读取其中的元信息:database/table/view(schema)、字段信息(例如 select * 提取出字段) 等。使用 Hive 的能力将 sql 转为 AST(Abstract Syntax Tree),然后遍历转换为 calcite 的 RelNode.(注:关于 RelNode 介绍Calcite笔记之三:处理流程的代码例子)

然后再将 RelNode 重写为 PrestoSQL

3. 实现:Coral 代码

如何用 ANTLR 解析和重写SQL笔记里,介绍过如何重写 SQL : SELECT size(ARRAY (1, 2)),这里仍然复用该 SQL 说明下 Coral 里重写的过程:

public class HiveToTrinoConverterTest {
   @Test
   public void testTypeCastForCardinalityFunction() {
      RelToTrinoConverter relToTrinoConverter = TestUtils.getRelToTrinoConverter();

      RelNode relNode = TestUtils.getHiveToRelConverter().convertSql("SELECT size(ARRAY (1, 2))");
      String targetSql = "SELECT CAST(CARDINALITY(ARRAY[1, 2]) AS INTEGER)\n" + "FROM (VALUES  (0)) AS \"t\" (\"ZERO\")";
      String expandedSql = relToTrinoConverter.convert(relNode);
      assertEquals(expandedSql, targetSql);
   }
}

代码分为两个步骤:

  1. hiveSQL -> relNode: 通过RelNode HiveToRelConverter.convertSql(String sql)实现
  2. relNode -> trinoSQL: 通过String RelToTrinoConverter.convert(RelNode relNode)实现

3.1. hiveSQL -> relNode

这个过程用一句话概括,就是先根据 hive 的 antlr 语法文件解析成 ASTNode,然后遍历节点逐个转为 SqlNode,再调用 calcite 的方法转为 RelNode.

整体的流程:hiveSQL -> ASTNode -> SqlNode -> RelNode

ToRelConverter.convertSql
 │
 ├─?ToRelConverter.toSqlNode    // hiveSql -> SqlNode
 │   │
 │   ├──?ParseTreeBuilder.process
 │   │    │
 │   │    ├─?CoralParseDriver.parse    // hiveSql -> ASTNode
 │   │    │
 │   │    └─?ParseTreeBuilder.processAST    // ASTNode -> SqlNode
 │   │        │
 │   │        └─?AbstractASTVistor.visit
 │   │
 │   ├──?sqlNode.accept(FuzzyUnionSqlRewriter...)
 │   │
 │   └──?sqlNode.accept(HiveSqlNodeToCoralSqlNodeConverter...)
 │
 └─?ToRelConverter.toRel        // SqlNode -> RelNode
     │
     └─?HiveSqlToRelConverter.convertQuery
public class ParseTreeBuilder extends AbstractASTVisitor<SqlNode, ParseTreeBuilder.ParseContext> {

  public SqlNode process(String sql, @Nullable Table hiveView) {
    ParseDriver pd = new CoralParseDriver();
    try {
      ASTNode root = pd.parse(sql);
      return processAST(root, hiveView);
    } catch (ParseException e) {
      throw new RuntimeException(e);
    }
  }
}

第一步,CoralParseDriver基本复用了 Hive 原生org.apache.hadoop.hive.ql.parse.ParseDriver代码,调用 antlr 的 HiveParser/HiveLexer. pd.parse解析出的 ASTNode 树结构形如:

nil
   TOK_QUERY
      TOK_INSERT
         TOK_DESTINATION
            TOK_DIR
               TOK_TMP_FILE
         TOK_SELECT
            TOK_SELEXPR
               TOK_FUNCTION
                  size
                  TOK_FUNCTION
                     ARRAY
                     1
                     2
   <EOF>

注意这里还是 antlr3 的语法

第二步,processAST遍历 ASTNode 根据node.getType()调用不同的 visit 方法,逐个转换为 SqlNode。

例如对于TOK_SELECT,则调用visitSelect: 先遍历所有子节点,然后构造SqlNodeList返回:

  @Override
  protected SqlNode visitSelect(ASTNode node, ParseContext ctx) {
    List<SqlNode> sqlNodes = visitChildren(node, ctx);
    ctx.selects = new SqlNodeList(sqlNodes, ZERO);
    return ctx.selects;
  }

对于TOK_FUNCTION,则调用visitFunction -> visitFunctionInternal方法:

public class ParseTreeBuilder extends AbstractASTVisitor<SqlNode, ParseTreeBuilder.ParseContext> {
   protected SqlNode visitFunction(ASTNode node, ParseContext ctx) {
      return visitFunctionInternal(node, ctx, null);
   }

   private SqlNode visitFunctionInternal(ASTNode node, ParseContext ctx, SqlLiteral quantifier) {
      ArrayList<Node> children = node.getChildren();
      checkState(children.size() > 0);
      // 对 TOK_FUNCTION,首节点即为函数本身,取出函数名
      ASTNode functionNode = (ASTNode) children.get(0);
      String functionName = functionNode.getText();
      // 遍历所有子节点转换为 SqlNode 列表
      List<SqlNode> sqlOperands = visitChildren(children, ctx);
      // 根据函数名、函数参数个数,查找对应的 hiveFunction
      Function hiveFunction = functionResolver.tryResolve(functionName, ctx.hiveTable.orElse(null),
              // The first element of sqlOperands is the operator itself. The actual # of operands is sqlOperands.size() - 1
              sqlOperands.size() - 1);

      // Special treatment for Window Function
      SqlNode lastSqlOperand = sqlOperands.get(sqlOperands.size() - 1);
      if (lastSqlOperand instanceof SqlWindow) {
         // ...
      }

      if (functionName.equalsIgnoreCase("SUBSTRING")) {
         // ...
      }

      // 创建 SqlBasicCall 返回
      return hiveFunction.createCall(sqlOperands.get(0), sqlOperands.subList(1, sqlOperands.size()), quantifier);
   }

}

HiveFunctionResolver functionResolver内部维护StaticHiveFunctionRegistry对象,注册了所有的 hive built-in UDF 到 SqlOperator 的映射,例如对于 “size” 和 “array”:

  /**
   * The ARRAY Value Constructor. e.g. "<code>ARRAY[1, 2, 3]</code>".
   */
  public static final SqlArrayValueConstructor ARRAY_VALUE_CONSTRUCTOR =
      new SqlArrayValueConstructor.SqlArrayValueConstructorAllowingEmpty();
        
// Complex type constructors
addFunctionEntry("array", ARRAY_VALUE_CONSTRUCTOR);

  /**
   * The CARDINALITY operator, used to retrieve the number of elements in a
   * MULTISET, ARRAY or MAP.
   */
  public static final SqlFunction CARDINALITY =
      new SqlFunction(
          "CARDINALITY",
          SqlKind.OTHER_FUNCTION,
          ReturnTypes.INTEGER_NULLABLE,
          null,
          OperandTypes.COLLECTION_OR_MAP,
          SqlFunctionCategory.SYSTEM);
        
// Collection functions
addFunctionEntry("size", CARDINALITY);

functionResolver.tryResolve根据函数名查找对应的Function对象,该对象记录了函数名、SqlOperator,然后调用hiveFunction.createCall创建对应的SqlCall对象。

例如对于树里的两个TOP_FUNCTION节点,转换后的对象如图:

很多类型例如nil TOK_QUERY TOK_INSERT TOK_SELEXPR等则直接调用visitChildren然后返回首节点。

这一步得到的SqlNode 树结构形如:

SqlSelect =(SELECT CARDINALITY(ARRAY[1,2])
│
└──?SqlNodeList = CARDINALITY(ARRAY[1,2])
    │
    └──?SqlBasicCall = CARDINALITY(ARRAY[1,2])
        │
        ├──?SqlFunction = CARDINALITY
        │
        └──?SqlBasicCall = ARRAY[1,2]
            │
            ├──?SqlArrayValueConsturctorAllowingEmpty
            │
            ├──?SqlNumericLiteral = 1
            │
            └──?SqlNumericListeral = 2

这一步的 SqlNode 还需要经过两次遍历优化:

  1. FuzzyUnionSqlRewriter: 比如两个 UNION ALL 表 struct 结构不一致, b struct<b1:string> vs b struct<b1: string, b2:int>,那 SQL 会改写为:CAST(row("b"."b1") as row("b1" varchar)) AS "b"的形式。注:这种 incompatible 的类型确实需要兼容么?
  2. HiveSqlNodeToCoralSqlNodeConverter: ShiftArrayIndexTransformer解决 hiveSQL 数组下标从0开始而 prestoSQL 下标从 1 开始的 diff.

第三步,ToRelConverter.toRel转化为 SqlNode 到 RelNode。

org.apache.calcite.sql2rel.SqlToRelConverter在之前笔记Calcite笔记之三:处理流程的代码例子介绍过,coral 使用了自定义的HiveSqlToRelConverter:

    if (sqlToRelConverter == null) {
      sqlToRelConverter =
          new HiveSqlToRelConverter(new HiveViewExpander(this), getSqlValidator(), getCalciteCatalogReader(),
              RelOptCluster.create(new VolcanoPlanner(), new HiveRexBuilder(getRelBuilder().getTypeFactory())),
              getConvertletTable(),
              SqlToRelConverter.configBuilder().withRelBuilderFactory(HiveRelBuilder.LOGICAL_BUILDER).build());
    }
    return sqlToRelConverter;
  1. HiveViewExpander.expandView方法使用HiveMetaStoreClient访问 hive 元数据,如果 tableType 是 VIRTUAL_VIEW,则返回getViewExpandedText即 view 的定义,如果是 MANAGED_TABLE EXTERNAL_TABLE INDEX_TABLE 等则返回SELECT * FROM $dbName.$tableName。
  2. 使用了VolcanoPlanner基于 CBO 优化 SQL

转换后的 RelNode Plan 为:

[RelNode Plan]
LogicalProject(EXPR$0=[CARDINALITY(ARRAY(1, 2))]): rowcount = 1.0, cumulative cost = {2.0 rows, 2.0 cpu, 0.0 io}, id = 1
  LogicalValues(tuples=[[{ 0 }]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io}, id = 0

RelNode 树结构:

3.2. relNode -> trinoSQL

和上一节的顺序正好相反,这一节的数据结构转换过程:RelNode -> SqlNode -> trinoSQL

public class RelToTrinoConverter extends RelToSqlConverter {
  public String convert(RelNode relNode) {
    RelNode rel = convertRel(relNode, configs);
    SqlNode sqlNode = convertToSqlNode(rel);

    SqlNode sqlNodeWithRelDataTypeDerivedConversions =
        sqlNode.accept(new DataTypeDerivedSqlCallConverter(_hiveMetastoreClient, sqlNode));

    SqlNode sqlNodeWithUDFOperatorConverted =
        sqlNodeWithRelDataTypeDerivedConversions.accept(new CoralToTrinoSqlCallConverter(configs));
    return sqlNodeWithUDFOperatorConverted.accept(new TrinoSqlRewriter()).toSqlString(TrinoSqlDialect.INSTANCE)
        .toString();
  }
}

DataTypeDerivedSqlCallConverter、CoralToTrinoSqlCallConverter定义了众多的SqlCallTransformer,例如以下两个 transformer 的作用:

public class ConcatOperatorTransformer extends SqlCallTransformer {
    @Override
    // 当 sqlCall.getOperator().getName() 为 concat 时调用该方法
    protected SqlCall transform(SqlCall sqlCall) {
        List<SqlNode> updatedOperands = new ArrayList<>();

        // 遍历所有参数节点
        for (SqlNode operand : sqlCall.getOperandList()) {
            RelDataType type = deriveRelDatatype(operand);
            // 如果不是 VARCHAR CHAR 类型,则强制转换 CAST (xxx AS VARCHAR(65535))
            if (!OPERAND_SQL_TYPE_NAMES.contains(type.getSqlTypeName())) {
                SqlNode castOperand = SqlStdOperatorTable.CAST.createCall(POS,
                        new ArrayList<>(Arrays.asList(operand, VARCHAR_SQL_DATA_TYPE_SPEC)));
                updatedOperands.add(castOperand);
            } else {
                updatedOperands.add(operand);
            }
        }
        return sqlCall.getOperator().createCall(POS, updatedOperands);
    }
}

该方法产生如下的转换效果:

  1. Input - SqlCall: `concat`(CURRENT_DATE, ‘|’, CURRENT_DATE, ‘-00’)
  2. Output - SqlCall: `concat`(CAST(CURRENT_DATE AS VARCHAR(65535)), ‘|’, CAST(CURRENT_DATE AS VARCHAR(65535)), ‘-00’)
public class OperatorRenameSqlCallTransformer extends SourceOperatorMatchSqlCallTransformer {
    @Override
    protected SqlCall transform(SqlCall sqlCall) {
        return createSqlOperator(targetOpName, sourceOperator.getReturnTypeInference())
                .createCall(new SqlNodeList(sqlCall.getOperandList(), SqlParserPos.ZERO));
    }
}

则将RLIKE转换为REGEXP_LIKE,RAND转换为RANDOM(),SUBSTRING转换为SUBSTR等。

TrinoSqlDialect.INSTANCE定义了 trino 一些特殊写法,例如concat -> "concat"。

对于上一节的 SQL,由于并不复杂,实际上在这一步只用calcite的原始接口(RelNode -> SqlString -> String)也生成正确的 trinoSQL.

4. 总结

SQL 重写的工具很多,如果要比较重写的效果,HiveToTrinoConverterTest单测的 SQL 是个不错的选择,例如:

HiveSQL

TrinoSQL

SELECT RAND()

SELECT “RANDOM”()

SELECT size(ARRAY (1, 2))

SELECT CAST(CARDINALITY(ARRAY[1, 2]) AS INTEGER)

select concat(current_date(), ‘|’, current_date(), ‘-00’)

SELECT “concat”(CAST(CURRENT_DATE AS VARCHAR(65535)), ‘|’, CAST(CURRENT_DATE AS VARCHAR(65535)), ‘-00’)

但是由于 hive/trino/presto 版本多,很难做到100%准确的转换,就像将一门编程语言准确的转换为另一种一样,更多的是作为参考。
Coral项目本身代码非常值得一读,对 calcite 框架、hive/trino/sparkSQL 都能有更深的理解。快速读了一遍,我还是有很多疑问,比如同样是遍历重写 SqlNode,为什么分别在转换 RelNode 前后实现?跟 view 有关?为什么同样是 SqlCallTransformer,实现在了多个类里?

5. 参考资料

  1. linkedin/coral
  2. 流批一体技术在天猫双11的应用
  3. Coral: A SQL translation, analysis, and rewrite engine for modern data lakehouses

相关推荐

悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)

新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...

高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源

凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...

微服务架构实战:商家管理后台与sso设计,SSO客户端设计

SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...

还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑

在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...

Seata源码—6.Seata AT模式的数据源代理二

大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...

30分钟了解K8S(30分钟了解微积分)

微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...

SpringBoot条件化配置(@Conditional)全面解析与实战指南

一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...

一招解决所有依赖冲突(克服依赖)

背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...

你读过Mybatis的源码?说说它用到了几种设计模式

学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...

golang对接阿里云私有Bucket上传图片、授权访问图片

1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...

spring中的资源的加载(spring加载原理)

最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...

Android资源使用(android资源文件)

Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...

如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)

深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...

@Autowired与@Resource原理知识点详解

springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...

java的redis连接工具篇(java redis client)

在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...