3RNN/Lib/site-packages/pandas/tests/io/test_http_headers.py
2024-05-26 19:49:15 +02:00

173 lines
4.6 KiB
Python

"""
Tests for the pandas custom headers in http(s) requests
"""
from functools import partial
import gzip
from io import BytesIO
import pytest
import pandas.util._test_decorators as td
import pandas as pd
import pandas._testing as tm
pytestmark = [
pytest.mark.single_cpu,
pytest.mark.network,
pytest.mark.filterwarnings(
"ignore:Passing a BlockManager to DataFrame:DeprecationWarning"
),
]
def gzip_bytes(response_bytes):
with BytesIO() as bio:
with gzip.GzipFile(fileobj=bio, mode="w") as zipper:
zipper.write(response_bytes)
return bio.getvalue()
def csv_responder(df):
return df.to_csv(index=False).encode("utf-8")
def gz_csv_responder(df):
return gzip_bytes(csv_responder(df))
def json_responder(df):
return df.to_json().encode("utf-8")
def gz_json_responder(df):
return gzip_bytes(json_responder(df))
def html_responder(df):
return df.to_html(index=False).encode("utf-8")
def parquetpyarrow_reponder(df):
return df.to_parquet(index=False, engine="pyarrow")
def parquetfastparquet_responder(df):
# the fastparquet engine doesn't like to write to a buffer
# it can do it via the open_with function being set appropriately
# however it automatically calls the close method and wipes the buffer
# so just overwrite that attribute on this instance to not do that
# protected by an importorskip in the respective test
import fsspec
df.to_parquet(
"memory://fastparquet_user_agent.parquet",
index=False,
engine="fastparquet",
compression=None,
)
with fsspec.open("memory://fastparquet_user_agent.parquet", "rb") as f:
return f.read()
def pickle_respnder(df):
with BytesIO() as bio:
df.to_pickle(bio)
return bio.getvalue()
def stata_responder(df):
with BytesIO() as bio:
df.to_stata(bio, write_index=False)
return bio.getvalue()
@pytest.mark.parametrize(
"responder, read_method",
[
(csv_responder, pd.read_csv),
(json_responder, pd.read_json),
(
html_responder,
lambda *args, **kwargs: pd.read_html(*args, **kwargs)[0],
),
pytest.param(
parquetpyarrow_reponder,
partial(pd.read_parquet, engine="pyarrow"),
marks=td.skip_if_no("pyarrow"),
),
pytest.param(
parquetfastparquet_responder,
partial(pd.read_parquet, engine="fastparquet"),
# TODO(ArrayManager) fastparquet
marks=[
td.skip_if_no("fastparquet"),
td.skip_if_no("fsspec"),
td.skip_array_manager_not_yet_implemented,
],
),
(pickle_respnder, pd.read_pickle),
(stata_responder, pd.read_stata),
(gz_csv_responder, pd.read_csv),
(gz_json_responder, pd.read_json),
],
)
@pytest.mark.parametrize(
"storage_options",
[
None,
{"User-Agent": "foo"},
{"User-Agent": "foo", "Auth": "bar"},
],
)
def test_request_headers(responder, read_method, httpserver, storage_options):
expected = pd.DataFrame({"a": ["b"]})
default_headers = ["Accept-Encoding", "Host", "Connection", "User-Agent"]
if "gz" in responder.__name__:
extra = {"Content-Encoding": "gzip"}
if storage_options is None:
storage_options = extra
else:
storage_options |= extra
else:
extra = None
expected_headers = set(default_headers).union(
storage_options.keys() if storage_options else []
)
httpserver.serve_content(content=responder(expected), headers=extra)
result = read_method(httpserver.url, storage_options=storage_options)
tm.assert_frame_equal(result, expected)
request_headers = dict(httpserver.requests[0].headers)
for header in expected_headers:
exp = request_headers.pop(header)
if storage_options and header in storage_options:
assert exp == storage_options[header]
# No extra headers added
assert not request_headers
@pytest.mark.parametrize(
"engine",
[
"pyarrow",
"fastparquet",
],
)
def test_to_parquet_to_disk_with_storage_options(engine):
headers = {
"User-Agent": "custom",
"Auth": "other_custom",
}
pytest.importorskip(engine)
true_df = pd.DataFrame({"column_name": ["column_value"]})
msg = (
"storage_options passed with file object or non-fsspec file path|"
"storage_options passed with buffer, or non-supported URL"
)
with pytest.raises(ValueError, match=msg):
true_df.to_parquet("/tmp/junk.parquet", storage_options=headers, engine=engine)