Создание продвинутого голосового пайплайна на WhisperX для транскрипции, выравнивания, анализа и экспорта
Зависимости и быстрая настройка
Установите WhisperX и вспомогательные библиотеки, затем импортируйте модули и задайте CONFIG, который определяет наличие CUDA, тип вычислений, размер модели и батча.
!pip install -q git+https://github.com/m-bain/whisperX.git
!pip install -q pandas matplotlib seaborn
import whisperx
import torch
import gc
import os
import json
import pandas as pd
from pathlib import Path
from IPython.display import Audio, display, HTML
import warnings
warnings.filterwarnings('ignore')
CONFIG = {
"device": "cuda" if torch.cuda.is_available() else "cpu",
"compute_type": "float16" if torch.cuda.is_available() else "int8",
"batch_size": 16,
"model_size": "base",
"language": None,
}
print(f" Running on: {CONFIG['device']}")
print(f" Compute type: {CONFIG['compute_type']}")
print(f" Model: {CONFIG['model_size']}")
Это подготовит окружение как для GPU в Colab, так и для обычного CPU.
Загрузка и анализ примера аудио
Функция скачивает тестовый файл, а другая функция загружает аудио, определяет длительность и показывает его прямо в ноутбуке.
def download_sample_audio():
"""Download a sample audio file for testing"""
!wget -q -O sample.mp3 https://github.com/mozilla-extensions/speaktome/raw/master/content/cv-valid-dev/sample-000000.mp3
print(" Sample audio downloaded")
return "sample.mp3"
def load_and_analyze_audio(audio_path):
"""Load audio and display basic info"""
audio = whisperx.load_audio(audio_path)
duration = len(audio) / 16000
print(f" Audio: {Path(audio_path).name}")
print(f" Duration: {duration:.2f} seconds")
print(f" Sample rate: 16000 Hz")
display(Audio(audio_path))
return audio, duration
Транскрипция с WhisperX (батчинг)
Загрузите модель WhisperX и выполните транскрипцию аудио по батчам. Результат содержит сегменты и определённый язык. Функция также очищает память модели после выполнения.
def transcribe_audio(audio, model_size=CONFIG["model_size"], language=None):
"""Transcribe audio using WhisperX (batched inference)"""
print("\n STEP 1: Transcribing audio...")
model = whisperx.load_model(
model_size,
CONFIG["device"],
compute_type=CONFIG["compute_type"]
)
transcribe_kwargs = {
"batch_size": CONFIG["batch_size"]
}
if language:
transcribe_kwargs["language"] = language
result = model.transcribe(audio, **transcribe_kwargs)
total_segments = len(result["segments"])
total_words = sum(len(seg.get("words", [])) for seg in result["segments"])
del model
gc.collect()
if CONFIG["device"] == "cuda":
torch.cuda.empty_cache()
print(f" Transcription complete!")
print(f" Language: {result['language']}")
print(f" Segments: {total_segments}")
print(f" Total text length: {sum(len(seg['text']) for seg in result['segments'])} characters")
return result
Выравнивание для получения словарных таймкодов
Подгружаем модель выравнивания и применяем её к сегментам для получения точных start/end для каждого слова. При ошибке возвращаем сегментные таймкоды, чтобы процесс не прерывался.
def align_transcription(segments, audio, language_code):
"""Align transcription for accurate word-level timestamps"""
print("\n STEP 2: Aligning for word-level timestamps...")
try:
model_a, metadata = whisperx.load_align_model(
language_code=language_code,
device=CONFIG["device"]
)
result = whisperx.align(
segments,
model_a,
metadata,
audio,
CONFIG["device"],
return_char_alignments=False
)
total_words = sum(len(seg.get("words", [])) for seg in result["segments"])
del model_a
gc.collect()
if CONFIG["device"] == "cuda":
torch.cuda.empty_cache()
print(f" Alignment complete!")
print(f" Aligned words: {total_words}")
return result
except Exception as e:
print(f" Alignment failed: {str(e)}")
print(" Continuing with segment-level timestamps only...")
return {"segments": segments, "word_segments": []}
Выравнивание важно для генерации субтитров и точной синхронизации текста с аудио.
Анализ транскрипции
Подсчитываем длительность, количество сегментов, слов и символов, вычисляем слова в минуту, паузы и среднюю длительность слов.
def analyze_transcription(result):
"""Generate statistics about the transcription"""
print("\n TRANSCRIPTION STATISTICS")
print("="*70)
segments = result["segments"]
total_duration = max(seg["end"] for seg in segments) if segments else 0
total_words = sum(len(seg.get("words", [])) for seg in segments)
total_chars = sum(len(seg["text"].strip()) for seg in segments)
print(f"Total duration: {total_duration:.2f} seconds")
print(f"Total segments: {len(segments)}")
print(f"Total words: {total_words}")
print(f"Total characters: {total_chars}")
if total_duration > 0:
print(f"Words per minute: {(total_words / total_duration * 60):.1f}")
pauses = []
for i in range(len(segments) - 1):
pause = segments[i+1]["start"] - segments[i]["end"]
if pause > 0:
pauses.append(pause)
if pauses:
print(f"Average pause between segments: {sum(pauses)/len(pauses):.2f}s")
print(f"Longest pause: {max(pauses):.2f}s")
word_durations = []
for seg in segments:
if "words" in seg:
for word in seg["words"]:
duration = word["end"] - word["start"]
word_durations.append(duration)
if word_durations:
print(f"Average word duration: {sum(word_durations)/len(word_durations):.3f}s")
print("="*70)
Эти метрики помогают оценить темп речи и характер пауз.
Отображение и экспорт результатов
Функции форматируют результаты в таблицы для просмотра в ноутбуке и экспортируют JSON, SRT, VTT, TXT и CSV.
def display_results(result, show_words=False, max_rows=50):
"""Display transcription results in formatted table"""
data = []
for seg in result["segments"]:
text = seg["text"].strip()
start = f"{seg['start']:.2f}s"
end = f"{seg['end']:.2f}s"
duration = f"{seg['end'] - seg['start']:.2f}s"
if show_words and "words" in seg:
for word in seg["words"]:
data.append({
"Start": f"{word['start']:.2f}s",
"End": f"{word['end']:.2f}s",
"Duration": f"{word['end'] - word['start']:.3f}s",
"Text": word["word"],
"Score": f"{word.get('score', 0):.2f}"
})
else:
data.append({
"Start": start,
"End": end,
"Duration": duration,
"Text": text
})
df = pd.DataFrame(data)
if len(df) > max_rows:
print(f"Showing first {max_rows} rows of {len(df)} total...")
display(HTML(df.head(max_rows).to_html(index=False)))
else:
display(HTML(df.to_html(index=False)))
return df
def export_results(result, output_dir="output", filename="transcript"):
"""Export results in multiple formats"""
os.makedirs(output_dir, exist_ok=True)
json_path = f"{output_dir}/{filename}.json"
with open(json_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
srt_path = f"{output_dir}/{filename}.srt"
with open(srt_path, "w", encoding="utf-8") as f:
for i, seg in enumerate(result["segments"], 1):
start = format_timestamp(seg["start"])
end = format_timestamp(seg["end"])
f.write(f"{i}\n{start} --> {end}\n{seg['text'].strip()}\n\n")
vtt_path = f"{output_dir}/{filename}.vtt"
with open(vtt_path, "w", encoding="utf-8") as f:
f.write("WEBVTT\n\n")
for i, seg in enumerate(result["segments"], 1):
start = format_timestamp_vtt(seg["start"])
end = format_timestamp_vtt(seg["end"])
f.write(f"{start} --> {end}\n{seg['text'].strip()}\n\n")
txt_path = f"{output_dir}/{filename}.txt"
with open(txt_path, "w", encoding="utf-8") as f:
for seg in result["segments"]:
f.write(f"{seg['text'].strip()}\n")
csv_path = f"{output_dir}/{filename}.csv"
df_data = []
for seg in result["segments"]:
df_data.append({
"start": seg["start"],
"end": seg["end"],
"text": seg["text"].strip()
})
pd.DataFrame(df_data).to_csv(csv_path, index=False)
print(f"\n Results exported to '{output_dir}/' directory:")
print(f" ✓ {filename}.json (full structured data)")
print(f" ✓ {filename}.srt (subtitles)")
print(f" ✓ {filename}.vtt (web video subtitles)")
print(f" ✓ {filename}.txt (plain text)")
print(f" ✓ {filename}.csv (timestamps + text)")
def format_timestamp(seconds):
"""Convert seconds to SRT timestamp format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def format_timestamp_vtt(seconds):
"""Convert seconds to VTT timestamp format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}"
Пакетная обработка и извлечение ключевых слов
Функции для пакетной обработки файлов и для простого подсчёта наиболее частых слов с фильтрацией служебных слов помогают быстро получать инсайты по множеству записей.
def batch_process_files(audio_files, output_dir="batch_output"):
"""Process multiple audio files in batch"""
print(f"\n Batch processing {len(audio_files)} files...")
results = {}
for i, audio_path in enumerate(audio_files, 1):
print(f"\n[{i}/{len(audio_files)}] Processing: {Path(audio_path).name}")
try:
result, _ = process_audio_file(audio_path, show_output=False)
results[audio_path] = result
filename = Path(audio_path).stem
export_results(result, output_dir, filename)
except Exception as e:
print(f" Error processing {audio_path}: {str(e)}")
results[audio_path] = None
print(f"\n Batch processing complete! Processed {len(results)} files.")
return results
def extract_keywords(result, top_n=10):
"""Extract most common words from transcription"""
from collections import Counter
import re
text = " ".join(seg["text"] for seg in result["segments"])
words = re.findall(r'\b\w+\b', text.lower())
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'is', 'was', 'are', 'were', 'be', 'been', 'being',
'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could',
'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those'}
filtered_words = [w for w in words if w not in stop_words and len(w) > 2]
word_counts = Counter(filtered_words).most_common(top_n)
print(f"\n Top {top_n} Keywords:")
for word, count in word_counts:
print(f" {word}: {count}")
return word_counts
Полный пайплайн и примеры
Функция process_audio_file связывает все шаги: загрузка аудио, транскрипция, выравнивание, анализ, отображение и экспорт. В комментариях приведены примеры использования и варианты масштабирования.
def process_audio_file(audio_path, show_output=True, analyze=True):
"""Complete WhisperX pipeline"""
if show_output:
print("="*70)
print(" WhisperX Advanced Tutorial")
print("="*70)
audio, duration = load_and_analyze_audio(audio_path)
result = transcribe_audio(audio, CONFIG["model_size"], CONFIG["language"])
aligned_result = align_transcription(
result["segments"],
audio,
result["language"]
)
if analyze and show_output:
analyze_transcription(aligned_result)
extract_keywords(aligned_result)
if show_output:
print("\n" + "="*70)
print(" TRANSCRIPTION RESULTS")
print("="*70)
df = display_results(aligned_result, show_words=False)
export_results(aligned_result)
else:
df = None
return aligned_result, df
# Example 1: Process sample audio
# audio_path = download_sample_audio()
# result, df = process_audio_file(audio_path)
# Example 2: Show word-level details
# result, df = process_audio_file(audio_path)
# word_df = display_results(result, show_words=True)
# Example 3: Process your own audio
# audio_path = "your_audio.wav" # or .mp3, .m4a, etc.
# result, df = process_audio_file(audio_path)
# Example 4: Batch process multiple files
# audio_files = ["audio1.mp3", "audio2.wav", "audio3.m4a"]
# results = batch_process_files(audio_files)
# Example 5: Use a larger model for better accuracy
# CONFIG["model_size"] = "large-v2"
# result, df = process_audio_file("audio.mp3")
print("\n Setup complete! Uncomment examples above to run.")
Используйте эти блоки как основу для интеграции WhisperX в ваши потоки данных, генерации субтитров или аналитики аудио.