mirror of https://github.com/poljar/matrix-nio.git
158 lines
4.9 KiB
Python
158 lines
4.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright 2018 Zil0
|
|
# Copyright © 2019 Damir Jelić <poljar@termina.org.uk>
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# This function is part of the matrix-python-sdk and is distributed
|
|
# under the APACHE 2.0 licence.
|
|
|
|
"""Matrix encryption algorithms for file uploads."""
|
|
|
|
from binascii import Error as BinAsciiError
|
|
from typing import Any, Dict, Generator, Iterable, Tuple, Union
|
|
|
|
import unpaddedbase64
|
|
from Crypto import Random
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Hash import SHA256
|
|
from Crypto.Util import Counter
|
|
|
|
from ..exceptions import EncryptionError
|
|
|
|
DataT = Union[bytes, Iterable[bytes]]
|
|
|
|
|
|
def decrypt_attachment(ciphertext: bytes, key: str, hash: str, iv: str):
|
|
"""Decrypt an encrypted attachment.
|
|
|
|
Args:
|
|
ciphertext (bytes): The data to decrypt.
|
|
key (str): AES_CTR JWK key object.
|
|
hash (str): Base64 encoded SHA-256 hash of the ciphertext.
|
|
iv (str): Base64 encoded 16 byte AES-CTR IV.
|
|
Returns:
|
|
The plaintext bytes.
|
|
Raises:
|
|
EncryptionError if the integrity check fails.
|
|
|
|
|
|
"""
|
|
expected_hash = unpaddedbase64.decode_base64(hash)
|
|
|
|
h = SHA256.new()
|
|
h.update(ciphertext)
|
|
|
|
if h.digest() != expected_hash:
|
|
raise EncryptionError("Mismatched SHA-256 digest.")
|
|
|
|
try:
|
|
byte_key: bytes = unpaddedbase64.decode_base64(key)
|
|
except (BinAsciiError, TypeError):
|
|
raise EncryptionError("Error decoding key.")
|
|
|
|
try:
|
|
byte_iv: bytes = unpaddedbase64.decode_base64(iv)
|
|
except (BinAsciiError, TypeError):
|
|
raise EncryptionError("Error decoding initial values.")
|
|
|
|
prefix: bytes = byte_iv[:8]
|
|
cnt: int = int.from_bytes(byte_iv[8:], "big")
|
|
ctr = Counter.new(64, prefix=prefix, initial_value=cnt)
|
|
|
|
try:
|
|
cipher = AES.new(byte_key, AES.MODE_CTR, counter=ctr)
|
|
except ValueError as e:
|
|
raise EncryptionError(e)
|
|
|
|
return cipher.decrypt(ciphertext)
|
|
|
|
|
|
def encrypt_attachment(plaintext: bytes) -> Tuple[bytes, Dict[str, Any]]:
|
|
"""Encrypt data in order to send it as an encrypted attachment.
|
|
|
|
Args:
|
|
data (bytes): The data to encrypt.
|
|
|
|
Returns:
|
|
A tuple with the encrypted bytes and a dict containing the info needed
|
|
to decrypt data. See ``encrypted_attachment_generator()`` for the keys.
|
|
"""
|
|
|
|
values = list(encrypted_attachment_generator(plaintext))
|
|
encrytped_bytes: bytes = b"".join(values[:-1]) # type: ignore
|
|
keys: Dict[str, Any] = values[-1] # type: ignore
|
|
return (encrytped_bytes, keys)
|
|
|
|
|
|
def encrypted_attachment_generator(
|
|
data: DataT,
|
|
) -> Generator[Union[bytes, Dict[str, Any]], None, None]:
|
|
"""Generator to encrypt data in order to send it as an encrypted
|
|
attachment.
|
|
|
|
Unlike ``encrypt_attachment()``, this function lazily encrypts and yields
|
|
data, thus it can be used to encrypt large files without fully loading them
|
|
into memory if an iterable of bytes is passed as data.
|
|
|
|
Args:
|
|
data (bytes/Iterable[bytes]): The data to encrypt.
|
|
|
|
Yields:
|
|
The encrypted bytes for each chunk of data.
|
|
The last yielded value will be a dict containing the info needed to
|
|
decrypt data. The keys are:
|
|
| key: AES-CTR JWK key object.
|
|
| iv: Base64 encoded 16 byte AES-CTR IV.
|
|
| hashes.sha256: Base64 encoded SHA-256 hash of the ciphertext.
|
|
"""
|
|
|
|
key = Random.new().read(32)
|
|
# 8 bytes IV
|
|
iv = Random.new().read(8)
|
|
# 8 bytes counter, prefixed by the IV
|
|
ctr = Counter.new(64, prefix=iv, initial_value=0)
|
|
|
|
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
|
|
sha256 = SHA256.new()
|
|
|
|
if isinstance(data, bytes):
|
|
data = [data]
|
|
|
|
for chunk in data:
|
|
encrypted_chunk = cipher.encrypt(chunk) # in executor
|
|
sha256.update(encrypted_chunk) # in executor
|
|
yield encrypted_chunk
|
|
|
|
yield _get_decryption_info_dict(key, iv, sha256)
|
|
|
|
|
|
def _get_decryption_info_dict(
|
|
key: bytes, iv: bytes, sha256: SHA256.SHA256Hash
|
|
) -> Dict[str, Any]:
|
|
json_web_key = {
|
|
"kty": "oct",
|
|
"alg": "A256CTR",
|
|
"ext": True,
|
|
"k": unpaddedbase64.encode_base64(key, urlsafe=True),
|
|
"key_ops": ["encrypt", "decrypt"],
|
|
}
|
|
|
|
return {
|
|
"v": "v2",
|
|
"key": json_web_key,
|
|
# Send IV concatenated with counter
|
|
"iv": unpaddedbase64.encode_base64(iv + b"\x00" * 8),
|
|
"hashes": {
|
|
"sha256": unpaddedbase64.encode_base64(sha256.digest()),
|
|
},
|
|
}
|