聊聊flink的MemoryBackendCheckpointStorage
bigegpt 2024-09-24 07:20 4 浏览
序
本文主要研究一下flink的MemoryBackendCheckpointStorage
CheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java
/** * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, * created by this class. */ public interface CheckpointStorage { ? ? boolean supportsHighlyAvailableStorage(); ? boolean hasDefaultSavepointLocation(); ? CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; ? CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException; ? CheckpointStorageLocation initializeLocationForSavepoint( long checkpointId, @Nullable String externalLocationPointer) throws IOException; ? CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException; ? CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; }
- CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回该backend是否支持highly available storage;hasDefaultSavepointLocation方法是否有默认的savepoint location;resolveCheckpoint方法用于解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根据checkpointId来初始化storage location;initializeLocationForSavepoint方法用于根据checkpointId来初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用于打开一个stream来持久化checkpoint state
AbstractFsCheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
/** * An implementation of durable checkpoint storage to file systems. */ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage { ? // ------------------------------------------------------------------------ // Constants // ------------------------------------------------------------------------ ? /** The prefix of the directory containing the data exclusive to a checkpoint. */ public static final String CHECKPOINT_DIR_PREFIX = "chk-"; ? /** The name of the directory for shared checkpoint state. */ public static final String CHECKPOINT_SHARED_STATE_DIR = "shared"; ? /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */ public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned"; ? /** The name of the metadata files in checkpoints / savepoints. */ public static final String METADATA_FILE_NAME = "_metadata"; ? /** The magic number that is put in front of any reference. */ private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 }; ? // ------------------------------------------------------------------------ // Fields and properties // ------------------------------------------------------------------------ ? /** The jobId, written into the generated savepoint directories. */ private final JobID jobId; ? /** The default location for savepoints. Null, if none is configured. */ @Nullable private final Path defaultSavepointDirectory; ? @Override public boolean hasDefaultSavepointLocation() { return defaultSavepointDirectory != null; } ? @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException { return resolveCheckpointPointer(checkpointPointer); } ? /** * Creates a file system based storage location for a savepoint. * * <p>This methods implements the logic that decides which location to use (given optional * parameters for a configured location and a location passed for this specific savepoint) * and how to name and initialize the savepoint directory. * * @param externalLocationPointer The target location pointer for the savepoint. * Must be a valid URI. Null, if not supplied. * @param checkpointId The checkpoint ID of the savepoint. * * @return The checkpoint storage location for the savepoint. * * @throws IOException Thrown if the target directory could not be created. */ @Override public CheckpointStorageLocation initializeLocationForSavepoint( @SuppressWarnings("unused") long checkpointId, @Nullable String externalLocationPointer) throws IOException { ? // determine where to write the savepoint to ? final Path savepointBasePath; if (externalLocationPointer != null) { savepointBasePath = new Path(externalLocationPointer); } else if (defaultSavepointDirectory != null) { savepointBasePath = defaultSavepointDirectory; } else { throw new IllegalArgumentException("No savepoint location given and no default location configured."); } ? // generate the savepoint directory ? final FileSystem fs = savepointBasePath.getFileSystem(); final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-'; ? Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix)); ? try { if (fs.mkdirs(path)) { // we make the path qualified, to make it independent of default schemes and authorities final Path qp = path.makeQualified(fs); ? return createSavepointLocation(fs, qp); } } catch (Exception e) { latestException = e; } } ? throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException); } ? protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException; ? //...... }
- AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
- resolveCheckpoint方法主要做两件事情,一个是解析checkpoint/savepoint path,一个是解析checkpoint/savepoint的metadata path,获取他们的FileStatus,然后创建FsCompletedCheckpointStorageLocation
- initializeLocationForSavepoint方法主要是给savepoint创建一个CheckpointStorageLocation,它可以根据externalLocationPointer来创建,该值为null的话则使用defaultSavepointDirectory,该方法里头调用了createSavepointLocation抽象方法,由子类去实现
MemoryBackendCheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
/** * An implementation of a checkpoint storage for the {@link MemoryStateBackend}. * Depending on whether this is created with a checkpoint location, the setup supports * durable checkpoints (durable metadata) or not. */ public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage { ? /** The target directory for checkpoints (here metadata files only). Null, if not configured. */ @Nullable private final Path checkpointsDirectory; ? /** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */ @Nullable private final FileSystem fileSystem; ? /** The maximum size of state stored in a state handle. */ private final int maxStateSize; ? /** * Creates a new MemoryBackendCheckpointStorage. * * @param jobId The ID of the job writing the checkpoints. * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null, * in which case this storage does not support durable persistence. * @param defaultSavepointLocation The default savepoint directory, or null, if none is set. * @param maxStateSize The maximum size of each individual piece of state. * * @throws IOException Thrown if a checkpoint base directory is given configured and the * checkpoint directory cannot be created within that directory. */ public MemoryBackendCheckpointStorage( JobID jobId, @Nullable Path checkpointsBaseDirectory, @Nullable Path defaultSavepointLocation, int maxStateSize) throws IOException { ? super(jobId, defaultSavepointLocation); ? checkArgument(maxStateSize > 0); this.maxStateSize = maxStateSize; ? if (checkpointsBaseDirectory == null) { checkpointsDirectory = null; fileSystem = null; } else { this.fileSystem = checkpointsBaseDirectory.getFileSystem(); this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId); ? fileSystem.mkdirs(checkpointsDirectory); } } ? // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ ? /** * Gets the size (in bytes) that a individual chunk of state may have at most. */ public int getMaxStateSize() { return maxStateSize; } ? // ------------------------------------------------------------------------ // Checkpoint Storage // ------------------------------------------------------------------------ ? @Override public boolean supportsHighlyAvailableStorage() { return checkpointsDirectory != null; } ? @Override public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException { checkArgument(checkpointId >= 0); ? if (checkpointsDirectory != null) { // configured for durable metadata // prepare all the paths needed for the checkpoints checkState(fileSystem != null); ? final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId); ? // create the checkpoint exclusive directory fileSystem.mkdirs(checkpointDir); ? return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize); } else { // no durable metadata - typical in IDE or test setup case return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize); } } ? @Override public CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException { ? // no matter where the checkpoint goes, we always return the storage location that stores // state inline with the state handles. return new MemCheckpointStreamFactory(maxStateSize); } ? @Override public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { return new MemoryCheckpointOutputStream(maxStateSize); } ? @Override protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException { return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize); } ? // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ ? @Override public String toString() { return "MemoryBackendCheckpointStorage {" + "checkpointsDirectory=" + checkpointsDirectory + ", fileSystem=" + fileSystem + ", maxStateSize=" + maxStateSize + '}'; } }
- MemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,这里返回的是PersistentMetadataCheckpointStorageLocation
- MemoryBackendCheckpointStorage还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
- supportsHighlyAvailableStorage是根据是否有配置checkpointsDirectory来判断;initializeLocationForCheckpoint这个根据checkpointsDirectory是否有设置来创建,为null的话,创建的是NonPersistentMetadataCheckpointStorageLocation,不为null创建的是PersistentMetadataCheckpointStorageLocation;resolveCheckpointStorageLocation这里创建的是MemCheckpointStreamFactory;而createTaskOwnedStateStream创建的是MemoryCheckpointOutputStream
小结
- CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同时定义了一个抽象方法createSavepointLocation
- MemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,同时还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
- 这里可以看到MemoryBackendCheckpointStorage虽然是memory的,但是如果有配置checkpointsDirectory(highly available storage),checkpoint location使用的是PersistentMetadataCheckpointStorageLocation,否则使用NonPersistentMetadataCheckpointStorageLocation;而savepoint location使用的是PersistentMetadataCheckpointStorageLocation(checkpiont可以选择是否使用文件存储,而savepoint只能使用文件存储)
doc
- The MemoryStateBackend
相关推荐
- 5分钟调色大片的方法(5分钟调色大片的方法有哪些)
-
哈喽大家好。在大家印象中一定觉得ps非常难学非常难。大家不要着急,小编的教学都是针对ps零基础的同学的,而且非常实用哦。只要大家跟着图文练习一两遍,保证大家立马学会~!好了,废话少说,下面开始我们今天...
- 闪白特效原来是这么用的(闪白特效怎么使用)
-
作者|高艳侠订阅|010-86092062闪白特效是影视作品中应用比较多的效果之一,那么具体该在哪些场景使用闪白特效?具体该如何操作?下面就以AdobePremiere(以下简称PR)为例,...
- ppt常用小图标去哪里找?3个矢量素材网站推荐!
-
ppt是一个注重可视化表达的演示载体,除了高清图片,ppt中另一类常用的素材是各种小图标,也叫矢量图标,巧妙运用小图标能提升整体美观度和表现力,那么ppt常用小图标去哪里找呢?为方便各位快速找到合适的...
- 有什么好用的截图录屏工具?试试这9款
-
经常有朋友反馈苦于缺乏截屏和录屏的趁手工具,本期我们分享几个相当好用的截屏和录屏工具,希望能帮到大家。ScreenToGifScreenToGif是一款免费且开源的录屏工具。此款工具最大的特点是可以...
- 配色苦手福音!专业快速色环配色PS插件
-
今天橘子老师给的大家介绍的是一款快速配色的插件,非常强大配色苦手福音来啦!(获取方式见文末)【插件介绍】配色在后期设计中占有主导地位,好的配色能让作品更加抢眼Coolorus这款专业的配色插件,能够...
- 如何用PS抠主体?(ps怎么抠主体)
-
1.主体法抠图-抠花苞和花梗导入一张荷花苞的照片,点击上图中顶部“选择”菜单栏,下拉单击“主体”。可以看到,只有花苞被选中,但是花梗并没有被选中。接下来单击上图中左侧工具栏的“快速选择工具”,上图中顶...
- 2799元的4K电视,有保障吗?(买4k电视机哪个品牌好)
-
在上一期《电脑报》的3·15专题报道中,我们揭露了一款不靠谱的42英寸4K智能电视——TCLD42A561U。这款售价2699元的4K智能电视不仅4K画质方面存在严重问题,而且各种功能和应用体验也不理...
- 苹果电脑的Touch Bar推出一段时间了 这款工具可以帮你开发适用于它的APP
-
距离苹果推出带有TouchBar的MacBookPro已经有一段时间了,除了那些像Adobe、Google和Microsoft大公司在开发适用于TouchBar的应用之外,其实还有很多独立的开...
- 如魔法般吸取颜色的桌灯(如魔法般吸取颜色的桌灯叫什么)
-
色彩为生活带来的感官刺激,逐渐被视为理所当然。一盏桌灯运用它的神奇力量,将隐藏于物件中的颜色逐一释放,成为装点环境的空间魔法师。ColorUp是一款可以改变颜色的吸色台灯,沿用传统灯泡的造型,融入了拾...
- 一篇文章带你用jquery mobile设计颜色拾取器
-
【一、项目背景】现实生活中,我们经常会遇到配色的问题,这个时候去百度一下RGB表。而RGB表只提供相对于的颜色的RGB值而没有可以验证的模块。我们可以通过jquerymobile去设计颜色的拾取器...
- ps拾色器快捷键是什么?(ps2019拾色器快捷键)
-
ps拾色器快捷键是什么?文章末尾有获取方式,按照以下步骤就能自动获得!学会制作PS特效需要一定程度的耐心和毅力。初学者可以从基本的工具和技术开始学习,逐渐提高他们的技能水平。同时,观看更多优秀的特效作...
- 免费开源的 Windows 截图录屏工具,支持 OCR 识别和滚动截图等
-
功能很强大、安装很小巧的免费截图、录屏工具,提供很多使用的工具来帮我么能解决问题,推荐给大家。关于ShareXShareX是一款免费的windows工具,起初是一个小巧的截图工具,经过多年的迭...
- 入门到精通系列PS教程:第13篇 · 拾色器、颜色问题说明及补充
-
入门到精通系列PS教程:第13篇·拾色器、颜色问题说明及补充作者|侯潇问题说明我的第12篇教程里,有个小问题没有说清楚。要说是错误,又不算是错误,只是没有说准确。写完那篇教程后,因为已经到了深...
- PS冷知识:用吸管工具吸取屏幕上的任意颜色
-
今天,我们给大家介绍PS中的一个冷知识:用吸管工具可以吸取屏幕上的任意颜色。其实,操作起来是非常简单的。大多数情况下,我们认为,PS的吸管工具只能吸取PS软件作图区域范围内的颜色,最多加上画布四周的...
- Windows 11 将提供内置颜色选择器工具
-
Windows11内置了颜色选择器,可以扫描并识别屏幕上的颜色并生成颜色代码。此外,微软还利用人工智能技术,让屏幕上的文本扫描和选择变得更加便捷。这两项功能均已在SnippingToolv1...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- libcrypto.so (74)
- linux安装minio (74)
- ubuntuunzip (67)
- vscode使用技巧 (83)
- secure-file-priv (67)
- vue阻止冒泡 (67)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)