Traktor/myenv/Lib/site-packages/fsspec/tests/test_fuse.py
2024-05-26 05:12:46 +02:00

148 lines
3.7 KiB
Python

import os
import subprocess
import time
from multiprocessing import Process
import pytest
try:
pytest.importorskip("fuse") # noqa: E402
except OSError:
# can succeed in importing fuse, but fail to load so
pytest.importorskip("nonexistent") # noqa: E402
from fsspec.fuse import main, run
from fsspec.implementations.memory import MemoryFileSystem
def host_fuse(mountdir):
fs = MemoryFileSystem()
fs.touch("/mounted/testfile")
run(fs, "/mounted/", mountdir)
def test_basic(tmpdir, capfd):
mountdir = str(tmpdir.mkdir("mount"))
fuse_process = Process(target=host_fuse, args=(str(mountdir),))
fuse_process.start()
try:
timeout = 10
while True:
try:
# can fail with device not ready while waiting for fuse
if "testfile" in os.listdir(mountdir):
break
except Exception:
pass
timeout -= 1
time.sleep(1)
if not timeout > 0:
import pdb
pdb.set_trace()
pytest.skip(msg="fuse didn't come live")
fn = os.path.join(mountdir, "test")
with open(fn, "wb") as f:
f.write(b"data")
with open(fn) as f:
assert f.read() == "data"
os.remove(fn)
os.mkdir(fn)
assert os.listdir(fn) == []
os.mkdir(fn + "/inner")
with pytest.raises(OSError):
os.rmdir(fn)
captured = capfd.readouterr()
assert "Traceback" not in captured.out
assert "Traceback" not in captured.err
os.rmdir(fn + "/inner")
os.rmdir(fn)
finally:
fuse_process.terminate()
fuse_process.join(timeout=10)
if fuse_process.is_alive():
fuse_process.kill()
fuse_process.join()
def host_mount_local(source_dir, mount_dir, debug_log):
main(["local", source_dir, mount_dir, "-l", debug_log, "--ready-file"])
@pytest.fixture()
def mount_local(tmpdir):
source_dir = tmpdir.mkdir("source")
mount_dir = tmpdir.mkdir("local")
debug_log = tmpdir / "debug.log"
fuse_process = Process(
target=host_mount_local, args=(str(source_dir), str(mount_dir), str(debug_log))
)
fuse_process.start()
ready_file = mount_dir / ".fuse_ready"
for _ in range(20):
if ready_file.exists() and open(ready_file).read() == b"ready":
break
time.sleep(0.1)
try:
yield (source_dir, mount_dir)
finally:
fuse_process.terminate()
fuse_process.join(timeout=10)
if fuse_process.is_alive():
fuse_process.kill()
fuse_process.join()
def test_mount(mount_local):
source_dir, mount_dir = mount_local
assert os.listdir(mount_dir) == []
assert os.listdir(source_dir) == []
mount_dir.mkdir("a")
assert os.listdir(mount_dir) == ["a"]
assert os.listdir(source_dir) == ["a"]
def test_chmod(mount_local):
source_dir, mount_dir = mount_local
open(mount_dir / "text", "w").write("test")
assert os.listdir(source_dir) == ["text"]
cp = subprocess.run(
["cp", str(mount_dir / "text"), str(mount_dir / "new")],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
assert cp.stderr == b""
assert cp.stdout == b""
assert set(os.listdir(source_dir)) == {"text", "new"}
assert open(mount_dir / "new").read() == "test"
def test_seek_rw(mount_local):
source_dir, mount_dir = mount_local
fh = open(mount_dir / "text", "w")
fh.write("teST")
fh.seek(2)
fh.write("st")
fh.close()
fh = open(mount_dir / "text", "r")
assert fh.read() == "test"
fh.seek(2)
assert fh.read() == "st"
fh.close()