synapse/tests/storage/test_base.py

807 lines
28 KiB
Python

# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from typing import Generator
from unittest.mock import Mock, call, patch
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.engines import create_engine
from tests import unittest
from tests.server import TestHomeServer
from tests.utils import USE_POSTGRES_FOR_TESTS, default_config
class SQLBaseStoreTestCase(unittest.TestCase):
"""Test the "simple" SQL generating methods in SQLBaseStore."""
def setUp(self) -> None:
# This is the Twisted connection pool.
conn_pool = Mock(spec=["runInteraction", "runWithConnection"])
self.mock_txn = Mock()
if USE_POSTGRES_FOR_TESTS:
# To avoid testing psycopg2 itself, patch execute_batch/execute_values
# to assert how it is called.
from psycopg2 import extras
self.mock_execute_batch = Mock()
self.execute_batch_patcher = patch.object(
extras, "execute_batch", new=self.mock_execute_batch
)
self.execute_batch_patcher.start()
self.mock_execute_values = Mock()
self.execute_values_patcher = patch.object(
extras, "execute_values", new=self.mock_execute_values
)
self.execute_values_patcher.start()
self.mock_conn = Mock(
spec_set=[
"cursor",
"rollback",
"commit",
"closed",
"reconnect",
"set_session",
"encoding",
]
)
self.mock_conn.encoding = "UNICODE"
else:
self.mock_conn = Mock(spec_set=["cursor", "rollback", "commit"])
self.mock_conn.cursor.return_value = self.mock_txn
self.mock_txn.connection = self.mock_conn
self.mock_conn.rollback.return_value = None
# Our fake runInteraction just runs synchronously inline
def runInteraction(func, *args, **kwargs) -> defer.Deferred: # type: ignore[no-untyped-def]
return defer.succeed(func(self.mock_txn, *args, **kwargs))
conn_pool.runInteraction = runInteraction
def runWithConnection(func, *args, **kwargs): # type: ignore[no-untyped-def]
return defer.succeed(func(self.mock_conn, *args, **kwargs))
conn_pool.runWithConnection = runWithConnection
config = default_config(name="test", parse=True)
hs = TestHomeServer("test", config=config)
if USE_POSTGRES_FOR_TESTS:
db_config = {"name": "psycopg2", "args": {}}
else:
db_config = {"name": "sqlite3"}
engine = create_engine(db_config)
fake_engine = Mock(wraps=engine)
fake_engine.in_transaction.return_value = False
fake_engine.module.OperationalError = engine.module.OperationalError
fake_engine.module.DatabaseError = engine.module.DatabaseError
fake_engine.module.IntegrityError = engine.module.IntegrityError
# Don't convert param style to make assertions easier.
fake_engine.convert_param_style = lambda sql: sql
# To fix isinstance(...) checks.
fake_engine.__class__ = engine.__class__ # type: ignore[assignment]
db = DatabasePool(Mock(), Mock(config=db_config), fake_engine)
db._db_pool = conn_pool
self.datastore = SQLBaseStore(db, None, hs) # type: ignore[arg-type]
def tearDown(self) -> None:
if USE_POSTGRES_FOR_TESTS:
self.execute_batch_patcher.stop()
self.execute_values_patcher.stop()
@defer.inlineCallbacks
def test_insert_1col(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
yield defer.ensureDeferred(
self.datastore.db_pool.simple_insert(
table="tablename", values={"columname": "Value"}
)
)
self.mock_txn.execute.assert_called_once_with(
"INSERT INTO tablename (columname) VALUES(?)", ("Value",)
)
@defer.inlineCallbacks
def test_insert_3cols(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
yield defer.ensureDeferred(
self.datastore.db_pool.simple_insert(
table="tablename",
# Use OrderedDict() so we can assert on the SQL generated
values=OrderedDict([("colA", 1), ("colB", 2), ("colC", 3)]),
)
)
self.mock_txn.execute.assert_called_once_with(
"INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)", (1, 2, 3)
)
@defer.inlineCallbacks
def test_insert_many(self) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_insert_many(
table="tablename",
keys=(
"col1",
"col2",
),
values=[
(
"val1",
"val2",
),
("val3", "val4"),
],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_values.assert_called_once_with(
self.mock_txn,
"INSERT INTO tablename (col1, col2) VALUES ?",
[("val1", "val2"), ("val3", "val4")],
template=None,
fetch=False,
)
else:
self.mock_txn.executemany.assert_called_once_with(
"INSERT INTO tablename (col1, col2) VALUES(?, ?)",
[("val1", "val2"), ("val3", "val4")],
)
@defer.inlineCallbacks
def test_insert_many_no_iterable(
self,
) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_insert_many(
table="tablename",
keys=(
"col1",
"col2",
),
values=[],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_values.assert_not_called()
else:
self.mock_txn.executemany.assert_not_called()
@defer.inlineCallbacks
def test_select_one_1col(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
self.mock_txn.__iter__ = Mock(return_value=iter([("Value",)]))
value = yield defer.ensureDeferred(
self.datastore.db_pool.simple_select_one_onecol(
table="tablename", keyvalues={"keycol": "TheKey"}, retcol="retcol"
)
)
self.assertEqual("Value", value)
self.mock_txn.execute.assert_called_once_with(
"SELECT retcol FROM tablename WHERE keycol = ?", ["TheKey"]
)
@defer.inlineCallbacks
def test_select_one_3col(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
self.mock_txn.fetchone.return_value = (1, 2, 3)
ret = yield defer.ensureDeferred(
self.datastore.db_pool.simple_select_one(
table="tablename",
keyvalues={"keycol": "TheKey"},
retcols=["colA", "colB", "colC"],
)
)
self.assertEqual((1, 2, 3), ret)
self.mock_txn.execute.assert_called_once_with(
"SELECT colA, colB, colC FROM tablename WHERE keycol = ?", ["TheKey"]
)
@defer.inlineCallbacks
def test_select_one_missing(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 0
self.mock_txn.fetchone.return_value = None
ret = yield defer.ensureDeferred(
self.datastore.db_pool.simple_select_one(
table="tablename",
keyvalues={"keycol": "Not here"},
retcols=["colA"],
allow_none=True,
)
)
self.assertIsNone(ret)
@defer.inlineCallbacks
def test_select_list(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 3
self.mock_txn.fetchall.return_value = [(1,), (2,), (3,)]
self.mock_txn.description = (("colA", None, None, None, None, None, None),)
ret = yield defer.ensureDeferred(
self.datastore.db_pool.simple_select_list(
table="tablename", keyvalues={"keycol": "A set"}, retcols=["colA"]
)
)
self.assertEqual([(1,), (2,), (3,)], ret)
self.mock_txn.execute.assert_called_once_with(
"SELECT colA FROM tablename WHERE keycol = ?", ["A set"]
)
@defer.inlineCallbacks
def test_select_many_batch(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 3
self.mock_txn.fetchall.side_effect = [[(1,), (2,)], [(3,)]]
ret = yield defer.ensureDeferred(
self.datastore.db_pool.simple_select_many_batch(
table="tablename",
column="col1",
iterable=("val1", "val2", "val3"),
retcols=("col2",),
keyvalues={"col3": "val4"},
batch_size=2,
)
)
self.mock_txn.execute.assert_has_calls(
[
call(
"SELECT col2 FROM tablename WHERE col1 = ANY(?) AND col3 = ?",
[["val1", "val2"], "val4"],
),
call(
"SELECT col2 FROM tablename WHERE col1 = ANY(?) AND col3 = ?",
[["val3"], "val4"],
),
],
)
self.assertEqual([(1,), (2,), (3,)], ret)
def test_select_many_no_iterable(self) -> None:
self.mock_txn.rowcount = 3
self.mock_txn.fetchall.side_effect = [(1,), (2,)]
ret = self.datastore.db_pool.simple_select_many_txn(
self.mock_txn,
table="tablename",
column="col1",
iterable=(),
retcols=("col2",),
keyvalues={"col3": "val4"},
)
self.mock_txn.execute.assert_not_called()
self.assertEqual([], ret)
@defer.inlineCallbacks
def test_update_one_1col(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
yield defer.ensureDeferred(
self.datastore.db_pool.simple_update_one(
table="tablename",
keyvalues={"keycol": "TheKey"},
updatevalues={"columnname": "New Value"},
)
)
self.mock_txn.execute.assert_called_once_with(
"UPDATE tablename SET columnname = ? WHERE keycol = ?",
["New Value", "TheKey"],
)
@defer.inlineCallbacks
def test_update_one_4cols(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
yield defer.ensureDeferred(
self.datastore.db_pool.simple_update_one(
table="tablename",
keyvalues=OrderedDict([("colA", 1), ("colB", 2)]),
updatevalues=OrderedDict([("colC", 3), ("colD", 4)]),
)
)
self.mock_txn.execute.assert_called_once_with(
"UPDATE tablename SET colC = ?, colD = ? WHERE" " colA = ? AND colB = ?",
[3, 4, 1, 2],
)
@defer.inlineCallbacks
def test_update_many(self) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_update_many(
table="tablename",
key_names=("col1", "col2"),
key_values=[("val1", "val2")],
value_names=("col3",),
value_values=[("val3",)],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_batch.assert_called_once_with(
self.mock_txn,
"UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?",
[("val3", "val1", "val2")],
)
else:
self.mock_txn.executemany.assert_called_once_with(
"UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?",
[("val3", "val1", "val2")],
)
# key_values and value_values must be the same length.
with self.assertRaises(ValueError):
yield defer.ensureDeferred(
self.datastore.db_pool.simple_update_many(
table="tablename",
key_names=("col1", "col2"),
key_values=[("val1", "val2")],
value_names=("col3",),
value_values=[],
desc="",
)
)
@defer.inlineCallbacks
def test_update_many_no_iterable(
self,
) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_update_many(
table="tablename",
key_names=("col1", "col2"),
key_values=[],
value_names=("col3",),
value_values=[],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_batch.assert_not_called()
else:
self.mock_txn.executemany.assert_not_called()
@defer.inlineCallbacks
def test_delete_one(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
yield defer.ensureDeferred(
self.datastore.db_pool.simple_delete_one(
table="tablename", keyvalues={"keycol": "Go away"}
)
)
self.mock_txn.execute.assert_called_once_with(
"DELETE FROM tablename WHERE keycol = ?", ["Go away"]
)
@defer.inlineCallbacks
def test_delete_many(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 2
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_delete_many(
table="tablename",
column="col1",
iterable=("val1", "val2"),
keyvalues={"col2": "val3"},
desc="",
)
)
self.mock_txn.execute.assert_called_once_with(
"DELETE FROM tablename WHERE col1 = ANY(?) AND col2 = ?",
[["val1", "val2"], "val3"],
)
self.assertEqual(result, 2)
@defer.inlineCallbacks
def test_delete_many_no_iterable(
self,
) -> Generator["defer.Deferred[object]", object, None]:
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_delete_many(
table="tablename",
column="col1",
iterable=(),
keyvalues={"col2": "val3"},
desc="",
)
)
self.mock_txn.execute.assert_not_called()
self.assertEqual(result, 0)
@defer.inlineCallbacks
def test_delete_many_no_keyvalues(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 2
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_delete_many(
table="tablename",
column="col1",
iterable=("val1", "val2"),
keyvalues={},
desc="",
)
)
self.mock_txn.execute.assert_called_once_with(
"DELETE FROM tablename WHERE col1 = ANY(?)", [["val1", "val2"]]
)
self.assertEqual(result, 2)
@defer.inlineCallbacks
def test_upsert(self) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "oldvalue"},
values={"othercol": "newvalue"},
)
)
self.mock_txn.execute.assert_called_once_with(
"INSERT INTO tablename (columnname, othercol) VALUES (?, ?) ON CONFLICT (columnname) DO UPDATE SET othercol=EXCLUDED.othercol",
["oldvalue", "newvalue"],
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_no_values(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "value"},
values={},
insertion_values={"columnname": "value"},
)
)
self.mock_txn.execute.assert_called_once_with(
"INSERT INTO tablename (columnname) VALUES (?) ON CONFLICT (columnname) DO NOTHING",
["value"],
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_with_insertion(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "oldvalue"},
values={"othercol": "newvalue"},
insertion_values={"thirdcol": "insertionval"},
)
)
self.mock_txn.execute.assert_called_once_with(
"INSERT INTO tablename (columnname, thirdcol, othercol) VALUES (?, ?, ?) ON CONFLICT (columnname) DO UPDATE SET othercol=EXCLUDED.othercol",
["oldvalue", "insertionval", "newvalue"],
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_with_where(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "oldvalue"},
values={"othercol": "newvalue"},
where_clause="thirdcol IS NULL",
)
)
self.mock_txn.execute.assert_called_once_with(
"INSERT INTO tablename (columnname, othercol) VALUES (?, ?) ON CONFLICT (columnname) WHERE thirdcol IS NULL DO UPDATE SET othercol=EXCLUDED.othercol",
["oldvalue", "newvalue"],
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_many(self) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert_many(
table="tablename",
key_names=["keycol1", "keycol2"],
key_values=[["keyval1", "keyval2"], ["keyval3", "keyval4"]],
value_names=["valuecol3"],
value_values=[["val5"], ["val6"]],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_values.assert_called_once_with(
self.mock_txn,
"INSERT INTO tablename (keycol1, keycol2, valuecol3) VALUES ? ON CONFLICT (keycol1, keycol2) DO UPDATE SET valuecol3=EXCLUDED.valuecol3",
[("keyval1", "keyval2", "val5"), ("keyval3", "keyval4", "val6")],
template=None,
fetch=False,
)
else:
self.mock_txn.executemany.assert_called_once_with(
"INSERT INTO tablename (keycol1, keycol2, valuecol3) VALUES (?, ?, ?) ON CONFLICT (keycol1, keycol2) DO UPDATE SET valuecol3=EXCLUDED.valuecol3",
[("keyval1", "keyval2", "val5"), ("keyval3", "keyval4", "val6")],
)
@defer.inlineCallbacks
def test_upsert_many_no_values(
self,
) -> Generator["defer.Deferred[object]", object, None]:
yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert_many(
table="tablename",
key_names=["columnname"],
key_values=[["oldvalue"]],
value_names=[],
value_values=[],
desc="",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_execute_values.assert_called_once_with(
self.mock_txn,
"INSERT INTO tablename (columnname) VALUES ? ON CONFLICT (columnname) DO NOTHING",
[("oldvalue",)],
template=None,
fetch=False,
)
else:
self.mock_txn.executemany.assert_called_once_with(
"INSERT INTO tablename (columnname) VALUES (?) ON CONFLICT (columnname) DO NOTHING",
[("oldvalue",)],
)
@defer.inlineCallbacks
def test_upsert_emulated_no_values_exists(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
self.mock_txn.fetchall.return_value = [(1,)]
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "value"},
values={},
insertion_values={"columnname": "value"},
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_txn.execute.assert_has_calls(
[
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
call("SELECT 1 FROM tablename WHERE columnname = ?", ["value"]),
]
)
else:
self.mock_txn.execute.assert_called_once_with(
"SELECT 1 FROM tablename WHERE columnname = ?", ["value"]
)
self.assertFalse(result)
@defer.inlineCallbacks
def test_upsert_emulated_no_values_not_exists(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
self.mock_txn.fetchall.return_value = []
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "value"},
values={},
insertion_values={"columnname": "value"},
)
)
self.mock_txn.execute.assert_has_calls(
[
call(
"SELECT 1 FROM tablename WHERE columnname = ?",
["value"],
),
call("INSERT INTO tablename (columnname) VALUES (?)", ["value"]),
],
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_emulated_with_insertion_exists(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "oldvalue"},
values={"othercol": "newvalue"},
insertion_values={"thirdcol": "insertionval"},
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_txn.execute.assert_has_calls(
[
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
call(
"UPDATE tablename SET othercol = ? WHERE columnname = ?",
["newvalue", "oldvalue"],
),
]
)
else:
self.mock_txn.execute.assert_called_once_with(
"UPDATE tablename SET othercol = ? WHERE columnname = ?",
["newvalue", "oldvalue"],
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_emulated_with_insertion_not_exists(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
self.mock_txn.rowcount = 0
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "oldvalue"},
values={"othercol": "newvalue"},
insertion_values={"thirdcol": "insertionval"},
)
)
self.mock_txn.execute.assert_has_calls(
[
call(
"UPDATE tablename SET othercol = ? WHERE columnname = ?",
["newvalue", "oldvalue"],
),
call(
"INSERT INTO tablename (columnname, othercol, thirdcol) VALUES (?, ?, ?)",
["oldvalue", "newvalue", "insertionval"],
),
]
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_emulated_with_where(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "oldvalue"},
values={"othercol": "newvalue"},
where_clause="thirdcol IS NULL",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_txn.execute.assert_has_calls(
[
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
call(
"UPDATE tablename SET othercol = ? WHERE columnname = ? AND thirdcol IS NULL",
["newvalue", "oldvalue"],
),
]
)
else:
self.mock_txn.execute.assert_called_once_with(
"UPDATE tablename SET othercol = ? WHERE columnname = ? AND thirdcol IS NULL",
["newvalue", "oldvalue"],
)
self.assertTrue(result)
@defer.inlineCallbacks
def test_upsert_emulated_with_where_no_values(
self,
) -> Generator["defer.Deferred[object]", object, None]:
self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename")
self.mock_txn.rowcount = 1
result = yield defer.ensureDeferred(
self.datastore.db_pool.simple_upsert(
table="tablename",
keyvalues={"columnname": "oldvalue"},
values={},
where_clause="thirdcol IS NULL",
)
)
if USE_POSTGRES_FOR_TESTS:
self.mock_txn.execute.assert_has_calls(
[
call("LOCK TABLE tablename in EXCLUSIVE MODE", ()),
call(
"SELECT 1 FROM tablename WHERE columnname = ? AND thirdcol IS NULL",
["oldvalue"],
),
]
)
else:
self.mock_txn.execute.assert_called_once_with(
"SELECT 1 FROM tablename WHERE columnname = ? AND thirdcol IS NULL",
["oldvalue"],
)
self.assertFalse(result)