Skip to content

util

Utilities for FastEstimator.

Suppressor

Bases: object

A class which can be used to silence output of function calls.

This class is intentionally not @traceable.

Parameters:

Name Type Description Default
allow_pyprint bool

Whether to allow python printing to occur still within this scope (and therefore only silence printing from non-python sources like c).

False
show_if_exception bool

Whether to retroactively show print messages if an exception is raised from within the suppressor scope.

False
x = lambda: print("hello")
x()  # "hello"
with fe.util.Suppressor():
    x()  #
x()  # "hello"
Source code in fastestimator/fastestimator/util/util.py
class Suppressor(object):
    """A class which can be used to silence output of function calls.

    This class is intentionally not @traceable.

    Args:
        allow_pyprint: Whether to allow python printing to occur still within this scope (and therefore only silence
            printing from non-python sources like c).
        show_if_exception: Whether to retroactively show print messages if an exception is raised from within the
            suppressor scope.

    ```python
    x = lambda: print("hello")
    x()  # "hello"
    with fe.util.Suppressor():
        x()  #
    x()  # "hello"
    ```
    """
    # Only create one file to save on disk IO
    stash_fd, stash_name = tempfile.mkstemp()
    os.close(stash_fd)
    tf_print_fd, tf_print_name = tempfile.mkstemp()
    os.close(tf_print_fd)
    tf_print_name_f = 'file://' + tf_print_name

    def __init__(self, allow_pyprint: bool = False, show_if_exception: bool = False):
        self.allow_pyprint = allow_pyprint
        self.show_if_exception = show_if_exception

    def __enter__(self) -> None:
        # This is not necessary to block printing, but lets the system know what's happening
        self.py_reals = [sys.stdout, sys.stderr]
        sys.stdout = sys.stderr = self
        # This part does the heavy lifting
        if self.show_if_exception:
            self.fake = os.open(self.stash_name, os.O_RDWR)
        else:
            self.fake = os.open(os.devnull, os.O_RDWR)
        self.reals = [os.dup(1), os.dup(2)]  # [stdout, stderr]
        os.dup2(self.fake, 1)
        os.dup2(self.fake, 2)
        # This avoids "OSError: [WinError 6] The handle is invalid" while logging tensorflow information in windows
        for handler in tf.get_logger().handlers:
            handler.setStream(sys.stderr)
        if self.allow_pyprint:
            tf.print = _custom_tf_print

    def __exit__(self, *exc: Tuple[Optional[Type], Optional[Exception], Optional[Any]]) -> None:
        # If there was an error, display any print messages
        if exc[0] is not None and self.show_if_exception and not isinstance(exc[1],
                                                                            (StopIteration, StopAsyncIteration)):
            for line in open(self.stash_name):
                os.write(self.reals[0], line.encode('utf-8'))
        # Set the print pointers back
        os.dup2(self.reals[0], 1)
        os.dup2(self.reals[1], 2)
        # Set the python pointers back too
        sys.stdout, sys.stderr = self.py_reals[0], self.py_reals[1]

        for handler in tf.get_logger().handlers:
            handler.setStream(sys.stderr)

        # Clean up the descriptors
        for fd in self.reals:
            os.close(fd)
        os.close(self.fake)
        if self.show_if_exception:
            # Clear the file
            open(self.stash_name, 'w').close()
        if self.allow_pyprint:
            tf.print = print_v2
            with open(self.tf_print_name, 'r') as f:
                for line in f:
                    print(line, end='')  # Endings already included from tf.print
            open(self.tf_print_name, 'w').close()

    def write(self, dummy: str) -> None:
        """A function which is invoked during print calls.

        Args:
            dummy: The string which wanted to be printed.
        """
        if self.allow_pyprint:
            os.write(self.reals[0], dummy.encode('utf-8'))
        elif self.show_if_exception:
            os.write(self.fake, dummy.encode('utf-8'))

    def flush(self) -> None:
        """A function to empty the current print buffer. No-op in this case.
        """

    @staticmethod
    @atexit.register
    def teardown() -> None:
        """Clean up the stash files when the program exists
        """
        try:
            os.remove(Suppressor.stash_name)
            os.remove(Suppressor.tf_print_name)
        except FileNotFoundError:
            pass

flush

A function to empty the current print buffer. No-op in this case.

Source code in fastestimator/fastestimator/util/util.py
def flush(self) -> None:
    """A function to empty the current print buffer. No-op in this case.
    """

teardown staticmethod

Clean up the stash files when the program exists

Source code in fastestimator/fastestimator/util/util.py
@staticmethod
@atexit.register
def teardown() -> None:
    """Clean up the stash files when the program exists
    """
    try:
        os.remove(Suppressor.stash_name)
        os.remove(Suppressor.tf_print_name)
    except FileNotFoundError:
        pass

write

A function which is invoked during print calls.

Parameters:

Name Type Description Default
dummy str

The string which wanted to be printed.

required
Source code in fastestimator/fastestimator/util/util.py
def write(self, dummy: str) -> None:
    """A function which is invoked during print calls.

    Args:
        dummy: The string which wanted to be printed.
    """
    if self.allow_pyprint:
        os.write(self.reals[0], dummy.encode('utf-8'))
    elif self.show_if_exception:
        os.write(self.fake, dummy.encode('utf-8'))

Timer

Bases: ContextDecorator

A class that can be used to time things.

This class is intentionally not @traceable.

x = lambda: list(map(lambda i: i + i/2, list(range(int(1e6)))))
with fe.util.Timer():
    x()  # Task took 0.1639 seconds
@fe.util.Timer("T2")
def func():
    return x()
func()  # T2 took 0.14819 seconds
Source code in fastestimator/fastestimator/util/util.py
class Timer(ContextDecorator):
    """A class that can be used to time things.

    This class is intentionally not @traceable.

    ```python
    x = lambda: list(map(lambda i: i + i/2, list(range(int(1e6)))))
    with fe.util.Timer():
        x()  # Task took 0.1639 seconds
    @fe.util.Timer("T2")
    def func():
        return x()
    func()  # T2 took 0.14819 seconds
    ```
    """
    def __init__(self, name="Task") -> None:
        self.name = name
        self.start = None
        self.end = None
        self.interval = None

    def __enter__(self) -> 'Timer':
        self.start = time.perf_counter()
        return self

    def __exit__(self, *exc: Tuple[Optional[Type], Optional[Exception], Optional[Any]]) -> None:
        self.end = time.perf_counter()
        self.interval = self.end - self.start
        tf.print("{} took {} seconds".format(self.name, self.interval))

cpu_count cached

Determine the number of available CPUs (correcting for docker container limits).

Parameters:

Name Type Description Default
limit Optional[int]

If provided, the TF and Torch backends will be told to use limit number of threads, or the available number of cpus if the latter is lower (limit cannot raise the number of threads). A limit can only be enforced once per python session, before starting anything like pipeline which requires multiprocessing.

None

Returns:

Type Description
int

The nuber of available CPUs (correcting for docker container limits), or the user provided limit.

Raises:

Type Description
ValueError

If a limit is provided which doesn't match previously enforced limits.

Source code in fastestimator/fastestimator/util/util.py
@lru_cache()
def cpu_count(limit: Optional[int] = None) -> int:
    """Determine the number of available CPUs (correcting for docker container limits).

    Args:
        limit: If provided, the TF and Torch backends will be told to use `limit` number of threads, or the available
            number of cpus if the latter is lower (`limit` cannot raise the number of threads). A limit can only be
            enforced once per python session, before starting anything like pipeline which requires multiprocessing.

    Returns:
        The nuber of available CPUs (correcting for docker container limits), or the user provided `limit`.

    Raises:
        ValueError: If a `limit` is provided which doesn't match previously enforced limits.
    """
    existing_limit = os.environ.get('FE_NUM_THREADS_', None)  # This variable is used internally to indicate whether cpu
    # limits have already been enforced in this python session
    if existing_limit:
        try:
            existing_limit = int(existing_limit)
        except ValueError as err:
            print("FastEstimator-Error: FE_NUM_THREADS_ is an internal variable. Use FE_NUM_THREADS (no underscore)")
            raise err
        if limit and limit != existing_limit:
            raise ValueError(f"Tried to enforce a cpu limit of {limit}, but {existing_limit} was already set.")
        return existing_limit
    # Check if user provided an environment variable limit on the number of threads
    env_limit = os.environ.get('FE_NUM_THREADS', None)  # User might set this one in a bash script
    if env_limit:
        try:
            env_limit = int(env_limit)
        except ValueError as err:
            warn(f"FE_NUM_THREADS variable must be an integer, but was set to: {env_limit}")
            raise err
    try:
        # In docker containers which have --cpuset-cpus, the limit won't be reflected by normal os.cpu_count() call
        cores = len(os.sched_getaffinity(0))
    except AttributeError:
        # Running on Mac or Windows where the above method isn't available, so use the regular way
        cores = os.cpu_count()
    cores = min(cores, limit or cores, env_limit or cores)
    if cores < 1:
        raise ValueError(f"At least 1 core is required for training, but found {cores}")
    os.environ['FE_NUM_THREADS_'] = f"{cores}"  # Remember the value so we don't try to re-set the frameworks later
    os.environ['OMP_NUM_THREADS'] = f"{cores}"
    os.environ['MKL_NUM_THREADS'] = f"{cores}"
    os.environ['TF_NUM_INTEROP_THREADS'] = f"{cores}"
    os.environ['TF_NUM_INTRAOP_THREADS'] = f"{cores}"
    torch.set_num_threads(cores)
    torch.set_num_interop_threads(cores)
    return cores

detach_tensors

Detach tensor (collections) from current graph recursively.

Parameters:

Name Type Description Default
data T

The data to be detached.

required

Returns:

Type Description
T

Output data.

Source code in fastestimator/fastestimator/util/util.py
def detach_tensors(data: T) -> T:
    """Detach tensor (collections) from current graph recursively.

    Args:
        data: The data to be detached.

    Returns:
        Output data.
    """
    if isinstance(data, dict):
        return {key: detach_tensors(value) for (key, value) in data.items()}
    elif isinstance(data, list):
        return [detach_tensors(val) for val in data]
    elif isinstance(data, tuple):
        return tuple([detach_tensors(val) for val in data])
    elif isinstance(data, set):
        return set([detach_tensors(val) for val in data])
    elif isinstance(data, torch.Tensor):
        return data.detach()
    return data

draw

Print our name.

Source code in fastestimator/fastestimator/util/util.py
def draw() -> None:
    """Print our name.
    """
    print(Figlet(font="slant").renderText("FastEstimator"))

get_batch_size

Infer batch size from a batch dictionary. It will ignore all dictionary value with data type that doesn't have "shape" attribute.

Parameters:

Name Type Description Default
data Dict[str, Any]

The batch dictionary.

required

Returns:

Type Description
int

batch size.

Source code in fastestimator/fastestimator/util/util.py
def get_batch_size(data: Dict[str, Any]) -> int:
    """Infer batch size from a batch dictionary. It will ignore all dictionary value with data type that
    doesn't have "shape" attribute.

    Args:
        data: The batch dictionary.

    Returns:
        batch size.
    """
    assert isinstance(data, dict), "data input must be a dictionary"
    batch_size = set(data[key].shape[0] for key in data if hasattr(data[key], "shape") and list(data[key].shape))
    assert len(batch_size) == 1, "invalid batch size: {}".format(batch_size)
    return batch_size.pop()

get_device cached

Get the torch device for the current hardware.

Returns:

Type Description
device

The torch device most appropriate for the current hardware.

Source code in fastestimator/fastestimator/util/util.py
@lru_cache()
def get_device() -> torch.device:
    """Get the torch device for the current hardware.

    Returns:
        The torch device most appropriate for the current hardware.
    """
    if torch.backends.mps.is_available():
        device = torch.device("mps")
    elif torch.cuda.is_available():
        device = torch.device("cuda:0")
    else:
        device = torch.device("cpu")
    return device

get_gpu_info cached

Get summaries of all of the GPUs accessible on this machine.

Returns:

Type Description
List[str]

A formatted summary of the GPUs available on the machine (one list entry per GPU).

Source code in fastestimator/fastestimator/util/util.py
@lru_cache()
def get_gpu_info() -> List[str]:
    """Get summaries of all of the GPUs accessible on this machine.

    Returns:
        A formatted summary of the GPUs available on the machine (one list entry per GPU).
    """
    if shutil.which('nvidia-smi') is not None:
        nvidia_command = ['nvidia-smi', '--query-gpu=gpu_name,memory.total,driver_version', '--format=csv']
        output = subprocess.check_output(nvidia_command)
        output = output.decode('utf-8')
        lines = output.strip().split(os.linesep)[1:]
        names = []
        for line in lines:
            name, mem, driver = line.strip().split(',')
            names.append(f"{name.strip()} ({mem.strip()}, Driver={driver.strip()})")
        return names
    elif torch.backends.mps.is_available():
        output = subprocess.check_output(["ioreg", "-l"])
        output = output.decode('utf-8')
        core_count = re.search(r'"gpu-core-count"[ ]*=[ ]*(\d*)', output)
        core_count = "???" if not core_count else core_count.group(1)
        output = subprocess.check_output(["sysctl", "-n", "hw.memsize"])
        output = output.decode('utf-8')
        gpu_mem = f"{float(output)*1e-9:0.2f} GB"  # Convert from bytes to GB
        return [f"{get_cpu_info()['brand_raw']} ({gpu_mem}, {core_count} Cores)"]  # On mac the CPU name is the GPU name
    return []

get_num_devices cached

Determine the number of available GPUs.

Returns:

Type Description
int

The number of available GPUs, or 1 if none are found.

Source code in fastestimator/fastestimator/util/util.py
@lru_cache()
def get_num_devices() -> int:
    """Determine the number of available GPUs.

    Returns:
        The number of available GPUs, or 1 if none are found.
    """
    return max(torch.cuda.device_count(), 1)

get_num_gpus cached

Get the number of GPUs available.

Returns:

Type Description
int

The number of GPUs available.

Source code in fastestimator/fastestimator/util/util.py
@lru_cache()
def get_num_gpus() -> int:
    """Get the number of GPUs available.

    Returns:
        The number of GPUs available.
    """
    if torch.backends.mps.is_available():
        return 1
    elif torch.cuda.is_available():
        return torch.cuda.device_count()
    else:
        return 0

is_valid_file

Validate whether file is valid or not.

Parameters:

Name Type Description Default
file_path str

location of the input file.

required

Returns:

Type Description
bool

Whether the file is valid.

Source code in fastestimator/fastestimator/util/util.py
def is_valid_file(file_path: str) -> bool:
    """Validate whether file is valid or not.

    Args:
        file_path: location of the input file.

    Returns:
        Whether the file is valid.
    """
    if not os.path.exists(file_path):
        return False
    suffix = Path(file_path).suffix
    try:
        if suffix == '.zip':
            import zipfile
            zip_file = zipfile.ZipFile(file_path)
            _ = zip_file.namelist()
        elif suffix == '.gz':
            if file_path.endswith('.tar.gz'):
                import tarfile
                with tarfile.open(file_path) as img_tar:
                    _ = img_tar.getmembers()
            else:
                import gzip
                f = gzip.open(file_path, 'rb')
                _ = f.read()
        return True
    except Exception as e:
        print(e)
        return False

move_tensors_to_device

Move torch tensor (collections) between gpu and cpu recursively.

Parameters:

Name Type Description Default
data T

The input data to be moved.

required
device Union[str, device]

The target device.

required

Returns:

Type Description
T

Output data.

Source code in fastestimator/fastestimator/util/util.py
def move_tensors_to_device(data: T, device: Union[str, torch.device]) -> T:
    """Move torch tensor (collections) between gpu and cpu recursively.

    Args:
        data: The input data to be moved.
        device: The target device.

    Returns:
        Output data.
    """
    if isinstance(data, dict):
        return {key: move_tensors_to_device(value, device) for (key, value) in data.items()}
    elif isinstance(data, list):
        return [move_tensors_to_device(val, device) for val in data]
    elif isinstance(data, tuple):
        return tuple([move_tensors_to_device(val, device) for val in data])
    elif isinstance(data, set):
        return set([move_tensors_to_device(val, device) for val in data])
    elif isinstance(data, torch.Tensor):
        return data.to(device)
    else:
        return data

pad_batch

A function to pad a batch of data in-place by appending to the ends of the tensors. Tensor type needs to be numpy array otherwise would get ignored. (tf.Tensor and torch.Tensor will cause error)

data = [{"x": np.ones((2, 2)), "y": 8}, {"x": np.ones((3, 1)), "y": 4}]
fe.util.pad_batch(data, pad_value=0)
print(data)  # [{'x': [[1., 1.], [1., 1.], [0., 0.]], 'y': 8}, {'x': [[1., 0.], [1., 0.], [1., 0.]]), 'y': 4}]

Parameters:

Name Type Description Default
batch List[MutableMapping[str, ndarray]]

A list of data to be padded.

required
pad_value Union[float, int]

The value to pad with.

required

Raises:

Type Description
AssertionError

If the data within the batch do not have matching rank, or have different keys

Source code in fastestimator/fastestimator/util/util.py
def pad_batch(batch: List[MutableMapping[str, np.ndarray]], pad_value: Union[float, int]) -> None:
    """A function to pad a batch of data in-place by appending to the ends of the tensors. Tensor type needs to be
    numpy array otherwise would get ignored. (tf.Tensor and torch.Tensor will cause error)

    ```python
    data = [{"x": np.ones((2, 2)), "y": 8}, {"x": np.ones((3, 1)), "y": 4}]
    fe.util.pad_batch(data, pad_value=0)
    print(data)  # [{'x': [[1., 1.], [1., 1.], [0., 0.]], 'y': 8}, {'x': [[1., 0.], [1., 0.], [1., 0.]]), 'y': 4}]
    ```

    Args:
        batch: A list of data to be padded.
        pad_value: The value to pad with.

    Raises:
        AssertionError: If the data within the batch do not have matching rank, or have different keys
    """
    keys = batch[0].keys()
    for one_batch in batch:
        assert one_batch.keys() == keys, "data within batch must have same keys"

    for key in keys:
        shapes = [data[key].shape for data in batch if hasattr(data[key], "shape")]
        if len(set(shapes)) > 1:
            assert len(set(len(shape) for shape in shapes)) == 1, "data within batch must have same rank"
            max_shapes = tuple(np.max(np.array(shapes), axis=0))
            for data in batch:
                data[key] = pad_data(data[key], max_shapes, pad_value)

pad_data

Pad data by appending pad_values along it's dimensions until the target_shape is reached. All entries of target_shape should be larger than the data.shape, and have the same rank.

x = np.ones((1,2))
x = fe.util.pad_data(x, target_shape=(3, 3), pad_value = -2)  # [[1, 1, -2], [-2, -2, -2], [-2, -2, -2]]
x = fe.util.pad_data(x, target_shape=(3, 3, 3), pad_value = -2) # error
x = fe.util.pad_data(x, target_shape=(4, 1), pad_value = -2) # error

Parameters:

Name Type Description Default
data ndarray

The data to be padded.

required
target_shape Tuple[int, ...]

The desired shape for data. Should have the same rank as data, with each dimension being >= the size of the data dimension.

required
pad_value Union[float, int]

The value to insert into data if padding is required to achieve the target_shape.

required

Returns:

Type Description
ndarray

The data, padded to the target_shape.

Source code in fastestimator/fastestimator/util/util.py
def pad_data(data: np.ndarray, target_shape: Tuple[int, ...], pad_value: Union[float, int]) -> np.ndarray:
    """Pad `data` by appending `pad_value`s along it's dimensions until the `target_shape` is reached. All entries of
    target_shape should be larger than the data.shape, and have the same rank.

    ```python
    x = np.ones((1,2))
    x = fe.util.pad_data(x, target_shape=(3, 3), pad_value = -2)  # [[1, 1, -2], [-2, -2, -2], [-2, -2, -2]]
    x = fe.util.pad_data(x, target_shape=(3, 3, 3), pad_value = -2) # error
    x = fe.util.pad_data(x, target_shape=(4, 1), pad_value = -2) # error
    ```

    Args:
        data: The data to be padded.
        target_shape: The desired shape for `data`. Should have the same rank as `data`, with each dimension being >=
            the size of the `data` dimension.
        pad_value: The value to insert into `data` if padding is required to achieve the `target_shape`.

    Returns:
        The `data`, padded to the `target_shape`.
    """
    shape_difference = np.array(target_shape) - np.array(data.shape)
    padded_shape = np.array([np.zeros_like(shape_difference), shape_difference]).T
    return np.pad(data, padded_shape, 'constant', constant_values=pad_value)

to_number

Convert an input value into a Numpy ndarray.

This method can be used with Python and Numpy data:

b = fe.backend.to_number(5)  # 5 (type==np.ndarray)
b = fe.backend.to_number(4.0)  # 4.0 (type==np.ndarray)
n = np.array([1, 2, 3])
b = fe.backend.to_number(n)  # [1, 2, 3] (type==np.ndarray)

This method can be used with TensorFlow tensors:

t = tf.constant([1, 2, 3])
b = fe.backend.to_number(t)  # [1, 2, 3] (type==np.ndarray)

This method can be used with PyTorch tensors:

p = torch.tensor([1, 2, 3])
b = fe.backend.to_number(p)  # [1, 2, 3] (type==np.ndarray)

Parameters:

Name Type Description Default
data Union[Tensor, Tensor, ndarray, int, float, str]

The value to be converted into a np.ndarray.

required

Returns:

Type Description
ndarray

An ndarray corresponding to the given data.

Source code in fastestimator/fastestimator/util/util.py
def to_number(data: Union[tf.Tensor, torch.Tensor, np.ndarray, int, float, str]) -> np.ndarray:
    """Convert an input value into a Numpy ndarray.

    This method can be used with Python and Numpy data:
    ```python
    b = fe.backend.to_number(5)  # 5 (type==np.ndarray)
    b = fe.backend.to_number(4.0)  # 4.0 (type==np.ndarray)
    n = np.array([1, 2, 3])
    b = fe.backend.to_number(n)  # [1, 2, 3] (type==np.ndarray)
    ```

    This method can be used with TensorFlow tensors:
    ```python
    t = tf.constant([1, 2, 3])
    b = fe.backend.to_number(t)  # [1, 2, 3] (type==np.ndarray)
    ```

    This method can be used with PyTorch tensors:
    ```python
    p = torch.tensor([1, 2, 3])
    b = fe.backend.to_number(p)  # [1, 2, 3] (type==np.ndarray)
    ```

    Args:
        data: The value to be converted into a np.ndarray.

    Returns:
        An ndarray corresponding to the given `data`.
    """
    if tf.is_tensor(data):
        data = data.numpy()
    elif isinstance(data, torch.Tensor):
        if data.requires_grad:
            data = data.detach().numpy()
        else:
            data = data.numpy()
    return np.array(data)