173 lines
4.6 KiB
Python
173 lines
4.6 KiB
Python
|
"""
|
||
|
Tests for the pandas custom headers in http(s) requests
|
||
|
"""
|
||
|
from functools import partial
|
||
|
import gzip
|
||
|
from io import BytesIO
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
import pandas.util._test_decorators as td
|
||
|
|
||
|
import pandas as pd
|
||
|
import pandas._testing as tm
|
||
|
|
||
|
pytestmark = [
|
||
|
pytest.mark.single_cpu,
|
||
|
pytest.mark.network,
|
||
|
pytest.mark.filterwarnings(
|
||
|
"ignore:Passing a BlockManager to DataFrame:DeprecationWarning"
|
||
|
),
|
||
|
]
|
||
|
|
||
|
|
||
|
def gzip_bytes(response_bytes):
|
||
|
with BytesIO() as bio:
|
||
|
with gzip.GzipFile(fileobj=bio, mode="w") as zipper:
|
||
|
zipper.write(response_bytes)
|
||
|
return bio.getvalue()
|
||
|
|
||
|
|
||
|
def csv_responder(df):
|
||
|
return df.to_csv(index=False).encode("utf-8")
|
||
|
|
||
|
|
||
|
def gz_csv_responder(df):
|
||
|
return gzip_bytes(csv_responder(df))
|
||
|
|
||
|
|
||
|
def json_responder(df):
|
||
|
return df.to_json().encode("utf-8")
|
||
|
|
||
|
|
||
|
def gz_json_responder(df):
|
||
|
return gzip_bytes(json_responder(df))
|
||
|
|
||
|
|
||
|
def html_responder(df):
|
||
|
return df.to_html(index=False).encode("utf-8")
|
||
|
|
||
|
|
||
|
def parquetpyarrow_reponder(df):
|
||
|
return df.to_parquet(index=False, engine="pyarrow")
|
||
|
|
||
|
|
||
|
def parquetfastparquet_responder(df):
|
||
|
# the fastparquet engine doesn't like to write to a buffer
|
||
|
# it can do it via the open_with function being set appropriately
|
||
|
# however it automatically calls the close method and wipes the buffer
|
||
|
# so just overwrite that attribute on this instance to not do that
|
||
|
|
||
|
# protected by an importorskip in the respective test
|
||
|
import fsspec
|
||
|
|
||
|
df.to_parquet(
|
||
|
"memory://fastparquet_user_agent.parquet",
|
||
|
index=False,
|
||
|
engine="fastparquet",
|
||
|
compression=None,
|
||
|
)
|
||
|
with fsspec.open("memory://fastparquet_user_agent.parquet", "rb") as f:
|
||
|
return f.read()
|
||
|
|
||
|
|
||
|
def pickle_respnder(df):
|
||
|
with BytesIO() as bio:
|
||
|
df.to_pickle(bio)
|
||
|
return bio.getvalue()
|
||
|
|
||
|
|
||
|
def stata_responder(df):
|
||
|
with BytesIO() as bio:
|
||
|
df.to_stata(bio, write_index=False)
|
||
|
return bio.getvalue()
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"responder, read_method",
|
||
|
[
|
||
|
(csv_responder, pd.read_csv),
|
||
|
(json_responder, pd.read_json),
|
||
|
(
|
||
|
html_responder,
|
||
|
lambda *args, **kwargs: pd.read_html(*args, **kwargs)[0],
|
||
|
),
|
||
|
pytest.param(
|
||
|
parquetpyarrow_reponder,
|
||
|
partial(pd.read_parquet, engine="pyarrow"),
|
||
|
marks=td.skip_if_no("pyarrow"),
|
||
|
),
|
||
|
pytest.param(
|
||
|
parquetfastparquet_responder,
|
||
|
partial(pd.read_parquet, engine="fastparquet"),
|
||
|
# TODO(ArrayManager) fastparquet
|
||
|
marks=[
|
||
|
td.skip_if_no("fastparquet"),
|
||
|
td.skip_if_no("fsspec"),
|
||
|
td.skip_array_manager_not_yet_implemented,
|
||
|
],
|
||
|
),
|
||
|
(pickle_respnder, pd.read_pickle),
|
||
|
(stata_responder, pd.read_stata),
|
||
|
(gz_csv_responder, pd.read_csv),
|
||
|
(gz_json_responder, pd.read_json),
|
||
|
],
|
||
|
)
|
||
|
@pytest.mark.parametrize(
|
||
|
"storage_options",
|
||
|
[
|
||
|
None,
|
||
|
{"User-Agent": "foo"},
|
||
|
{"User-Agent": "foo", "Auth": "bar"},
|
||
|
],
|
||
|
)
|
||
|
def test_request_headers(responder, read_method, httpserver, storage_options):
|
||
|
expected = pd.DataFrame({"a": ["b"]})
|
||
|
default_headers = ["Accept-Encoding", "Host", "Connection", "User-Agent"]
|
||
|
if "gz" in responder.__name__:
|
||
|
extra = {"Content-Encoding": "gzip"}
|
||
|
if storage_options is None:
|
||
|
storage_options = extra
|
||
|
else:
|
||
|
storage_options |= extra
|
||
|
else:
|
||
|
extra = None
|
||
|
expected_headers = set(default_headers).union(
|
||
|
storage_options.keys() if storage_options else []
|
||
|
)
|
||
|
httpserver.serve_content(content=responder(expected), headers=extra)
|
||
|
result = read_method(httpserver.url, storage_options=storage_options)
|
||
|
tm.assert_frame_equal(result, expected)
|
||
|
|
||
|
request_headers = dict(httpserver.requests[0].headers)
|
||
|
for header in expected_headers:
|
||
|
exp = request_headers.pop(header)
|
||
|
if storage_options and header in storage_options:
|
||
|
assert exp == storage_options[header]
|
||
|
# No extra headers added
|
||
|
assert not request_headers
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"engine",
|
||
|
[
|
||
|
"pyarrow",
|
||
|
"fastparquet",
|
||
|
],
|
||
|
)
|
||
|
def test_to_parquet_to_disk_with_storage_options(engine):
|
||
|
headers = {
|
||
|
"User-Agent": "custom",
|
||
|
"Auth": "other_custom",
|
||
|
}
|
||
|
|
||
|
pytest.importorskip(engine)
|
||
|
|
||
|
true_df = pd.DataFrame({"column_name": ["column_value"]})
|
||
|
msg = (
|
||
|
"storage_options passed with file object or non-fsspec file path|"
|
||
|
"storage_options passed with buffer, or non-supported URL"
|
||
|
)
|
||
|
with pytest.raises(ValueError, match=msg):
|
||
|
true_df.to_parquet("/tmp/junk.parquet", storage_options=headers, engine=engine)
|