mautrix-python/mautrix/util/ffmpeg.py

248 lines
7.7 KiB
Python

# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations
from typing import Any, Iterable
from pathlib import Path
import asyncio
import json
import logging
import mimetypes
import os
import shutil
import tempfile
try:
from . import magic
except ImportError:
magic = None
def _abswhich(program: str) -> str | None:
path = shutil.which(program)
return os.path.abspath(path) if path else None
class ConverterError(ChildProcessError):
pass
class NotInstalledError(ConverterError):
def __init__(self) -> None:
super().__init__("failed to transcode media: ffmpeg is not installed")
ffmpeg_path = _abswhich("ffmpeg")
ffmpeg_default_params = ("-hide_banner", "-loglevel", "warning", "-y")
ffprobe_path = _abswhich("ffprobe")
ffprobe_default_params = (
"-loglevel",
"quiet",
"-print_format",
"json",
"-show_optional_fields",
"1",
"-show_format",
"-show_streams",
)
async def probe_path(
input_file: os.PathLike[str] | str,
logger: logging.Logger | None = None,
) -> Any:
"""
Probes a media file on the disk using ffprobe.
Args:
input_file: The full path to the file.
Returns:
A Python object containing the parsed JSON response from ffprobe
Raises:
ConverterError: if ffprobe returns a non-zero exit code.
"""
if ffprobe_path is None:
raise NotInstalledError()
input_file = Path(input_file)
proc = await asyncio.create_subprocess_exec(
ffprobe_path,
*ffprobe_default_params,
str(input_file),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
err_text = stderr.decode("utf-8") if stderr else f"unknown ({proc.returncode})"
raise ConverterError(f"ffprobe error: {err_text}")
elif stderr and logger:
logger.warning(f"ffprobe warning: {stderr.decode('utf-8')}")
return json.loads(stdout)
async def probe_bytes(
data: bytes,
input_mime: str | None = None,
logger: logging.Logger | None = None,
) -> Any:
"""
Probe media file data using ffprobe.
Args:
data: The bytes of the file to probe.
input_mime: The mime type of the input data. If not specified, will be guessed using magic.
Returns:
A Python object containing the parsed JSON response from ffprobe
Raises:
ConverterError: if ffprobe returns a non-zero exit code.
"""
if ffprobe_path is None:
raise NotInstalledError()
if input_mime is None:
if magic is None:
raise ValueError("input_mime was not specified and magic is not installed")
input_mime = magic.mimetype(data)
input_extension = mimetypes.guess_extension(input_mime)
with tempfile.TemporaryDirectory(prefix="mautrix_ffmpeg_") as tmpdir:
input_file = Path(tmpdir) / f"data{input_extension}"
with open(input_file, "wb") as file:
file.write(data)
return await probe_path(input_file=input_file, logger=logger)
async def convert_path(
input_file: os.PathLike[str] | str,
output_extension: str | None,
input_args: Iterable[str] | None = None,
output_args: Iterable[str] | None = None,
remove_input: bool = False,
output_path_override: os.PathLike[str] | str | None = None,
logger: logging.Logger | None = None,
) -> Path | bytes:
"""
Convert a media file on the disk using ffmpeg.
Args:
input_file: The full path to the file.
output_extension: The extension that the output file should be.
input_args: Arguments to tell ffmpeg how to parse the input file.
output_args: Arguments to tell ffmpeg how to convert the file to reach the wanted output.
remove_input: Whether the input file should be removed after converting.
Not compatible with ``output_path_override``.
output_path_override: A custom output path to use
(instead of using the input path with a different extension).
Returns:
The path to the converted file, or the stdout if ``output_path_override`` was set to ``-``.
Raises:
ConverterError: if ffmpeg returns a non-zero exit code.
"""
if ffmpeg_path is None:
raise NotInstalledError()
if output_path_override:
output_file = output_path_override
if remove_input:
raise ValueError("remove_input can't be specified with output_path_override")
elif not output_extension:
raise ValueError("output_extension or output_path_override is required")
else:
input_file = Path(input_file)
output_file = input_file.parent / f"{input_file.stem}{output_extension}"
if input_file == output_file:
output_file = Path(output_file)
output_file = output_file.parent / f"{output_file.stem}-new{output_extension}"
proc = await asyncio.create_subprocess_exec(
ffmpeg_path,
*ffmpeg_default_params,
*(input_args or ()),
"-i",
str(input_file),
*(output_args or ()),
str(output_file),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
err_text = stderr.decode("utf-8") if stderr else f"unknown ({proc.returncode})"
raise ConverterError(f"ffmpeg error: {err_text}")
elif stderr and logger:
logger.warning(f"ffmpeg warning: {stderr.decode('utf-8')}")
if remove_input and isinstance(input_file, Path):
input_file.unlink(missing_ok=True)
return stdout if output_file == "-" else output_file
async def convert_bytes(
data: bytes,
output_extension: str,
input_args: Iterable[str] | None = None,
output_args: Iterable[str] | None = None,
input_mime: str | None = None,
logger: logging.Logger | None = None,
) -> bytes:
"""
Convert media file data using ffmpeg.
Args:
data: The bytes of the file to convert.
output_extension: The extension that the output file should be.
input_args: Arguments to tell ffmpeg how to parse the input file.
output_args: Arguments to tell ffmpeg how to convert the file to reach the wanted output.
input_mime: The mime type of the input data. If not specified, will be guessed using magic.
Returns:
The converted file as bytes.
Raises:
ConverterError: if ffmpeg returns a non-zero exit code.
"""
if ffmpeg_path is None:
raise NotInstalledError()
if input_mime is None:
if magic is None:
raise ValueError("input_mime was not specified and magic is not installed")
input_mime = magic.mimetype(data)
input_extension = mimetypes.guess_extension(input_mime)
with tempfile.TemporaryDirectory(prefix="mautrix_ffmpeg_") as tmpdir:
input_file = Path(tmpdir) / f"data{input_extension}"
with open(input_file, "wb") as file:
file.write(data)
output_file = await convert_path(
input_file=input_file,
output_extension=output_extension,
input_args=input_args,
output_args=output_args,
logger=logger,
)
with open(output_file, "rb") as file:
return file.read()
__all__ = [
"ffmpeg_path",
"ffmpeg_default_params",
"ConverterError",
"NotInstalledError",
"convert_bytes",
"convert_path",
"probe_bytes",
"probe_path",
]