mirror of https://github.com/poljar/matrix-nio.git
1309 lines
41 KiB
Python
1309 lines
41 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
import random
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
from helpers import FrameFactory, ephemeral, ephemeral_dir, faker
|
|
|
|
from nio import (
|
|
Client,
|
|
ClientConfig,
|
|
DeviceList,
|
|
DeviceOneTimeKeyCount,
|
|
DownloadResponse,
|
|
EncryptionError,
|
|
FullyReadEvent,
|
|
HttpClient,
|
|
InviteInfo,
|
|
InviteMemberEvent,
|
|
JoinedMembersResponse,
|
|
KeysQueryResponse,
|
|
KeysUploadResponse,
|
|
LocalProtocolError,
|
|
LoginResponse,
|
|
LogoutResponse,
|
|
MegolmEvent,
|
|
PresenceEvent,
|
|
ProfileGetAvatarResponse,
|
|
ProfileGetDisplayNameResponse,
|
|
ProfileGetResponse,
|
|
ProfileSetAvatarResponse,
|
|
ProfileSetDisplayNameResponse,
|
|
PushRulesEvent,
|
|
Receipt,
|
|
ReceiptEvent,
|
|
RoomCreateResponse,
|
|
RoomEncryptionEvent,
|
|
RoomForgetResponse,
|
|
RoomInfo,
|
|
RoomKeyRequestResponse,
|
|
RoomMember,
|
|
RoomMemberEvent,
|
|
RoomRedactResponse,
|
|
Rooms,
|
|
RoomSummary,
|
|
RoomTypingResponse,
|
|
ShareGroupSessionResponse,
|
|
SyncResponse,
|
|
TagEvent,
|
|
ThumbnailResponse,
|
|
Timeline,
|
|
TransportType,
|
|
TypingNoticeEvent,
|
|
)
|
|
from nio.event_builders import ToDeviceMessage
|
|
|
|
HOST = "example.org"
|
|
USER = "example"
|
|
DEVICE_ID = "DEVICEID"
|
|
|
|
BOB_ID = "@bob:example.org"
|
|
TEST_ROOM_ID = "!testroom:example.org"
|
|
TEST_EVENT_ID = "$15163622445EBvZJ:localhost"
|
|
|
|
ALICE_ID = "@alice:example.org"
|
|
ALICE_DEVICE_ID = "JLAFKJWSCS"
|
|
|
|
CAROL_ID = "@carol:example.org"
|
|
|
|
|
|
@pytest.fixture
|
|
def synced_client(tempdir):
|
|
http_client = HttpClient("example.org", "ephemeral", "DEVICEID", tempdir)
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
http_client.login("1234")
|
|
http_client.receive(TestClass().login_byte_response)
|
|
response = http_client.next_response()
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
http_client.sync()
|
|
http_client.receive(TestClass().sync_byte_response)
|
|
response = http_client.next_response()
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
return http_client
|
|
|
|
|
|
class TestClass:
|
|
example_response_headers = [(":status", "200"), ("server", "fake-serv/0.1.0")]
|
|
|
|
@property
|
|
def login_response(self):
|
|
return LoginResponse("@ephemeral:example.org", "DEVICEID", "abc123")
|
|
|
|
@property
|
|
def logout_response(self):
|
|
return LogoutResponse()
|
|
|
|
@staticmethod
|
|
def _load_response(filename):
|
|
with open(filename) as f:
|
|
return json.loads(f.read())
|
|
|
|
@staticmethod
|
|
def _load_byte_response(filename):
|
|
with open(filename, "rb") as f:
|
|
return f.read()
|
|
|
|
@property
|
|
def login_byte_response(self):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=1
|
|
)
|
|
|
|
login_body = json.dumps(
|
|
{
|
|
"user_id": "@ephemeral:example.org",
|
|
"access_token": "ABCD",
|
|
"device_id": "DEVICEID",
|
|
}
|
|
).encode("utf-8")
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=login_body, stream_id=1, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
@property
|
|
def sync_byte_response(self):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=3
|
|
)
|
|
|
|
body = self._load_byte_response("tests/data/sync.json")
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body, stream_id=3, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
def file_byte_response(self, stream_id=5, header_filename=""):
|
|
frame_factory = FrameFactory()
|
|
|
|
headers = self.example_response_headers + [("content-type", "image/png")]
|
|
|
|
if header_filename:
|
|
headers.append(
|
|
(
|
|
"content-disposition",
|
|
f'inline; filename="{header_filename}"',
|
|
),
|
|
)
|
|
|
|
f = frame_factory.build_headers_frame(headers=headers, stream_id=stream_id)
|
|
|
|
body = self._load_byte_response("tests/data/file_response")
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body, stream_id=stream_id, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
def empty_response(self, stream_id=5):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=stream_id
|
|
)
|
|
|
|
body = b"{}"
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body, stream_id=stream_id, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
def room_id_response(self, stream_id=5, room_id=TEST_ROOM_ID):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=stream_id
|
|
)
|
|
|
|
body = json.dumps({"room_id": room_id}).encode()
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body, stream_id=stream_id, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
def event_id_response(self, stream_id=5, event_id=TEST_EVENT_ID):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=stream_id
|
|
)
|
|
|
|
body = json.dumps({"event_id": event_id}).encode()
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body,
|
|
stream_id=stream_id,
|
|
flags=["END_STREAM"],
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
def get_displayname_byte_response(self, displayname, stream_id=5):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=stream_id
|
|
)
|
|
|
|
body = json.dumps({"displayname": displayname}).encode("utf-8")
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body, stream_id=stream_id, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
def get_avatar_byte_response(self, avatar_url, stream_id=5):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=stream_id
|
|
)
|
|
|
|
body = json.dumps({"avatar_url": avatar_url}).encode("utf-8")
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body, stream_id=stream_id, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
def get_profile_byte_response(self, displayname, avatar_url, stream_id=5):
|
|
frame_factory = FrameFactory()
|
|
|
|
f = frame_factory.build_headers_frame(
|
|
headers=self.example_response_headers, stream_id=stream_id
|
|
)
|
|
|
|
body = json.dumps(
|
|
{"displayname": displayname, "avatar_url": avatar_url}
|
|
).encode("utf-8")
|
|
|
|
data = frame_factory.build_data_frame(
|
|
data=body, stream_id=stream_id, flags=["END_STREAM"]
|
|
)
|
|
|
|
return f.serialize() + data.serialize()
|
|
|
|
@property
|
|
def sync_response(self):
|
|
timeline = Timeline(
|
|
[
|
|
RoomMemberEvent(
|
|
{
|
|
"event_id": "event_id_1",
|
|
"sender": ALICE_ID,
|
|
"origin_server_ts": 1516809890615,
|
|
},
|
|
ALICE_ID,
|
|
"join",
|
|
None,
|
|
{"membership": "join"},
|
|
),
|
|
RoomMemberEvent(
|
|
{
|
|
"event_id": "event_id_2",
|
|
"sender": ALICE_ID,
|
|
"origin_server_ts": 1516809890615,
|
|
},
|
|
CAROL_ID,
|
|
"invite",
|
|
None,
|
|
{"membership": "invite"},
|
|
),
|
|
RoomEncryptionEvent(
|
|
{
|
|
"event_id": "event_id_3",
|
|
"sender": ALICE_ID,
|
|
"origin_server_ts": 1516809890615,
|
|
}
|
|
),
|
|
],
|
|
False,
|
|
"prev_batch_token",
|
|
)
|
|
test_room_info = RoomInfo(
|
|
timeline=timeline,
|
|
state=[],
|
|
ephemeral=[
|
|
TypingNoticeEvent([ALICE_ID]),
|
|
ReceiptEvent(
|
|
[
|
|
Receipt(
|
|
event_id="event_id_3",
|
|
receipt_type="m.read",
|
|
user_id=ALICE_ID,
|
|
timestamp=1516809890615,
|
|
)
|
|
]
|
|
),
|
|
],
|
|
account_data=[
|
|
FullyReadEvent(event_id="event_id_2"),
|
|
TagEvent(tags={"u.test": {"order": 1}}),
|
|
],
|
|
summary=RoomSummary(
|
|
invited_member_count=1,
|
|
joined_member_count=2,
|
|
),
|
|
)
|
|
rooms = Rooms(invite={}, join={TEST_ROOM_ID: test_room_info}, leave={})
|
|
return SyncResponse(
|
|
next_batch="token123",
|
|
rooms=rooms,
|
|
device_key_count=DeviceOneTimeKeyCount(49, 50),
|
|
device_list=DeviceList([ALICE_ID], []),
|
|
to_device_events=[
|
|
RoomEncryptionEvent(
|
|
{
|
|
"event_id": "event_id_2",
|
|
"sender": ALICE_ID,
|
|
"origin_server_ts": 1516809890615,
|
|
}
|
|
)
|
|
],
|
|
presence_events=[
|
|
PresenceEvent(ALICE_ID, "online", 1337, True, "I am here.")
|
|
],
|
|
account_data_events=[
|
|
PushRulesEvent(),
|
|
],
|
|
)
|
|
|
|
@property
|
|
def sync_invite_response(self):
|
|
state = [
|
|
InviteMemberEvent(
|
|
{},
|
|
"@BOB:example.org",
|
|
ALICE_ID,
|
|
"invite",
|
|
None,
|
|
{
|
|
"membership": "invite",
|
|
"display_name": None,
|
|
},
|
|
)
|
|
]
|
|
|
|
test_room_info = InviteInfo(state)
|
|
rooms = Rooms({TEST_ROOM_ID: test_room_info}, {}, {})
|
|
return SyncResponse(
|
|
"token123",
|
|
rooms,
|
|
DeviceOneTimeKeyCount(49, 50),
|
|
DeviceList([ALICE_ID], []),
|
|
[],
|
|
[],
|
|
)
|
|
|
|
@property
|
|
def downgrade_sync(self):
|
|
timeline = Timeline(
|
|
[
|
|
RoomMemberEvent(
|
|
{
|
|
"event_id": "event_id_1",
|
|
"sender": ALICE_ID,
|
|
"origin_server_ts": 1516809890615,
|
|
},
|
|
ALICE_ID,
|
|
"join",
|
|
None,
|
|
{"membership": "join"},
|
|
),
|
|
],
|
|
False,
|
|
"prev_batch_token",
|
|
)
|
|
test_room_info = RoomInfo(timeline, [], [], [], RoomSummary(1, 2, []))
|
|
rooms = Rooms({}, {TEST_ROOM_ID: test_room_info}, {})
|
|
return SyncResponse(
|
|
"token123",
|
|
rooms,
|
|
DeviceOneTimeKeyCount(49, 50),
|
|
DeviceList([ALICE_ID], []),
|
|
[],
|
|
[],
|
|
)
|
|
|
|
@property
|
|
def second_sync(self):
|
|
timeline = Timeline(
|
|
[
|
|
RoomMemberEvent(
|
|
{
|
|
"event_id": "event_id_1",
|
|
"sender": ALICE_ID,
|
|
"origin_server_ts": 1516809890615,
|
|
},
|
|
ALICE_ID,
|
|
"join",
|
|
None,
|
|
{"membership": "join"},
|
|
),
|
|
RoomEncryptionEvent(
|
|
{
|
|
"event_id": "event_id_2",
|
|
"sender": ALICE_ID,
|
|
"origin_server_ts": 1516809890615,
|
|
}
|
|
),
|
|
],
|
|
True,
|
|
"prev_batch_token",
|
|
)
|
|
test_room_info = RoomInfo(timeline, [], [], [], RoomSummary(1, 2, []))
|
|
rooms = Rooms({}, {TEST_ROOM_ID: test_room_info}, {})
|
|
return SyncResponse(
|
|
"token123", rooms, DeviceOneTimeKeyCount(49, 50), DeviceList([], []), [], []
|
|
)
|
|
|
|
@property
|
|
def keys_query_response(self):
|
|
parsed_dict = TestClass._load_response("tests/data/keys_query.json")
|
|
return KeysQueryResponse.from_dict(parsed_dict)
|
|
|
|
@property
|
|
def joined_members(self):
|
|
return JoinedMembersResponse(
|
|
[
|
|
RoomMember(BOB_ID, None, None), # joined
|
|
RoomMember(ALICE_ID, None, None), # joined
|
|
RoomMember(CAROL_ID, None, None), # invited
|
|
],
|
|
TEST_ROOM_ID,
|
|
)
|
|
|
|
def test_client_protocol_error(self):
|
|
client = Client(USER, DEVICE_ID)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.olm_account_shared
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.blacklist_device(faker.olm_device())
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.unblacklist_device(faker.olm_device())
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.verify_device(faker.olm_device())
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.unverify_device(faker.olm_device())
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.decrypt_event(None)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.decrypt_event(None)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.device_store
|
|
|
|
client = HttpClient(HOST, USER, DEVICE_ID)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.share_group_session(None)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.keys_claim(None)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.keys_query(None)
|
|
|
|
def test_client_create(self, client):
|
|
assert isinstance(client, Client)
|
|
assert not client.store
|
|
|
|
def test_client_invalid_response(self, client):
|
|
with pytest.raises(ValueError):
|
|
client.receive_response(None)
|
|
|
|
def test_client_login(self, client):
|
|
assert not client.access_token
|
|
assert not client.store
|
|
assert not client.olm
|
|
|
|
client.receive_response(self.login_response)
|
|
|
|
assert client.access_token
|
|
assert client.store
|
|
assert client.olm
|
|
|
|
def test_client_restore_login(self, tempdir):
|
|
client = Client(BOB_ID, store_path=tempdir)
|
|
assert not client.user_id
|
|
assert not client.device_id
|
|
assert not client.access_token
|
|
assert not client.store
|
|
assert not client.olm
|
|
|
|
client.restore_login(BOB_ID, DEVICE_ID, "ABCD")
|
|
|
|
assert client.user_id
|
|
assert client.device_id
|
|
assert client.access_token
|
|
assert client.store
|
|
assert client.olm
|
|
|
|
def test_client_logout(self, client):
|
|
client.receive_response(self.login_response)
|
|
assert client.access_token
|
|
|
|
client.receive_response(self.logout_response)
|
|
|
|
assert client.access_token == ""
|
|
|
|
def test_client_account_sharing(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
with pytest.raises(ValueError):
|
|
client.decrypt_event(None)
|
|
|
|
assert not client.olm_account_shared
|
|
assert client.should_upload_keys
|
|
assert client.device_store
|
|
|
|
client.receive_response(KeysUploadResponse(49, 49))
|
|
assert client.should_upload_keys
|
|
client.receive_response(KeysUploadResponse(50, 50))
|
|
assert not client.should_upload_keys
|
|
|
|
def test_client_room_creation(self, client):
|
|
client.receive_response(self.login_response)
|
|
client.receive_response(KeysUploadResponse(50, 50))
|
|
|
|
assert not client.should_query_keys
|
|
client.receive_response(self.sync_response)
|
|
|
|
assert client.rooms[TEST_ROOM_ID]
|
|
room = client.rooms[TEST_ROOM_ID]
|
|
|
|
assert room.encrypted
|
|
assert client.should_query_keys
|
|
|
|
def test_device_store(self, tempdir):
|
|
client = Client("ephemeral", "DEVICEID", tempdir)
|
|
client.receive_response(self.login_response)
|
|
client.receive_response(KeysUploadResponse(50, 50))
|
|
|
|
assert not client.should_query_keys
|
|
|
|
client.receive_response(self.sync_response)
|
|
client.receive_response(self.keys_query_response)
|
|
|
|
assert list(client.device_store.users) == [ALICE_ID, CAROL_ID]
|
|
alice_device = client.device_store[ALICE_ID][ALICE_DEVICE_ID]
|
|
assert alice_device
|
|
|
|
client = Client("ephemeral", "DEVICEID", tempdir)
|
|
client.receive_response(self.login_response)
|
|
assert list(client.device_store.users) == [ALICE_ID]
|
|
alice_device = client.device_store[ALICE_ID][ALICE_DEVICE_ID]
|
|
assert alice_device
|
|
|
|
def test_client_key_query(self, client):
|
|
assert not client.should_query_keys
|
|
|
|
client.receive_response(self.login_response)
|
|
client.receive_response(KeysUploadResponse(50, 50))
|
|
|
|
assert not client.should_query_keys
|
|
client.receive_response(self.sync_response)
|
|
|
|
assert not client.device_store.users
|
|
|
|
assert client.rooms[TEST_ROOM_ID]
|
|
room = client.rooms[TEST_ROOM_ID]
|
|
|
|
assert room.encrypted
|
|
assert room.summary
|
|
assert len(room.users) == 2
|
|
assert room.invited_count == 1
|
|
assert room.joined_count == 2
|
|
assert room.member_count == 3
|
|
assert room.summary.invited_member_count == 1
|
|
assert room.summary.joined_member_count == 2
|
|
assert client.should_query_keys
|
|
assert not client.device_store.users
|
|
|
|
client.receive_response(self.keys_query_response)
|
|
|
|
assert not client.should_query_keys
|
|
assert client.device_store.users
|
|
|
|
assert not room.members_synced
|
|
|
|
client.receive_response(self.joined_members)
|
|
|
|
assert room.members_synced
|
|
assert client.should_query_keys
|
|
|
|
assert client.users_for_key_query == {BOB_ID}
|
|
|
|
@ephemeral
|
|
def test_query_rule(self):
|
|
client = Client("ephemeral", "DEVICEID", ephemeral_dir)
|
|
client.receive_response(self.login_response)
|
|
assert client.store is not None
|
|
client.receive_response(KeysUploadResponse(50, 50))
|
|
assert not client.should_query_keys
|
|
|
|
client.receive_response(self.sync_response)
|
|
assert client.should_query_keys
|
|
client.receive_response(self.keys_query_response)
|
|
assert client.olm.tracked_users == {ALICE_ID, CAROL_ID}
|
|
assert list(client.device_store.users) == [ALICE_ID, CAROL_ID]
|
|
assert not client.should_query_keys
|
|
|
|
del client
|
|
|
|
client = Client("ephemeral", "DEVICEID", ephemeral_dir)
|
|
client.receive_response(self.login_response)
|
|
assert not client.should_upload_keys
|
|
assert not client.should_query_keys
|
|
|
|
assert list(client.device_store.users) == [ALICE_ID]
|
|
assert client.device_store.active_user_devices(ALICE_ID)
|
|
|
|
alice_device = client.device_store[ALICE_ID][ALICE_DEVICE_ID]
|
|
assert alice_device
|
|
|
|
client.receive_response(self.second_sync)
|
|
assert client.should_query_keys
|
|
|
|
client.users_for_key_query == {ALICE_ID}
|
|
|
|
client.receive_response(self.joined_members)
|
|
|
|
client.users_for_key_query == {ALICE_ID, BOB_ID}
|
|
|
|
client.receive_response(self.keys_query_response)
|
|
assert client.olm.tracked_users == {ALICE_ID, CAROL_ID}
|
|
assert client.users_for_key_query == {BOB_ID}
|
|
assert client.should_query_keys
|
|
|
|
@ephemeral
|
|
def test_early_store_loading(self):
|
|
client = Client("ephemeral")
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.load_store()
|
|
|
|
client = Client("ephemeral", store_path=ephemeral_dir)
|
|
client.user_id = "@ephemeral:example.org"
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.load_store()
|
|
|
|
client.user_id = None
|
|
client.device_id = "DEVICEID"
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client.load_store()
|
|
|
|
client.receive_response(self.login_response)
|
|
|
|
del client
|
|
client = Client("ephemeral", "DEVICEID", ephemeral_dir)
|
|
client.user_id = "@ephemeral:example.org"
|
|
|
|
assert not client.store
|
|
assert not client.olm
|
|
|
|
client.load_store()
|
|
assert client.store
|
|
assert client.olm
|
|
|
|
def test_marking_sessions_as_shared(self, client):
|
|
client.receive_response(self.login_response)
|
|
client.receive_response(self.sync_response)
|
|
client.receive_response(self.joined_members)
|
|
client.receive_response(self.keys_query_response)
|
|
|
|
room = client.rooms[TEST_ROOM_ID]
|
|
|
|
assert room.encrypted
|
|
assert len(room.users) == 3
|
|
assert ALICE_ID in client.device_store.users
|
|
assert BOB_ID not in client.device_store.users
|
|
|
|
with pytest.raises(EncryptionError):
|
|
client.olm.share_group_session(TEST_ROOM_ID, room.users)
|
|
|
|
shared_with, to_device = client.olm.share_group_session(
|
|
TEST_ROOM_ID, room.users, True
|
|
)
|
|
|
|
session = client.olm.outbound_group_sessions[TEST_ROOM_ID]
|
|
assert (ALICE_ID, ALICE_DEVICE_ID) in session.users_ignored
|
|
|
|
response = ShareGroupSessionResponse.from_dict({}, TEST_ROOM_ID, set())
|
|
client.receive_response(response)
|
|
|
|
assert session.shared
|
|
|
|
def test_storing_room_encryption_state(self, client):
|
|
client.receive_response(self.login_response)
|
|
assert not client.encrypted_rooms
|
|
|
|
client.receive_response(self.sync_response)
|
|
assert TEST_ROOM_ID in client.encrypted_rooms
|
|
|
|
encrypted_rooms = client.store.load_encrypted_rooms()
|
|
assert TEST_ROOM_ID in encrypted_rooms
|
|
|
|
client2 = Client(client.user, client.device_id, client.store_path)
|
|
client2.receive_response(self.login_response)
|
|
assert TEST_ROOM_ID in client2.encrypted_rooms
|
|
|
|
client2.receive_response(self.downgrade_sync)
|
|
room = client2.rooms[TEST_ROOM_ID]
|
|
|
|
assert room.encrypted
|
|
|
|
def test_http_client_login(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
def test_http_client_login_raw(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
auth_dict = {
|
|
"type": "m.login.password",
|
|
"identifier": {
|
|
"type": "m.id.thirdparty",
|
|
"medium": "email",
|
|
"address": "testemail@mail.org",
|
|
},
|
|
"password": "PASSWORDABCD",
|
|
"initial_device_display_name": "Citadel bot",
|
|
}
|
|
_, _ = http_client.login_raw(auth_dict)
|
|
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
def test_http_client_login_raw_with_empty_dict(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
auth_dict = {}
|
|
|
|
with pytest.raises(ValueError):
|
|
_, _ = http_client.login_raw(auth_dict)
|
|
|
|
assert not http_client.access_token == "ABCD"
|
|
|
|
def test_http_client_login_raw_with_none_dict(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
auth_dict = None
|
|
|
|
with pytest.raises(ValueError):
|
|
_, _ = http_client.login_raw(auth_dict)
|
|
|
|
assert not http_client.access_token == "ABCD"
|
|
|
|
def test_http_client_sync(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.sync()
|
|
|
|
http_client.receive(self.sync_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
def test_http_client_keys_query(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.sync()
|
|
|
|
http_client.receive(self.sync_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
event = MegolmEvent.from_dict(
|
|
self._load_response("tests/data/events/megolm.json")
|
|
)
|
|
|
|
http_client.request_room_key(event)
|
|
http_client.receive(self.empty_response(5))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, RoomKeyRequestResponse)
|
|
assert (
|
|
"X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ"
|
|
in http_client.outgoing_key_requests
|
|
)
|
|
|
|
def test_http_client_room_create(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.sync()
|
|
|
|
http_client.receive(self.sync_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.room_create()
|
|
|
|
http_client.receive(self.room_id_response(5))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, RoomCreateResponse)
|
|
assert response.room_id == TEST_ROOM_ID
|
|
|
|
def test_http_client_room_forget(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.sync()
|
|
|
|
http_client.receive(self.sync_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
room_id = next(iter(http_client.rooms))
|
|
_, _ = http_client.room_forget(room_id)
|
|
|
|
http_client.receive(self.empty_response(5))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, RoomForgetResponse)
|
|
|
|
def test_http_client_room_redact(self, synced_client):
|
|
room_id = next(iter(synced_client.rooms))
|
|
event_id = "$15163622445EBvZJ:localhost"
|
|
tx_id = uuid4()
|
|
reason = "for no reason"
|
|
|
|
synced_client.room_redact(room_id, event_id, reason, tx_id)
|
|
synced_client.receive(self.event_id_response(5))
|
|
response = synced_client.next_response()
|
|
assert isinstance(response, RoomRedactResponse)
|
|
|
|
def test_http_client_room_typing(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.sync()
|
|
|
|
http_client.receive(self.sync_byte_response)
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
assert http_client.rooms
|
|
room_id = list(http_client.rooms.keys())[0]
|
|
_, _ = http_client.room_typing(room_id, typing_state=False)
|
|
|
|
http_client.receive(self.empty_response(5))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, RoomTypingResponse)
|
|
|
|
def test_http_client_download(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
server_name = "example.og"
|
|
media_id = ("ascERGshawAWawugaAcauga",)
|
|
filename = "example&.png" # has unsafe character to test % encoding
|
|
|
|
_, _ = http_client.download(server_name, media_id, allow_remote=False)
|
|
|
|
http_client.receive(self.file_byte_response(1))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, DownloadResponse)
|
|
assert response.body == self._load_byte_response("tests/data/file_response")
|
|
assert response.content_type == "image/png"
|
|
assert response.filename is None
|
|
|
|
_, _ = http_client.download(server_name, media_id, filename)
|
|
|
|
http_client.receive(self.file_byte_response(3, filename))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, DownloadResponse)
|
|
assert response.body == self._load_byte_response("tests/data/file_response")
|
|
assert response.content_type == "image/png"
|
|
assert response.filename == filename
|
|
|
|
def test_http_client_thumbnail(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.thumbnail(
|
|
"example.org", "ascERGshawAWawugaAcauga", 32, 32, allow_remote=False
|
|
)
|
|
|
|
http_client.receive(self.file_byte_response(1))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, ThumbnailResponse)
|
|
assert response.body == self._load_byte_response("tests/data/file_response")
|
|
assert response.content_type == "image/png"
|
|
|
|
def test_http_client_get_profile(self, http_client: HttpClient):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
name = faker.name()
|
|
avatar = faker.avatar_url().replace("#auto", "")
|
|
|
|
http_client.user_id = ALICE_ID
|
|
|
|
_, _ = http_client.get_profile()
|
|
http_client.receive(self.get_profile_byte_response(name, avatar, 1))
|
|
response = http_client.next_response()
|
|
|
|
assert isinstance(response, ProfileGetResponse)
|
|
assert response.displayname == name
|
|
assert response.avatar_url.replace("#auto", "") == avatar
|
|
|
|
def test_http_client_get_set_displayname(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.sync()
|
|
http_client.receive(self.sync_byte_response)
|
|
response = http_client.next_response()
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.get_displayname()
|
|
http_client.receive(self.get_displayname_byte_response(None, 5))
|
|
response = http_client.next_response()
|
|
assert isinstance(response, ProfileGetDisplayNameResponse)
|
|
assert not response.displayname
|
|
|
|
new_name = faker.name()
|
|
_, _ = http_client.set_displayname(new_name)
|
|
http_client.receive(self.empty_response(7))
|
|
response = http_client.next_response()
|
|
assert isinstance(response, ProfileSetDisplayNameResponse)
|
|
|
|
_, _ = http_client.get_displayname()
|
|
http_client.receive(self.get_displayname_byte_response(new_name, 9))
|
|
response = http_client.next_response()
|
|
assert isinstance(response, ProfileGetDisplayNameResponse)
|
|
assert response.displayname == new_name
|
|
|
|
def test_http_client_get_set_avatar(self, http_client):
|
|
http_client.connect(TransportType.HTTP2)
|
|
|
|
_, _ = http_client.login("1234")
|
|
http_client.receive(self.login_byte_response)
|
|
response = http_client.next_response()
|
|
assert isinstance(response, LoginResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.sync()
|
|
http_client.receive(self.sync_byte_response)
|
|
response = http_client.next_response()
|
|
assert isinstance(response, SyncResponse)
|
|
assert http_client.access_token == "ABCD"
|
|
|
|
_, _ = http_client.get_avatar()
|
|
http_client.receive(self.get_avatar_byte_response(None, 5))
|
|
response = http_client.next_response()
|
|
assert isinstance(response, ProfileGetAvatarResponse)
|
|
assert not response.avatar_url
|
|
|
|
new_avatar = faker.avatar_url().replace("#auto", "")
|
|
_, _ = http_client.set_avatar(new_avatar)
|
|
http_client.receive(self.empty_response(7))
|
|
response = http_client.next_response()
|
|
assert isinstance(response, ProfileSetAvatarResponse)
|
|
|
|
_, _ = http_client.get_avatar()
|
|
http_client.receive(self.get_avatar_byte_response(new_avatar, 9))
|
|
response = http_client.next_response()
|
|
assert isinstance(response, ProfileGetAvatarResponse)
|
|
assert response.avatar_url.replace("#auto", "") == new_avatar
|
|
|
|
def test_event_callback(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
class CallbackException(Exception):
|
|
pass
|
|
|
|
def cb(room, event):
|
|
if isinstance(event, RoomMemberEvent):
|
|
raise CallbackException()
|
|
|
|
client.add_event_callback(cb, (RoomMemberEvent, RoomEncryptionEvent))
|
|
|
|
with pytest.raises(CallbackException):
|
|
client.receive_response(self.sync_response)
|
|
|
|
def test_to_device_cb(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
class CallbackException(Exception):
|
|
pass
|
|
|
|
def cb(event):
|
|
if isinstance(event, RoomEncryptionEvent):
|
|
raise CallbackException()
|
|
|
|
client.add_to_device_callback(cb, RoomEncryptionEvent)
|
|
|
|
with pytest.raises(CallbackException):
|
|
client.receive_response(self.sync_response)
|
|
|
|
def test_ephemeral_cb(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
class CallbackException(Exception):
|
|
pass
|
|
|
|
def cb(_, event):
|
|
raise CallbackException()
|
|
|
|
client.add_ephemeral_callback(cb, TypingNoticeEvent)
|
|
|
|
with pytest.raises(CallbackException):
|
|
client.receive_response(self.sync_response)
|
|
|
|
def test_many_ephemeral_cb(self, client):
|
|
"""Test that callbacks for multiple ephemeral events are properly handled.
|
|
|
|
Generates a random selection of ephemeral events and produces unique
|
|
callbacks and exceptions for each. Verifies that all of the callbacks
|
|
are called, including for duplicate events.
|
|
"""
|
|
client.receive_response(self.login_response)
|
|
ephemeral_events = [TypingNoticeEvent, ReceiptEvent]
|
|
|
|
event_selection = random.choices(
|
|
population=ephemeral_events,
|
|
# By the pigeonhole princple, we'll have at least one duplicate Event
|
|
k=len(ephemeral_events) + 1,
|
|
)
|
|
# This will only print during a failure, at which point we want to know
|
|
# what event selection caused an error.
|
|
print(f"Random selection of EphemeralEvents: {event_selection}")
|
|
|
|
exceptions = []
|
|
for index, event in enumerate(event_selection):
|
|
exception_class = type(
|
|
f"CbException{event.__name__}_{index}", (Exception,), {}
|
|
)
|
|
exceptions.append(exception_class)
|
|
|
|
def callback(_, event):
|
|
raise exception_class()
|
|
|
|
client.add_ephemeral_callback(callback, event)
|
|
|
|
with pytest.raises(tuple(exceptions)):
|
|
client.receive_response(self.sync_response)
|
|
|
|
def test_room_account_data_cb(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
class CallbackException(Exception):
|
|
pass
|
|
|
|
def cb(_, event):
|
|
raise CallbackException()
|
|
|
|
client.add_room_account_data_callback(cb, FullyReadEvent)
|
|
|
|
with pytest.raises(CallbackException):
|
|
client.receive_response(self.sync_response)
|
|
|
|
def test_global_account_data_cb(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
class CallbackCalled(Exception):
|
|
pass
|
|
|
|
def cb(_event):
|
|
raise CallbackCalled()
|
|
|
|
client.add_global_account_data_callback(cb, PushRulesEvent)
|
|
|
|
with pytest.raises(CallbackCalled):
|
|
client.receive_response(self.sync_response)
|
|
|
|
def test_handle_account_data(self, client):
|
|
client.receive_response(self.login_response)
|
|
client.receive_response(self.sync_response)
|
|
|
|
room = client.rooms[TEST_ROOM_ID]
|
|
assert room.fully_read_marker == "event_id_2"
|
|
assert room.tags == {"u.test": {"order": 1}}
|
|
|
|
def test_no_encryption(self, client_no_e2e):
|
|
client_no_e2e.receive_response(self.login_response)
|
|
assert client_no_e2e.logged_in
|
|
|
|
assert not client_no_e2e.olm
|
|
client_no_e2e.receive_response(self.sync_response)
|
|
|
|
assert len(client_no_e2e.rooms) == 1
|
|
|
|
room = list(client_no_e2e.rooms.values())[0]
|
|
|
|
assert room.encrypted
|
|
client_no_e2e.receive_response(self.second_sync)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client_no_e2e.device_store
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client_no_e2e.olm_account_shared
|
|
|
|
assert not client_no_e2e.should_query_keys
|
|
|
|
assert not client_no_e2e.users_for_key_query
|
|
assert not client_no_e2e.key_verifications
|
|
assert not client_no_e2e.outgoing_to_device_messages
|
|
assert not client_no_e2e.get_active_sas(ALICE_ID, ALICE_DEVICE_ID)
|
|
|
|
ToDeviceMessage("m.test", ALICE_ID, ALICE_DEVICE_ID, {})
|
|
|
|
client_no_e2e.room_contains_unverified(room.room_id)
|
|
|
|
with pytest.raises(LocalProtocolError):
|
|
client_no_e2e.invalidate_outbound_session(room.room_id)
|
|
|
|
client_no_e2e.receive_response(self.keys_query_response)
|
|
|
|
def test_event_cb_for_invited_rooms(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
class CallbackException(Exception):
|
|
pass
|
|
|
|
def cb(_, event):
|
|
raise CallbackException()
|
|
|
|
client.add_event_callback(cb, InviteMemberEvent)
|
|
|
|
with pytest.raises(CallbackException):
|
|
client.receive_response(self.sync_invite_response)
|
|
|
|
def test_homeserver_url_parsing(self):
|
|
host, path = HttpClient._parse_homeserver("https://example.org:8080")
|
|
assert host == "example.org:8080"
|
|
assert path == ""
|
|
|
|
host, path = HttpClient._parse_homeserver("example.org:8080")
|
|
assert host == "example.org:8080"
|
|
assert path == ""
|
|
|
|
host, path = HttpClient._parse_homeserver("example.org/_matrix")
|
|
assert host == "example.org:443"
|
|
assert path == "_matrix"
|
|
|
|
host, path = HttpClient._parse_homeserver("https://example.org:8008/_matrix")
|
|
assert host == "example.org:8008"
|
|
assert path == "_matrix"
|
|
|
|
def test_room_devices(self, client):
|
|
client.receive_response(self.login_response)
|
|
client.receive_response(self.sync_response)
|
|
client.receive_response(self.keys_query_response)
|
|
|
|
room_devices = client.room_devices(TEST_ROOM_ID)
|
|
|
|
assert ALICE_ID in room_devices
|
|
assert ALICE_DEVICE_ID in room_devices[ALICE_ID]
|
|
|
|
alice_device = room_devices[ALICE_ID][ALICE_DEVICE_ID]
|
|
|
|
assert alice_device
|
|
|
|
def test_soft_logout(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
assert client.logged_in
|
|
|
|
error_response = SyncResponse.from_dict(
|
|
{
|
|
"errcode": "M_UNKNOWN_TOKEN",
|
|
"error": "Access token has expired",
|
|
"soft_logout": True,
|
|
}
|
|
)
|
|
client.receive_response(error_response)
|
|
|
|
assert not client.logged_in
|
|
|
|
def test_sync_token_restoring(self, client):
|
|
user = client.user_id
|
|
device_id = client.device_id
|
|
path = client.store_path
|
|
del client
|
|
|
|
config = ClientConfig(store_sync_tokens=True)
|
|
client = Client(user, device_id, path, config=config)
|
|
|
|
client.receive_response(self.login_response)
|
|
assert not client.next_batch
|
|
assert not client.loaded_sync_token
|
|
client.receive_response(self.sync_response)
|
|
assert client.next_batch
|
|
|
|
client = Client(user, device_id, path, config=config)
|
|
client.receive_response(self.login_response)
|
|
assert client.loaded_sync_token
|
|
|
|
def test_presence_callback(self, client):
|
|
client.receive_response(self.login_response)
|
|
|
|
class CallbackException(Exception):
|
|
pass
|
|
|
|
def cb(event):
|
|
if isinstance(event, PresenceEvent):
|
|
raise CallbackException()
|
|
|
|
client.add_presence_callback(cb, PresenceEvent)
|
|
|
|
client.add_presence_callback(cb, PresenceEvent)
|
|
|
|
with pytest.raises(CallbackException):
|
|
client.receive_response(self.sync_response)
|