Traktor/myenv/Lib/site-packages/fsspec/implementations/tests/test_archive.py
2024-05-26 05:12:46 +02:00

383 lines
13 KiB
Python

import bz2
import gzip
import lzma
import os
import pickle
import tarfile
import tempfile
import zipfile
from contextlib import contextmanager
from io import BytesIO
import pytest
import fsspec
# The blueprint to create synthesized archive files from.
archive_data = {"a": b"", "b": b"hello", "deeply/nested/path": b"stuff"}
@contextmanager
def tempzip(data=None):
"""
Provide test cases with temporary synthesized Zip archives.
"""
data = data or {}
f = tempfile.mkstemp(suffix=".zip")[1]
with zipfile.ZipFile(f, mode="w") as z:
for k, v in data.items():
z.writestr(k, v)
try:
yield f
finally:
try:
os.remove(f)
except OSError:
pass
@contextmanager
def temparchive(data=None):
"""
Provide test cases with temporary synthesized 7-Zip archives.
"""
data = data or {}
libarchive = pytest.importorskip("libarchive")
f = tempfile.mkstemp(suffix=".7z")[1]
with libarchive.file_writer(f, "7zip") as archive:
for k, v in data.items():
archive.add_file_from_memory(entry_path=k, entry_size=len(v), entry_data=v)
try:
yield f
finally:
try:
os.remove(f)
except OSError:
pass
@contextmanager
def temptar(data=None, mode="w", suffix=".tar"):
"""
Provide test cases with temporary synthesized .tar archives.
"""
data = data or {}
fn = tempfile.mkstemp(suffix=suffix)[1]
with tarfile.TarFile.open(fn, mode=mode) as t:
touched = {}
for name, data in data.items():
# Create directory hierarchy.
# https://bugs.python.org/issue22208#msg225558
if "/" in name and name not in touched:
parts = os.path.dirname(name).split("/")
for index in range(1, len(parts) + 1):
info = tarfile.TarInfo("/".join(parts[:index]))
info.type = tarfile.DIRTYPE
t.addfile(info)
touched[name] = True
# Add file content.
info = tarfile.TarInfo(name=name)
info.size = len(data)
t.addfile(info, BytesIO(data))
try:
yield fn
finally:
try:
os.remove(fn)
except OSError:
pass
@contextmanager
def temptargz(data=None, mode="w", suffix=".tar.gz"):
"""
Provide test cases with temporary synthesized .tar.gz archives.
"""
with temptar(data=data, mode=mode) as tarname:
fn = tempfile.mkstemp(suffix=suffix)[1]
with open(tarname, "rb") as tar:
cf = gzip.GzipFile(filename=fn, mode=mode)
cf.write(tar.read())
cf.close()
try:
yield fn
finally:
try:
os.remove(fn)
except OSError:
pass
@contextmanager
def temptarbz2(data=None, mode="w", suffix=".tar.bz2"):
"""
Provide test cases with temporary synthesized .tar.bz2 archives.
"""
with temptar(data=data, mode=mode) as tarname:
fn = tempfile.mkstemp(suffix=suffix)[1]
with open(tarname, "rb") as tar:
cf = bz2.BZ2File(filename=fn, mode=mode)
cf.write(tar.read())
cf.close()
try:
yield fn
finally:
try:
os.remove(fn)
except OSError:
pass
@contextmanager
def temptarxz(data=None, mode="w", suffix=".tar.xz"):
"""
Provide test cases with temporary synthesized .tar.xz archives.
"""
with temptar(data=data, mode=mode) as tarname:
fn = tempfile.mkstemp(suffix=suffix)[1]
with open(tarname, "rb") as tar:
cf = lzma.open(filename=fn, mode=mode, format=lzma.FORMAT_XZ)
cf.write(tar.read())
cf.close()
try:
yield fn
finally:
try:
os.remove(fn)
except OSError:
pass
class ArchiveTestScenario:
"""
Describe a test scenario for any type of archive.
"""
def __init__(self, protocol=None, provider=None, variant=None):
# The filesystem protocol identifier. Any of "zip", "tar" or "libarchive".
self.protocol = protocol
# A contextmanager function to provide temporary synthesized archives.
self.provider = provider
# The filesystem protocol variant identifier. Any of "gz", "bz2" or "xz".
self.variant = variant
def pytest_generate_tests(metafunc):
"""
Generate test scenario parametrization arguments with appropriate labels (idlist).
On the one hand, this yields an appropriate output like::
fsspec/implementations/tests/test_archive.py::TestArchive::test_empty[zip] PASSED # noqa
On the other hand, it will support perfect test discovery, like::
pytest fsspec -vvv -k "zip or tar or libarchive"
https://docs.pytest.org/en/latest/example/parametrize.html#a-quick-port-of-testscenarios
"""
idlist = []
argnames = ["scenario"]
argvalues = []
for scenario in metafunc.cls.scenarios:
scenario: ArchiveTestScenario = scenario
label = scenario.protocol
if scenario.variant:
label += "-" + scenario.variant
idlist.append(label)
argvalues.append([scenario])
metafunc.parametrize(argnames, argvalues, ids=idlist, scope="class")
# Define test scenarios.
scenario_zip = ArchiveTestScenario(protocol="zip", provider=tempzip)
scenario_tar = ArchiveTestScenario(protocol="tar", provider=temptar)
scenario_targz = ArchiveTestScenario(protocol="tar", provider=temptargz, variant="gz")
scenario_tarbz2 = ArchiveTestScenario(
protocol="tar", provider=temptarbz2, variant="bz2"
)
scenario_tarxz = ArchiveTestScenario(protocol="tar", provider=temptarxz, variant="xz")
scenario_libarchive = ArchiveTestScenario(protocol="libarchive", provider=temparchive)
class TestAnyArchive:
"""
Validate that all filesystem adapter implementations for archive files
will adhere to the same specification.
"""
scenarios = [
scenario_zip,
scenario_tar,
scenario_targz,
scenario_tarbz2,
scenario_tarxz,
scenario_libarchive,
]
def test_repr(self, scenario: ArchiveTestScenario):
with scenario.provider() as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
assert repr(fs).startswith("<Archive-like object")
def test_empty(self, scenario: ArchiveTestScenario):
with scenario.provider() as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
assert fs.find("") == []
assert fs.find("", withdirs=True) == []
with pytest.raises(FileNotFoundError):
fs.info("")
assert fs.ls("") == []
def test_glob(self, scenario: ArchiveTestScenario):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
assert fs.glob("*/*/*th") == ["deeply/nested/path"]
def test_mapping(self, scenario: ArchiveTestScenario):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
m = fs.get_mapper()
assert list(m) == ["a", "b", "deeply/nested/path"]
assert m["b"] == archive_data["b"]
def test_pickle(self, scenario: ArchiveTestScenario):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
fs2 = pickle.loads(pickle.dumps(fs))
assert fs2.cat("b") == b"hello"
def test_all_dirnames(self, scenario: ArchiveTestScenario):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
# fx are files, dx are a directories
assert fs._all_dirnames([]) == set()
assert fs._all_dirnames(["f1"]) == set()
assert fs._all_dirnames(["f1", "f2"]) == set()
assert fs._all_dirnames(["f1", "f2", "d1/f1"]) == {"d1"}
assert fs._all_dirnames(["f1", "d1/f1", "d1/f2"]) == {"d1"}
assert fs._all_dirnames(["f1", "d1/f1", "d2/f1"]) == {"d1", "d2"}
assert fs._all_dirnames(["d1/d1/d1/f1"]) == {"d1", "d1/d1", "d1/d1/d1"}
def test_ls(self, scenario: ArchiveTestScenario):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
assert fs.ls("", detail=False) == ["a", "b", "deeply"]
assert fs.ls("/") == fs.ls("")
assert fs.ls("deeply", detail=False) == ["deeply/nested"]
assert fs.ls("deeply/") == fs.ls("deeply")
assert fs.ls("deeply/nested", detail=False) == ["deeply/nested/path"]
assert fs.ls("deeply/nested/") == fs.ls("deeply/nested")
def test_find(self, scenario: ArchiveTestScenario):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
assert fs.find("") == ["a", "b", "deeply/nested/path"]
assert fs.find("", withdirs=True) == [
"a",
"b",
"deeply",
"deeply/nested",
"deeply/nested/path",
]
assert fs.find("deeply") == ["deeply/nested/path"]
assert fs.find("deeply/") == fs.find("deeply")
@pytest.mark.parametrize("topdown", [True, False])
@pytest.mark.parametrize("prune_nested", [True, False])
def test_walk(self, scenario: ArchiveTestScenario, topdown, prune_nested):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
expected = [
# (dirname, list of subdirs, list of files)
("", ["deeply"], ["a", "b"]),
("deeply", ["nested"], []),
]
if not topdown or not prune_nested:
expected.append(("deeply/nested", [], ["path"]))
if not topdown:
expected.reverse()
result = []
for path, dirs, files in fs.walk("", topdown=topdown):
result.append((path, dirs.copy(), files))
# Bypass the "nested" dir
if prune_nested and "nested" in dirs:
dirs.remove("nested")
# prior py3.10 zip() does not support strict=True, we need
# a manual len check here
assert len(result) == len(expected)
for lhs, rhs in zip(result, expected):
assert lhs[0] == rhs[0]
assert sorted(lhs[1]) == sorted(rhs[1])
assert sorted(lhs[2]) == sorted(rhs[2])
def test_info(self, scenario: ArchiveTestScenario):
# https://github.com/Suor/funcy/blob/1.15/funcy/colls.py#L243-L245
def project(mapping, keys):
"""Leaves only given keys in mapping."""
return {k: mapping[k] for k in keys if k in mapping}
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
with pytest.raises(FileNotFoundError):
fs.info("i-do-not-exist")
# Iterate over all directories.
for d in fs._all_dirnames(archive_data.keys()):
lhs = project(fs.info(d), ["name", "size", "type"])
expected = {"name": f"{d}", "size": 0, "type": "directory"}
assert lhs == expected
# Iterate over all files.
for f, v in archive_data.items():
lhs = fs.info(f)
assert lhs["name"] == f
assert lhs["size"] == len(v)
assert lhs["type"] == "file"
@pytest.mark.parametrize("scale", [128, 512, 4096])
def test_isdir_isfile(self, scenario: ArchiveTestScenario, scale: int):
def make_nested_dir(i):
x = f"{i}"
table = x.maketrans("0123456789", "ABCDEFGHIJ")
return "/".join(x.translate(table))
scaled_data = {f"{make_nested_dir(i)}/{i}": b"" for i in range(1, scale + 1)}
with scenario.provider(scaled_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
lhs_dirs, lhs_files = (
fs._all_dirnames(scaled_data.keys()),
scaled_data.keys(),
)
# Warm-up the Cache, this is done in both cases anyways...
fs._get_dirs()
entries = lhs_files | lhs_dirs
assert lhs_dirs == {e for e in entries if fs.isdir(e)}
assert lhs_files == {e for e in entries if fs.isfile(e)}
def test_read_empty_file(self, scenario: ArchiveTestScenario):
with scenario.provider(archive_data) as archive:
fs = fsspec.filesystem(scenario.protocol, fo=archive)
assert fs.open("a").read() == b""