-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Multiprocessing interpolation and reprojection #661
base: main
Are you sure you want to change the base?
Conversation
Hi @vschaffn @adebardo, nice to see the progress! Summary: Upon reading the code in Why? Moving forward: geoutils/geoutils/raster/delayed.py Line 783 in b4da49b
So all the projected grid matching calculations to map the input chunks to output chunks written above that line (using the GeoGrid classes I mentioned) can simply stay the same 😉
|
Also note that Rasterio has reprojection errors that can appear (especially when comparing a tiled reprojection to a reprojection on the full raster), due to internal inconsistencies... |
To clarify one more remark: In Due to shape deformations, the mapping of input-output tiles between any CRS requires more than an overlap value, and also depends on the input/output chunksizes (what the You can essentially turn this list comprehension here into your loop for geoutils/geoutils/raster/delayed.py Line 784 in b4da49b
And let reproject_block stick the pieces of input tiles in the right places (you might not have a perfect square with all the tiles you opened), and give you the output: geoutils/geoutils/raster/delayed.py Line 644 in b4da49b
|
I should have thought about this last week (I was mostly off work and only popped for the comment above, so I didn't think about the big picture, sorry!): It is only |
9b7fe6c
to
06fbcfc
Compare
dst_raster_tile = get_raster_tile(dst_raster, dst_tile) | ||
|
||
try: | ||
raster_tile = raster_unload.crop(bbox=dst_raster_tile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I've gone through all the code again.
This might not be the source of the differences you see in your test (it would create gaps but not create differences), but it is a step that will definitely not work between 2 CRSs, because of two things:
- It requires overlap to support resampling during reprojection.
- It uses
dst_raster_tile.bounds
projected onraster_tile.crs
throughcrop()
. These are the projected bounds and not the projected footprint (with densified lines), which gives quite a different intersection. (Actually, we might have to think of changing this incrop()
altogether)
See the drawing of what a projected square can look in a different CRS, to understand the difference:
So we need to work on polygon intersections like is done in the GeoGrid
classes.
I looked at the Dask code again, and it didn't look too hard to adapt for Multiprocessing when already familiar with the underlying functions, so I gave it a go to save you time 😉. It's only about 10 lines of new or adapted code (ignoring the writing part of the Multiprocessing that we write the same as in #669). The rest is an exact copy of the Dask code. The most critical aspect was to call Here it is, with comments explaining the changes with the Dask code: ### (NO CHANGE) BLOCK FUNCTION FULLY COPIED FROM THE DASK ONE ABOVE, JUST WITHOUT THE @DELAYED DECORATOR
def _multiproc_reproject_per_block(
*src_arrs: tuple[NDArrayNum], block_ids: list[dict[str, int]], combined_meta: dict[str, Any], **kwargs: Any
) -> NDArrayNum:
"""
Delayed reprojection per destination block (also rebuilds a square array combined from intersecting source blocks).
"""
# If no source chunk intersects, we return a chunk of destination nodata values
if len(src_arrs) == 0:
# We can use float32 to return NaN, will be cast to other floating type later if that's not source array dtype
dst_arr = np.zeros(combined_meta["dst_shape"], dtype=np.dtype("float32"))
dst_arr[:] = kwargs["dst_nodata"]
return dst_arr
# First, we build an empty array with the combined shape, only with nodata values
comb_src_arr = np.ones((combined_meta["src_shape"]), dtype=src_arrs[0].dtype)
comb_src_arr[:] = kwargs["src_nodata"]
# Then fill it with the source chunks values
for i, arr in enumerate(src_arrs):
bid = block_ids[i]
comb_src_arr[bid["rys"] : bid["rye"], bid["rxs"] : bid["rxe"]] = arr
# Now, we can simply call Rasterio!
# We build the combined transform from tuple
src_transform = rio.transform.Affine(*combined_meta["src_transform"])
dst_transform = rio.transform.Affine(*combined_meta["dst_transform"])
# Reproject
dst_arr = np.zeros(combined_meta["dst_shape"], dtype=comb_src_arr.dtype)
_ = rio.warp.reproject(
comb_src_arr,
dst_arr,
src_transform=src_transform,
src_crs=kwargs["src_crs"],
dst_transform=dst_transform,
dst_crs=kwargs["dst_crs"],
resampling=kwargs["resampling"],
src_nodata=kwargs["src_nodata"],
dst_nodata=kwargs["dst_nodata"],
num_threads=1, # Force the number of threads to 1 to avoid Dask/Rasterio conflicting on multi-threading
)
return dst_arr
### (NEW WRAPPER) TO RUN BLOCK FUNCTION FOR MULTIPROC CALL: ADD READING FOR RASTER BLOCK + RETURN DESTINATION BLOCK ID
def _wrapper_multiproc_reproject_per_block(rst, src_block_ids, dst_block_id, idx_d2s, block_ids, combined_meta, kwargs):
"""Wrapper to use reproject_per_block for multiprocessing."""
# Get source array block for each destination block
s = src_block_ids
src_arrs = (rst.icrop(bbox=(s[idx]["xs"], s[idx]["ys"], s[idx]["xe"], s[idx]["ye"])).data for idx in idx_d2s)
# Call reproject per block
dst_block_arr = _multiproc_reproject_per_block(*src_arrs, block_ids=block_ids, combined_meta=combined_meta, **kwargs)
return dst_block_arr, dst_block_id
### (SMALL CHANGES, MOSTLY AT THE END) FINAL REPROJECT FUNCTION
def reproject_multiproc(
rst: Any, # NEW INPUT FOR MULTIPROC (INSTEAD OF DASK ARRAY)
config: Any, # NEW INPUT FOR MULTIPROC
src_chunksizes: tuple[int, int], # NEW INPUT FOR MULTIPROC (INSTEAD OF BEING SAVED WITHIN DASK ARRAY)
src_transform: rio.transform.Affine,
src_crs: rio.crs.CRS,
dst_transform: rio.transform.Affine,
dst_shape: tuple[int, int],
dst_crs: rio.crs.CRS,
resampling: rio.enums.Resampling,
src_nodata: int | float | None = None,
dst_nodata: int | float | None = None,
dst_chunksizes: tuple[int, int] | None = None,
**kwargs: Any,
) -> None:
"""
Reproject georeferenced raster on out-of-memory chunks with multiprocessing
"""
# 1/ Define source and destination chunked georeferenced grid through simple classes storing CRS/transform/shape,
# which allow to consistently derive shape/transform for each block and their CRS-projected footprints
# Define georeferenced grids for source/destination array
src_geogrid = GeoGrid(transform=src_transform, shape=rst.shape, crs=src_crs)
dst_geogrid = GeoGrid(transform=dst_transform, shape=dst_shape, crs=dst_crs)
# Add the chunking
# For source, we can use the .chunks attribute
# SMALL CHANGE: GENERATE THE TUPLES OF CHUNKS SIMILARLY AS FOR DASK, BASED ON CHUNKSIZE INPUT
chunks_x = tuple((src_chunksizes[0] if i<=rst.shape[0] else rst.shape[0] % src_chunksizes[0])
for i in np.arange(src_chunksizes[0], rst.shape[0] + src_chunksizes[0], src_chunksizes[0]))
chunks_y = tuple((src_chunksizes[1] if i<=rst.shape[1] else rst.shape[1] % src_chunksizes[1])
for i in np.arange(src_chunksizes[1], rst.shape[1] + src_chunksizes[1], src_chunksizes[1]))
src_chunks = (chunks_x, chunks_y)
src_geotiling = ChunkedGeoGrid(grid=src_geogrid, chunks=src_chunks)
# For destination, we need to create the chunks based on destination chunksizes
if dst_chunksizes is None:
dst_chunksizes = src_chunksizes
dst_chunks = _chunks2d_from_chunksizes_shape(chunksizes=dst_chunksizes, shape=dst_shape)
dst_geotiling = ChunkedGeoGrid(grid=dst_geogrid, chunks=dst_chunks)
# 2/ Get footprints of tiles in CRS of destination array, with a buffer of 2 pixels for destination ones to ensure
# overlap, then map indexes of source blocks that intersect a given destination block
src_footprints = src_geotiling.get_block_footprints(crs=dst_crs)
dst_footprints = dst_geotiling.get_block_footprints().buffer(2 * max(dst_geogrid.res))
dest2source = [np.where(dst.intersects(src_footprints).values)[0] for dst in dst_footprints]
# 3/ To reconstruct a square source array during chunked reprojection, we need to derive the combined shape and
# transform of each tuples of source blocks
src_block_ids = np.array(src_geotiling.get_block_locations())
meta_params = [
(
_combined_blocks_shape_transform(sub_block_ids=src_block_ids[sbid], src_geogrid=src_geogrid) # type: ignore
if len(sbid) > 0
else ({}, [])
)
for sbid in dest2source
]
# We also add the output transform/shape for this destination chunk in the combined meta
# (those are the only two that are chunk-specific)
dst_block_geogrids = dst_geotiling.get_blocks_as_geogrids()
for i, (c, _) in enumerate(meta_params):
c.update({"dst_shape": dst_block_geogrids[i].shape, "dst_transform": tuple(dst_block_geogrids[i].transform)})
# 4/ Call a delayed function that uses rio.warp to reproject the combined source block(s) to each destination block
# Add fixed arguments to keywords
kwargs.update(
{
"src_nodata": src_nodata,
"dst_nodata": dst_nodata,
"resampling": resampling,
"src_crs": src_crs,
"dst_crs": dst_crs,
}
)
### FROM HERE: ADAPTED CODE FOR MULTIPROC
# SMALL CHANGE: RETURN BLOCK LOCATIONS TO EASILY WRITE OUTPUT
# (WASN'T NEEDED FOR DASK THAT JUST CONCATENATED BLOCKS IN THE RIGHT ORDER)
# Get location of destination blocks to write file
dst_block_ids = np.array(dst_geotiling.get_block_locations())
# MODIFY DASK LIST COMPREHENSION INTO LOOP TO LAUNCH TASKS
# ADDING SRC BLOCK IDS TO THE WRAPPER CALL TO LOAD WITH ICROP
# Create tasks for multiprocessing
tasks = []
for i in range(len(dest2source)):
tasks.append(
config.cluster.launch_task(
fun=_wrapper_multiproc_reproject_per_block, args=[rst, src_block_ids, dst_block_ids[i], dest2source[i], meta_params[i][1], meta_params[i][0], kwargs],
)
)
result_list = []
# get first tile to retrieve dtype and nodata
result_tile0, _ = config.cluster.get_res(tasks[0])
# WRITE OUTPUT AS IN GENERIC MULTIPROC PR
# Create a new raster file to save the processed results
with rio.open(
config.outfile,
"w",
driver="GTiff",
height=dst_shape[0],
width=dst_shape[1],
count=1,
dtype=rst.dtype,
crs=dst_crs,
transform=dst_transform,
nodata=dst_nodata,
) as dst:
try:
# Iterate over the tasks and retrieve the processed tiles
for results in tasks:
dst_block_arr, dst_block_id = config.cluster.get_res(results)
# Define the window in the output file where the tile should be written
dst_window = rio.windows.Window(
col_off=dst_block_id["xs"],
row_off=dst_block_id["ys"],
width=dst_block_id["xe"] - dst_block_id["xs"],
height=dst_block_id["ye"] - dst_block_id["ys"],
)
# Cast to 3D
dst_block_arr = dst_block_arr[np.newaxis, :, :]
# Write the processed tile to the appropriate location in the output file
dst.write(dst_block_arr, window=dst_window)
except Exception as e:
raise RuntimeError(f"Error retrieving data from multiprocessing tasks: {e}") |
It runs but I didn't test it thoroughly, so hopefully I didn't add any bug and it works well quickly 😉. Also, I should justify why
This way, they don't have to go through the _user_input_reproject and _get_reproj_params steps which stay consistent no matter if the method runs on the full array, or on chunks of the array.
I think we should do the same with the final |
Also, if you want to test the internals of this geoutils/tests/test_raster/test_delayed.py Line 409 in 4d38e65
This will define all the arguments needed to run the |
@rhugonnet many thanks for your work and your explications. Here is my test code: import matplotlib.pyplot as plt
import rasterio as rio
import geoutils as gu
from geoutils import Raster
from geoutils.raster.distributed_computing import MultiprocConfig
from geoutils.raster.distributed_computing.delayed_multiproc import reproject_multiproc
example = gu.examples.get_path("exploradores_aster_dem")
outfile = "test.tif"
config = MultiprocConfig(chunk_size=200, outfile=outfile)
r = Raster(example)
# - Test reprojection with CRS change -
out_crs = rio.crs.CRS.from_epsg(4326)
# Single-process reprojection
r_single = r.reproject(crs=out_crs)
# Multiprocessing reprojection
reproject_multiproc(r, config, crs=out_crs)
r_multi = Raster(outfile)
plt.figure()
r_single.plot()
plt.figure()
r_multi.plot()
plt.figure()
diff1 = r_single - r_multi
diff1.plot()
plt.show() Here is the ### (SMALL CHANGES, MOSTLY AT THE END) FINAL REPROJECT FUNCTION
def reproject_multiproc(
rst: RasterType,
config: MultiprocConfig,
ref: RasterType | str | None = None,
crs: CRS | str | int | None = None,
res: float | abc.Iterable[float] | None = None,
grid_size: tuple[int, int] | None = None,
bounds: rio.coords.BoundingBox | None = None,
nodata: int | float | None = None,
dtype: DTypeLike | None = None,
resampling: Resampling | str = Resampling.bilinear,
force_source_nodata: int | float | None = None,
**kwargs: Any,
) -> None:
"""
Reproject georeferenced raster on out-of-memory chunks with multiprocessing
"""
# Process user inputs
dst_crs, dst_dtype, src_nodata, dst_nodata, dst_res, dst_bounds = _user_input_reproject(
source_raster=rst,
ref=ref,
crs=crs,
bounds=bounds,
res=res,
nodata=nodata,
dtype=dtype,
force_source_nodata=force_source_nodata,
)
# Retrieve transform and grid_size
dst_transform, dst_grid_size = _get_target_georeferenced_grid(
rst, crs=dst_crs, grid_size=grid_size, res=dst_res, bounds=dst_bounds
)
dst_width, dst_height = dst_grid_size
dst_shape = (dst_height, dst_width)
# 1/ Define source and destination chunked georeferenced grid through simple classes storing CRS/transform/shape,
# which allow to consistently derive shape/transform for each block and their CRS-projected footprints
# Define georeferenced grids for source/destination array
src_geogrid = GeoGrid(transform=rst.transform, shape=rst.shape, crs=rst.crs)
dst_geogrid = GeoGrid(transform=dst_transform, shape=dst_shape, crs=dst_crs)
# Add the chunking
# For source, we can use the .chunks attribute
# SMALL CHANGE: GENERATE THE TUPLES OF CHUNKS SIMILARLY AS FOR DASK, BASED ON CHUNKSIZE INPUT
chunks_x = tuple((config.chunk_size if i<=rst.shape[0] else rst.shape[0] % config.chunk_size)
for i in np.arange(config.chunk_size, rst.shape[0] + config.chunk_size, config.chunk_size))
chunks_y = tuple((config.chunk_size if i<=rst.shape[1] else rst.shape[1] % config.chunk_size)
for i in np.arange(config.chunk_size, rst.shape[1] + config.chunk_size, config.chunk_size))
src_chunks = (chunks_x, chunks_y)
src_geotiling = ChunkedGeoGrid(grid=src_geogrid, chunks=src_chunks)
# For destination, we need to create the chunks based on destination chunksizes
dst_chunks = _chunks2d_from_chunksizes_shape(chunksizes=(config.chunk_size, config.chunk_size), shape=dst_shape)
dst_geotiling = ChunkedGeoGrid(grid=dst_geogrid, chunks=dst_chunks)
# 2/ Get footprints of tiles in CRS of destination array, with a buffer of 2 pixels for destination ones to ensure
# overlap, then map indexes of source blocks that intersect a given destination block
src_footprints = src_geotiling.get_block_footprints(crs=dst_crs)
dst_footprints = dst_geotiling.get_block_footprints().buffer(2 * max(dst_geogrid.res))
dest2source = [np.where(dst.intersects(src_footprints).values)[0] for dst in dst_footprints]
# 3/ To reconstruct a square source array during chunked reprojection, we need to derive the combined shape and
# transform of each tuples of source blocks
src_block_ids = np.array(src_geotiling.get_block_locations())
meta_params = [
(
_combined_blocks_shape_transform(sub_block_ids=src_block_ids[sbid], src_geogrid=src_geogrid) # type: ignore
if len(sbid) > 0
else ({}, [])
)
for sbid in dest2source
]
# We also add the output transform/shape for this destination chunk in the combined meta
# (those are the only two that are chunk-specific)
dst_block_geogrids = dst_geotiling.get_blocks_as_geogrids()
for i, (c, _) in enumerate(meta_params):
c.update({"dst_shape": dst_block_geogrids[i].shape, "dst_transform": tuple(dst_block_geogrids[i].transform)})
# 4/ Call a delayed function that uses rio.warp to reproject the combined source block(s) to each destination block
# Add fixed arguments to keywords
kwargs.update(
{
"src_nodata": src_nodata,
"dst_nodata": dst_nodata,
"resampling": resampling,
"src_crs": rst.crs,
"dst_crs": dst_crs,
}
)
### FROM HERE: ADAPTED CODE FOR MULTIPROC
# SMALL CHANGE: RETURN BLOCK LOCATIONS TO EASILY WRITE OUTPUT
# (WASN'T NEEDED FOR DASK THAT JUST CONCATENATED BLOCKS IN THE RIGHT ORDER)
# Get location of destination blocks to write file
dst_block_ids = np.array(dst_geotiling.get_block_locations())
# MODIFY DASK LIST COMPREHENSION INTO LOOP TO LAUNCH TASKS
# ADDING SRC BLOCK IDS TO THE WRAPPER CALL TO LOAD WITH ICROP
# Create tasks for multiprocessing
tasks = []
for i in range(len(dest2source)):
tasks.append(
config.cluster.launch_task(
fun=_wrapper_multiproc_reproject_per_block, args=[rst, src_block_ids, dst_block_ids[i], dest2source[i], meta_params[i][1], meta_params[i][0], kwargs],
)
)
result_list = []
# get first tile to retrieve dtype and nodata
result_tile0, _ = config.cluster.get_res(tasks[0])
# WRITE OUTPUT AS IN GENERIC MULTIPROC PR
# Create a new raster file to save the processed results
with rio.open(
config.outfile,
"w",
driver="GTiff",
height=dst_height,
width=dst_width,
count=1,
dtype=rst.dtype,
crs=dst_crs,
transform=dst_transform,
nodata=dst_nodata,
) as dst:
try:
# Iterate over the tasks and retrieve the processed tiles
for results in tasks:
dst_block_arr, dst_block_id = config.cluster.get_res(results)
# Define the window in the output file where the tile should be written
dst_window = rio.windows.Window(
col_off=dst_block_id["xs"],
row_off=dst_block_id["ys"],
width=dst_block_id["xe"] - dst_block_id["xs"],
height=dst_block_id["ye"] - dst_block_id["ys"],
)
# Cast to 3D
dst_block_arr = dst_block_arr[np.newaxis, :, :]
# Write the processed tile to the appropriate location in the output file
dst.write(dst_block_arr, window=dst_window)
except Exception as e:
raise RuntimeError(f"Error retrieving data from multiprocessing tasks: {e}") |
@vschaffn I can't reproduce this, my diff plot is as large as the DEMs, looks like the tiles might be inverted. Did you fix something in my code that is not above? |
@rhugonnet it certainly comes from the |
Resolves #648.
Description
This PR introduces raster interpolation and reprojection using multiprocessing to optimize memory usage and improve performance.
Code changes
multiproc_interp_points
leverages multiprocessing to create a separate process for each point to be interpolated. For each point, the function calculates the smallest possible raster window to open based on the interpolation method and point coordinates, and use theraster.crop
method to open the window. Theraster.interpolate
method is then applied to the cropped raster window, allowing for efficient interpolation without loading the full raster into memory.multiproc_reproject
utilizes raster tiling (see Implement Tiling for Multiprocessing #652) to perform multiprocessing-based reprojection. A separate process is created for each tile of the raster. Each process opens the corresponding tile using theraster.icrop
method, calculates the bounds of the reprojected tile, snaps the bounds to the target reprojected grid, and reprojects the tile using theraster.reproject
method. The reprojected tiles are written separately to disk, with safeguards in place to prevent data overwriting.Tests
multiproc_interp_points
andmultiproc_reproject
. The test results are compared against the behavior of theraster.interpolate
andraster.reproject
methods to ensure consistency.Note:
Currently, there are some tile alignment issues when performing reprojection operations that involve more than just translation. Further investigation is required to address these challenges.
Difference between tiled reprojection and classic reprojection (exploradores_aster_dem):