Source code for orbax.checkpoint._src.handlers.async_checkpoint_handler
# Copyright 2026 The Orbax Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""AsyncCheckpointHandler interface."""
import abc
from typing import List, Optional
from etils import epath
from orbax.checkpoint._src.futures import future
from orbax.checkpoint._src.handlers import checkpoint_handler
from orbax.checkpoint._src.path import types as path_types
[docs]
class AsyncCheckpointHandler(checkpoint_handler.CheckpointHandler):
"""An interface providing async methods used with AsyncCheckpointer."""
[docs]
@abc.abstractmethod
async def async_save(
self,
directory: epath.Path,
*args,
**kwargs,
) -> Optional[List[future.Future]]:
"""Saves the given item to the provided directory.
Args:
directory: the directory to save to.
*args: additional arguments for save.
**kwargs: additional arguments for save.
Returns:
A list of commit futures which can be awaited upon to complete the save
operation.
"""
pass
class DeferredPathAsyncCheckpointHandler(AsyncCheckpointHandler):
"""Handler interface that receives Path or PathAwaitingCreation.
This interface extends AsyncCheckpointHandler with an async_save method that
accepts either an epath.Path or PathAwaitingCreation, allowing handlers to
work with deferred paths (e.g., TFHub) where the actual path is allocated
asynchronously.
Handlers implementing this interface can:
1. Receive a deferred path representation before the path is allocated
2. Wait for STEP_DIRECTORY_CREATION signal inside their CommitFuture
3. Access the path via await_creation() or .path after the signal
"""
@abc.abstractmethod
async def async_save(
self,
directory: epath.Path | path_types.PathAwaitingCreation,
*args,
**kwargs,
) -> Optional[List[future.Future]]:
"""Constructs a save operation with support for deferred paths.
This method accepts an epath.Path or PathAwaitingCreation.
When a deferred path is passed, handler coroutines should wait for the
STEP_DIRECTORY_CREATION signal before accessing the path.
Args:
directory: The directory to save to. May be an epath.Path or
PathAwaitingCreation. For deferred paths, await_creation() or signal
ordering ensures the path is available.
*args: additional arguments for save.
**kwargs: additional arguments for save.
Returns:
A list of futures that will commit the data when awaited.
"""
pass