A1-1 帧数目计算与补零
无重叠窗可以视作跳距等于帧长度的窗。满足关系: FRAME_LENGTH × NUM_FRAMES = TOTAL_NUM_SAMPLES
下面的代码将传入的音频采样点序列进行补零,直到满足被帧长度整除。
# 如果传入的音频采样点序列不能被帧长度所整除,就对它补零,直到能被帧长度整除,并返回新的序列。
def pad_zero_for_make_frames(x, frame_length):
num_samples_needed = frame_length - (len(x) % frame_length)
if num_samples_needed != frame_length:
return np.concatenate((x, np.zeros(num_samples_needed)))
else:
return x
A1-2 帧划分实现
下面的函数能根据给定的帧长度进行简单帧划分。
# 传入的音频采样点序列(假设其已经是 numpy ndarray)根据给定的帧长度进行划分。
def make_frames(x, frame_length):
num_frames = len(x) // frame_length
frames = np.zeros((num_frames, frame_length))
for n in range(num_frames):
frames[n] = x[n*frame_length:(n+1)*frame_length]
return frames
或者下面是一个使用 numpy.lib.stride_tricks 工具的版本。
ndarray.strides
返回一个元组,表示各个维度上每次应跳跃多少 byte。stride_tricks.as_strided
是一个直接操纵内存的方法,需要万分谨慎地使用。它会根据给出的目标 shape和跳距 strides 依次取出项目。而且并不会拷贝原始数据,只是提供一个 view。# 传入的音频采样点序列(假设其已经是 numpy ndarray)根据给定的帧长度进行划分。
def make_frames(x, frame_length):
hop_length = frame_length
num_samples = x.shape[0]
num_frames = num_samples // frame_length
return stride_tricks.as_strided(
x,
shape=(num_frames, frame_length),
strides=(hop_length * x.strides[0], x.strides[0])
)
A1-3 从采样点位置倒推帧位置
TIME_IN_SAMPLES // FRAME_LENGTH
就能得到帧下标,然后 TIME_IN_SAMPLES % FRAME_LENGTH / FRAME_LENGTH
得到帧位置。
A2-1 帧数目计算与补零
有重叠窗要分别指定跳距和帧长度。
代码是类似的。
# 如果传入的音频采样点序列不能被帧长度所整除,就对它补零,直到能被帧长度整除,并返回新的序列。
def pad_zero_for_make_overlap_frames(x, frame_length, hop_length, pad_before=0, pad_after=0):
if pad_before > 0:
x = np.concatenate((np.zeros(pad_before), x))
if pad_after > 0:
x = np.concatenate((x, np.zeros(pad_after)))
num_samples_needed = frame_length - ((len(x) - hop_length) % frame_length)
if num_samples_needed != frame_length:
x = np.concatenate((x, np.zeros(num_samples_needed)))
return x
A2-2 帧划分实现
还是基于 stride。
# 传入的音频采样点序列(假设其已经是 numpy ndarray)根据给定的帧长度和跳距进行划分。
def make_overlap_frames(x, frame_length, hop_length):
# 计算步长,等同于 (len(x) - frame_length + hop_length) // hop_length
num_frames = 1 + (len(x) - frame_length) // hop_length
return stride_tricks.as_strided(
x,
shape=(num_frames, frame_length),
strides=(hop_length * x.strides[0], x.strides[0])
)
A2-3 从采样点位置倒推帧位置
从时间点倒推其涉及的帧的位置的时候,可能出现多个帧位置,请悉知。
# 从采样点时间 time_in_samples 得到其帧信息。
def get_frame_indices_info(time_in_samples, FRAME_LENGTH, HOP_LENGTH, frame_count):
time_in_samples = int(round(time_in_samples))
# 确定帧序号范围
frame_end = int(time_in_samples // HOP_LENGTH)
frame_begin = int((time_in_samples - FRAME_LENGTH) // HOP_LENGTH + 1)
frame_begin = max(frame_begin, 0)
frame_end = min(frame_end, frame_count-1)
indices = np.arange(frame_begin, frame_end + 1)
# 确定采样点各帧中位置
positions = [0 for i in range(len(indices))]
for i_pos, i in enumerate(indices):
begin_sample = i * HOP_LENGTH
positions[i_pos] = time_in_samples - begin_sample
return pd.DataFrame.from_dict({"index": indices, 'position': positions}, dtype=int)
效果如图。
如果是为了重建信号之类等目的,需要获得原始音频每个采样点处涉及的帧信息,调用那么多次上述函数肯定不现实。这是一个在分帧的同时记录每个采样点位置的帧信息的方法。
# 从采样点时间 time_in_samples 得到其帧信息。
def make_overlap_frames_and_get_frame_profile(x, frame_length, hop_length, debug=True):
# 计算步长,等同于 (len(x) - frame_length + hop_length) // hop_length
num_samples = len(x)
num_frames = 1 + (num_samples - frame_length) // hop_length
frames = stride_tricks.as_strided(
x,
shape=(num_frames, frame_length),
strides=(hop_length * x.strides[0], x.strides[0])
)
frame_profile_of_samples = [[] for i in range(num_samples)]
for m in tqdm(range(num_frames), ascii=True) if debug else range(num_frames):
begin_point = m * hop_length
oend_point = begin_point + frame_length
for pt in range(begin_point, oend_point):
frame_profile_of_samples[pt].append({
'index': m,
'position': pt - begin_point,
})
for i in tqdm(range(num_samples), ascii=True) if debug else range(num_samples):
frame_profile_of_samples[i] = pd.DataFrame.from_records(frame_profile_of_samples[i])
return frames, frame_profile_of_samples
frame_profile_of_samples[n]
会是空的(因为最后会有多余部分不对应任何帧)。如果后续需要进行任何处理,需要判断保证它不是空的。A2-4 从所有帧逐段合成所有采样点数据
另一种方法就是为每帧的数据叠加上一个汉明窗后,并一段段地加到空白序列上。唯一可惜的地方就是各采样点之间没有标准化。还是得靠每个采样点位置的汉明窗之和来进行标准化。
samples_num = (len(frames) - 1) * hop_length + frame_length
h_signal = np.zeros(samples_num, dtype=np.float64)
hamming_window = np.hamming(frame_length)
for m in range(len(frames)):
begin_sample = m * hop_length
oend_sample = begin_sample + frame_length
h_signal[begin_sample:oend_sample] += h_frames[m] * hamming_window
# if standardation needed
for n in range(samples_num):
h_signal[n] /= frame_profile_of_samples[n]['hamming'].sum()
# 此方法使用 ChatGPT o1-mini 模型辅助产出。
def get_center_frames(x: np.ndarray, start_sample: int, hop_sample: int, half_frame_len: int, pad_type: str='zero', axis: int=-1):
"""
提取信号 x 以 start_sample 为起点、hop_sample 为跳距、half_frame_len 为半长度的帧。对超出部分进行必要的填充。
支持 x 为多维信号,这时将在指定的第 axis 轴进行切割。
参数:
- x (np.ndarray): 输入的多维信号数组。
- start_sample (int): 第一个帧的中心位置索引。
- hop_sample (int): 帧之间的跳距。
- half_frame_len (int): 窗口半长度,窗口长度为 2 * half_frame_len + 1。
- pad_type (str): 填充类型,'zero' 表示补零,'mirror' 表示镜像填充。
- axis (int): 要切帧的轴,默认为最后一轴。
返回:
- frames (np.ndarray): 提取的帧,形状为 x.shape[:axis] + (n_frames, 2 * half_frame_len + 1) + x.shape[axis+1:]。
"""
if not isinstance(x, np.ndarray):
raise TypeError("输入 x 必须是 NumPy 数组。")
if x.ndim < 1:
raise ValueError("输入 x 至少应为一维。")
if pad_type not in ['zero', 'mirror']:
raise ValueError("pad_type 必须是 'zero' 或 'mirror'。")
if not (-x.ndim <= axis < x.ndim):
raise ValueError(f"axis 必须在 {-x.ndim} 到 {x.ndim - 1} 之间,当前为 {axis}。")
if start_sample < 0:
raise ValueError(f"start_sample 必须大于等于 0,当前为 {start_sample}。")
if hop_sample <= 0:
raise ValueError(f"hop_sample 必须大于 0,当前为 {hop_sample}。")
# 规范化 axis
axis = axis % x.ndim
# 计算帧宽度
frame_length = 2 * half_frame_len + 1
# 计算帧总数
num_frames = (x.shape[axis] - start_sample + hop_sample - 1) // hop_sample
if num_frames <= 0:
raise ValueError("给定参数下没有可提取的帧。")
# 计算填充宽度
pad_width = [(0, 0)] * x.ndim
pad_width[axis] = (half_frame_len, half_frame_len)
# 选择填充模式
if pad_type == 'zero':
x_padded = np.pad(x, pad_width, mode='constant', constant_values=0)
elif pad_type == 'mirror':
x_padded = np.pad(x, pad_width, mode='symmetric')
slicer = [slice(None)] * x.ndim
slicer[axis] = slice(start_sample, start_sample + hop_sample * (num_frames-1) + frame_length)
slicer = tuple(slicer)
x_neated = x_padded[slicer]
frames = np.lib.stride_tricks.as_strided(
x_neated,
shape = x_neated.shape[:axis] + (num_frames, frame_length) + x_neated.shape[axis+1:],
strides = tuple(x_neated.strides[:axis] + (hop_sample * x_neated.strides[axis], x_neated.strides[axis]) + x_neated.strides[axis+1:])
)
return frames
频谱泄露 (Spectrum Leakage):矩形窗的频谱有很大的旁瓣。时域中窗函数 $w[t]$ 和信号 $x[t]$相乘相当于在频域上作卷积,若旁瓣较大会造成加窗后对应部分频谱差异较大,称为频谱泄露。
汉明窗 (Hamming Window) 或汉宁窗 (Hann Window) 有较小的旁瓣,造成的泄露也较小。下面是它们的时域波形与频域响应。
对于一段 $N$ 点音频帧,Hamming 窗的公式为:
$$ w_{hamming} [n] = 0.54 - 0.46 \cos \left( \frac {2 \pi n} {N-1} \right), \ \ \ 0 \le n \le N-1 $$
GPT: 应用汉宁窗后,频谱主瓣的宽度会有所增加,主瓣以外的区域的幅度会下降。
加窗的方法也很简单,numpy 提供了 hamming 和 hanning 等函数用于加窗,乘到各个帧上面就可以了。
frames = make_overlap_frames(x1, FRAME_LENGTH, HOP_LENGTH) * np.hamming(FRAME_LENGTH).reshape(1, -1)
如果想要从帧位置得到汉明窗权重,可以使用上述公式进行运算。
def get_hamming_info(frame_indices_info, FRAME_LENGTH):
return 0.54 - 0.46 * np.cos((2 * np.pi / (FRAME_LENGTH - 1)) * frame_indices_info['position'])
frame_indices_info['hamming'] = get_hamming_info(frame_indices_info, FRAME_LENGTH)
效果如图。
整合到 make_overlap_frames_and_get_frame_profile
函数里面的版本如下。
# 从采样点时间 time_in_samples 得到其帧信息。
def make_overlap_frames_and_get_frame_profile(x, frame_length, hop_length, debug=True, window_type='hamming'):
# 计算步长,等同于 (len(x) - frame_length + hop_length) // hop_length
num_samples = len(x)
num_frames = 1 + (num_samples - frame_length) // hop_length
frames = stride_tricks.as_strided(
x,
shape=(num_frames, frame_length),
strides=(hop_length * x.strides[0], x.strides[0])
)
frame_profile_of_samples = [[] for i in range(num_samples)]
if window_type == 'hamming':
window_func = np.hamming(frame_length)
elif window_type is None:
pass
else:
raise ValueError('invalid param window_type: ' + window_type)
for m in tqdm(range(num_frames), ascii=True) if debug else range(num_frames):
begin_point = m * hop_length
oend_point = begin_point + frame_length
for pt in range(begin_point, oend_point):
if window_type is None:
frame_profile_of_samples[pt].append({
'index': m,
'position': pt - begin_point,
})
else:
frame_profile_of_samples[pt].append({
'index': m,
'position': pt - begin_point,
window_type: window_func[pt - begin_point],
})
for i in tqdm(range(num_samples), ascii=True) if debug else range(num_samples):
frame_profile_of_samples[i] = pd.DataFrame.from_records(frame_profile_of_samples[i])
return frames, frame_profile_of_samples