Atomicity#

Utils for creating and finalizing temporary paths.

Note that the configurability provided by this feature does not leave users free to define their own temporary path structure. The current implementation is mainly a refactoring of old logic that separately created temp directories and finalized them. It does not touch other logic that detects temp checkpoints and cleans them up (primarily located in orbax.checkpoint.path.step and CheckpointManager).

Ordinarily, atomic logic defaults to AtomicRenameTemporaryPath, which uses an atomic rename to indicate checkpoint completion. However, not all filesystems support atomic rename, so CommitFileTemporaryPath is provided as an alternative, which uses a “commit_success” file to indicate completion.

Ideally, we would standardize on a single behavior, but it is difficult, largely for legacy reasons, to achieve this. Furthermore, there are many other alternative ways of ensuring save atomicity. As such, we have opted to provide a more flexible approach that allows users to configure the behavior they want.

Configuration can be done in the following way:

AsyncCheckpointer(
    StandardCheckpointHandler(),
    temporary_path_class=CommitFileTemporaryPath,
)

# OR

CheckpointManager(
    directory,
    item_names=('state', 'dataset',),
    options=CheckpointManagerOptions(
        temporary_path_class=atomicity.CommitFileTemporaryPath
    ),
)

AtomicRenameTemporaryPath#

class orbax.checkpoint.path.atomicity.AtomicRenameTemporaryPath(temporary_path, final_path, *, checkpoint_metadata_store=None, file_options=None, snapshot_type=None)[source]#

TemporaryPath implementation that uses atomic rename.

async classmethod validate(temporary_path)[source]#

Validates the temporary path or raises a ValidationError.

async classmethod validate_final(final_path)[source]#

Validates the final path or raises a ValidationError.

classmethod from_temporary(temporary_path, *, file_options=None, snapshot_type=None)[source]#

Creates a TemporaryPath from a temporary path.

Return type:

AtomicRenameTemporaryPath

classmethod from_final(final_path, *, checkpoint_metadata_store=None, file_options=None, snapshot_type=None)[source]#

Creates a TemporaryPath from a final path.

Return type:

AtomicRenameTemporaryPath

get_final()[source]#

Returns the final path without creating it.

Return type:

Path

async create()[source]#

Creates a non-deterministic tmp directory for saving given final_dir.

Also writes checkpoint metadata in the tmp directory.

NOTE: This function does not include any barrier syncs, and calling it directly from multiprocess code can lead to race conditions. Prefer to use atomicity.create_all in such cases.

Return type:

Path

Returns:

The tmp directory.

Raises:

FileExistsError – if tmp directory already exists.

async finalize()[source]#

Finalizes atomic save by renaming tmp_dir.

Updates checkpoint metadata with commit_timestamp_nsecs.

CommitFileTemporaryPath#

class orbax.checkpoint.path.atomicity.CommitFileTemporaryPath(temporary_path, final_path, *, checkpoint_metadata_store=None, file_options=None, snapshot_type=None)[source]#

TemporaryPath implementation that uses a commit file.

async classmethod validate(temporary_path)[source]#

Validates the temporary path or raises a ValidationError.

async classmethod validate_final(final_path)[source]#

Validates the final path or raises a ValidationError.

classmethod from_temporary(temporary_path, *, file_options=None, snapshot_type=None)[source]#

Creates a TemporaryPath from a temporary path.

Return type:

CommitFileTemporaryPath

classmethod from_final(final_path, *, checkpoint_metadata_store=None, file_options=None, snapshot_type=None)[source]#

Creates a TemporaryPath from a final path.

Return type:

CommitFileTemporaryPath

get_final()[source]#

Returns the final path without creating it.

Return type:

Path

async create()[source]#

Creates a non-deterministic tmp directory for saving given final_dir.

Also writes checkpoint metadata in the tmp directory.

NOTE: This function does not include any barrier syncs, and calling it directly from multiprocess code can lead to race conditions. Prefer to use atomicity.create_all in such cases.

Return type:

Path

Returns:

The tmp directory.

Raises:

FileExistsError – if tmp directory already exists.

async finalize()[source]#

Finalizes atomic save by writing a success file.

Updates checkpoint metadata with commit_timestamp_nsecs.

Helper functions#

async orbax.checkpoint.path.atomicity.on_commit_callback(tmp_dir, *, checkpoint_start_time)[source]#

To commit save operation, atomically finalizes step dir.

Records save duration and lineage-logs step dir.

Parameters:
  • tmp_dir (TemporaryPath) – A temporary checkpoint directory, where the checkpoint data is currently saved.

  • checkpoint_start_time (float) – The time at which checkpoint saving began. # BEGIN

  • tree_verity_options – Options to configure checkpoint signing and integrity verification using

  • set_immutable – Whether to mark all files as immutable. This is only