first commit

This commit is contained in:
Ruslan Grak
2025-01-07 10:00:02 +03:00
commit 626d8d3c56
349 changed files with 44175 additions and 0 deletions

6
queue_job/tests/__init__.py Executable file
View File

@@ -0,0 +1,6 @@
from . import test_runner_channels
from . import test_runner_runner
from . import test_json_field
from . import test_model_job_channel
from . import test_model_job_function
from . import test_queue_job_protected_write

149
queue_job/tests/common.py Executable file
View File

@@ -0,0 +1,149 @@
# Copyright 2019 Camptocamp
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import doctest
import logging
import sys
from contextlib import contextmanager
import mock
from ..job import Job
class JobCounter:
def __init__(self, env):
super().__init__()
self.env = env
self.existing = self.search_all()
def count_all(self):
return len(self.search_all())
def count_created(self):
return len(self.search_created())
def count_existing(self):
return len(self.existing)
def search_created(self):
return self.search_all() - self.existing
def search_all(self):
return self.env["queue.job"].search([])
class JobMixin:
def job_counter(self):
return JobCounter(self.env)
def perform_jobs(self, jobs):
for job in jobs.search_created():
Job.load(self.env, job.uuid).perform()
@contextmanager
def mock_with_delay():
"""Context Manager mocking ``with_delay()``
Mocking this method means we can decorrelate the tests in:
* the part that delay the job with the expected arguments
* the execution of the job itself
The first kind of test does not need to actually create the jobs in the
database, as we can inspect how the Mocks were called.
The second kind of test calls directly the method decorated by ``@job``
with the arguments that we want to test.
The context manager returns 2 mocks:
* the first allow to check that with_delay() was called and with which
arguments
* the second to check which job method was called and with which arguments.
Example of test::
def test_export(self):
with mock_with_delay() as (delayable_cls, delayable):
# inside this method, there is a call
# partner.with_delay(priority=15).export_record('test')
self.record.run_export()
# check 'with_delay()' part:
self.assertEqual(delayable_cls.call_count, 1)
# arguments passed in 'with_delay()'
delay_args, delay_kwargs = delayable_cls.call_args
self.assertEqual(
delay_args, (self.env['res.partner'],)
)
self.assertDictEqual(delay_kwargs, {priority: 15})
# check what's passed to the job method 'export_record'
self.assertEqual(delayable.export_record.call_count, 1)
delay_args, delay_kwargs = delayable.export_record.call_args
self.assertEqual(delay_args, ('test',))
self.assertDictEqual(delay_kwargs, {})
An example of the first kind of test:
https://github.com/camptocamp/connector-jira/blob/0ca4261b3920d5e8c2ae4bb0fc352ea3f6e9d2cd/connector_jira/tests/test_batch_timestamp_import.py#L43-L76 # noqa
And the second kind:
https://github.com/camptocamp/connector-jira/blob/0ca4261b3920d5e8c2ae4bb0fc352ea3f6e9d2cd/connector_jira/tests/test_import_task.py#L34-L46 # noqa
"""
with mock.patch(
"odoo.addons.queue_job.models.base.DelayableRecordset",
name="DelayableRecordset",
spec=True,
) as delayable_cls:
# prepare the mocks
delayable = mock.MagicMock(name="DelayableBinding")
delayable_cls.return_value = delayable
yield delayable_cls, delayable
class OdooDocTestCase(doctest.DocTestCase):
"""
We need a custom DocTestCase class in order to:
- define test_tags to run as part of standard tests
- output a more meaningful test name than default "DocTestCase.runTest"
"""
def __init__(self, doctest, optionflags=0, setUp=None, tearDown=None, checker=None):
super().__init__(
doctest._dt_test,
optionflags=optionflags,
setUp=setUp,
tearDown=tearDown,
checker=checker,
)
def setUp(self):
"""Log an extra statement which test is started."""
super(OdooDocTestCase, self).setUp()
logging.getLogger(__name__).info("Running tests for %s", self._dt_test.name)
def load_doctests(module):
"""
Generates a tests loading method for the doctests of the given module
https://docs.python.org/3/library/unittest.html#load-tests-protocol
"""
def load_tests(loader, tests, ignore):
"""
Apply the 'test_tags' attribute to each DocTestCase found by the DocTestSuite.
Also extend the DocTestCase class trivially to fit the class teardown
that Odoo backported for its own test classes from Python 3.8.
"""
if sys.version_info < (3, 8):
doctest.DocTestCase.doClassCleanups = lambda: None
doctest.DocTestCase.tearDown_exceptions = []
for test in doctest.DocTestSuite(module):
odoo_test = OdooDocTestCase(test)
odoo_test.test_tags = {"standard", "at_install", "queue_job", "doctest"}
tests.addTest(odoo_test)
return tests
return load_tests

View File

@@ -0,0 +1,140 @@
# copyright 2016 Camptocamp
# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import json
from datetime import date, datetime
from lxml import etree
from odoo.tests import common
# pylint: disable=odoo-addons-relative-import
# we are testing, we want to test as we were an external consumer of the API
from odoo.addons.queue_job.fields import JobDecoder, JobEncoder
class TestJson(common.TransactionCase):
def test_encoder_recordset(self):
demo_user = self.env.ref("base.user_demo")
partner = self.env(user=demo_user).ref("base.main_partner")
value = partner
value_json = json.dumps(value, cls=JobEncoder)
expected = {
"uid": demo_user.id,
"_type": "odoo_recordset",
"model": "res.partner",
"ids": [partner.id],
"su": False,
}
self.assertEqual(json.loads(value_json), expected)
def test_encoder_recordset_list(self):
demo_user = self.env.ref("base.user_demo")
partner = self.env(user=demo_user).ref("base.main_partner")
value = ["a", 1, partner]
value_json = json.dumps(value, cls=JobEncoder)
expected = [
"a",
1,
{
"uid": demo_user.id,
"_type": "odoo_recordset",
"model": "res.partner",
"ids": [partner.id],
"su": False,
},
]
self.assertEqual(json.loads(value_json), expected)
def test_decoder_recordset(self):
demo_user = self.env.ref("base.user_demo")
partner = self.env(user=demo_user).ref("base.main_partner")
value_json = (
'{"_type": "odoo_recordset",'
'"model": "res.partner",'
'"su": false,'
'"ids": [%s],"uid": %s}' % (partner.id, demo_user.id)
)
expected = partner
value = json.loads(value_json, cls=JobDecoder, env=self.env)
self.assertEqual(value, expected)
self.assertEqual(demo_user, expected.env.user)
def test_decoder_recordset_list(self):
demo_user = self.env.ref("base.user_demo")
partner = self.env(user=demo_user).ref("base.main_partner")
value_json = (
'["a", 1, '
'{"_type": "odoo_recordset",'
'"model": "res.partner",'
'"su": false,'
'"ids": [%s],"uid": %s}]' % (partner.id, demo_user.id)
)
expected = ["a", 1, partner]
value = json.loads(value_json, cls=JobDecoder, env=self.env)
self.assertEqual(value, expected)
self.assertEqual(demo_user, expected[2].env.user)
def test_decoder_recordset_list_without_user(self):
value_json = (
'["a", 1, {"_type": "odoo_recordset",' '"model": "res.users", "ids": [1]}]'
)
expected = ["a", 1, self.env.ref("base.user_root")]
value = json.loads(value_json, cls=JobDecoder, env=self.env)
self.assertEqual(value, expected)
def test_encoder_datetime(self):
value = ["a", 1, datetime(2017, 4, 19, 8, 48, 50, 1)]
value_json = json.dumps(value, cls=JobEncoder)
expected = [
"a",
1,
{"_type": "datetime_isoformat", "value": "2017-04-19T08:48:50.000001"},
]
self.assertEqual(json.loads(value_json), expected)
def test_decoder_datetime(self):
value_json = (
'["a", 1, {"_type": "datetime_isoformat",'
'"value": "2017-04-19T08:48:50.000001"}]'
)
expected = ["a", 1, datetime(2017, 4, 19, 8, 48, 50, 1)]
value = json.loads(value_json, cls=JobDecoder, env=self.env)
self.assertEqual(value, expected)
def test_encoder_date(self):
value = ["a", 1, date(2017, 4, 19)]
value_json = json.dumps(value, cls=JobEncoder)
expected = ["a", 1, {"_type": "date_isoformat", "value": "2017-04-19"}]
self.assertEqual(json.loads(value_json), expected)
def test_decoder_date(self):
value_json = '["a", 1, {"_type": "date_isoformat",' '"value": "2017-04-19"}]'
expected = ["a", 1, date(2017, 4, 19)]
value = json.loads(value_json, cls=JobDecoder, env=self.env)
self.assertEqual(value, expected)
def test_encoder_etree(self):
etree_el = etree.Element("root", attr="val")
etree_el.append(etree.Element("child", attr="val"))
value = ["a", 1, etree_el]
value_json = json.dumps(value, cls=JobEncoder)
expected = [
"a",
1,
{
"_type": "etree_element",
"value": '<root attr="val"><child attr="val"/></root>',
},
]
self.assertEqual(json.loads(value_json), expected)
def test_decoder_etree(self):
value_json = '["a", 1, {"_type": "etree_element", "value": \
"<root attr=\\"val\\"><child attr=\\"val\\"/></root>"}]'
etree_el = etree.Element("root", attr="val")
etree_el.append(etree.Element("child", attr="val"))
expected = ["a", 1, etree.tostring(etree_el)]
value = json.loads(value_json, cls=JobDecoder, env=self.env)
value[2] = etree.tostring(value[2])
self.assertEqual(value, expected)

View File

@@ -0,0 +1,50 @@
# copyright 2018 Camptocamp
# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
from psycopg2 import IntegrityError
import odoo
from odoo.tests import common
class TestJobChannel(common.TransactionCase):
def setUp(self):
super().setUp()
self.Channel = self.env["queue.job.channel"]
self.root_channel = self.Channel.search([("name", "=", "root")])
def test_channel_new(self):
channel = self.Channel.new()
self.assertFalse(channel.name)
self.assertFalse(channel.complete_name)
def test_channel_create(self):
channel = self.Channel.create(
{"name": "sub", "parent_id": self.root_channel.id}
)
self.assertEqual(channel.name, "sub")
self.assertEqual(channel.complete_name, "root.sub")
channel2 = self.Channel.create({"name": "sub", "parent_id": channel.id})
self.assertEqual(channel2.name, "sub")
self.assertEqual(channel2.complete_name, "root.sub.sub")
@odoo.tools.mute_logger("odoo.sql_db")
def test_channel_complete_name_uniq(self):
channel = self.Channel.create(
{"name": "sub", "parent_id": self.root_channel.id}
)
self.assertEqual(channel.name, "sub")
self.assertEqual(channel.complete_name, "root.sub")
self.Channel.create({"name": "sub", "parent_id": self.root_channel.id})
with self.assertRaises(IntegrityError):
# Flush process all the pending recomputations (or at least the
# given field and flush the pending updates to the database.
# It is normally called on commit.
self.env["base"].flush()
def test_channel_name_get(self):
channel = self.Channel.create(
{"name": "sub", "parent_id": self.root_channel.id}
)
self.assertEqual(channel.name_get(), [(channel.id, "root.sub")])

View File

@@ -0,0 +1,56 @@
# copyright 2020 Camptocamp
# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
from odoo import exceptions
from odoo.tests import common
class TestJobFunction(common.SavepointCase):
def test_function_name_compute(self):
function = self.env["queue.job.function"].create(
{"model_id": self.env.ref("base.model_res_users").id, "method": "read"}
)
self.assertEqual(function.name, "<res.users>.read")
def test_function_name_inverse(self):
function = self.env["queue.job.function"].create({"name": "<res.users>.read"})
self.assertEqual(function.model_id.model, "res.users")
self.assertEqual(function.method, "read")
def test_function_name_inverse_invalid_regex(self):
with self.assertRaises(exceptions.UserError):
self.env["queue.job.function"].create({"name": "<res.users.read"})
def test_function_name_inverse_model_not_found(self):
with self.assertRaises(exceptions.UserError):
self.env["queue.job.function"].create(
{"name": "<this.model.does.not.exist>.read"}
)
def test_function_job_config(self):
channel = self.env["queue.job.channel"].create(
{"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id}
)
self.env["queue.job.function"].create(
{
"model_id": self.env.ref("base.model_res_users").id,
"method": "read",
"channel_id": channel.id,
"edit_retry_pattern": "{1: 2, 3: 4}",
"edit_related_action": (
'{"enable": True,'
' "func_name": "related_action_foo",'
' "kwargs": {"b": 1}}'
),
}
)
self.assertEqual(
self.env["queue.job.function"].job_config("<res.users>.read"),
self.env["queue.job.function"].JobConfig(
channel="root.foo",
retry_pattern={1: 2, 3: 4},
related_action_enable=True,
related_action_func_name="related_action_foo",
related_action_kwargs={"b": 1},
),
)

View File

@@ -0,0 +1,25 @@
# copyright 2020 Camptocamp
# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
from odoo import exceptions
from odoo.tests import common
class TestJobWriteProtected(common.SavepointCase):
def test_create_error(self):
with self.assertRaises(exceptions.AccessError):
self.env["queue.job"].create(
{"uuid": "test", "model_name": "res.partner", "method_name": "write"}
)
def test_write_protected_field_error(self):
job_ = self.env["res.partner"].with_delay().create({"name": "test"})
db_job = job_.db_record()
with self.assertRaises(exceptions.AccessError):
db_job.method_name = "unlink"
def test_write_allow_no_protected_field_error(self):
job_ = self.env["res.partner"].with_delay().create({"name": "test"})
db_job = job_.db_record()
db_job.priority = 30
self.assertEqual(db_job.priority, 30)

View File

@@ -0,0 +1,10 @@
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
# pylint: disable=odoo-addons-relative-import
# we are testing, we want to test as we were an external consumer of the API
from odoo.addons.queue_job.jobrunner import channels
from .common import load_doctests
load_tests = load_doctests(channels)

View File

@@ -0,0 +1,10 @@
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
# pylint: disable=odoo-addons-relative-import
# we are testing, we want to test as we were an external consumer of the API
from odoo.addons.queue_job.jobrunner import runner
from .common import load_doctests
load_tests = load_doctests(runner)