SubchunkableApplyFlow
¶
- zetta_utils.mazepa_layer_processing.common.subchunkable_apply_flow.build_subchunkable_apply_flow(dst, dst_resolution, processing_chunk_sizes, processing_gap=(0, 0, 0), processing_crop_pads=(0, 0, 0), processing_blend_pads=(0, 0, 0), processing_blend_modes='quadratic', level_intermediaries_dirs=None, skip_intermediaries=False, max_reduction_chunk_size=None, expand_bbox_resolution=False, expand_bbox_backend=False, expand_bbox_processing=True, shrink_processing_chunk=False, auto_divisibility=False, allow_cache_up_to_level=None, op_worker_type=None, reduction_worker_type=None, print_summary=True, generate_ng_link=False, fn=None, fn_semaphores=None, op=None, op_args=(), op_kwargs=None, bbox=None, start_coord=None, end_coord=None, coord_resolution=None, auto_bbox=False, dst_tighten_bounds=False)[source]¶
The helper constructor for a flow that applies any function or operation with a
Tensor
valued output in a chunkwise fashion, allowing for arbitrary subchunking with cropping and blending. Chunks are processed, written to an intermediary location temporarily, and then combined (reduced
) with weights if necessary to produce the output.- Parameters
dst (
UnionType
[VolumetricBasedLayerProtocol
,None
]) – The destination VolumetricBasedLayerProtocol. May be None, in which caseskip_intermediaries
must be True,expand_bbox_backend
must be False, and “defer” may not be used inprocessing_blend_modes
.dst_resolution (
Sequence
[float
]) – The resolution of the destination VolumetricBasedLayerProtocol (or resolution to use for computation, even ifdst
is None).dst_tighten_bounds (
bool
) – Tighten the VolumetricBasedLayerProtocol’s bounds to the givenbbox
before any expansions (whether given as abbox
orstart_coord
,end_coord
, andcoord_resolution
) EXCEPT forexpand_bbox_resolution
. If this is the case, then the processing will happen in the expanded bbox according to all expansion, though the writing will only happen within the tight bounds. Note that this only happens for thedst_resolution
, and may result in odd behaviour if the bounds at multiple resolutions are used in a subsequent processes. Requiresdst
.processing_chunk_sizes (
Sequence
[Sequence
[int
]]) – The base chunk size at each subchunking level in X, Y, Z, from the largest to the smallest. Subject to divisibility requirements (see bottom). Whenauto_divisibility
is used, the chunk sizes other than the bottom level chunk size will be treated as an upper bound, and rounded down to satisfy divisibility. Must be even.processing_gap (
Sequence
[int
]) – Extra unprocessed space to be skipped between chunks at the top level. When used, blend_pad cannot be used at the top level. Cannot be used withauto_divisibility
,expand_bbox_backend
,expand_bbox_resolution
, orshrink_processing_chunk
.processing_crop_pads (
Union
[Sequence
[int
],Sequence
[Sequence
[int
]]]) – Pixels to crop per processing chunk at each subchunking level in X, Y, Z, from the largest to the smallest. Affects divisibility requirements (see bottom). Given as a padding:(10, 10, 0)
crop_pad
with a(1024, 1024, 1)
processing_chunk_size
means that(1044, 1044, 1)
area will be processed and then cropped by(10, 10, 0)
to output a(1024, 1024, 1)
chunk.processing_blend_pads (
Union
[Sequence
[int
],Sequence
[Sequence
[int
]]]) – Pixels to blend per processing chunk at each subchunking level in X, Y, Z, from the largest to the smallest. Affects divisibility requirements (see bottom).Given as a padding:(10, 10, 0)
blend_pad
with a(1024, 1024, 1)
processing_chunk_size
means that(1044, 1044, 1)
area will be processed and then be overlapped by(20, 20, 0)
between each(1024, 1024, 1)
chunk. Must be less than or equal to half of theprocessing_chunk_size
in each dimension.processing_blend_modes (
Union
[Literal
[‘linear’, ‘quadratic’, ‘max’, ‘defer’],Sequence
[Literal
[‘linear’, ‘quadratic’, ‘max’, ‘defer’]]]) – Which blend mode to use at each subchunking level.linear
sums the blended areas weighted linearly by the position.quadratic
sums the blended areas weighted quadratically by the position.max
takes the maximum value of any layer in the overlap area.defer
can only be supplied as the blend mode for the top level, and skips the final reduction stage, leaving the final intermediary files for the user to handle. Ifdefer
is used, thenskip_intermediaries
cannot be used.max_reduction_chunk_size (
Optional
[Sequence
[int
]]) – The upper bounds of the size for chunks to be used for the reduction step. During the reduction step, backend chunks in the area to be reduced will be reduced in larger chunks that have been combined up to this limit. Reduction chunks are only used to combine already computed outputs, so larger is better to cut down on the number of tasks. Must be larger than both theprocessing_chunk_size
for the top level, as well as thedst
chunk size.level_intermediaries_dirs (
Optional
[Sequence
[UnionType
[str
,None
]]]) – Intermediary directories for temporary layers at each subchunking level, used for handling blending, cropping, and rechunking for backends. Only used if the level is using blending and/or if the level above has crop, or if it is the top level andskip_intermediaries
has not been set. If running remotely, the top levelmust
be also remote, as the worker performing the reduction step may be different from the worker that wrote the processing. The other levels are recommended to be local.skip_intermediaries (
bool
) – Skips all intermediaries. This means that no blending is allowed anywhere, and that only the bottom level may have crop. You MUST ensure that your output is aligned to the backend chunk yourself when this option is used. Cannot be used ifdefer
is used inprocessing_blend_modes
.expand_bbox_resolution (
bool
) – Expandsbbox
(whether given as abbox
orstart_coord
,end_coord
, andcoord_resolution
) to be integral in thedst_resolution
. Cannot be used withprocessing_gap
.expand_bbox_backend (
bool
) – Expandsbbox
(whether given as abbox
orstart_coord
,end_coord
, andcoord_resolution
) to be aligned to thedst
layer’s backend chunk size and offset atdst_resolution
. Requiresbbox
to be integral indst_resolution
. Cannot be used withprocessing_gap
,expand_bbox_processing
, orauto_divisibility
.expand_bbox_processing (
bool
) – Expandsbbox
(whether given as abbox
orstart_coord
,end_coord
, andcoord_resolution
) to be an integer multiple of the top levelprocessing_chunk_size
, holding the top left corner fixed. Requiresbbox
to be integral indst_resolution
. Cannot be used withexpand_bbox_backend
orshrink_processing_chunk
.shrink_processing_chunk (
bool
) – Shrinks the top levelprocessing_chunk_size
to fit thebbox
. Does not affect other levels, so divisibility requirements may be affected. Requiresbbox
to be integral indst_resolution
. Cannot be used withprocessing_gap
,expand_bbox_processing
, orauto_divisibility
.auto_divisibility (
bool
) – Automatically choosesprocessing_chunk_sizes
that are divisible, while respecting the bottom levelprocessing_chunk_size
as well as every level’sprocessing_crop_pads
andprocessing_blend_pads
. The user-providedprocessing_chunk_sizes
are treated as an upper bound. Requiresbbox
to be integral indst_resolution
. Requiresexpand_bbox_processing
. Cannot be used withprocessing_gap
,expand_bbox_backend
,shrink_processing_chunk
.allow_cache_up_to_level (
UnionType
[int
,None
]) – The subchunking level (smallest is 0) where the cache for different remote layers should be cleared after the processing is done. Recommended to keep this at the level of the largest subchunks (default).op_worker_type (
UnionType
[str
,None
]) – The worker type required by the op. The subchunked tasks will be routed to only the workers that have the requested worker type.reduction_worker_type (
UnionType
[str
,None
]) – The worker type required by the reduction process. The subchunked tasks will be routed to only the workers that have the requested worker type.print_summary (
bool
) – Whether a summary should be printed.generate_ng_link (
bool
) – Whether a neuroglancer link should be generated in the summary. Requiresprint_summary
.fn (
Optional
[Callable
[[ParamSpec
],Tensor
]]) – The function to be run on each chunk. Cannot be used withop
.fn_semaphores (
Optional
[Sequence
[Literal
[‘read’, ‘write’, ‘cuda’, ‘cpu’]]]) – The semaphores to be acquired while the function runs. Supportsread
,write
,cuda
, andcpu
. Cannot be used withop
.op (
Optional
[VolumetricOpProtocol
[ParamSpec
,None
,Any
]]) – The operation to be run on each chunk. Cannot be used withfn
.op_args (
Iterable
) – Only used for typing. Do not use: will raise an exception if nonempty.op_kwargs (
Optional
[Mapping
[str
,Any
]]) – Any kwarguments taken by thefn
or theop
.end_coord
, andcoord_resolution
.bbox (
UnionType
[BBox3D
,None
]) – The bounding box for the operation. Cannot be used withstart_coord
,end_coord
, andcoord_resolution
, orauto_bbox
.start_coord (
Optional
[Sequence
[int
]]) – The start coordinate for the bounding box. Must be used withend_coord
andcoord_resolution
; cannot be used withbbox
or ``auto_bbox.end_coord (
Optional
[Sequence
[int
]]) – The end coordinate for the bounding box. Must be used withstart_coord
andcoord_resolution
; cannot be used withbbox
or ``auto_bbox.coord_resolution (
Optional
[Sequence
]) – The resolution in which the coordinates are given for the bounding box. Must be used withstart_coord
andend_coord
; cannot be used withbbox
.auto_bbox (
bool
) – Sets thebbox
to the bounds ofdst
atdst_resolution
. Note that this may result in a significant overhead if thedst
bounds have not been tightened. Requiresdst
. Cannot be used withbbox
orstart_coord
,end_coord
, andcoord_resolution
.
- Return type
Flow
Divisibility Requirement¶
At each subchunking level, the processing_
parameters of SubchunkableApplyFlow
are subject to divisiblity requirements. Specifically, the processing chunk at each level must evenly divide the crop
and blend
padded processing chunk of the level above. Furthermore, the top level processing chunk must evenly divide the bbox
.
This requirement exists because SubchunkableApplyFlow
recursively subchunks the padded ROI into the processing chunks of the level below. auto_divisiblity
exists to automatically find the correct processing chunk sizes, but it may have an adverse impact on performance.
Note
If calculating the processing_chunk_sizes
by hand, make sure to multiply the crop_pad
and blend_pad
by 2 before adding it to the processing_chunk_size
.
Note on Architecture¶
This subsection is intended as an in-depth discussion of the SubchunkableApplyFlow architecture.
All of the volumes that zetta_utils
expects to work with are backed by a chunked storage. This means that if we want to use padding of any form in our processing, whether that is blending or cropping, we run into two major issues:
Assuming that the processing chunks (hereafter referred to as punks) are exactly the size of the backend chunks (bunks), adding even a single pixel of padding in XY incurs a 9x read penalty since the neighbouring chunks must be read.
Since the writing is also done to chunks, we have a race condition in writes when using blending. 9, 27, or even more (if a bunk is small) punks with padding can overlap in a bunk, which results in a race condition.
Solving problem #1 is the motivation for why SubchunkableApplyFlow
needs to exist, and solving #2 is the motivation for its architecture.
To solve the first problem, we can use caching. However, naive chunking means that cache locality cannot be guaranteed. A single node in a large run might see more than 10,000 tasks, and a cache might only be large enough for about 100 bunks, meaning that a downloaded chunk will be evicted after 10 tasks since 9 chunks need to be downloaded for each task.
Subchunking so that each node sees a “superchunk” that should entirely fit in memory guarantees that we are using caching, thereby massively reducing the amount of read required. In fact, we can even use disk-backed caching on a “super-superchunk”, with each “superchunk” fitting in memory if we want to reduce the overhead from caching even further.
In addition, the subchunking allows task generation to be naturally parallelised since each node is responsible for splitting up the chunks it received into subchunks. This is a bigger boon than it might seem at the beginning: if generating a single task takes a millisecond, then scheduling 1e8 tasks takes 28 hours.
Solving the second problem is a little harder: the easy solution is to just emit the tasks in 9 or 27 waves and add to the existing output (with blending weights) at every write, but this has its own issues:
Emitting tasks in waves is very inefficient since there will always be straggler tasks. A thousand nodes should not be held back because one node was preempted.
If a node gets preempted, each bunk needs to be tracked to see if the existing data should be ignored.
Each write now incurs a read call as well.
Each node requires 9 or 27 read and write calls.
Thus, the better solution is actually to just write to different temporary layers and sum with the weights at the end. This might seem expensive, but thanks to subchunking we can use local storage or memory as temporary layers, and just write to remote temporary layers for the top level. (This relies on the fact that local I/O is practically free, even when chunked.)
From here on, we will refer to the bottom level processing chunk and the bottom level backend chunk as the L0 punk and L0 bunk, respectively. The superchunks that consist of L(n) punks will be denoted L(n+1) punks, with the entire ROI to be processed being one level higher than the highest level.
Whenever blending is used in SubchunkableApplyFlow
, 8 temporary layers are created for checkerboarding in X, Y, Z. Each of these temporary layers have a smaller bunk than the original layer, at half of the punk given, and aligned to the punks. This ensures that there are no race conditions within one of these 8 layers, as long as blend_crop
is less than half of the punk. The tasks can be emitted all at once, and once the processing tasks are done, there is a reduction task for each original bunk that collects all the processing tasks that intersect that bunk and sums them all with the correct weights. When blending is not used, a single temporary layer is created to solve the problem of the punk not being aligned to the bunk, or the bunk being too large (resulting in a race condition).
So how does this checkerboarding interact with the subchunking? SubchunkableApplyFlow
is fully recursive, and here is the rundown for any given level:
Start with the level N+1 bunk and crop_padded punk. We are given the level N punk, crop_pad and blend_pad.
Divide each crop_padded N+1 punk into level N punks (divisibility is checked).
Make 8 temporary layers, each with bunks that are aligned to the crop_padded N+1 punk and with half the size of level N punks.
Pad the level N punks, and recursively schedule and execute level N tasks.
Reduce the results for each level N+1 bunk.
Return
In practice, to save time, the reduction happens not for each bunk, but for each superbunk consisting of bunks that have been aggregated up to some size, specified by max_reduction_chunk_size
. Note that the L0 crop_pad
is handled by the function or the operation.
This design makes it possible to use arbitrary cropping and blending at each level.
In addition, the intermediary layers are only made when they are necessary:
At the top level. This is because the backend chunk size of the destination layer might not be a divisor of the output size (thus creating a race condition).
At the bottom level. This is to allow arbitrary output sizes since the output size will likely be smaller than the destination backend size.
For all other levels, where either the blending is used for that level or crop is used for the level above.
This can be overridden with skip_intermediaries
.