百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

聊聊flink的MemoryBackendCheckpointStorage

bigegpt 2024-09-24 07:20 4 浏览

本文主要研究一下flink的MemoryBackendCheckpointStorage

CheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java

/**
 * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
 * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
 * created by this class.
 */
public interface CheckpointStorage {
?
?
 boolean supportsHighlyAvailableStorage();
?
 boolean hasDefaultSavepointLocation();
?
 CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;
?
 CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;
?
 CheckpointStorageLocation initializeLocationForSavepoint(
 long checkpointId,
 @Nullable String externalLocationPointer) throws IOException;
?
 CheckpointStreamFactory resolveCheckpointStorageLocation(
 long checkpointId,
 CheckpointStorageLocationReference reference) throws IOException;
?
 CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回该backend是否支持highly available storage;hasDefaultSavepointLocation方法是否有默认的savepoint location;resolveCheckpoint方法用于解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根据checkpointId来初始化storage location;initializeLocationForSavepoint方法用于根据checkpointId来初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用于打开一个stream来持久化checkpoint state

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
?
 // ------------------------------------------------------------------------
 // Constants
 // ------------------------------------------------------------------------
?
 /** The prefix of the directory containing the data exclusive to a checkpoint. */
 public static final String CHECKPOINT_DIR_PREFIX = "chk-";
?
 /** The name of the directory for shared checkpoint state. */
 public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";
?
 /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
 public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";
?
 /** The name of the metadata files in checkpoints / savepoints. */
 public static final String METADATA_FILE_NAME = "_metadata";
?
 /** The magic number that is put in front of any reference. */
 private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };
?
 // ------------------------------------------------------------------------
 // Fields and properties
 // ------------------------------------------------------------------------
?
 /** The jobId, written into the generated savepoint directories. */
 private final JobID jobId;
?
 /** The default location for savepoints. Null, if none is configured. */
 @Nullable
 private final Path defaultSavepointDirectory;
?
 @Override
 public boolean hasDefaultSavepointLocation() {
 return defaultSavepointDirectory != null;
 }
?
 @Override
 public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
 return resolveCheckpointPointer(checkpointPointer);
 }
?
 /**
 * Creates a file system based storage location for a savepoint.
 *
 * <p>This methods implements the logic that decides which location to use (given optional
 * parameters for a configured location and a location passed for this specific savepoint)
 * and how to name and initialize the savepoint directory.
 *
 * @param externalLocationPointer The target location pointer for the savepoint.
 * Must be a valid URI. Null, if not supplied.
 * @param checkpointId The checkpoint ID of the savepoint.
 *
 * @return The checkpoint storage location for the savepoint.
 *
 * @throws IOException Thrown if the target directory could not be created.
 */
 @Override
 public CheckpointStorageLocation initializeLocationForSavepoint(
 @SuppressWarnings("unused") long checkpointId,
 @Nullable String externalLocationPointer) throws IOException {
?
 // determine where to write the savepoint to
?
 final Path savepointBasePath;
 if (externalLocationPointer != null) {
 savepointBasePath = new Path(externalLocationPointer);
 }
 else if (defaultSavepointDirectory != null) {
 savepointBasePath = defaultSavepointDirectory;
 }
 else {
 throw new IllegalArgumentException("No savepoint location given and no default location configured.");
 }
?
 // generate the savepoint directory
?
 final FileSystem fs = savepointBasePath.getFileSystem();
 final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';
?
 Exception latestException = null;
 for (int attempt = 0; attempt < 10; attempt++) {
 final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));
?
 try {
 if (fs.mkdirs(path)) {
 // we make the path qualified, to make it independent of default schemes and authorities
 final Path qp = path.makeQualified(fs);
?
 return createSavepointLocation(fs, qp);
 }
 } catch (Exception e) {
 latestException = e;
 }
 }
?
 throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
 }
?
 protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;
?
 //......
}
  • AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
  • resolveCheckpoint方法主要做两件事情,一个是解析checkpoint/savepoint path,一个是解析checkpoint/savepoint的metadata path,获取他们的FileStatus,然后创建FsCompletedCheckpointStorageLocation
  • initializeLocationForSavepoint方法主要是给savepoint创建一个CheckpointStorageLocation,它可以根据externalLocationPointer来创建,该值为null的话则使用defaultSavepointDirectory,该方法里头调用了createSavepointLocation抽象方法,由子类去实现

MemoryBackendCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java

/**
 * An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
 * Depending on whether this is created with a checkpoint location, the setup supports
 * durable checkpoints (durable metadata) or not.
 */
public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {
?
 /** The target directory for checkpoints (here metadata files only). Null, if not configured. */
 @Nullable
 private final Path checkpointsDirectory;
?
 /** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
 @Nullable
 private final FileSystem fileSystem;
?
 /** The maximum size of state stored in a state handle. */
 private final int maxStateSize;
?
 /**
 * Creates a new MemoryBackendCheckpointStorage.
 *
 * @param jobId The ID of the job writing the checkpoints.
 * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
 * in which case this storage does not support durable persistence.
 * @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
 * @param maxStateSize The maximum size of each individual piece of state.
 *
 * @throws IOException Thrown if a checkpoint base directory is given configured and the
 * checkpoint directory cannot be created within that directory.
 */
 public MemoryBackendCheckpointStorage(
 JobID jobId,
 @Nullable Path checkpointsBaseDirectory,
 @Nullable Path defaultSavepointLocation,
 int maxStateSize) throws IOException {
?
 super(jobId, defaultSavepointLocation);
?
 checkArgument(maxStateSize > 0);
 this.maxStateSize = maxStateSize;
?
 if (checkpointsBaseDirectory == null) {
 checkpointsDirectory = null;
 fileSystem = null;
 }
 else {
 this.fileSystem = checkpointsBaseDirectory.getFileSystem();
 this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);
?
 fileSystem.mkdirs(checkpointsDirectory);
 }
 }
?
 // ------------------------------------------------------------------------
 // Properties
 // ------------------------------------------------------------------------
?
 /**
 * Gets the size (in bytes) that a individual chunk of state may have at most.
 */
 public int getMaxStateSize() {
 return maxStateSize;
 }
?
 // ------------------------------------------------------------------------
 // Checkpoint Storage
 // ------------------------------------------------------------------------
?
 @Override
 public boolean supportsHighlyAvailableStorage() {
 return checkpointsDirectory != null;
 }
?
 @Override
 public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
 checkArgument(checkpointId >= 0);
?
 if (checkpointsDirectory != null) {
 // configured for durable metadata
 // prepare all the paths needed for the checkpoints
 checkState(fileSystem != null);
?
 final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
?
 // create the checkpoint exclusive directory
 fileSystem.mkdirs(checkpointDir);
?
 return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize);
 }
 else {
 // no durable metadata - typical in IDE or test setup case
 return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize);
 }
 }
?
 @Override
 public CheckpointStreamFactory resolveCheckpointStorageLocation(
 long checkpointId,
 CheckpointStorageLocationReference reference) throws IOException {
?
 // no matter where the checkpoint goes, we always return the storage location that stores
 // state inline with the state handles.
 return new MemCheckpointStreamFactory(maxStateSize);
 }
?
 @Override
 public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
 return new MemoryCheckpointOutputStream(maxStateSize);
 }
?
 @Override
 protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
 return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
 }
?
 // ------------------------------------------------------------------------
 // Utilities
 // ------------------------------------------------------------------------
?
 @Override
 public String toString() {
 return "MemoryBackendCheckpointStorage {" +
 "checkpointsDirectory=" + checkpointsDirectory +
 ", fileSystem=" + fileSystem +
 ", maxStateSize=" + maxStateSize +
 '}';
 }
}
  • MemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,这里返回的是PersistentMetadataCheckpointStorageLocation
  • MemoryBackendCheckpointStorage还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • supportsHighlyAvailableStorage是根据是否有配置checkpointsDirectory来判断;initializeLocationForCheckpoint这个根据checkpointsDirectory是否有设置来创建,为null的话,创建的是NonPersistentMetadataCheckpointStorageLocation,不为null创建的是PersistentMetadataCheckpointStorageLocation;resolveCheckpointStorageLocation这里创建的是MemCheckpointStreamFactory;而createTaskOwnedStateStream创建的是MemoryCheckpointOutputStream

小结

  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同时定义了一个抽象方法createSavepointLocation
  • MemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,同时还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • 这里可以看到MemoryBackendCheckpointStorage虽然是memory的,但是如果有配置checkpointsDirectory(highly available storage),checkpoint location使用的是PersistentMetadataCheckpointStorageLocation,否则使用NonPersistentMetadataCheckpointStorageLocation;而savepoint location使用的是PersistentMetadataCheckpointStorageLocation(checkpiont可以选择是否使用文件存储,而savepoint只能使用文件存储)

doc

  • The MemoryStateBackend

相关推荐

5分钟调色大片的方法(5分钟调色大片的方法有哪些)

哈喽大家好。在大家印象中一定觉得ps非常难学非常难。大家不要着急,小编的教学都是针对ps零基础的同学的,而且非常实用哦。只要大家跟着图文练习一两遍,保证大家立马学会~!好了,废话少说,下面开始我们今天...

闪白特效原来是这么用的(闪白特效怎么使用)

作者|高艳侠订阅|010-86092062闪白特效是影视作品中应用比较多的效果之一,那么具体该在哪些场景使用闪白特效?具体该如何操作?下面就以AdobePremiere(以下简称PR)为例,...

ppt常用小图标去哪里找?3个矢量素材网站推荐!

ppt是一个注重可视化表达的演示载体,除了高清图片,ppt中另一类常用的素材是各种小图标,也叫矢量图标,巧妙运用小图标能提升整体美观度和表现力,那么ppt常用小图标去哪里找呢?为方便各位快速找到合适的...

有什么好用的截图录屏工具?试试这9款

经常有朋友反馈苦于缺乏截屏和录屏的趁手工具,本期我们分享几个相当好用的截屏和录屏工具,希望能帮到大家。ScreenToGifScreenToGif是一款免费且开源的录屏工具。此款工具最大的特点是可以...

配色苦手福音!专业快速色环配色PS插件

今天橘子老师给的大家介绍的是一款快速配色的插件,非常强大配色苦手福音来啦!(获取方式见文末)【插件介绍】配色在后期设计中占有主导地位,好的配色能让作品更加抢眼Coolorus这款专业的配色插件,能够...

如何用PS抠主体?(ps怎么抠主体)

1.主体法抠图-抠花苞和花梗导入一张荷花苞的照片,点击上图中顶部“选择”菜单栏,下拉单击“主体”。可以看到,只有花苞被选中,但是花梗并没有被选中。接下来单击上图中左侧工具栏的“快速选择工具”,上图中顶...

2799元的4K电视,有保障吗?(买4k电视机哪个品牌好)

在上一期《电脑报》的3·15专题报道中,我们揭露了一款不靠谱的42英寸4K智能电视——TCLD42A561U。这款售价2699元的4K智能电视不仅4K画质方面存在严重问题,而且各种功能和应用体验也不理...

苹果电脑的Touch Bar推出一段时间了 这款工具可以帮你开发适用于它的APP

距离苹果推出带有TouchBar的MacBookPro已经有一段时间了,除了那些像Adobe、Google和Microsoft大公司在开发适用于TouchBar的应用之外,其实还有很多独立的开...

如魔法般吸取颜色的桌灯(如魔法般吸取颜色的桌灯叫什么)

色彩为生活带来的感官刺激,逐渐被视为理所当然。一盏桌灯运用它的神奇力量,将隐藏于物件中的颜色逐一释放,成为装点环境的空间魔法师。ColorUp是一款可以改变颜色的吸色台灯,沿用传统灯泡的造型,融入了拾...

一篇文章带你用jquery mobile设计颜色拾取器

【一、项目背景】现实生活中,我们经常会遇到配色的问题,这个时候去百度一下RGB表。而RGB表只提供相对于的颜色的RGB值而没有可以验证的模块。我们可以通过jquerymobile去设计颜色的拾取器...

ps拾色器快捷键是什么?(ps2019拾色器快捷键)

ps拾色器快捷键是什么?文章末尾有获取方式,按照以下步骤就能自动获得!学会制作PS特效需要一定程度的耐心和毅力。初学者可以从基本的工具和技术开始学习,逐渐提高他们的技能水平。同时,观看更多优秀的特效作...

免费开源的 Windows 截图录屏工具,支持 OCR 识别和滚动截图等

功能很强大、安装很小巧的免费截图、录屏工具,提供很多使用的工具来帮我么能解决问题,推荐给大家。关于ShareXShareX是一款免费的windows工具,起初是一个小巧的截图工具,经过多年的迭...

入门到精通系列PS教程:第13篇 · 拾色器、颜色问题说明及补充

入门到精通系列PS教程:第13篇·拾色器、颜色问题说明及补充作者|侯潇问题说明我的第12篇教程里,有个小问题没有说清楚。要说是错误,又不算是错误,只是没有说准确。写完那篇教程后,因为已经到了深...

PS冷知识:用吸管工具吸取屏幕上的任意颜色

今天,我们给大家介绍PS中的一个冷知识:用吸管工具可以吸取屏幕上的任意颜色。其实,操作起来是非常简单的。大多数情况下,我们认为,PS的吸管工具只能吸取PS软件作图区域范围内的颜色,最多加上画布四周的...

Windows 11 将提供内置颜色选择器工具

Windows11内置了颜色选择器,可以扫描并识别屏幕上的颜色并生成颜色代码。此外,微软还利用人工智能技术,让屏幕上的文本扫描和选择变得更加便捷。这两项功能均已在SnippingToolv1...