matrix-nio/tests/encryption_test.py

1946 lines
67 KiB
Python

# -*- coding: utf-8 -*-
import copy
import json
import os
import time
from datetime import datetime, timedelta
import pytest
from helpers import faker
from olm import Account, OlmMessage, OlmPreKeyMessage, OutboundGroupSession
from nio.crypto import (
DeviceStore,
GroupSessionStore,
InboundGroupSession,
Olm,
OlmDevice,
OutboundSession,
OutgoingKeyRequest,
Session,
SessionStore,
)
from nio.event_builders import RoomKeyRequestMessage
from nio.events import (
DummyEvent,
ForwardedRoomKeyEvent,
MegolmEvent,
OlmEvent,
RoomKeyEvent,
RoomKeyRequest,
RoomKeyRequestCancellation,
RoomMessageText,
ToDeviceEvent,
UnknownBadEvent,
)
from nio.exceptions import EncryptionError, GroupEncryptionError, OlmTrustError
from nio.responses import KeysClaimResponse, KeysQueryResponse, KeysUploadResponse
from nio.store import DefaultStore, Ed25519Key, Key, KeyStore
AliceId = "@alice:example.org"
Alice_device = "ALDEVICE"
BobId = "@bob:example.org"
Bob_device = "BOBDEVICE"
MaloryId = "@malory:example.org"
Malory_device = "MALORYDEVICE"
PICKLE_KEY = "DEFAULT_KEY"
TEST_ROOM = "!test_room"
ephemeral_dir = os.path.join(os.curdir, "tests/data/encryption")
def ephemeral(func):
def wrapper(*args, **kwargs):
try:
ret = func(*args, **kwargs)
finally:
os.remove(os.path.join(ephemeral_dir, "ephemeral_DEVICEID.db"))
return ret
return wrapper
@pytest.fixture
def olm_account(tempdir):
return Olm(
faker.mx_id(), faker.device_id(), DefaultStore("ephemeral", "DEVICEID", tempdir)
)
@pytest.fixture
def bob_account(tempdir):
return Olm(
faker.mx_id(), faker.device_id(), DefaultStore("ephemeral", "DEVICEID", tempdir)
)
class TestClass:
@staticmethod
def _load_response(filename):
with open(filename) as f:
return json.loads(f.read())
def _get_store(self, user_id, device_id, pickle_key=""):
return DefaultStore(user_id, device_id, ephemeral_dir, pickle_key)
@staticmethod
def olm_message_to_event(message_dict, recipient, sender):
olm_content = message_dict["messages"][recipient.user_id][recipient.device_id]
return {
"sender": sender.user_id,
"type": "m.room.encrypted",
"content": olm_content,
}
@property
def ephemeral_olm(self):
user_id = "ephemeral"
device_id = "DEVICEID"
return Olm(user_id, device_id, self._get_store(user_id, device_id))
@ephemeral
def test_new_account_creation(self):
olm = self.ephemeral_olm
assert isinstance(olm.account, Account)
def _load(self, user_id, device_id, pickle_key=""):
return Olm(user_id, device_id, self._get_store(user_id, device_id, pickle_key))
def test_account_loading(self):
olm = self._load("example", "DEVICEID", PICKLE_KEY)
assert isinstance(olm.account, Account)
assert (
olm.account.identity_keys["curve25519"]
== "Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM"
)
assert (
olm.account.identity_keys["ed25519"]
== "FEfrmWlasr4tcMtbNX/BU5lbdjmpt3ptg8ApTD8YAh4"
)
def test_fingerprint_store(self, monkeypatch):
def mocksave(self):
return
monkeypatch.setattr(KeyStore, "_save", mocksave)
store = KeyStore(os.path.join(ephemeral_dir, "ephemeral_devices"))
account = Account()
device = OlmDevice("example", "DEVICEID", account.identity_keys)
key = Key.from_olmdevice(device)
assert key not in store
assert store.add(key)
assert key in store
assert store.remove(key)
assert store.check(key) is False
def test_fingerprint_store_loading(self):
store = KeyStore(os.path.join(ephemeral_dir, "known_devices"))
key = Ed25519Key(
"example", "DEVICEID", "2MX1WOCAmE9eyywGdiMsQ4RxL2SIKVeyJXiSjVFycpA"
)
assert key in store
def test_invalid_store_entry_equality(self):
entry = Ed25519Key(
"example", "DEVICEID", "2MX1WOCAmE9eyywGdiMsQ4RxL2SIKVeyJXiSjVFycpA"
)
assert entry != 1
def test_differing_store_entries(self):
alice = Ed25519Key(
"alice", "DEVICEID", "2MX1WOCAmE9eyywGdiMsQ4RxL2SIKVeyJXiSjVFycpA"
)
bob = Ed25519Key(
"bob", "DEVICEDI", "3MX1WOCAmE9eyywGdiMsQ4RxL2SIKVeyJXiSjVFycpA"
)
assert alice != bob
def _create_session(self):
alice = Account()
bob = Account()
bob.generate_one_time_keys(1)
one_time = list(bob.one_time_keys["curve25519"].values())[0]
id_key = bob.identity_keys["curve25519"]
s = OutboundSession(alice, id_key, one_time)
return alice, bob, s
def test_session_store(self):
alice, bob, s = self._create_session()
store = SessionStore()
store.add(bob.identity_keys["curve25519"], s)
assert s in store
def test_session_store_sort(self):
alice, bob, s = self._create_session()
bob.generate_one_time_keys(1)
one_time = list(bob.one_time_keys["curve25519"].values())[0]
curve_key = bob.identity_keys["curve25519"]
s2 = OutboundSession(alice, curve_key, one_time)
store = SessionStore()
store.add(curve_key, s)
store.add(curve_key, s2)
if s.use_time > s2.use_time:
assert s == store.get(curve_key)
else:
assert s2 == store.get(curve_key)
def test_device_store(self):
alice = OlmDevice(
"example",
"DEVICEID",
{
"ed25519": "2MX1WOCAmE9eyywGdiMsQ4RxL2SIKVeyJXiSjVFycpA",
"curve25519": "3MX1WOCAmE9eyywGdiMsQ4RxL2SIKVeyJXiSjVFycpA",
},
)
store = DeviceStore()
assert store.add(alice)
assert store.add(alice) is False
assert alice in store
@ephemeral
def test_olm_outbound_session_create(self):
bob = Account()
bob.generate_one_time_keys(1)
one_time = list(bob.one_time_keys["curve25519"].values())[0]
bob_device = OlmDevice(BobId, Bob_device, bob.identity_keys)
olm = self.ephemeral_olm
olm.device_store[bob_device.user_id][bob_device.id] = bob_device
olm.create_session(one_time, bob_device.curve25519)
assert isinstance(
olm.session_store.get(bob.identity_keys["curve25519"]), OutboundSession
)
def test_olm_session_load(self):
olm = self._load("example", "DEVICEID", PICKLE_KEY)
bob_session = olm.session_store.get(
"+Qs131S/odNdWG6VJ8hiy9YZW0us24wnsDjYQbaxLk4"
)
assert bob_session
assert bob_session.id == "EeEiqT9LjCtECaN7WTqcBQ7D5Dwm4+/L9Uxr1IyPAts"
@ephemeral
def test_olm_group_session_store(self):
olm = self.ephemeral_olm
bob_account = Account()
outbound_session = OutboundGroupSession()
olm.create_group_session(
bob_account.identity_keys["curve25519"],
bob_account.identity_keys["ed25519"],
"!test_room",
outbound_session.id,
outbound_session.session_key,
)
del olm
olm = self.ephemeral_olm
bob_session = olm.inbound_group_store.get(
"!test_room", bob_account.identity_keys["curve25519"], outbound_session.id
)
assert bob_session
assert bob_session.id == outbound_session.id
@ephemeral
def test_keys_query(self):
olm = self.ephemeral_olm
parsed_dict = TestClass._load_response("tests/data/keys_query.json")
response = KeysQueryResponse.from_dict(parsed_dict)
assert isinstance(response, KeysQueryResponse)
olm.handle_response(response)
device = olm.device_store["@alice:example.org"]["JLAFKJWSCS"]
assert device.ed25519 == "nE6W2fCblxDcOFmeEtCHNl8/l8bXcu7GKyAswA4r3mM"
del olm
olm = self.ephemeral_olm
device = olm.device_store["@alice:example.org"]["JLAFKJWSCS"]
assert device.ed25519 == "nE6W2fCblxDcOFmeEtCHNl8/l8bXcu7GKyAswA4r3mM"
@ephemeral
def test_same_query_response_twice(self):
olm = self.ephemeral_olm
parsed_dict = TestClass._load_response("tests/data/keys_query.json")
response = KeysQueryResponse.from_dict(parsed_dict)
olm.handle_response(response)
assert response.changed
# TODO check out why this fails under python2 if we remove the copy()
# call.
response2 = copy.copy(response)
olm.handle_response(response)
assert response2.changed
def test_olm_inbound_session(self, monkeypatch):
def mocksave(self):
return
monkeypatch.setattr(KeyStore, "_save", mocksave)
# create three new accounts
alice = self._load(AliceId, Alice_device)
bob = self._load(BobId, Bob_device)
malory = self._load(BobId, Bob_device)
# create olm devices for each others known devices list
alice_device = OlmDevice(AliceId, Alice_device, alice.account.identity_keys)
bob_device = OlmDevice(BobId, Bob_device, bob.account.identity_keys)
malory_device = OlmDevice(MaloryId, Malory_device, malory.account.identity_keys)
# add the devices to the device list
alice.device_store.add(bob_device)
alice.device_store.add(malory_device)
bob.device_store.add(alice_device)
# bob creates one time keys
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
# Mark the keys as published
bob.account.mark_keys_as_published()
# alice creates an outbound olm session with bob
alice.create_session(one_time, bob_device.curve25519)
# alice creates an group session
alice.create_outbound_group_session("!test:example.org")
group_session = alice.outbound_group_sessions["!test:example.org"]
# alice shares the group session with bob, but bob isn't verified
with pytest.raises(OlmTrustError):
sharing_with, to_device = alice.share_group_session(
"!test:example.org", [BobId]
)
alice.verify_device(bob_device)
# alice shares the group session with bob and malory, but malory isn't
# blocked
with pytest.raises(OlmTrustError):
sharing_with, to_device = alice.share_group_session(
"!test:example.org", [BobId, MaloryId]
)
alice.blacklist_device(malory_device)
sharing_with, to_device = alice.share_group_session(
"!test:example.org", [BobId, MaloryId]
)
# check that we aren't sharing the group session with malory
with pytest.raises(KeyError):
to_device["messages"][MaloryId][malory_device.id]["ciphertext"]
ciphertext = to_device["messages"][BobId][bob_device.id]["ciphertext"]
olm_event_dict = {
"sender": AliceId,
"type": "m.room.encrypted",
"content": {
"algorithm": Olm._olm_algorithm,
"sender_key": alice_device.curve25519,
"ciphertext": ciphertext,
},
}
olm_event = OlmEvent.from_dict(olm_event_dict)
assert isinstance(olm_event, OlmEvent)
# bob decrypts the message and creates a new inbound session with alice
try:
# pdb.set_trace()
bob.decrypt_event(olm_event)
# we check that the session is there
assert bob.session_store.get(alice_device.curve25519)
# we check that the group session is there
assert bob.inbound_group_store.get(
"!test:example.org",
alice_device.curve25519,
group_session.id,
)
# Test another round of sharing, this time with an existing session
alice.create_outbound_group_session(TEST_ROOM)
group_session = alice.outbound_group_sessions[TEST_ROOM]
sharing_with, to_device = alice.share_group_session(
TEST_ROOM, [BobId, MaloryId]
)
ciphertext = to_device["messages"][BobId][bob_device.id]["ciphertext"]
olm_event_dict = {
"sender": AliceId,
"type": "m.room.encrypted",
"content": {
"algorithm": Olm._olm_algorithm,
"sender_key": alice_device.curve25519,
"ciphertext": ciphertext,
},
}
olm_event = OlmEvent.from_dict(olm_event_dict)
assert isinstance(olm_event, OlmEvent)
event = bob.decrypt_event(olm_event)
assert event
assert bob.inbound_group_store.get(
TEST_ROOM,
alice_device.curve25519,
group_session.id,
)
finally:
# remove the databases, the known devices store is handled by
# monkeypatching
os.remove(os.path.join(ephemeral_dir, f"{AliceId}_{Alice_device}.db"))
os.remove(os.path.join(ephemeral_dir, f"{BobId}_{Bob_device}.db"))
def test_group_session_sharing(self, monkeypatch):
def mocksave(self):
return
monkeypatch.setattr(KeyStore, "_save", mocksave)
# create three new accounts
alice = self._load(AliceId, Alice_device)
bob = self._load(BobId, Bob_device)
malory = self._load(BobId, Bob_device)
# create olm devices for each others known devices list
alice_device = OlmDevice(AliceId, Alice_device, alice.account.identity_keys)
bob_device = OlmDevice(BobId, Bob_device, bob.account.identity_keys)
malory_device = OlmDevice(MaloryId, Malory_device, malory.account.identity_keys)
# add the devices to the device list
alice.device_store.add(bob_device)
alice.device_store.add(malory_device)
bob.device_store.add(alice_device)
# bob creates one time keys
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
# Mark the keys as published
bob.account.mark_keys_as_published()
# alice creates an outbound olm session with bob
alice.create_session(one_time, bob_device.curve25519)
alice.verify_device(bob_device)
alice.verify_device(malory_device)
alice._maxToDeviceMessagesPerRequest = 1
sharing_with, to_device = alice.share_group_session(
"!test:example.org", [BobId, MaloryId]
)
group_session = alice.outbound_group_sessions["!test:example.org"]
assert group_session
assert len(sharing_with) == 1
assert not group_session.users_shared_with
group_session.users_shared_with.update(sharing_with)
sharing_with, to_device = alice.share_group_session(
"!test:example.org", [BobId, MaloryId]
)
assert len(sharing_with) == 1
os.remove(os.path.join(ephemeral_dir, f"{AliceId}_{Alice_device}.db"))
os.remove(os.path.join(ephemeral_dir, f"{BobId}_{Bob_device}.db"))
@ephemeral
def test_room_key_event(self):
olm = self.ephemeral_olm
session = OutboundGroupSession()
payload = {
"sender": BobId,
"sender_device": Bob_device,
"type": "m.room_key",
"content": {
"algorithm": "m.megolm.v1.aes-sha2",
"room_id": TEST_ROOM,
"session_id": session.id,
"session_key": session.session_key,
},
"keys": {},
}
bad_event = olm._handle_room_key_event(
BobId, "Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM", {}
)
assert isinstance(bad_event, UnknownBadEvent)
event = olm._handle_room_key_event(
BobId, "Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM", payload
)
assert not event
payload["keys"] = {"ed25519": "FEfrmWlasr4tcMtbNX/BU5lbdjmpt3ptg8ApTD8YAh4"}
event = olm._handle_room_key_event(
BobId, "Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM", payload
)
assert isinstance(event, RoomKeyEvent)
@ephemeral
def test_forwarded_room_key_event(self):
olm = self.ephemeral_olm
session = OutboundGroupSession()
session = InboundGroupSession(
session.session_key,
"FEfrmWlasr4tcMtbNX/BU5lbdjmpt3ptg8ApTD8YAh4",
"Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM",
TEST_ROOM,
)
payload = {
"sender": BobId,
"sender_device": Bob_device,
"type": "m.forwarded_room_key",
"content": {
"algorithm": "m.megolm.v1.aes-sha2",
"room_id": session.room_id,
"session_id": session.id,
"session_key": session.export_session(session.first_known_index),
"sender_key": session.sender_key,
"sender_claimed_ed25519_key": session.ed25519,
"forwarding_curve25519_key_chain": session.forwarding_chain,
},
"keys": {"ed25519": session.ed25519},
}
bad_event = olm._handle_room_key_event(
BobId, "Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM", {}
)
assert isinstance(bad_event, UnknownBadEvent)
event = olm._handle_forwarded_room_key_event(
BobId, "Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM", payload
)
assert not event
key_request = OutgoingKeyRequest(
session.id,
session.id,
session.room_id,
"m.megolm.v1.aes-sha2",
)
olm.outgoing_key_requests[session.id] = key_request
event = olm._handle_olm_event(
BobId, "Xjuu9d2KjHLGIHpCOCHS7hONQahapiwI1MhVmlPlCFM", payload
)
assert isinstance(event, ForwardedRoomKeyEvent)
def test_user_verification_status(self, monkeypatch):
def mocksave(self):
return
monkeypatch.setattr(KeyStore, "_save", mocksave)
# create three new accounts
alice = self._load(AliceId, Alice_device)
bob = self._load(BobId, Bob_device)
# create olm devices for each others known devices list
bob_device = OlmDevice(BobId, Bob_device, bob.account.identity_keys)
bob2_device = OlmDevice(BobId, Malory_device, bob.account.identity_keys)
alice.device_store.add(bob_device)
assert not alice.user_fully_verified(BobId)
alice.verify_device(bob_device)
assert alice.user_fully_verified(BobId)
alice.device_store.add(bob2_device)
assert not alice.user_fully_verified(BobId)
alice.verify_device(bob2_device)
assert alice.user_fully_verified(BobId)
os.remove(os.path.join(ephemeral_dir, f"{AliceId}_{Alice_device}.db"))
os.remove(os.path.join(ephemeral_dir, f"{BobId}_{Bob_device}.db"))
@ephemeral
def test_group_decryption(self):
olm = self.ephemeral_olm
olm.create_outbound_group_session(TEST_ROOM)
message = {
"type": "m.room.message",
"content": {
"msgtype": "m.text",
"body": "hello wordl",
},
}
with pytest.raises(GroupEncryptionError):
encrypted_dict = olm.group_encrypt(TEST_ROOM, message)
session = olm.outbound_group_sessions[TEST_ROOM]
session.shared = True
encrypted_dict = olm.group_encrypt(TEST_ROOM, message)
megolm = {"type": "m.room.encrypted", "content": encrypted_dict}
megolm_event = MegolmEvent.from_dict(megolm)
assert isinstance(megolm_event, UnknownBadEvent)
megolm["event_id"] = "1"
megolm["sender"] = "@ephemeral:example.org"
megolm["origin_server_ts"] = 0
megolm_event = MegolmEvent.from_dict(megolm)
assert isinstance(megolm_event, MegolmEvent)
with pytest.raises(EncryptionError):
event = olm.decrypt_megolm_event(megolm_event)
session_store = olm.inbound_group_store
olm.inbound_group_store = GroupSessionStore()
with pytest.raises(EncryptionError):
event = olm.decrypt_megolm_event(megolm_event)
olm.inbound_group_store = session_store
megolm_event.room_id = TEST_ROOM
event = olm.decrypt_event(megolm_event)
assert isinstance(event, RoomMessageText)
assert event.decrypted
@ephemeral
def test_key_sharing(self):
olm = self.ephemeral_olm
assert olm.should_upload_keys
to_share = olm.share_keys()
assert "device_keys" in to_share
assert "one_time_keys" in to_share
assert len(to_share["one_time_keys"]) == olm.account.max_one_time_keys // 2
response = KeysUploadResponse.from_dict(
{
"one_time_key_counts": {
"curve25519": 0,
"signed_curve25519": olm.account.max_one_time_keys // 2,
}
}
)
olm.handle_response(response)
assert not olm.should_upload_keys
with pytest.raises(ValueError):
to_share = olm.share_keys()
olm.uploaded_key_count -= 1
assert olm.should_upload_keys
to_share = olm.share_keys()
assert "device_keys" not in to_share
assert "one_time_keys" in to_share
assert len(to_share["one_time_keys"]) == 1
def test_outbound_session_creation(self, monkeypatch):
def mocksave(self):
return
monkeypatch.setattr(KeyStore, "_save", mocksave)
alice = self._load(AliceId, Alice_device)
bob = self._load(BobId, Bob_device)
bob_device = OlmDevice(BobId, Bob_device, bob.account.identity_keys)
assert not alice.get_missing_sessions([BobId])
alice.device_store.add(bob_device)
missing = alice.get_missing_sessions([BobId])
assert not alice.session_store.get(bob_device.curve25519)
assert BobId in missing
assert Bob_device in missing[BobId]
to_share = bob.share_keys()
one_time_key = list(to_share["one_time_keys"].items())[0]
key_claim_dict = {
"one_time_keys": {
BobId: {
Bob_device: {one_time_key[0]: one_time_key[1]},
},
},
"failures": {},
}
response = KeysClaimResponse.from_dict(key_claim_dict, TEST_ROOM)
assert isinstance(response, KeysClaimResponse)
print(response)
alice.handle_response(response)
assert not alice.get_missing_sessions([BobId])
assert alice.session_store.get(bob_device.curve25519)
os.remove(os.path.join(ephemeral_dir, f"{AliceId}_{Alice_device}.db"))
os.remove(os.path.join(ephemeral_dir, f"{BobId}_{Bob_device}.db"))
def test_group_session_sharing_new(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
bob.account.mark_keys_as_published()
alice.create_session(one_time, bob_device.curve25519)
sharing_with, to_device = alice.share_group_session(
"!test:example.org", [bob.user_id], ignore_unverified_devices=True
)
assert len(sharing_with) == 1
assert alice.outbound_group_sessions["!test:example.org"]
assert alice.is_device_ignored(bob_device)
def test_session_unwedging(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
bob.account.mark_keys_as_published()
alice.create_session(one_time, bob_device.curve25519)
# Let us pickle our session with bob here so we can later unpickle it
# and wedge our session.
alice_pickle = alice.session_store[bob_device.curve25519][0].pickle("")
# Share a initial olm encrypted message
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
outbound_session = alice.outbound_group_sessions[TEST_ROOM]
olm_message = self.olm_message_to_event(to_device, bob, alice)
# Pass the to-device event to bob and make sure we get the right events
event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(event, OlmEvent)
decrypted_event = bob.decrypt_event(event)
assert isinstance(decrypted_event, RoomKeyEvent)
# Make sure bob got the room-key
assert bob.inbound_group_store
bob_session = bob.inbound_group_store.get(
TEST_ROOM, alice_device.curve25519, outbound_session.id
)
assert bob_session.id == outbound_session.id
# Now bob shares a room-key with alice
_, to_device = bob.share_group_session(
TEST_ROOM, [alice.user_id], ignore_unverified_devices=True
)
olm_message = self.olm_message_to_event(to_device, alice, bob)
event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(event, OlmEvent)
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, RoomKeyEvent)
# Let us wedge the session now
session = alice.session_store[bob_device.curve25519][0]
alice.session_store[bob_device.curve25519][0] = Session.from_pickle(
alice_pickle, session.creation_time, "", session.use_time
)
alice.rotate_outbound_group_session(TEST_ROOM)
# Try to share a room-key now
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
# Set the creation time to be older than an hour, otherwise we will not
# be able to unwedge the session.
alice_session = bob.session_store.get(alice_device.curve25519)
alice_session.creation_time = datetime.now() - timedelta(hours=2)
olm_message = self.olm_message_to_event(to_device, bob, alice)
# Pass the to-device event to bob and make sure we get the right events
event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(event, OlmEvent)
decrypted_event = bob.decrypt_event(event)
# Make sure that decryption failed
assert decrypted_event is None
# Make sure that we have queued a m.dummy message to be sent out as a
# to-device message
assert alice_device in bob.wedged_devices
# Bob should now claim new keys from alice, we're simulating this over
# here since the olm machine doesn't know how to do requests.
to_share = alice.share_keys()
one_time_key = list(to_share["one_time_keys"].items())[0]
key_claim_dict = {
"one_time_keys": {
alice.user_id: {
alice.device_id: {one_time_key[0]: one_time_key[1]},
},
},
"failures": {},
}
response = KeysClaimResponse.from_dict(key_claim_dict, TEST_ROOM)
assert not bob.outgoing_to_device_messages
assert isinstance(response, KeysClaimResponse)
bob.handle_response(response)
# After we claimed the keys a new Olm session will be created and a
# to-device message will be prepared for alice.
assert bob.outgoing_to_device_messages
message = bob.outgoing_to_device_messages[0]
assert message.type == "m.room.encrypted"
assert message.recipient == alice.user_id
assert message.recipient_device == alice.device_id
# Forward the message to alice.
event = ToDeviceEvent.parse_event(
self.olm_message_to_event(message.as_dict(), alice, bob)
)
assert isinstance(event, OlmEvent)
# Take out our currently used session for bob.
wedged_session = alice.session_store.get(bob_device.curve25519)
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, DummyEvent)
# Check that the dummy event created a new Olm session and that it is
# the preferred one.
new_session = alice.session_store.get(bob_device.curve25519)
assert wedged_session.use_time < new_session.use_time
assert wedged_session != new_session
# Try to mark the device again to be unwedged, this should fail since
# our creation time isn't old enough.
alice._mark_device_for_unwedging(alice_device.user_id, alice_device.curve25519)
assert alice_device not in bob.wedged_devices
def test_device_renaming(self, olm_account):
parsed_dict = TestClass._load_response("tests/data/keys_query.json")
response = KeysQueryResponse.from_dict(parsed_dict)
assert isinstance(response, KeysQueryResponse)
olm_account.handle_response(response)
device = olm_account.device_store["@alice:example.org"]["JLAFKJWSCS"]
assert device.ed25519 == "nE6W2fCblxDcOFmeEtCHNl8/l8bXcu7GKyAswA4r3mM"
assert device.display_name == "Alice's mobile phone"
parsed_dict["device_keys"]["@alice:example.org"]["JLAFKJWSCS"]["unsigned"][
"device_display_name"
] = "Phoney"
response = KeysQueryResponse.from_dict(parsed_dict)
olm_account.handle_response(response)
assert device.display_name == "Phoney"
def test_replay_attack_protection(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
bob.account.mark_keys_as_published()
alice.create_session(one_time, bob_device.curve25519)
# Share a initial olm encrypted message
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
outbound_session = alice.outbound_group_sessions[TEST_ROOM]
outbound_session.shared = True
olm_message = self.olm_message_to_event(to_device, bob, alice)
# Pass the to-device event to bob and make sure we get the right events
event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(event, OlmEvent)
decrypted_event = bob.decrypt_event(event)
assert isinstance(decrypted_event, RoomKeyEvent)
message = {
"type": "m.room.message",
"content": {"msgtype": "m.text", "body": "It's a secret to everybody."},
}
encrypted_content = alice.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": alice.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
event = MegolmEvent.from_dict(encrypted_message)
decrypted_event = bob.decrypt_event(event)
assert decrypted_event.body == message["content"]["body"]
# Let us now replay the event.
encrypted_message["event_id"] = "!new_event_id"
event = MegolmEvent.from_dict(encrypted_message)
with pytest.raises(EncryptionError):
bob.decrypt_megolm_event(event)
encrypted_message["event_id"] = "!event_id"
old_time = encrypted_message["origin_server_ts"]
encrypted_message["origin_server_ts"] += 100
event = MegolmEvent.from_dict(encrypted_message)
with pytest.raises(EncryptionError):
bob.decrypt_megolm_event(event)
# Let us now check that normal messages from the room history decrypt
# again.
encrypted_message["origin_server_ts"] = old_time
event = MegolmEvent.from_dict(encrypted_message)
decrypted_event = bob.decrypt_event(event)
assert decrypted_event.body == message["content"]["body"]
def test_key_forwards(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
bob.account.mark_keys_as_published()
alice.create_session(one_time, bob_device.curve25519)
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
# Setup a working olm session by sharing a key from alice to bob
olm_message = self.olm_message_to_event(to_device, bob, alice)
event = ToDeviceEvent.parse_event(olm_message)
bob.decrypt_event(event)
# Bob shares a room session as well but alice never receives the
# session.
bob.share_group_session(
TEST_ROOM, [alice.user_id], ignore_unverified_devices=True
)
session = bob.outbound_group_sessions[TEST_ROOM]
session.shared = True
session.users_shared_with.add((alice.user_id, alice.device_id))
message = {
"type": "m.room.message",
"content": {"msgtype": "m.text", "body": "It's a secret to everybody."},
}
encrypted_content = bob.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": bob.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
event = MegolmEvent.from_dict(encrypted_message)
# Alice tries to decrypt the event but can't.
decrypted_event = alice.decrypt_event(event)
assert decrypted_event is None
key_request = event.as_key_request(
bob.user_id,
alice.device_id,
event.session_id,
)
outgoing_key_request = OutgoingKeyRequest(
event.session_id, event.session_id, TEST_ROOM, event.algorithm
)
alice.outgoing_key_requests[event.session_id] = outgoing_key_request
key_request = {
"sender": alice.user_id,
"type": "m.room_key_request",
"content": key_request.as_dict()["messages"][bob.user_id]["*"],
}
key_request_event = RoomKeyRequest.from_dict(key_request)
assert isinstance(key_request_event, RoomKeyRequest)
assert not bob.outgoing_to_device_messages
# Bob receives the event and queues it up for collection.
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
# Convert the key request event into a to-device message.
bob.collect_key_requests()
# Check that the message is now queued.
assert bob.outgoing_to_device_messages
to_device = bob.outgoing_to_device_messages[0]
# Let us now share the to-device message with Alice
olm_message = self.olm_message_to_event(to_device.as_dict(), alice, bob)
forwarded_key_event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(forwarded_key_event, OlmEvent)
# Decrypt the olm event and check that we received a forwarded room
# key.
decrypted_event = alice.handle_to_device_event(forwarded_key_event)
assert isinstance(decrypted_event, ForwardedRoomKeyEvent)
# Alice tries to decrypt the previous event again.
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, RoomMessageText)
assert decrypted_event.body == "It's a secret to everybody."
def test_key_forwards_with_ourselves(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
bob.user_id = alice.user_id
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
bob.verify_device(alice_device)
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
bob.account.mark_keys_as_published()
alice.create_session(one_time, bob_device.curve25519)
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
# Setup a working olm session by sharing a key from alice to bob
olm_message = self.olm_message_to_event(to_device, bob, alice)
event = ToDeviceEvent.parse_event(olm_message)
bob.decrypt_event(event)
# Bob shares a room session as well but alice never receives the
# session.
bob.share_group_session(
TEST_ROOM, [alice.user_id], ignore_unverified_devices=True
)
session = bob.outbound_group_sessions[TEST_ROOM]
session.shared = True
session.users_shared_with.add((alice.user_id, alice.device_id))
message = {
"type": "m.room.message",
"content": {"msgtype": "m.text", "body": "It's a secret to everybody."},
}
encrypted_content = bob.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": bob.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
event = MegolmEvent.from_dict(encrypted_message)
# Alice tries to decrypt the event but can't.
decrypted_event = alice.decrypt_event(event)
assert decrypted_event is None
key_request = event.as_key_request(
bob.user_id,
alice.device_id,
event.session_id,
)
outgoing_key_request = OutgoingKeyRequest(
event.session_id, event.session_id, TEST_ROOM, event.algorithm
)
alice.outgoing_key_requests[event.session_id] = outgoing_key_request
key_request = {
"sender": alice.user_id,
"type": "m.room_key_request",
"content": key_request.as_dict()["messages"][bob.user_id]["*"],
}
key_request_event = RoomKeyRequest.from_dict(key_request)
assert isinstance(key_request_event, RoomKeyRequest)
assert not bob.outgoing_to_device_messages
# Bob receives the event and queues it up for collection.
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
# Convert the key request event into a to-device message.
bob.collect_key_requests()
# Check that the message is now queued.
assert bob.outgoing_to_device_messages
to_device = bob.outgoing_to_device_messages[0]
# Let us now share the to-device message with Alice
olm_message = self.olm_message_to_event(to_device.as_dict(), alice, bob)
forwarded_key_event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(forwarded_key_event, OlmEvent)
# Decrypt the olm event and check that we received a forwarded room
# key.
decrypted_event = alice.handle_to_device_event(forwarded_key_event)
assert isinstance(decrypted_event, ForwardedRoomKeyEvent)
# Alice tries to decrypt the previous event again.
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, RoomMessageText)
assert decrypted_event.body == "It's a secret to everybody."
def test_key_forwards_missing_session(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
bob.user_id = alice.user_id
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
bob.verify_device(alice_device)
bob.create_outbound_group_session(TEST_ROOM)
session = bob.outbound_group_sessions[TEST_ROOM]
session.shared = True
message = {
"type": "m.room.message",
"content": {"msgtype": "m.text", "body": "It's a secret to everybody."},
}
encrypted_content = bob.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": bob.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
event = MegolmEvent.from_dict(encrypted_message)
# Alice tries to decrypt the event but can't.
decrypted_event = alice.decrypt_event(event)
assert decrypted_event is None
key_request = event.as_key_request(
bob.user_id,
alice.device_id,
event.session_id,
)
outgoing_key_request = OutgoingKeyRequest(
event.session_id, event.session_id, TEST_ROOM, event.algorithm
)
alice.outgoing_key_requests[event.session_id] = outgoing_key_request
key_request = {
"sender": alice.user_id,
"type": "m.room_key_request",
"content": key_request.as_dict()["messages"][bob.user_id]["*"],
}
key_request_event = RoomKeyRequest.from_dict(key_request)
assert isinstance(key_request_event, RoomKeyRequest)
assert not bob.outgoing_to_device_messages
# Bob receives the event and queues it up for collection.
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
# Convert the key request event into a to-device message.
bob.collect_key_requests()
# Check that the message is not queued. We are missing a Olm session.
assert not bob.outgoing_to_device_messages
assert alice_device in bob.key_request_devices_no_session
assert (
key_request_event
in bob.key_requests_waiting_for_session[
alice_device.user_id, alice_device.id
].values()
)
# Let us do a key claim request.
to_share = alice.share_keys()
one_time_key = list(to_share["one_time_keys"].items())[0]
key_claim_dict = {
"one_time_keys": {
alice.user_id: {
alice.device_id: {one_time_key[0]: one_time_key[1]},
},
},
"failures": {},
}
response = KeysClaimResponse.from_dict(key_claim_dict)
bob.handle_response(response)
# We got a session now, the device is not waiting for a session anymore
assert alice_device not in bob.key_request_devices_no_session
# The key request is neither waiting for a session anymore.
assert (
key_request_event
not in bob.key_requests_waiting_for_session[
alice_device.user_id, alice_device.id
].values()
)
# The key request is now waiting to be collected again.
assert key_request_event in bob.received_key_requests.values()
# Let us collect it now.
bob.collect_key_requests()
# We found a to-device message now.
to_device = bob.outgoing_to_device_messages[0]
# Let us now share the to-device message with Alice
olm_message = self.olm_message_to_event(to_device.as_dict(), alice, bob)
forwarded_key_event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(forwarded_key_event, OlmEvent)
# Decrypt the olm event and check that we received a forwarded room
# key.
decrypted_event = alice.handle_to_device_event(forwarded_key_event)
assert isinstance(decrypted_event, ForwardedRoomKeyEvent)
# Alice tries to decrypt the previous event again.
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, RoomMessageText)
assert decrypted_event.body == "It's a secret to everybody."
def test_key_forward_untrusted_device(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
bob.user_id = alice.user_id
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
bob.account.mark_keys_as_published()
alice.create_session(one_time, bob_device.curve25519)
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
# Setup a working olm session by sharing a key from alice to bob
olm_message = self.olm_message_to_event(to_device, bob, alice)
event = ToDeviceEvent.parse_event(olm_message)
bob.decrypt_event(event)
# Bob shares a room session as well but alice never receives the
# session.
bob.share_group_session(
TEST_ROOM, [alice.user_id], ignore_unverified_devices=True
)
session = bob.outbound_group_sessions[TEST_ROOM]
session.shared = True
session.users_shared_with.add((alice.user_id, alice.device_id))
message = {
"type": "m.room.message",
"content": {"msgtype": "m.text", "body": "It's a secret to everybody."},
}
encrypted_content = bob.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": bob.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
event = MegolmEvent.from_dict(encrypted_message)
# Alice tries to decrypt the event but can't.
decrypted_event = alice.decrypt_event(event)
assert decrypted_event is None
key_request = event.as_key_request(
bob.user_id,
alice.device_id,
event.session_id,
)
outgoing_key_request = OutgoingKeyRequest(
event.session_id, event.session_id, TEST_ROOM, event.algorithm
)
alice.outgoing_key_requests[event.session_id] = outgoing_key_request
key_request = {
"sender": alice.user_id,
"type": "m.room_key_request",
"content": key_request.as_dict()["messages"][bob.user_id]["*"],
}
key_request_event = RoomKeyRequest.from_dict(key_request)
assert isinstance(key_request_event, RoomKeyRequest)
assert not bob.outgoing_to_device_messages
# Bob receives the event and queues it up for collection.
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
# Convert the key request event into a to-device message.
collected_requests = bob.collect_key_requests()
# The message could not be queued because the device is not trusted
assert not bob.outgoing_to_device_messages
assert key_request_event in bob.key_request_from_untrusted.values()
assert key_request_event in collected_requests
# Let us try to continue the key share without verifying the device.
assert not bob.continue_key_share(key_request_event)
# Let us now verify the device and tell our Olm machine that we should
# resume.
bob.verify_device(alice_device)
assert bob.continue_key_share(key_request_event)
assert key_request_event not in bob.key_request_from_untrusted.values()
# There is now a key queued up to be sent as a to-device message.
assert bob.outgoing_to_device_messages
to_device = bob.outgoing_to_device_messages[0]
# Let us now share the to-device message with Alice
olm_message = self.olm_message_to_event(to_device.as_dict(), alice, bob)
forwarded_key_event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(forwarded_key_event, OlmEvent)
# Decrypt the olm event and check that we received a forwarded room
# key.
decrypted_event = alice.handle_to_device_event(forwarded_key_event)
assert isinstance(decrypted_event, ForwardedRoomKeyEvent)
# Alice tries to decrypt the previous event again.
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, RoomMessageText)
assert decrypted_event.body == "It's a secret to everybody."
def test_key_forward_cancelling(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
bob.user_id = alice.user_id
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
# bob.verify_device(alice_device)
bob.create_outbound_group_session(TEST_ROOM)
session = bob.outbound_group_sessions[TEST_ROOM]
session.shared = True
message = {
"type": "m.room.message",
"content": {"msgtype": "m.text", "body": "It's a secret to everybody."},
}
encrypted_content = bob.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": bob.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
event = MegolmEvent.from_dict(encrypted_message)
# Alice tries to decrypt the event but can't.
decrypted_event = alice.decrypt_event(event)
assert decrypted_event is None
key_request = event.as_key_request(
bob.user_id,
alice.device_id,
event.session_id,
)
outgoing_key_request = OutgoingKeyRequest(
event.session_id, event.session_id, TEST_ROOM, event.algorithm
)
alice.outgoing_key_requests[event.session_id] = outgoing_key_request
key_request = {
"sender": alice.user_id,
"type": "m.room_key_request",
"content": key_request.as_dict()["messages"][bob.user_id]["*"],
}
key_request_event = RoomKeyRequest.from_dict(key_request)
assert isinstance(key_request_event, RoomKeyRequest)
assert not bob.outgoing_to_device_messages
cancellation = RoomKeyRequestCancellation(
{},
key_request_event.sender,
key_request_event.requesting_device_id,
key_request_event.request_id,
)
# Bob receives the event and queues it up for collection.
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
# Cancel the request immediatelly.
bob.handle_to_device_event(cancellation)
assert key_request_event not in bob.received_key_requests.values()
# Bob receives the event again
bob.handle_to_device_event(key_request_event)
# This time we collect the event.
assert cancellation not in bob.collect_key_requests()
# Check that the message is not queued. We are missing a Olm session.
assert not bob.outgoing_to_device_messages
assert alice_device in bob.key_request_devices_no_session
assert (
key_request_event
in bob.key_requests_waiting_for_session[
alice_device.user_id, alice_device.id
].values()
)
# We cancel again.
bob.handle_to_device_event(cancellation)
assert cancellation not in bob.collect_key_requests()
assert alice_device not in bob.key_request_devices_no_session
assert (
key_request_event
not in bob.key_requests_waiting_for_session[
alice_device.user_id, alice_device.id
].values()
)
# Let us do another round
bob.handle_to_device_event(key_request_event)
bob.collect_key_requests()
# Let us do a key claim request.
to_share = alice.share_keys()
one_time_key = list(to_share["one_time_keys"].items())[0]
key_claim_dict = {
"one_time_keys": {
alice.user_id: {
alice.device_id: {one_time_key[0]: one_time_key[1]},
},
},
"failures": {},
}
response = KeysClaimResponse.from_dict(key_claim_dict)
bob.handle_response(response)
# We got a session now, the device is not waiting for a session anymore
assert alice_device not in bob.key_request_devices_no_session
# The key request is neither waiting for a session anymore.
assert (
key_request_event
not in bob.key_requests_waiting_for_session[
alice_device.user_id, alice_device.id
].values()
)
# The key request is now waiting to be collected again.
assert key_request_event in bob.received_key_requests.values()
# Let us collect it now.
bob.collect_key_requests()
# Still no, device isn't verified.
assert not bob.outgoing_to_device_messages
assert key_request_event in bob.key_request_from_untrusted.values()
# Cancel again, now we're going to get the cancellation event in the
# collect output
bob.handle_to_device_event(cancellation)
assert cancellation in bob.collect_key_requests()
# Let us finally check out if bob can also reject the sharing of the
# key.
bob.handle_to_device_event(key_request_event)
event_for_user = bob.collect_key_requests()[0]
assert not bob.outgoing_to_device_messages
assert key_request_event in bob.key_request_from_untrusted.values()
assert bob.cancel_key_share(event_for_user)
assert key_request_event not in bob.key_request_from_untrusted.values()
def test_invalid_key_requests(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
# bob.verify_device(alice_device)
bob.create_outbound_group_session(TEST_ROOM)
session = bob.outbound_group_sessions[TEST_ROOM]
session.shared = True
message = {
"type": "m.room.message",
"content": {"msgtype": "m.text", "body": "It's a secret to everybody."},
}
encrypted_content = bob.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": bob.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
event = MegolmEvent.from_dict(encrypted_message)
# Alice tries to decrypt the event but can't.
decrypted_event = alice.decrypt_event(event)
assert decrypted_event is None
key_request = event.as_key_request(
bob.user_id,
alice.device_id,
event.session_id,
)
outgoing_key_request = OutgoingKeyRequest(
event.session_id, event.session_id, TEST_ROOM, event.algorithm
)
alice.outgoing_key_requests[event.session_id] = outgoing_key_request
key_request = {
"sender": alice.user_id,
"type": "m.room_key_request",
"content": key_request.as_dict()["messages"][bob.user_id]["*"],
}
key_request_event = RoomKeyRequest.from_dict(key_request)
assert isinstance(key_request_event, RoomKeyRequest)
assert not bob.outgoing_to_device_messages
key_request_event.session_id = "fake_id"
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
assert not bob.outgoing_to_device_messages
bob.collect_key_requests()
assert not bob.outgoing_to_device_messages
key_request_event.session_id = session.id
key_request_event.requesting_device_id = "FAKE_ID"
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
assert not bob.outgoing_to_device_messages
bob.collect_key_requests()
assert not bob.outgoing_to_device_messages
alice_device.deleted = True
key_request_event.requesting_device_id = alice.device_id
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
assert not bob.outgoing_to_device_messages
bob.collect_key_requests()
assert not bob.outgoing_to_device_messages
bob.user_id = alice.user_id
key_request_event.session_id = "fake_id"
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
assert not bob.outgoing_to_device_messages
bob.collect_key_requests()
assert not bob.outgoing_to_device_messages
key_request_event.session_id = session.id
key_request_event.requesting_device_id = "FAKE_ID"
bob.handle_to_device_event(key_request_event)
assert key_request_event in bob.received_key_requests.values()
assert not bob.outgoing_to_device_messages
bob.collect_key_requests()
assert not bob.outgoing_to_device_messages
def test_key_re_request(self, olm_account, bob_account):
alice = olm_account
bob = bob_account
alice_device = OlmDevice(
alice.user_id, alice.device_id, alice.account.identity_keys
)
bob_device = OlmDevice(bob.user_id, bob.device_id, bob.account.identity_keys)
alice.device_store.add(bob_device)
bob.device_store.add(alice_device)
alice.verify_device(bob_device)
bob.account.generate_one_time_keys(1)
one_time = list(bob.account.one_time_keys["curve25519"].values())[0]
bob.account.mark_keys_as_published()
alice.create_session(one_time, bob_device.curve25519)
# Let us pickle our session with bob here so we can later unpickle it
# and wedge our session.
alice_pickle = alice.session_store[bob_device.curve25519][0].pickle("")
# Share a initial olm encrypted message
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
outbound_session = alice.outbound_group_sessions[TEST_ROOM]
olm_message = self.olm_message_to_event(to_device, bob, alice)
# Pass the to-device event to bob and make sure we get the right events
event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(event, OlmEvent)
decrypted_event = bob.decrypt_event(event)
assert isinstance(decrypted_event, RoomKeyEvent)
# Make sure bob got the room-key
assert bob.inbound_group_store
bob_session = bob.inbound_group_store.get(
TEST_ROOM, alice_device.curve25519, outbound_session.id
)
assert bob_session.id == outbound_session.id
# Now bob shares a room-key with alice
_, to_device = bob.share_group_session(
TEST_ROOM, [alice.user_id], ignore_unverified_devices=True
)
olm_message = self.olm_message_to_event(to_device, alice, bob)
event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(event, OlmEvent)
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, RoomKeyEvent)
# Let us wedge the session now
session = alice.session_store[bob_device.curve25519][0]
alice.session_store[bob_device.curve25519][0] = Session.from_pickle(
alice_pickle, session.creation_time, "", session.use_time
)
alice.rotate_outbound_group_session(TEST_ROOM)
# Try to share a room-key now
_, to_device = alice.share_group_session(
TEST_ROOM, [bob.user_id], ignore_unverified_devices=True
)
# Set the creation time to be older than an hour, otherwise we will not
# be able to unwedge the session.
alice_session = bob.session_store.get(alice_device.curve25519)
alice_session.creation_time = datetime.now() - timedelta(hours=2)
olm_message = self.olm_message_to_event(to_device, bob, alice)
# Pass the to-device event to bob and make sure we get the right events
event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(event, OlmEvent)
decrypted_event = bob.decrypt_event(event)
# Make sure that decryption failed
assert decrypted_event is None
# Make sure that we have queued a m.dummy message to be sent out as a
# to-device message
assert alice_device in bob.wedged_devices
# Alice now encrypts a room message for bob.
# Mark the session as shared.
session = alice.outbound_group_sessions[TEST_ROOM]
session.shared = True
session.users_shared_with.add((bob.user_id, bob.device_id))
message = {
"type": "m.room.message",
"content": {
"msgtype": "m.text",
"body": "hello wordl",
},
}
encrypted_content = alice.group_encrypt(TEST_ROOM, message)
encrypted_message = {
"event_id": "!event_id",
"type": "m.room.encrypted",
"sender": alice.user_id,
"origin_server_ts": int(time.time()),
"content": encrypted_content,
"room_id": TEST_ROOM,
}
megolm_event = MegolmEvent.from_dict(encrypted_message)
# Bob tries to decrypt the event, but no key is found. Since the Olm
# session was marked as wedged, this event should get cached until we
# send out the dummy event.
with pytest.raises(EncryptionError):
bob.decrypt_megolm_event(megolm_event)
# Let's check if bob had cached the event for a key re-request.
assert (
megolm_event
in bob.key_re_requests_events[(megolm_event.sender, megolm_event.device_id)]
)
# Bob should now claim new keys from alice, we're simulating this over
# here since the olm machine doesn't know how to do requests.
to_share = alice.share_keys()
one_time_key = list(to_share["one_time_keys"].items())[0]
key_claim_dict = {
"one_time_keys": {
alice.user_id: {
alice.device_id: {one_time_key[0]: one_time_key[1]},
},
},
"failures": {},
}
response = KeysClaimResponse.from_dict(key_claim_dict, TEST_ROOM)
assert not bob.outgoing_to_device_messages
assert isinstance(response, KeysClaimResponse)
bob.handle_response(response)
# After we claimed the keys a new Olm session will be created and a
# to-device message will be prepared for alice.
assert bob.outgoing_to_device_messages
message = bob.outgoing_to_device_messages[0]
assert message.type == "m.room.encrypted"
assert message.recipient == alice.user_id
assert message.recipient_device == alice.device_id
# Forward the message to alice.
event = ToDeviceEvent.parse_event(
self.olm_message_to_event(message.as_dict(), alice, bob)
)
# Alice receives the message and successfully decrypts it with the new
# session.
assert isinstance(event, OlmEvent)
decrypted_event = alice.decrypt_event(event)
assert isinstance(decrypted_event, DummyEvent)
# Bob marks the message as sent, we check that we have a new to-device
# message queued. It's the key re-request for the previous megolm event
# that failed to decrypt.
bob._mark_to_device_message_as_sent(message)
assert bob.outgoing_to_device_messages
message = bob.outgoing_to_device_messages[0]
# Bob sends out the key request.
assert isinstance(message, RoomKeyRequestMessage)
bob._mark_to_device_message_as_sent(message)
assert not bob.outgoing_to_device_messages
key_request = {
"sender": bob.user_id,
"type": "m.room_key_request",
"content": message.as_dict()["messages"][alice.user_id][alice.device_id],
}
key_request_event = RoomKeyRequest.from_dict(key_request)
assert isinstance(key_request_event, RoomKeyRequest)
# Alice receives the key request.
assert not alice.outgoing_to_device_messages
alice.handle_to_device_event(key_request_event)
assert key_request_event in alice.received_key_requests.values()
# Convert the key request event into a to-device message.
alice.collect_key_requests()
assert alice.outgoing_to_device_messages
message = alice.outgoing_to_device_messages[0]
olm_message = self.olm_message_to_event(message.as_dict(), bob, alice)
forwarded_key_event = ToDeviceEvent.parse_event(olm_message)
assert isinstance(forwarded_key_event, OlmEvent)
bob.handle_to_device_event(forwarded_key_event)
event = bob.decrypt_megolm_event(megolm_event)
assert isinstance(event, RoomMessageText)