248 lines
7.7 KiB
Python
248 lines
7.7 KiB
Python
# Copyright (c) 2022 Tulir Asokan
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Iterable
|
|
from pathlib import Path
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
|
|
try:
|
|
from . import magic
|
|
except ImportError:
|
|
magic = None
|
|
|
|
|
|
def _abswhich(program: str) -> str | None:
|
|
path = shutil.which(program)
|
|
return os.path.abspath(path) if path else None
|
|
|
|
|
|
class ConverterError(ChildProcessError):
|
|
pass
|
|
|
|
|
|
class NotInstalledError(ConverterError):
|
|
def __init__(self) -> None:
|
|
super().__init__("failed to transcode media: ffmpeg is not installed")
|
|
|
|
|
|
ffmpeg_path = _abswhich("ffmpeg")
|
|
ffmpeg_default_params = ("-hide_banner", "-loglevel", "warning", "-y")
|
|
|
|
ffprobe_path = _abswhich("ffprobe")
|
|
ffprobe_default_params = (
|
|
"-loglevel",
|
|
"quiet",
|
|
"-print_format",
|
|
"json",
|
|
"-show_optional_fields",
|
|
"1",
|
|
"-show_format",
|
|
"-show_streams",
|
|
)
|
|
|
|
|
|
async def probe_path(
|
|
input_file: os.PathLike[str] | str,
|
|
logger: logging.Logger | None = None,
|
|
) -> Any:
|
|
"""
|
|
Probes a media file on the disk using ffprobe.
|
|
|
|
Args:
|
|
input_file: The full path to the file.
|
|
|
|
Returns:
|
|
A Python object containing the parsed JSON response from ffprobe
|
|
|
|
Raises:
|
|
ConverterError: if ffprobe returns a non-zero exit code.
|
|
"""
|
|
if ffprobe_path is None:
|
|
raise NotInstalledError()
|
|
|
|
input_file = Path(input_file)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
ffprobe_path,
|
|
*ffprobe_default_params,
|
|
str(input_file),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
err_text = stderr.decode("utf-8") if stderr else f"unknown ({proc.returncode})"
|
|
raise ConverterError(f"ffprobe error: {err_text}")
|
|
elif stderr and logger:
|
|
logger.warning(f"ffprobe warning: {stderr.decode('utf-8')}")
|
|
return json.loads(stdout)
|
|
|
|
|
|
async def probe_bytes(
|
|
data: bytes,
|
|
input_mime: str | None = None,
|
|
logger: logging.Logger | None = None,
|
|
) -> Any:
|
|
"""
|
|
Probe media file data using ffprobe.
|
|
|
|
Args:
|
|
data: The bytes of the file to probe.
|
|
input_mime: The mime type of the input data. If not specified, will be guessed using magic.
|
|
|
|
Returns:
|
|
A Python object containing the parsed JSON response from ffprobe
|
|
|
|
Raises:
|
|
ConverterError: if ffprobe returns a non-zero exit code.
|
|
"""
|
|
if ffprobe_path is None:
|
|
raise NotInstalledError()
|
|
|
|
if input_mime is None:
|
|
if magic is None:
|
|
raise ValueError("input_mime was not specified and magic is not installed")
|
|
input_mime = magic.mimetype(data)
|
|
input_extension = mimetypes.guess_extension(input_mime)
|
|
with tempfile.TemporaryDirectory(prefix="mautrix_ffmpeg_") as tmpdir:
|
|
input_file = Path(tmpdir) / f"data{input_extension}"
|
|
with open(input_file, "wb") as file:
|
|
file.write(data)
|
|
return await probe_path(input_file=input_file, logger=logger)
|
|
|
|
|
|
async def convert_path(
|
|
input_file: os.PathLike[str] | str,
|
|
output_extension: str | None,
|
|
input_args: Iterable[str] | None = None,
|
|
output_args: Iterable[str] | None = None,
|
|
remove_input: bool = False,
|
|
output_path_override: os.PathLike[str] | str | None = None,
|
|
logger: logging.Logger | None = None,
|
|
) -> Path | bytes:
|
|
"""
|
|
Convert a media file on the disk using ffmpeg.
|
|
|
|
Args:
|
|
input_file: The full path to the file.
|
|
output_extension: The extension that the output file should be.
|
|
input_args: Arguments to tell ffmpeg how to parse the input file.
|
|
output_args: Arguments to tell ffmpeg how to convert the file to reach the wanted output.
|
|
remove_input: Whether the input file should be removed after converting.
|
|
Not compatible with ``output_path_override``.
|
|
output_path_override: A custom output path to use
|
|
(instead of using the input path with a different extension).
|
|
|
|
Returns:
|
|
The path to the converted file, or the stdout if ``output_path_override`` was set to ``-``.
|
|
|
|
Raises:
|
|
ConverterError: if ffmpeg returns a non-zero exit code.
|
|
"""
|
|
if ffmpeg_path is None:
|
|
raise NotInstalledError()
|
|
|
|
if output_path_override:
|
|
output_file = output_path_override
|
|
if remove_input:
|
|
raise ValueError("remove_input can't be specified with output_path_override")
|
|
elif not output_extension:
|
|
raise ValueError("output_extension or output_path_override is required")
|
|
else:
|
|
input_file = Path(input_file)
|
|
output_file = input_file.parent / f"{input_file.stem}{output_extension}"
|
|
if input_file == output_file:
|
|
output_file = Path(output_file)
|
|
output_file = output_file.parent / f"{output_file.stem}-new{output_extension}"
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
ffmpeg_path,
|
|
*ffmpeg_default_params,
|
|
*(input_args or ()),
|
|
"-i",
|
|
str(input_file),
|
|
*(output_args or ()),
|
|
str(output_file),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
err_text = stderr.decode("utf-8") if stderr else f"unknown ({proc.returncode})"
|
|
raise ConverterError(f"ffmpeg error: {err_text}")
|
|
elif stderr and logger:
|
|
logger.warning(f"ffmpeg warning: {stderr.decode('utf-8')}")
|
|
if remove_input and isinstance(input_file, Path):
|
|
input_file.unlink(missing_ok=True)
|
|
return stdout if output_file == "-" else output_file
|
|
|
|
|
|
async def convert_bytes(
|
|
data: bytes,
|
|
output_extension: str,
|
|
input_args: Iterable[str] | None = None,
|
|
output_args: Iterable[str] | None = None,
|
|
input_mime: str | None = None,
|
|
logger: logging.Logger | None = None,
|
|
) -> bytes:
|
|
"""
|
|
Convert media file data using ffmpeg.
|
|
|
|
Args:
|
|
data: The bytes of the file to convert.
|
|
output_extension: The extension that the output file should be.
|
|
input_args: Arguments to tell ffmpeg how to parse the input file.
|
|
output_args: Arguments to tell ffmpeg how to convert the file to reach the wanted output.
|
|
input_mime: The mime type of the input data. If not specified, will be guessed using magic.
|
|
|
|
Returns:
|
|
The converted file as bytes.
|
|
|
|
Raises:
|
|
ConverterError: if ffmpeg returns a non-zero exit code.
|
|
"""
|
|
if ffmpeg_path is None:
|
|
raise NotInstalledError()
|
|
|
|
if input_mime is None:
|
|
if magic is None:
|
|
raise ValueError("input_mime was not specified and magic is not installed")
|
|
input_mime = magic.mimetype(data)
|
|
input_extension = mimetypes.guess_extension(input_mime)
|
|
with tempfile.TemporaryDirectory(prefix="mautrix_ffmpeg_") as tmpdir:
|
|
input_file = Path(tmpdir) / f"data{input_extension}"
|
|
with open(input_file, "wb") as file:
|
|
file.write(data)
|
|
output_file = await convert_path(
|
|
input_file=input_file,
|
|
output_extension=output_extension,
|
|
input_args=input_args,
|
|
output_args=output_args,
|
|
logger=logger,
|
|
)
|
|
with open(output_file, "rb") as file:
|
|
return file.read()
|
|
|
|
|
|
__all__ = [
|
|
"ffmpeg_path",
|
|
"ffmpeg_default_params",
|
|
"ConverterError",
|
|
"NotInstalledError",
|
|
"convert_bytes",
|
|
"convert_path",
|
|
"probe_bytes",
|
|
"probe_path",
|
|
]
|