Windows bugfix: safe rename if renaming file onto itself (#43456)
* Generally use os.replace on Windows and Linux * Windows behavior for os.replace differs when the destination exists and is a symlink to a directory: on Linux the dst is replaced and on Windows this fails - this PR makes Windows behave like Linux (by deleting the dst before doing the rename unless src and dst are the same)
This commit is contained in:
@@ -198,15 +198,32 @@ def getuid():
|
|||||||
return os.getuid()
|
return os.getuid()
|
||||||
|
|
||||||
|
|
||||||
|
def _win_rename(src, dst):
|
||||||
|
# os.replace will still fail if on Windows (but not POSIX) if the dst
|
||||||
|
# is a symlink to a directory (all other cases have parity Windows <-> Posix)
|
||||||
|
if os.path.islink(dst) and os.path.isdir(os.path.realpath(dst)):
|
||||||
|
if os.path.samefile(src, dst):
|
||||||
|
# src and dst are the same
|
||||||
|
# do nothing and exit early
|
||||||
|
return
|
||||||
|
# If dst exists and is a symlink to a directory
|
||||||
|
# we need to remove dst and then perform rename/replace
|
||||||
|
# this is safe to do as there's no chance src == dst now
|
||||||
|
os.remove(dst)
|
||||||
|
os.replace(src, dst)
|
||||||
|
|
||||||
|
|
||||||
@system_path_filter
|
@system_path_filter
|
||||||
def rename(src, dst):
|
def rename(src, dst):
|
||||||
# On Windows, os.rename will fail if the destination file already exists
|
# On Windows, os.rename will fail if the destination file already exists
|
||||||
|
# os.replace is the same as os.rename on POSIX and is MoveFileExW w/
|
||||||
|
# the MOVEFILE_REPLACE_EXISTING flag on Windows
|
||||||
|
# Windows invocation is abstracted behind additonal logic handling
|
||||||
|
# remaining cases of divergent behavior accross platforms
|
||||||
if sys.platform == "win32":
|
if sys.platform == "win32":
|
||||||
# Windows path existence checks will sometimes fail on junctions/links/symlinks
|
_win_rename(src, dst)
|
||||||
# so check for that case
|
else:
|
||||||
if os.path.exists(dst) or islink(dst):
|
os.replace(src, dst)
|
||||||
os.remove(dst)
|
|
||||||
os.rename(src, dst)
|
|
||||||
|
|
||||||
|
|
||||||
@system_path_filter
|
@system_path_filter
|
||||||
|
|||||||
@@ -9,6 +9,7 @@
|
|||||||
import shutil
|
import shutil
|
||||||
import stat
|
import stat
|
||||||
import sys
|
import sys
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@@ -908,3 +909,93 @@ def test_find_first_file(tmpdir, bfs_depth):
|
|||||||
|
|
||||||
# Should find first dir
|
# Should find first dir
|
||||||
assert os.path.samefile(fs.find_first(root, "a", bfs_depth=bfs_depth), os.path.join(root, "a"))
|
assert os.path.samefile(fs.find_first(root, "a", bfs_depth=bfs_depth), os.path.join(root, "a"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_rename_dest_exists(tmpdir):
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def setup_test_files():
|
||||||
|
a = tmpdir.join("a", "file1")
|
||||||
|
b = tmpdir.join("a", "file2")
|
||||||
|
fs.touchp(a)
|
||||||
|
fs.touchp(b)
|
||||||
|
with open(a, "w") as oa, open(b, "w") as ob:
|
||||||
|
oa.write("I am A")
|
||||||
|
ob.write("I am B")
|
||||||
|
yield a, b
|
||||||
|
shutil.rmtree(tmpdir.join("a"))
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def setup_test_dirs():
|
||||||
|
a = tmpdir.join("d", "a")
|
||||||
|
b = tmpdir.join("d", "b")
|
||||||
|
fs.mkdirp(a)
|
||||||
|
fs.mkdirp(b)
|
||||||
|
yield a, b
|
||||||
|
shutil.rmtree(tmpdir.join("d"))
|
||||||
|
|
||||||
|
# test standard behavior of rename
|
||||||
|
# smoke test
|
||||||
|
with setup_test_files() as files:
|
||||||
|
a, b = files
|
||||||
|
fs.rename(str(a), str(b))
|
||||||
|
assert os.path.exists(b)
|
||||||
|
assert not os.path.exists(a)
|
||||||
|
with open(b, "r") as ob:
|
||||||
|
content = ob.read()
|
||||||
|
assert content == "I am A"
|
||||||
|
|
||||||
|
# test relatitve paths
|
||||||
|
# another sanity check/smoke test
|
||||||
|
with setup_test_files() as files:
|
||||||
|
a, b = files
|
||||||
|
with fs.working_dir(str(tmpdir)):
|
||||||
|
fs.rename(os.path.join("a", "file1"), os.path.join("a", "file2"))
|
||||||
|
assert os.path.exists(b)
|
||||||
|
assert not os.path.exists(a)
|
||||||
|
with open(b, "r") as ob:
|
||||||
|
content = ob.read()
|
||||||
|
assert content == "I am A"
|
||||||
|
|
||||||
|
# Test rename symlinks to same file
|
||||||
|
c = tmpdir.join("a", "file1")
|
||||||
|
a = tmpdir.join("a", "link1")
|
||||||
|
b = tmpdir.join("a", "link2")
|
||||||
|
fs.touchp(c)
|
||||||
|
symlink(c, a)
|
||||||
|
symlink(c, b)
|
||||||
|
fs.rename(str(a), str(b))
|
||||||
|
assert os.path.exists(b)
|
||||||
|
assert not os.path.exists(a)
|
||||||
|
assert os.path.realpath(b) == c
|
||||||
|
shutil.rmtree(tmpdir.join("a"))
|
||||||
|
|
||||||
|
# test rename onto itself
|
||||||
|
a = tmpdir.join("a", "file1")
|
||||||
|
b = a
|
||||||
|
fs.touchp(a)
|
||||||
|
with open(a, "w") as oa:
|
||||||
|
oa.write("I am A")
|
||||||
|
fs.rename(str(a), str(b))
|
||||||
|
# check a, or b, doesn't matter, same file
|
||||||
|
assert os.path.exists(a)
|
||||||
|
# ensure original file was not duplicated
|
||||||
|
assert len(os.listdir(tmpdir.join("a"))) == 1
|
||||||
|
with open(a, "r") as oa:
|
||||||
|
assert oa.read()
|
||||||
|
shutil.rmtree(tmpdir.join("a"))
|
||||||
|
|
||||||
|
# test rename onto symlink
|
||||||
|
# to directory from symlink to directory
|
||||||
|
# (this is something spack does when regenerating views)
|
||||||
|
with setup_test_dirs() as dirs:
|
||||||
|
a, b = dirs
|
||||||
|
link1 = tmpdir.join("f", "link1")
|
||||||
|
link2 = tmpdir.join("f", "link2")
|
||||||
|
fs.mkdirp(tmpdir.join("f"))
|
||||||
|
symlink(a, link1)
|
||||||
|
symlink(b, link2)
|
||||||
|
fs.rename(str(link1), str(link2))
|
||||||
|
assert os.path.exists(link2)
|
||||||
|
assert os.path.realpath(link2) == a
|
||||||
|
shutil.rmtree(tmpdir.join("f"))
|
||||||
|
|||||||
Reference in New Issue
Block a user