Inzynierka/Lib/site-packages/pandas/_libs/tslibs/vectorized.pyx

380 lines
11 KiB
Cython
Raw Permalink Normal View History

2023-06-02 12:51:02 +02:00
cimport cython
from cpython.datetime cimport (
date,
datetime,
time,
tzinfo,
)
import numpy as np
cimport numpy as cnp
from numpy cimport (
int64_t,
ndarray,
)
cnp.import_array()
from .dtypes import Resolution
from .dtypes cimport (
c_Resolution,
periods_per_day,
)
from .nattype cimport (
NPY_NAT,
c_NaT as NaT,
)
from .np_datetime cimport (
NPY_DATETIMEUNIT,
NPY_FR_ns,
npy_datetimestruct,
pandas_datetime_to_datetimestruct,
)
from .period cimport get_period_ordinal
from .timestamps cimport create_timestamp_from_ts
from .timezones cimport is_utc
from .tzconversion cimport Localizer
@cython.boundscheck(False)
@cython.wraparound(False)
def tz_convert_from_utc(ndarray stamps, tzinfo tz, NPY_DATETIMEUNIT reso=NPY_FR_ns):
# stamps is int64_t, arbitrary ndim
"""
Convert the values (in i8) from UTC to tz
Parameters
----------
stamps : ndarray[int64]
tz : tzinfo
Returns
-------
ndarray[int64]
"""
cdef:
Localizer info = Localizer(tz, creso=reso)
int64_t utc_val, local_val
Py_ssize_t pos, i, n = stamps.size
ndarray result
cnp.broadcast mi
if tz is None or is_utc(tz) or stamps.size == 0:
# Much faster than going through the "standard" pattern below
return stamps.copy()
result = cnp.PyArray_EMPTY(stamps.ndim, stamps.shape, cnp.NPY_INT64, 0)
mi = cnp.PyArray_MultiIterNew2(result, stamps)
for i in range(n):
# Analogous to: utc_val = stamps[i]
utc_val = (<int64_t*>cnp.PyArray_MultiIter_DATA(mi, 1))[0]
if utc_val == NPY_NAT:
local_val = NPY_NAT
else:
local_val = info.utc_val_to_local_val(utc_val, &pos)
# Analogous to: result[i] = local_val
(<int64_t*>cnp.PyArray_MultiIter_DATA(mi, 0))[0] = local_val
cnp.PyArray_MultiIter_NEXT(mi)
return result
# -------------------------------------------------------------------------
@cython.wraparound(False)
@cython.boundscheck(False)
def ints_to_pydatetime(
ndarray stamps,
tzinfo tz=None,
str box="datetime",
NPY_DATETIMEUNIT reso=NPY_FR_ns,
) -> np.ndarray:
# stamps is int64, arbitrary ndim
"""
Convert an i8 repr to an ndarray of datetimes, date, time or Timestamp.
Parameters
----------
stamps : array of i8
tz : str, optional
convert to this timezone
box : {'datetime', 'timestamp', 'date', 'time'}, default 'datetime'
* If datetime, convert to datetime.datetime
* If date, convert to datetime.date
* If time, convert to datetime.time
* If Timestamp, convert to pandas.Timestamp
reso : NPY_DATETIMEUNIT, default NPY_FR_ns
Returns
-------
ndarray[object] of type specified by box
"""
cdef:
Localizer info = Localizer(tz, creso=reso)
int64_t utc_val, local_val
Py_ssize_t i, n = stamps.size
Py_ssize_t pos = -1 # unused, avoid not-initialized warning
npy_datetimestruct dts
tzinfo new_tz
bint use_date = False, use_ts = False, use_pydt = False
object res_val
bint fold = 0
# Note that `result` (and thus `result_flat`) is C-order and
# `it` iterates C-order as well, so the iteration matches
# See discussion at
# github.com/pandas-dev/pandas/pull/46886#discussion_r860261305
ndarray result = cnp.PyArray_EMPTY(stamps.ndim, stamps.shape, cnp.NPY_OBJECT, 0)
object[::1] res_flat = result.ravel() # should NOT be a copy
cnp.flatiter it = cnp.PyArray_IterNew(stamps)
if box == "date":
assert (tz is None), "tz should be None when converting to date"
use_date = True
elif box == "timestamp":
use_ts = True
elif box == "datetime":
use_pydt = True
elif box != "time":
raise ValueError(
"box must be one of 'datetime', 'date', 'time' or 'timestamp'"
)
for i in range(n):
# Analogous to: utc_val = stamps[i]
utc_val = (<int64_t*>cnp.PyArray_ITER_DATA(it))[0]
new_tz = tz
if utc_val == NPY_NAT:
res_val = <object>NaT
else:
local_val = info.utc_val_to_local_val(utc_val, &pos, &fold)
if info.use_pytz:
# find right representation of dst etc in pytz timezone
new_tz = tz._tzinfos[tz._transition_info[pos]]
pandas_datetime_to_datetimestruct(local_val, reso, &dts)
if use_ts:
res_val = create_timestamp_from_ts(
utc_val, dts, new_tz, fold, reso=reso
)
elif use_pydt:
res_val = datetime(
dts.year, dts.month, dts.day, dts.hour, dts.min, dts.sec, dts.us,
new_tz, fold=fold,
)
elif use_date:
res_val = date(dts.year, dts.month, dts.day)
else:
res_val = time(dts.hour, dts.min, dts.sec, dts.us, new_tz, fold=fold)
# Note: we can index result directly instead of using PyArray_MultiIter_DATA
# like we do for the other functions because result is known C-contiguous
# and is the first argument to PyArray_MultiIterNew2. The usual pattern
# does not seem to work with object dtype.
# See discussion at
# github.com/pandas-dev/pandas/pull/46886#discussion_r860261305
res_flat[i] = res_val
cnp.PyArray_ITER_NEXT(it)
return result
# -------------------------------------------------------------------------
cdef c_Resolution _reso_stamp(npy_datetimestruct *dts):
if dts.ps != 0:
return c_Resolution.RESO_NS
elif dts.us != 0:
if dts.us % 1000 == 0:
return c_Resolution.RESO_MS
return c_Resolution.RESO_US
elif dts.sec != 0:
return c_Resolution.RESO_SEC
elif dts.min != 0:
return c_Resolution.RESO_MIN
elif dts.hour != 0:
return c_Resolution.RESO_HR
return c_Resolution.RESO_DAY
@cython.wraparound(False)
@cython.boundscheck(False)
def get_resolution(
ndarray stamps, tzinfo tz=None, NPY_DATETIMEUNIT reso=NPY_FR_ns
) -> Resolution:
# stamps is int64_t, any ndim
cdef:
Localizer info = Localizer(tz, creso=reso)
int64_t utc_val, local_val
Py_ssize_t i, n = stamps.size
Py_ssize_t pos = -1 # unused, avoid not-initialized warning
cnp.flatiter it = cnp.PyArray_IterNew(stamps)
npy_datetimestruct dts
c_Resolution pd_reso = c_Resolution.RESO_DAY, curr_reso
for i in range(n):
# Analogous to: utc_val = stamps[i]
utc_val = cnp.PyArray_GETITEM(stamps, cnp.PyArray_ITER_DATA(it))
if utc_val == NPY_NAT:
pass
else:
local_val = info.utc_val_to_local_val(utc_val, &pos)
pandas_datetime_to_datetimestruct(local_val, reso, &dts)
curr_reso = _reso_stamp(&dts)
if curr_reso < pd_reso:
pd_reso = curr_reso
cnp.PyArray_ITER_NEXT(it)
return Resolution(pd_reso)
# -------------------------------------------------------------------------
@cython.cdivision(False)
@cython.wraparound(False)
@cython.boundscheck(False)
cpdef ndarray normalize_i8_timestamps(ndarray stamps, tzinfo tz, NPY_DATETIMEUNIT reso):
# stamps is int64_t, arbitrary ndim
"""
Normalize each of the (nanosecond) timezone aware timestamps in the given
array by rounding down to the beginning of the day (i.e. midnight).
This is midnight for timezone, `tz`.
Parameters
----------
stamps : int64 ndarray
tz : tzinfo or None
reso : NPY_DATETIMEUNIT
Returns
-------
result : int64 ndarray of converted of normalized nanosecond timestamps
"""
cdef:
Localizer info = Localizer(tz, creso=reso)
int64_t utc_val, local_val, res_val
Py_ssize_t i, n = stamps.size
Py_ssize_t pos = -1 # unused, avoid not-initialized warning
ndarray result = cnp.PyArray_EMPTY(stamps.ndim, stamps.shape, cnp.NPY_INT64, 0)
cnp.broadcast mi = cnp.PyArray_MultiIterNew2(result, stamps)
int64_t ppd = periods_per_day(reso)
for i in range(n):
# Analogous to: utc_val = stamps[i]
utc_val = (<int64_t*>cnp.PyArray_MultiIter_DATA(mi, 1))[0]
if utc_val == NPY_NAT:
res_val = NPY_NAT
else:
local_val = info.utc_val_to_local_val(utc_val, &pos)
res_val = local_val - (local_val % ppd)
# Analogous to: result[i] = res_val
(<int64_t*>cnp.PyArray_MultiIter_DATA(mi, 0))[0] = res_val
cnp.PyArray_MultiIter_NEXT(mi)
return result
@cython.wraparound(False)
@cython.boundscheck(False)
def is_date_array_normalized(ndarray stamps, tzinfo tz, NPY_DATETIMEUNIT reso) -> bool:
# stamps is int64_t, arbitrary ndim
"""
Check if all of the given (nanosecond) timestamps are normalized to
midnight, i.e. hour == minute == second == 0. If the optional timezone
`tz` is not None, then this is midnight for this timezone.
Parameters
----------
stamps : int64 ndarray
tz : tzinfo or None
reso : NPY_DATETIMEUNIT
Returns
-------
is_normalized : bool True if all stamps are normalized
"""
cdef:
Localizer info = Localizer(tz, creso=reso)
int64_t utc_val, local_val
Py_ssize_t i, n = stamps.size
Py_ssize_t pos = -1 # unused, avoid not-initialized warning
cnp.flatiter it = cnp.PyArray_IterNew(stamps)
int64_t ppd = periods_per_day(reso)
for i in range(n):
# Analogous to: utc_val = stamps[i]
utc_val = cnp.PyArray_GETITEM(stamps, cnp.PyArray_ITER_DATA(it))
local_val = info.utc_val_to_local_val(utc_val, &pos)
if local_val % ppd != 0:
return False
cnp.PyArray_ITER_NEXT(it)
return True
# -------------------------------------------------------------------------
@cython.wraparound(False)
@cython.boundscheck(False)
def dt64arr_to_periodarr(
ndarray stamps, int freq, tzinfo tz, NPY_DATETIMEUNIT reso=NPY_FR_ns
):
# stamps is int64_t, arbitrary ndim
cdef:
Localizer info = Localizer(tz, creso=reso)
Py_ssize_t i, n = stamps.size
Py_ssize_t pos = -1 # unused, avoid not-initialized warning
int64_t utc_val, local_val, res_val
npy_datetimestruct dts
ndarray result = cnp.PyArray_EMPTY(stamps.ndim, stamps.shape, cnp.NPY_INT64, 0)
cnp.broadcast mi = cnp.PyArray_MultiIterNew2(result, stamps)
for i in range(n):
# Analogous to: utc_val = stamps[i]
utc_val = (<int64_t*>cnp.PyArray_MultiIter_DATA(mi, 1))[0]
if utc_val == NPY_NAT:
res_val = NPY_NAT
else:
local_val = info.utc_val_to_local_val(utc_val, &pos)
pandas_datetime_to_datetimestruct(local_val, reso, &dts)
res_val = get_period_ordinal(&dts, freq)
# Analogous to: result[i] = res_val
(<int64_t*>cnp.PyArray_MultiIter_DATA(mi, 0))[0] = res_val
cnp.PyArray_MultiIter_NEXT(mi)
return result