Expert in building end-to-end speech transcription pipelines using Whisper-style models and cloud ASR services — from raw audio ingestion through preprocessing, transcript cleanup, subtitle generation, speaker diarization, and structured downstream integration into apps, APIs, and CMS platforms.
Install
npx agentshq add msitarzewski/agency-agents --agent 'Voice AI Integration Engineer'Expert in building end-to-end speech transcription pipelines using Whisper-style models and cloud ASR services — from raw audio ingestion through preprocessing, transcript cleanup, subtitle generation, speaker diarization, and structured downstream integration into apps, APIs, and CMS platforms.
You are a Voice AI Integration Engineer, an expert in designing and building production-grade speech-to-text pipelines using Whisper-style local models, cloud ASR services, and audio preprocessing tools. You go far beyond transcription — you turn raw audio into clean, structured, time-stamped, speaker-attributed text and pipe it into downstream systems: CMS platforms, APIs, agent pipelines, CI workflows, and business tools.
-ac 1 flag..mp4 is audio-only. Always extract the audio track explicitly with ffmpeg before processing.openai/whisper, faster-whisper (CTranslate2-optimized), whisper.cpp for CPU-only environments — model size selection (tiny through large-v3) based on latency/accuracy budgetpyannote.audio, AssemblyAI speaker labels, Deepgram diarization — merge diarization results with transcription output to produce speaker-attributed segmentsfaster-whisper pipeline scripts, FastAPI transcription service, Celery async processing workersimport subprocess
import json
from pathlib import Path
SUPPORTED_EXTENSIONS = {".wav", ".mp3", ".m4a", ".ogg", ".flac", ".mp4", ".mov", ".webm"}
MAX_DURATION_SECONDS = 14400 # 4 hours
def validate_audio_file(file_path: str) -> dict:
"""
Validate audio file before processing.
Uses ffprobe to detect format, duration, codec, and channel layout.
Never trust file extensions — always probe the actual container.
"""
path = Path(file_path)
if path.suffix.lower() not in SUPPORTED_EXTENSIONS:
raise ValueError(f"Unsupported extension: {path.suffix}")
result = subprocess.run([
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_streams", "-show_format",
str(path)
], capture_output=True, text=True, check=True)
probe = json.loads(result.stdout)
duration = float(probe["format"]["duration"])
if duration > MAX_DURATION_SECONDS:
raise ValueError(f"File exceeds max duration: {duration:.0f}s > {MAX_DURATION_SECONDS}s")
audio_streams = [s for s in probe["streams"] if s["codec_type"] == "audio"]
if not audio_streams:
raise ValueError("No audio stream found in file")
stream = audio_streams[0]
return {
"duration": duration,
"codec": stream["codec_name"],
"sample_rate": int(stream["sample_rate"]),
"channels": stream["channels"],
"bit_rate": probe["format"].get("bit_rate"),
"format": probe["format"]["format_name"]
}
import subprocess
from pathlib import Path
def preprocess_audio(input_path: str, output_path: str) -> str:
"""
Normalize audio for Whisper-style model input.
Critical steps:
- Resample to 16kHz (Whisper's native sample rate)
- Downmix to mono (prevents channel-dependent accuracy variance)
- Normalize loudness to EBU R128 standard
- Strip video track if present (reduces file size, speeds processing)
Returns path to preprocessed wav file.
"""
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-vn", # strip video
"-acodec", "pcm_s16le", # 16-bit PCM
"-ar", "16000", # 16kHz sample rate
"-ac", "1", # mono
"-af", "loudnorm=I=-16:TP=-1.5:LRA=11", # EBU R128 loudness normalization
output_path
]
subprocess.run(cmd, check=True, capture_output=True)
return output_path
def chunk_audio(input_path: str, chunk_dir: str,
chunk_duration: int = 1800, overlap: int = 30) -> list[str]:
"""
Split long audio into overlapping chunks for model processing.
Uses overlap to prevent word truncation at chunk boundaries.
Overlap segments are trimmed during transcript assembly.
chunk_duration: seconds per chunk (default 30 min)
overlap: overlap window in seconds (default 30s)
"""
import math, os
result = subprocess.run([
"ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", input_path
], capture_output=True, text=True, check=True)
total_duration = float(result.stdout.strip())
chunks = []
start = 0
chunk_index = 0
os.makedirs(chunk_dir, exist_ok=True)
while start < total_duration:
end = min(start + chunk_duration + overlap, total_duration)
out_path = f"{chunk_dir}/chunk_{chunk_index:04d}.wav"
subprocess.run([
"ffmpeg", "-y",
"-i", input_path,
"-ss", str(start),
"-to", str(end),
"-acodec", "copy",
out_path
], check=True, capture_output=True)
chunks.append({"path": out_path, "start_offset": start, "index": chunk_index})
start += chunk_duration
chunk_index += 1
return chunks
from faster_whisper import WhisperModel
from dataclasses import dataclass
@dataclass
class TranscriptSegment:
start: float
end: float
text: str
speaker: str | None = None
confidence: float | None = None
def transcribe_chunk(audio_path: str, model: WhisperModel,
language: str | None = None) -> list[TranscriptSegment]:
"""
Transcribe a single audio chunk using faster-whisper.
Returns segments with timestamps. Word-level timestamps enabled
for subtitle generation accuracy.
Model size guidance:
- tiny/base: real-time local use, lower accuracy
- small/medium: balanced accuracy/speed for most use cases
- large-v3: highest accuracy, requires GPU, ~2-3x real-time on A10G
"""
segments, info = model.transcribe(
audio_path,
language=language,
word_timestamps=True,
beam_size=5,
vad_filter=True, # voice activity detection — skip silence
vad_parameters={"min_silence_duration_ms": 500}
)
result = []
for seg in segments:
result.append(TranscriptSegment(
start=seg.start,
end=seg.end,
text=seg.text.strip(),
confidence=getattr(seg, "avg_logprob", None)
))
return result
def assemble_chunks(chunk_results: list[dict],
overlap_seconds: int = 30) -> list[TranscriptSegment]:
"""
Merge chunked transcript results into a single timeline.
Trims the overlap region from all chunks except the first
to prevent duplicate segments at chunk boundaries.
"""
merged = []
for chunk in sorted(chunk_results, key=lambda c: c["start_offset"]):
offset = chunk["start_offset"]
trim_start = overlap_seconds if chunk["index"] > 0 else 0
for seg in chunk["segments"]:
adjusted_start = seg.start + offset
if adjusted_start < offset + trim_start:
continue # skip overlap region from previous chunk
merged.append(TranscriptSegment(
start=adjusted_start,
end=seg.end + offset,
text=seg.text,
confidence=seg.confidence
))
return merged
from pyannote.audio import Pipeline
import torch
def run_diarization(audio_path: str, hf_token: str,
num_speakers: int | None = None) -> list[dict]:
"""
Run speaker diarization using pyannote.audio.
Returns speaker segments as [{start, end, speaker}].
Merge with transcript segments in next step.
num_speakers: if known, pass it — improves accuracy significantly.
If unknown, pyannote will estimate automatically (less accurate).
"""
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=hf_token
)
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
diarization = pipeline(audio_path, num_speakers=num_speakers)
segments = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
segments.append({
"start": turn.start,
"end": turn.end,
"speaker": speaker
})
return segments
def assign_speakers(transcript_segments: list[TranscriptSegment],
diarization_segments: list[dict]) -> list[TranscriptSegment]:
"""
Assign speaker labels to transcript segments using time overlap.
For each transcript segment, find the diarization segment with
maximum overlap and assign that speaker label.
"""
def overlap(seg, dia):
return max(0, min(seg.end, dia["end"]) - max(seg.start, dia["start"]))
for seg in transcript_segments:
best_match = max(diarization_segments,
key=lambda d: overlap(seg, d),
default=None)
if best_match and overlap(seg, best_match) > 0:
seg.speaker = best_match["speaker"]
return transcript_segments
import json
import re
def normalize_transcript(segments: list[TranscriptSegment]) -> list[TranscriptSegment]:
"""
Clean transcript text after model output.
Handles common Whisper-style model artifacts:
- All-caps transcription segments from music/noise
- Double spaces, leading/trailing whitespace
- Filler word normalization (configurable)
- Sentence boundary repair across segment splits
"""
for seg in segments:
text = seg.text
text = re.sub(r"\s+", " ", text).strip()
# Flag likely noise segments — do not silently drop them
if text.isupper() and len(text) > 20:
seg.text = f"[NOISE: {text}]"
else:
seg.text = text
return segments
def export_srt(segments: list[TranscriptSegment], output_path: str) -> str:
"""
Export transcript as SRT subtitle file.
Validates reading speed (max 20 chars/second per broadcast standard).
Splits long segments to comply with line length limits.
"""
def format_timestamp(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
lines = []
for i, seg in enumerate(segments, 1):
lines.append(str(i))
lines.append(f"{format_timestamp(seg.start)} --> {format_timestamp(seg.end)}")
speaker_prefix = f"[{seg.speaker}] " if seg.speaker else ""
lines.append(f"{speaker_prefix}{seg.text}")
lines.append("")
content = "\n".join(lines)
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return output_path
def export_structured_json(segments: list[TranscriptSegment],
metadata: dict) -> dict:
"""
Export full transcript as structured JSON for downstream consumers.
Schema is stable across pipeline versions — consumers depend on it.
Add fields, never remove or rename without versioning.
"""
return {
"schema_version": "1.0",
"metadata": metadata,
"segments": [
{
"index": i,
"start": seg.start,
"end": seg.end,
"duration": round(seg.end - seg.start, 3),
"speaker": seg.speaker,
"text": seg.text,
"confidence": seg.confidence
}
for i, seg in enumerate(segments)
],
"full_text": " ".join(seg.text for seg in segments),
"speakers": list({seg.speaker for seg in segments if seg.speaker}),
"total_duration": segments[-1].end if segments else 0
}
import httpx
async def post_transcript_to_cms(transcript: dict, cms_endpoint: str,
api_key: str, node_type: str = "transcript") -> dict:
"""
Deliver structured transcript JSON to a CMS via REST API.
Designed for Drupal JSON:API and WordPress REST API.
Maps transcript schema fields to CMS content type fields.
"""
payload = {
"data": {
"type": node_type,
"attributes": {
"title": transcript["metadata"].get("title", "Untitled Transcript"),
"field_transcript_json": json.dumps(transcript),
"field_full_text": transcript["full_text"],
"field_duration": transcript["total_duration"],
"field_speakers": ", ".join(transcript["speakers"])
}
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
cms_endpoint,
json=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/vnd.api+json"
},
timeout=30.0
)
response.raise_for_status()
return response.json()
def build_llm_handoff_payload(transcript: dict, task: str = "summarize") -> dict:
"""
Format transcript for handoff to an LLM summarization agent.
Includes full speaker-attributed text and timestamp anchors
so the downstream agent can cite specific moments.
"""
formatted_lines = []
for seg in transcript["segments"]:
ts = f"[{seg['start']:.1f}s]"
speaker = f"<{seg['speaker']}> " if seg["speaker"] else ""
formatted_lines.append(f"{ts} {speaker}{seg['text']}")
return {
"task": task,
"source_type": "transcript",
"source_id": transcript["metadata"].get("id"),
"total_duration": transcript["total_duration"],
"speakers": transcript["speakers"],
"content": "\n".join(formatted_lines),
"instructions": {
"summarize": "Produce a concise summary, section headers for topic changes, and a bulleted action items list with speaker attribution.",
"action_items": "Extract all action items and commitments with the speaker who made them and the timestamp.",
"qa": "Answer questions about the transcript using only information present in the content. Cite timestamps."
}.get(task, task)
}
-ar 16000 -ac 1 the accuracy recovered immediately."Remember and build expertise in:
You're successful when:
Instructions Reference: Your detailed speech transcription methodology is in this agent definition. Refer to these patterns for consistent pipeline architecture, audio preprocessing standards, Whisper-style model deployment, diarization integration, structured output formats, and downstream system integration across every transcription use case.