SubchunkableApplyFlow

zetta_utils.mazepa_layer_processing.common.subchunkable_apply_flow.build_subchunkable_apply_flow(dst, dst_resolution, processing_chunk_sizes, processing_gap=(0, 0, 0), processing_crop_pads=(0, 0, 0), processing_blend_pads=(0, 0, 0), processing_blend_modes='quadratic', level_intermediaries_dirs=None, skip_intermediaries=False, max_reduction_chunk_size=None, expand_bbox_resolution=False, expand_bbox_backend=False, expand_bbox_processing=True, shrink_processing_chunk=False, auto_divisibility=False, allow_cache_up_to_level=None, op_worker_type=None, reduction_worker_type=None, print_summary=True, generate_ng_link=False, fn=None, fn_semaphores=None, op=None, op_args=(), op_kwargs=None, bbox=None, start_coord=None, end_coord=None, coord_resolution=None, auto_bbox=False, dst_tighten_bounds=False)[source]

The helper constructor for a flow that applies any function or operation with a Tensor valued output in a chunkwise fashion, allowing for arbitrary subchunking with cropping and blending. Chunks are processed, written to an intermediary location temporarily, and then combined (reduced) with weights if necessary to produce the output.

Parameters
  • dst (UnionType[VolumetricBasedLayerProtocol, None]) – The destination VolumetricBasedLayerProtocol. May be None, in which case skip_intermediaries must be True, expand_bbox_backend must be False, and “defer” may not be used in processing_blend_modes.

  • dst_resolution (Sequence[float]) – The resolution of the destination VolumetricBasedLayerProtocol (or resolution to use for computation, even if dst is None).

  • dst_tighten_bounds (bool) – Tighten the VolumetricBasedLayerProtocol’s bounds to the given bbox before any expansions (whether given as a bbox or start_coord, end_coord, and coord_resolution) EXCEPT for expand_bbox_resolution. If this is the case, then the processing will happen in the expanded bbox according to all expansion, though the writing will only happen within the tight bounds. Note that this only happens for the dst_resolution, and may result in odd behaviour if the bounds at multiple resolutions are used in a subsequent processes. Requires dst.

  • processing_chunk_sizes (Sequence[Sequence[int]]) – The base chunk size at each subchunking level in X, Y, Z, from the largest to the smallest. Subject to divisibility requirements (see bottom). When auto_divisibility is used, the chunk sizes other than the bottom level chunk size will be treated as an upper bound, and rounded down to satisfy divisibility. Must be even.

  • processing_gap (Sequence[int]) – Extra unprocessed space to be skipped between chunks at the top level. When used, blend_pad cannot be used at the top level. Cannot be used with auto_divisibility, expand_bbox_backend, expand_bbox_resolution, or shrink_processing_chunk.

  • processing_crop_pads (Union[Sequence[int], Sequence[Sequence[int]]]) – Pixels to crop per processing chunk at each subchunking level in X, Y, Z, from the largest to the smallest. Affects divisibility requirements (see bottom). Given as a padding: (10, 10, 0) crop_pad with a (1024, 1024, 1) processing_chunk_size means that (1044, 1044, 1) area will be processed and then cropped by (10, 10, 0) to output a (1024, 1024, 1) chunk.

  • processing_blend_pads (Union[Sequence[int], Sequence[Sequence[int]]]) – Pixels to blend per processing chunk at each subchunking level in X, Y, Z, from the largest to the smallest. Affects divisibility requirements (see bottom).Given as a padding: (10, 10, 0) blend_pad with a (1024, 1024, 1) processing_chunk_size means that (1044, 1044, 1) area will be processed and then be overlapped by (20, 20, 0) between each (1024, 1024, 1) chunk. Must be less than or equal to half of the processing_chunk_size in each dimension.

  • processing_blend_modes (Union[Literal[‘linear’, ‘quadratic’, ‘max’, ‘defer’], Sequence[Literal[‘linear’, ‘quadratic’, ‘max’, ‘defer’]]]) – Which blend mode to use at each subchunking level. linear sums the blended areas weighted linearly by the position. quadratic sums the blended areas weighted quadratically by the position. max takes the maximum value of any layer in the overlap area. defer can only be supplied as the blend mode for the top level, and skips the final reduction stage, leaving the final intermediary files for the user to handle. If defer is used, then skip_intermediaries cannot be used.

  • max_reduction_chunk_size (Optional[Sequence[int]]) – The upper bounds of the size for chunks to be used for the reduction step. During the reduction step, backend chunks in the area to be reduced will be reduced in larger chunks that have been combined up to this limit. Reduction chunks are only used to combine already computed outputs, so larger is better to cut down on the number of tasks. Must be larger than both the processing_chunk_size for the top level, as well as the dst chunk size.

  • level_intermediaries_dirs (Optional[Sequence[UnionType[str, None]]]) – Intermediary directories for temporary layers at each subchunking level, used for handling blending, cropping, and rechunking for backends. Only used if the level is using blending and/or if the level above has crop, or if it is the top level and skip_intermediaries has not been set. If running remotely, the top level must be also remote, as the worker performing the reduction step may be different from the worker that wrote the processing. The other levels are recommended to be local.

  • skip_intermediaries (bool) – Skips all intermediaries. This means that no blending is allowed anywhere, and that only the bottom level may have crop. You MUST ensure that your output is aligned to the backend chunk yourself when this option is used. Cannot be used if defer is used in processing_blend_modes.

  • expand_bbox_resolution (bool) – Expands bbox (whether given as a bbox or start_coord, end_coord, and coord_resolution) to be integral in the dst_resolution. Cannot be used with processing_gap.

  • expand_bbox_backend (bool) – Expands bbox (whether given as a bbox or start_coord, end_coord, and coord_resolution) to be aligned to the dst layer’s backend chunk size and offset at dst_resolution. Requires bbox to be integral in dst_resolution. Cannot be used with processing_gap, expand_bbox_processing, or auto_divisibility.

  • expand_bbox_processing (bool) – Expands bbox (whether given as a bbox or start_coord, end_coord, and coord_resolution) to be an integer multiple of the top level processing_chunk_size, holding the top left corner fixed. Requires bbox to be integral in dst_resolution. Cannot be used with expand_bbox_backend or shrink_processing_chunk.

  • shrink_processing_chunk (bool) – Shrinks the top level processing_chunk_size to fit the bbox. Does not affect other levels, so divisibility requirements may be affected. Requires bbox to be integral in dst_resolution. Cannot be used with processing_gap, expand_bbox_processing, or auto_divisibility.

  • auto_divisibility (bool) – Automatically chooses processing_chunk_sizes that are divisible, while respecting the bottom level processing_chunk_size as well as every level’s processing_crop_pads and processing_blend_pads. The user-provided processing_chunk_sizes are treated as an upper bound. Requires bbox to be integral in dst_resolution. Requires expand_bbox_processing. Cannot be used with processing_gap, expand_bbox_backend, shrink_processing_chunk.

  • allow_cache_up_to_level (UnionType[int, None]) – The subchunking level (smallest is 0) where the cache for different remote layers should be cleared after the processing is done. Recommended to keep this at the level of the largest subchunks (default).

  • op_worker_type (UnionType[str, None]) – The worker type required by the op. The subchunked tasks will be routed to only the workers that have the requested worker type.

  • reduction_worker_type (UnionType[str, None]) – The worker type required by the reduction process. The subchunked tasks will be routed to only the workers that have the requested worker type.

  • print_summary (bool) – Whether a summary should be printed.

  • generate_ng_link (bool) – Whether a neuroglancer link should be generated in the summary. Requires print_summary.

  • fn (Optional[Callable[[ParamSpec], Tensor]]) – The function to be run on each chunk. Cannot be used with op.

  • fn_semaphores (Optional[Sequence[Literal[‘read’, ‘write’, ‘cuda’, ‘cpu’]]]) – The semaphores to be acquired while the function runs. Supports read, write, cuda, and cpu. Cannot be used with op.

  • op (Optional[VolumetricOpProtocol[ParamSpec, None, Any]]) – The operation to be run on each chunk. Cannot be used with fn.

  • op_args (Iterable) – Only used for typing. Do not use: will raise an exception if nonempty.

  • op_kwargs (Optional[Mapping[str, Any]]) – Any kwarguments taken by the fn or the op. end_coord, and coord_resolution.

  • bbox (UnionType[BBox3D, None]) – The bounding box for the operation. Cannot be used with start_coord, end_coord, and coord_resolution, or auto_bbox.

  • start_coord (Optional[Sequence[int]]) – The start coordinate for the bounding box. Must be used with end_coord and coord_resolution; cannot be used with bbox or ``auto_bbox.

  • end_coord (Optional[Sequence[int]]) – The end coordinate for the bounding box. Must be used with start_coord and coord_resolution; cannot be used with bbox or ``auto_bbox.

  • coord_resolution (Optional[Sequence]) – The resolution in which the coordinates are given for the bounding box. Must be used with start_coord and end_coord; cannot be used with bbox.

  • auto_bbox (bool) – Sets the bbox to the bounds of dst at dst_resolution. Note that this may result in a significant overhead if the dst bounds have not been tightened. Requires dst. Cannot be used with bbox or start_coord, end_coord, and coord_resolution.

Return type

Flow

Divisibility Requirement

At each subchunking level, the processing_ parameters of SubchunkableApplyFlow are subject to divisiblity requirements. Specifically, the processing chunk at each level must evenly divide the crop and blend padded processing chunk of the level above. Furthermore, the top level processing chunk must evenly divide the bbox.

This requirement exists because SubchunkableApplyFlow recursively subchunks the padded ROI into the processing chunks of the level below. auto_divisiblity exists to automatically find the correct processing chunk sizes, but it may have an adverse impact on performance.

Note

If calculating the processing_chunk_sizes by hand, make sure to multiply the crop_pad and blend_pad by 2 before adding it to the processing_chunk_size.

Note on Architecture

This subsection is intended as an in-depth discussion of the SubchunkableApplyFlow architecture.

All of the volumes that zetta_utils expects to work with are backed by a chunked storage. This means that if we want to use padding of any form in our processing, whether that is blending or cropping, we run into two major issues:

  1. Assuming that the processing chunks (hereafter referred to as punks) are exactly the size of the backend chunks (bunks), adding even a single pixel of padding in XY incurs a 9x read penalty since the neighbouring chunks must be read.

  2. Since the writing is also done to chunks, we have a race condition in writes when using blending. 9, 27, or even more (if a bunk is small) punks with padding can overlap in a bunk, which results in a race condition.

Solving problem #1 is the motivation for why SubchunkableApplyFlow needs to exist, and solving #2 is the motivation for its architecture.

To solve the first problem, we can use caching. However, naive chunking means that cache locality cannot be guaranteed. A single node in a large run might see more than 10,000 tasks, and a cache might only be large enough for about 100 bunks, meaning that a downloaded chunk will be evicted after 10 tasks since 9 chunks need to be downloaded for each task.

Subchunking so that each node sees a “superchunk” that should entirely fit in memory guarantees that we are using caching, thereby massively reducing the amount of read required. In fact, we can even use disk-backed caching on a “super-superchunk”, with each “superchunk” fitting in memory if we want to reduce the overhead from caching even further.

In addition, the subchunking allows task generation to be naturally parallelised since each node is responsible for splitting up the chunks it received into subchunks. This is a bigger boon than it might seem at the beginning: if generating a single task takes a millisecond, then scheduling 1e8 tasks takes 28 hours.

Solving the second problem is a little harder: the easy solution is to just emit the tasks in 9 or 27 waves and add to the existing output (with blending weights) at every write, but this has its own issues:

  1. Emitting tasks in waves is very inefficient since there will always be straggler tasks. A thousand nodes should not be held back because one node was preempted.

  2. If a node gets preempted, each bunk needs to be tracked to see if the existing data should be ignored.

  3. Each write now incurs a read call as well.

  4. Each node requires 9 or 27 read and write calls.

Thus, the better solution is actually to just write to different temporary layers and sum with the weights at the end. This might seem expensive, but thanks to subchunking we can use local storage or memory as temporary layers, and just write to remote temporary layers for the top level. (This relies on the fact that local I/O is practically free, even when chunked.)

From here on, we will refer to the bottom level processing chunk and the bottom level backend chunk as the L0 punk and L0 bunk, respectively. The superchunks that consist of L(n) punks will be denoted L(n+1) punks, with the entire ROI to be processed being one level higher than the highest level.

Whenever blending is used in SubchunkableApplyFlow, 8 temporary layers are created for checkerboarding in X, Y, Z. Each of these temporary layers have a smaller bunk than the original layer, at half of the punk given, and aligned to the punks. This ensures that there are no race conditions within one of these 8 layers, as long as blend_crop is less than half of the punk. The tasks can be emitted all at once, and once the processing tasks are done, there is a reduction task for each original bunk that collects all the processing tasks that intersect that bunk and sums them all with the correct weights. When blending is not used, a single temporary layer is created to solve the problem of the punk not being aligned to the bunk, or the bunk being too large (resulting in a race condition).

So how does this checkerboarding interact with the subchunking? SubchunkableApplyFlow is fully recursive, and here is the rundown for any given level:

  1. Start with the level N+1 bunk and crop_padded punk. We are given the level N punk, crop_pad and blend_pad.

  2. Divide each crop_padded N+1 punk into level N punks (divisibility is checked).

  3. Make 8 temporary layers, each with bunks that are aligned to the crop_padded N+1 punk and with half the size of level N punks.

  4. Pad the level N punks, and recursively schedule and execute level N tasks.

  5. Reduce the results for each level N+1 bunk.

  6. Return

In practice, to save time, the reduction happens not for each bunk, but for each superbunk consisting of bunks that have been aggregated up to some size, specified by max_reduction_chunk_size. Note that the L0 crop_pad is handled by the function or the operation.

This design makes it possible to use arbitrary cropping and blending at each level.

In addition, the intermediary layers are only made when they are necessary:

  1. At the top level. This is because the backend chunk size of the destination layer might not be a divisor of the output size (thus creating a race condition).

  2. At the bottom level. This is to allow arbitrary output sizes since the output size will likely be smaller than the destination backend size.

  3. For all other levels, where either the blending is used for that level or crop is used for the level above.

This can be overridden with skip_intermediaries.