2021-01-10 11:09:45 +01:00
|
|
|
|
|
|
|
def createReadStream(file):
|
|
|
|
"""Create Bytes Stream"""
|
|
|
|
data = open(file, 'rb')
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
def process_find_loudness(file_path: str, threshold_at_point: int, inertia_samples: float, label: str):
|
|
|
|
|
|
|
|
print("Start process to find loudness in:")
|
|
|
|
print(
|
|
|
|
F"\tfile_path: {file_path}\n"
|
|
|
|
F"\tthreshold_at_point: {threshold_at_point}\n"
|
|
|
|
F"\tinertia_samples: {inertia_samples}\n"
|
|
|
|
F"\tlabel: {label}\n"
|
|
|
|
)
|
|
|
|
stream = createReadStream(file_path)
|
|
|
|
position = 0
|
|
|
|
results = []
|
|
|
|
last_swap_position = 0
|
2021-01-11 16:46:53 +01:00
|
|
|
keep_loud_until = 0
|
2021-01-10 11:09:45 +01:00
|
|
|
was_loud_last_time = False
|
|
|
|
print("Read chunks")
|
|
|
|
chunks = stream.read()
|
|
|
|
print(F"Length: {len(chunks)}")
|
|
|
|
|
|
|
|
try:
|
|
|
|
for i, byte in enumerate(chunks):
|
|
|
|
position += 1
|
|
|
|
volume = abs(byte - 128)
|
|
|
|
|
|
|
|
if position >= keep_loud_until:
|
|
|
|
is_loud = volume > threshold_at_point
|
|
|
|
|
|
|
|
if is_loud != was_loud_last_time:
|
|
|
|
swap_point = {
|
|
|
|
'position_start': last_swap_position,
|
|
|
|
'duration': position - last_swap_position,
|
|
|
|
'loud': was_loud_last_time,
|
|
|
|
'label': label
|
|
|
|
}
|
|
|
|
results.append(swap_point)
|
|
|
|
last_swap_position = position
|
|
|
|
was_loud_last_time = is_loud
|
|
|
|
|
|
|
|
if volume > threshold_at_point:
|
|
|
|
keep_loud_until = position + inertia_samples
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
except Exception as err:
|
|
|
|
print(err)
|