设为首页 - 加入收藏 廊坊站长网 (http://www.0316zz.com)- 国内知名站长资讯网站,提供最新最全的站长资讯,创业经验,网站建设等!
热搜: 巨头 如何 张勇 安全
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Spark Delta Lake写数据使用及实现原理代码解析

发布时间:2019-10-01 13:39 所属栏目:[教程] 来源:明惠
导读:Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下: df.write.format(delta).save(/data/yangping.wyp/delta/test/) //数据按照dt分区 df.write.f

Apache Spark Delta Lake 写数据使用及实现原理代码解析

Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下:

  1. df.write.format("delta").save("/data/yangping.wyp/delta/test/")?
  2. ??
  3. //数据按照?dt?分区?
  4. df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")?
  5. ??
  6. //?覆盖之前的数据?
  7. df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")?

大家可以看出,使用写 Delta 数据是非常简单的,这也是 Delte Lake 介绍的 100% 兼容 Spark。

Delta Lake 写数据原理

前面简单了解了如何使用 Delta Lake 来写数据,本小结我们将深入介绍 Delta Lake 是如何保证写数据的基本原理以及如何保证事务性。

得益于 Apache Spark 强大的数据源 API,我们可以很方便的给 Spark 添加任何数据源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 实现的一种新的数据源,我们调用 df.write.format("delta") 其实底层调用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 类。为了简单起见,本文介绍的是 Delta Lake 批量写的实现,实时流写 Delta Lake 本文不涉及,后面有机会再介绍。 Delta Lake 批量写扩展了 org.apache.spark.sql.sources.CreatableRelationProvider 特质,并实现了其中的方法。我们调用上面的写数据方法首先会调用 DeltaDataSource 类的 createRelation 方法,它的具体实现如下:

  1. override?def?createRelation(?
  2. ????sqlContext:?SQLContext,?
  3. ????mode:?SaveMode,?
  4. ????parameters:?Map[String,?String],?
  5. ????data:?DataFrame):?BaseRelation?=?{?
  6. ??
  7. ??//?写数据的路径?
  8. ??val?path?=?parameters.getOrElse("path",?{?
  9. ????throw?DeltaErrors.pathNotSpecifiedException?
  10. ??})?
  11. ??
  12. ??//?分区字段?
  13. ??val?partitionColumns?=?parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)?
  14. ????.map(DeltaDataSource.decodePartitioningColumns)?
  15. ????.getOrElse(Nil)?
  16. ??
  17. ??
  18. ??//?事务日志对象?
  19. ??val?deltaLog?=?DeltaLog.forTable(sqlContext.sparkSession,?path)?
  20. ??
  21. ??//?真正的写操作过程?
  22. ??WriteIntoDelta(?
  23. ????deltaLog?=?deltaLog,?
  24. ????mode?=?mode,?
  25. ????new?DeltaOptions(parameters,?sqlContext.sparkSession.sessionState.conf),?
  26. ????partitionColumns?=?partitionColumns,?
  27. ????configuration?=?Map.empty,?
  28. ????data?=?data).run(sqlContext.sparkSession)?
  29. ??
  30. ??deltaLog.createRelation()?
  31. }?

其中 mode 就是保持数据的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 这个传递的参数,比如分区字段、数据保存路径以及 Delta 支持的一些参数(replaceWhere、mergeSchema、overwriteSchema 等,具体参见 org.apache.spark.sql.delta.DeltaOptions);data 就是我们需要保存的数据。

createRelation 方法紧接着就是获取数据保存的路径,分区字段等信息。然后初始化 deltaLog,deltaLog 的初始化会做很多事情,比如会读取磁盘所有的事务日志(_delta_log 目录下),并构建最新事务日志的最新快照,里面可以拿到最新数据的版本。由于 deltaLog 的初始化成本比较高,所以 deltaLog 初始化完之后会缓存到 deltaLogCache 中,这是一个使用 Guava 的 CacheBuilder 类实现的一个缓存,缓存的数据保持一小时,缓存大小可以通过 delta.log.cacheSize 参数进行设置。只要写数据的路径是一样的,就只需要初始化一次 deltaLog,后面直接从缓存中拿即可。除非之前缓存的 deltaLog 被清理了,或者无效才会再次初始化。DeltaLog 类是 Delta Lake 中最重要的类之一,涉及的内容非常多,所以我们会单独使用一篇文章进行介绍。

紧接着初始化 WriteIntoDelta,WriteIntoDelta 扩展自 RunnableCommand,Delta Lake 中的更新、删除、合并都是扩展这个类的。初始化完 WriteIntoDelta 之后,就会调用 run 方法执行真正的写数据操作。WriteIntoDelta 的 run 方法实现如下:

  1. override?def?run(sparkSession:?SparkSession):?Seq[Row]?=?{?
  2. ????deltaLog.withNewTransaction?{?txn?=>?
  3. ??????val?actions?=?write(txn,?sparkSession)?
  4. ??????val?operation?=?DeltaOperations.Write(mode,?Option(partitionColumns),?options.replaceWhere)?
  5. ??????txn.commit(actions,?operation)?
  6. ????}?
  7. ????Seq.empty?
  8. }?

【免责声明】本站内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

网友评论
推荐文章