diff --git a/Makefile b/Makefile index 99b55d3762..73fca6ea8c 100644 --- a/Makefile +++ b/Makefile @@ -2,10 +2,26 @@ # PRODUCTION COMMANDS ######################################### ############################################################### # These are production commands which may be invoked in deployments -altprodserver: NUM_PROCS:=3 -altprodserver: NUM_THREADS:=5 -altprodserver: collectstatic compilemessages - cd contentcuration/ && gunicorn contentcuration.wsgi:application --timeout=4000 --error-logfile=/var/log/gunicorn-error.log --workers=${NUM_PROCS} --threads=${NUM_THREADS} --bind=0.0.0.0:8081 --pid=/tmp/contentcuration.pid --log-level=debug || sleep infinity +#altprodserver: NUM_PROCS:=3 +#altprodserver: NUM_THREADS:=5 +#altprodserver: collectstatic compilemessages +# cd contentcuration/ && gunicorn contentcuration.wsgi:application --timeout=4000 --error-logfile=/var/log/gunicorn-error.log --workers=${NUM_PROCS} --threads=${NUM_THREADS} --bind=0.0.0.0:8081 --pid=/tmp/contentcuration.pid --log-level=debug || sleep infinity + +altprodserver: + $(MAKE) -j 2 gunicornanddapneserver + +gunicornanddapneserver: gunicornserver daphneserver + +daphneserver: + cd contentcuration/ && daphne -b 0.0.0.0 -p 8082 contentcuration.asgi:application + +gunicornserver: NUM_PROCS:=1 +gunicornserver: + cd contentcuration/ && gunicorn contentcuration.wsgi:application --timeout=4000 --error-logfile=/var/log/gunicorn-error.log --workers=${NUM_PROCS} --bind=0.0.0.0:8081 --pid=/tmp/contentcuration.pid --log-level=debug || sleep infinity + + +contentnodegc: + cd contentcuration/ && python manage.py garbage_collect prodceleryworkers: cd contentcuration/ && celery -A contentcuration worker -l info --concurrency=3 --task-events @@ -20,9 +36,6 @@ migrate: python contentcuration/manage.py migrate || true python contentcuration/manage.py loadconstants -contentnodegc: - python contentcuration/manage.py garbage_collect - filedurations: python contentcuration/manage.py set_file_duration diff --git a/contentcuration/contentcuration/apps.py b/contentcuration/contentcuration/apps.py index 6d0fa858ff..6f5fb0bb23 100644 --- a/contentcuration/contentcuration/apps.py +++ b/contentcuration/contentcuration/apps.py @@ -8,6 +8,9 @@ class ContentConfig(AppConfig): name = 'contentcuration' def ready(self): + # signals for websockets + import contentcuration.viewsets.websockets.signals # noqa + if settings.AWS_AUTO_CREATE_BUCKET and not is_gcs_backend(): from contentcuration.utils.minio_utils import ensure_storage_bucket_public ensure_storage_bucket_public() diff --git a/contentcuration/contentcuration/asgi.py b/contentcuration/contentcuration/asgi.py new file mode 100644 index 0000000000..8a63bc229a --- /dev/null +++ b/contentcuration/contentcuration/asgi.py @@ -0,0 +1,25 @@ +import django +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter +from channels.routing import URLRouter +from django.conf import settings + +from contentcuration.viewsets.websockets.routing import http_urlpatterns +from contentcuration.viewsets.websockets.routing import websocket_urlpatterns + +django.setup(set_prefix=False) + +protocol_config = { + "websocket": + AuthMiddlewareStack( + URLRouter( + websocket_urlpatterns + ) + ), +} + +# production settings to add healthcheck +if not settings.DEBUG: + protocol_config.update(http=URLRouter(http_urlpatterns)) + +application = ProtocolTypeRouter(protocol_config) diff --git a/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js b/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js index 3e5b8a79d9..d6ce310cc8 100644 --- a/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js +++ b/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js @@ -1,8 +1,8 @@ import Dexie from 'dexie'; import sortBy from 'lodash/sortBy'; -import { CHANGE_TYPES, IGNORED_SOURCE, TABLE_NAMES } from './constants'; import db from './db'; import { INDEXEDDB_RESOURCES } from './registry'; +import { CHANGE_TYPES, IGNORED_SOURCE, TABLE_NAMES } from './constants'; const { CREATED, DELETED, UPDATED, MOVED, PUBLISHED, SYNCED } = CHANGE_TYPES; diff --git a/contentcuration/contentcuration/frontend/shared/data/resources.js b/contentcuration/contentcuration/frontend/shared/data/resources.js index bd8796a8fe..01432fafe3 100644 --- a/contentcuration/contentcuration/frontend/shared/data/resources.js +++ b/contentcuration/contentcuration/frontend/shared/data/resources.js @@ -2,6 +2,7 @@ import Dexie from 'dexie'; import Mutex from 'mutex-js'; import findIndex from 'lodash/findIndex'; import flatMap from 'lodash/flatMap'; +import intersection from 'lodash/intersection'; import isArray from 'lodash/isArray'; import isNumber from 'lodash/isNumber'; import isString from 'lodash/isString'; @@ -14,19 +15,19 @@ import uniqBy from 'lodash/uniqBy'; import { v4 as uuidv4 } from 'uuid'; import { - CHANGE_TYPES, + ACTIVE_CHANNELS, CHANGES_TABLE, + CHANGE_TYPES, + CHANNEL_SYNC_KEEP_ALIVE_INTERVAL, + COPYING_FLAG, + CURRENT_USER, + CREATION_CHANGE_TYPES, IGNORED_SOURCE, + LAST_FETCHED, + MAX_REV_KEY, RELATIVE_TREE_POSITIONS, TABLE_NAMES, - COPYING_FLAG, TASK_ID, - CURRENT_USER, - ACTIVE_CHANNELS, - CHANNEL_SYNC_KEEP_ALIVE_INTERVAL, - MAX_REV_KEY, - LAST_FETCHED, - CREATION_CHANGE_TYPES, TREE_CHANGE_TYPES, } from './constants'; import applyChanges, { applyMods, collectChanges } from './applyRemoteChanges'; @@ -34,9 +35,15 @@ import mergeAllChanges from './mergeChanges'; import db, { channelScope, CLIENTID, Collection } from './db'; import { API_RESOURCES, INDEXEDDB_RESOURCES } from './registry'; import { DELAYED_VALIDATION, fileErrors, NEW_OBJECT } from 'shared/constants'; -import client, { paramsSerializer } from 'shared/client'; -import { currentLanguage } from 'shared/i18n'; import urls from 'shared/urls'; +import { currentLanguage } from 'shared/i18n'; +import client, { paramsSerializer } from 'shared/client'; + +/** + * Task names for which it is only useful to keep the most recent task object + * @type {string[]} + */ +const SINGULAR_TASKS = ['export-channel', 'sync-channel']; // Number of seconds after which data is considered stale. const REFRESH_INTERVAL = 5; @@ -1847,13 +1854,15 @@ export const Task = new IndexedDBResource({ task.channel_id = task.channel_id.replace('-', ''); } return this.transaction({ mode: 'rw', source: IGNORED_SOURCE }, () => { - return this.table - .where(this.idField) - .noneOf(tasks.map(t => t[this.idField])) - .delete() - .then(() => { - return this.table.bulkPut(tasks); - }); + let deletePromise = Promise.resolve(); + let taskDeletes = intersection( + SINGULAR_TASKS, + tasks.map(t => t.task_name) + ); + if (taskDeletes.length) { + deletePromise = this.table.filter(t => taskDeletes.includes(t.task_name)).delete(); + } + return deletePromise.then(() => this.table.bulkPut(tasks)); }); }, }); diff --git a/contentcuration/contentcuration/frontend/shared/data/serverSync.js b/contentcuration/contentcuration/frontend/shared/data/serverSync.js index e114b8cd19..59ad8042e8 100644 --- a/contentcuration/contentcuration/frontend/shared/data/serverSync.js +++ b/contentcuration/contentcuration/frontend/shared/data/serverSync.js @@ -1,37 +1,34 @@ import debounce from 'lodash/debounce'; import findLastIndex from 'lodash/findLastIndex'; import get from 'lodash/get'; -import pick from 'lodash/pick'; import omit from 'lodash/omit'; import orderBy from 'lodash/orderBy'; +import pick from 'lodash/pick'; import uniq from 'lodash/uniq'; +import mergeAllChanges from './mergeChanges'; +import db from './db'; import applyChanges from './applyRemoteChanges'; +import { INDEXEDDB_RESOURCES } from './registry'; +import { Channel, Session, Task } from './resources'; import { - CHANGE_TYPES, + ACTIVE_CHANNELS, CHANGES_TABLE, - IGNORED_SOURCE, + CHANGE_TYPES, CHANNEL_SYNC_KEEP_ALIVE_INTERVAL, - ACTIVE_CHANNELS, + IGNORED_SOURCE, MAX_REV_KEY, LAST_FETCHED, COPYING_FLAG, TASK_ID, } from './constants'; -import db from './db'; -import mergeAllChanges from './mergeChanges'; -import { INDEXEDDB_RESOURCES } from './registry'; -import { Channel, Session, Task } from './resources'; import client from 'shared/client'; import urls from 'shared/urls'; // When this many seconds pass without a syncable // change being registered, sync changes! -const SYNC_IF_NO_CHANGES_FOR = 2; - -// When this many seconds pass, repoll the backend to -// check for any updates to active channels, or the user and sync any current changes -const SYNC_POLL_INTERVAL = 5; +const SYNC_IF_NO_CHANGES_FOR = 0.5; +let socket; // Flag to check if a sync is currently active. let syncActive = false; @@ -94,10 +91,9 @@ function trimChangeForSync(change) { return payload; } -function handleDisallowed(response) { +function handleDisallowed(disallowed) { // The disallowed property is an array of any changes that were sent to the server, // that were rejected. - const disallowed = get(response, ['data', 'disallowed'], []); if (disallowed.length) { // Collect all disallowed const disallowedRevs = disallowed.map(d => Number(d.rev)); @@ -111,10 +107,9 @@ function handleDisallowed(response) { return Promise.resolve(); } -function handleAllowed(response) { +function handleAllowed(allowed) { // The allowed property is an array of any rev and server_rev for any changes sent to // the server that were accepted - const allowed = get(response, ['data', 'allowed'], []); if (allowed.length) { const revMap = {}; for (let obj of allowed) { @@ -130,21 +125,19 @@ function handleAllowed(response) { return Promise.resolve(); } -function handleReturnedChanges(response) { +function handleReturnedChanges(returnedChanges) { // The changes property is an array of any changes from the server to apply in the // client. - const returnedChanges = get(response, ['data', 'changes'], []); if (returnedChanges.length) { return applyChanges(returnedChanges); } return Promise.resolve(); } -function handleErrors(response) { +function handleErrors(errors) { // The errors property is an array of any changes that were sent to the server, // that were rejected, with an additional errors property that describes // the error. - const errors = get(response, ['data', 'errors'], []); if (errors.length) { const errorMap = {}; for (let error of errors) { @@ -162,10 +155,13 @@ function handleErrors(response) { return Promise.resolve(); } -function handleSuccesses(response) { +function handleTasks(tasks) { + return Task.setTasks(tasks); +} + +function handleSuccesses(successes) { // The successes property is an array of server_revs for any previously synced changes // that have now been successfully applied on the server. - const successes = get(response, ['data', 'successes'], []); if (successes.length) { return db[CHANGES_TABLE].where('server_rev') .anyOf(successes.map(c => c.server_rev)) @@ -174,14 +170,8 @@ function handleSuccesses(response) { return Promise.resolve(); } -function handleMaxRevs(response, userId) { - const allChanges = orderBy( - get(response, ['data', 'changes'], []) - .concat(get(response, ['data', 'errors'], [])) - .concat(get(response, ['data', 'successes'], [])), - 'server_rev', - 'desc' - ); +function handleMaxRevs(changes, userId) { + const allChanges = orderBy(changes, 'server_rev', 'desc'); const channelIds = uniq(allChanges.map(c => c.channel_id)).filter(Boolean); const maxRevs = {}; const promises = []; @@ -214,11 +204,71 @@ function handleMaxRevs(response, userId) { return Promise.all(promises); } -function handleTasks(response) { - const tasks = get(response, ['data', 'tasks'], []); - return Task.setTasks(tasks); -} +async function WebsocketSendChanges() { + // Note: we could in theory use Dexie syncable for what + // we are doing here, but I can't find a good way to make + // it ignore our regular API calls for seeding the database + // Also, the pattern it expects for server interactions would + // require greater backend rearchitecting to focus our server-client + // interactions on changes to objects, with consistent and resolvable + // revisions. We will do this for now, but we have the option of doing + // something more involved and better architectured in the future. + + syncActive = true; + + // Track the maxRevision at this moment so that we can ignore any changes that + // might have come in during processing - leave them for the next cycle. + // This is the primary key of the change objects, so the collection is ordered by this + // by default - if we just grab the last object, we can get the key from there. + const [lastChange, user] = await Promise.all([ + db[CHANGES_TABLE].orderBy('rev').last(), + Session.getSession(), + ]); + if (!user) { + // If not logged in, nothing to do. + return; + } + + const now = Date.now(); + const channelIds = Object.entries(user[ACTIVE_CHANNELS] || {}) + .filter(([id, time]) => id && time > now - CHANNEL_SYNC_KEEP_ALIVE_INTERVAL) + .map(([id]) => id); + const channel_revs = {}; + for (let channelId of channelIds) { + channel_revs[channelId] = get(user, [MAX_REV_KEY, channelId], 0); + } + + const requestPayload = { + changes: [], + channel_revs, + user_rev: user.user_rev || 0, + }; + if (lastChange) { + const changesMaxRevision = lastChange.rev; + const syncableChanges = db[CHANGES_TABLE].where('rev') + .belowOrEqual(changesMaxRevision) + .filter(c => !c.synced); + const changesToSync = await syncableChanges.toArray(); + // By the time we get here, our changesToSync Array should + // have every change we want to sync to the server, so we + // can now trim it down to only what is needed to transmit over the wire. + // TODO: remove moves when a delete change is present for an object, + // because a delete will wipe out the move. + const changes = changesToSync.map(trimChangeForSync); + // Create a promise for the sync - if there is nothing to sync just resolve immediately, + // in order to still call our change cleanup code. + if (changes.length) { + requestPayload.changes = changes; + socket.send( + JSON.stringify({ + payload: requestPayload, + }) + ); + } + } + syncActive = false; +} async function syncChanges() { // Note: we could in theory use Dexie syncable for what // we are doing here, but I can't find a good way to make @@ -291,16 +341,23 @@ async function syncChanges() { // "errors": [], // "successes": [], // } + const response = await client.post(urls['sync'](), requestPayload); + try { await Promise.all([ - handleDisallowed(response), - handleAllowed(response), - handleReturnedChanges(response), - handleErrors(response), - handleSuccesses(response), - handleMaxRevs(response, user.id), - handleTasks(response), + handleDisallowed(get(response, ['data', 'disallowed'], [])), + handleAllowed(get(response, ['data', 'allowed'], [])), + handleReturnedChanges(get(response, ['data', 'changes'], [])), + handleErrors(get(response, ['data', 'errors'], [])), + handleSuccesses(get(response, ['data', 'successes'], [])), + handleMaxRevs( + get(response, ['data', 'changes'], []) + .concat(get(response, ['data', 'errors'], [])) + .concat(get(response, ['data', 'successes'], [])), + user.id + ), + handleTasks(get(response, ['data', 'tasks'], [])), ]); } catch (err) { console.error('There was an error updating change status: ', err); // eslint-disable-line no-console @@ -333,7 +390,7 @@ async function handleChanges(changes) { // MOVE, COPY, PUBLISH, and SYNC changes where we may be executing them inside an IGNORED_SOURCE // because they also make UPDATE and CREATE changes that we wish to make in the client only. // Only do this for changes that are not marked as synced. - const newChangeTableEntries = changes.some( + const newChangeTableEntries = changes.filter( c => c.table === CHANGES_TABLE && c.type === CHANGE_TYPES.CREATED && !c.obj.synced ); @@ -353,8 +410,11 @@ async function handleChanges(changes) { // If we detect changes were written to the changes table // then we'll trigger sync - if (syncableChanges.length || newChangeTableEntries) { - debouncedSyncChanges(); + + if (newChangeTableEntries.length) { + if (!syncActive) { + WebsocketSendChanges(); + } } } @@ -364,8 +424,71 @@ export function startSyncing() { // Initiate a sync immediately in case any data // is left over in the database. debouncedSyncChanges(); - // Start the sync interval - intervalTimer = setInterval(debouncedSyncChanges, SYNC_POLL_INTERVAL * 1000); + const websocketUrl = new URL( + `/ws/sync_socket/${window.CHANNEL_EDIT_GLOBAL.channel_id}/`, + window.location.href + ); + websocketUrl.protocol = window.location.protocol == 'https:' ? 'wss:' : 'ws:'; + + socket = new WebSocket(websocketUrl); + + // Connection opened + socket.addEventListener('open', () => { + console.log('Websocket connected'); + }); + + // Listen for any errors due to which connection may be closed. + socket.addEventListener('error', event => { + console.log('WebSocket error: ', event); + }); + + socket.addEventListener('message', async e => { + const data = JSON.parse(e.data); + const user = await Session.getSession(); + console.log(data); + if (data.task) { + Task.setTasks([data.task]); + } + if (data.response_payload && data.response_payload.allowed) { + try { + handleAllowed(data.response_payload.allowed); + } catch (err) { + console.log(err); + } + } + if (data.response_payload && data.response_payload.disallowed) { + try { + handleDisallowed(data.response_payload.disallowed); + } catch (err) { + console.log(err); + } + } + if (data.change) { + try { + handleReturnedChanges([data.change]); + handleMaxRevs([data.change], user.id); + } catch (err) { + console.log(err); + } + } + if (data.errored) { + try { + handleErrors([data.errored]); + handleMaxRevs([data.errored], user.id); + } catch (err) { + console.log(err); + } + } + if (data.success) { + try { + handleSuccesses([data.success]); + handleMaxRevs([data.success], user.id); + } catch (err) { + console.log(err); + } + } + }); + db.on('changes', handleChanges); } diff --git a/contentcuration/contentcuration/models.py b/contentcuration/contentcuration/models.py index 8c772e46af..eb19955c7f 100644 --- a/contentcuration/contentcuration/models.py +++ b/contentcuration/contentcuration/models.py @@ -2405,7 +2405,7 @@ def _create_from_change(cls, created_by_id=None, channel_id=None, user_id=None, ) @classmethod - def create_changes(cls, changes, created_by_id=None, session_key=None, applied=False): + def create_changes(cls, changes, created_by_id, session_key=None, applied=False): change_models = [] for change in changes: change_models.append(cls._create_from_change(created_by_id=created_by_id, session_key=session_key, applied=applied, **change)) @@ -2414,7 +2414,7 @@ def create_changes(cls, changes, created_by_id=None, session_key=None, applied=F return change_models @classmethod - def create_change(cls, change, created_by_id=None, session_key=None, applied=False): + def create_change(cls, change, created_by_id, session_key=None, applied=False): obj = cls._create_from_change(created_by_id=created_by_id, session_key=session_key, applied=applied, **change) obj.save() return obj diff --git a/contentcuration/contentcuration/settings.py b/contentcuration/contentcuration/settings.py index deb5894eab..bb540d3527 100644 --- a/contentcuration/contentcuration/settings.py +++ b/contentcuration/contentcuration/settings.py @@ -88,6 +88,7 @@ 'mathfilters', 'django.contrib.postgres', 'django_celery_results', + 'channels', ) SESSION_ENGINE = "django.contrib.sessions.backends.cached_db" @@ -203,7 +204,17 @@ ] WSGI_APPLICATION = 'contentcuration.wsgi.application' +ASGI_APPLICATION = 'contentcuration.asgi.application' +CHANNELS_DB = 2 +CHANNEL_LAYERS = { + 'default': { + 'BACKEND': 'channels_redis.core.RedisChannelLayer', + 'CONFIG': { + "hosts": ["{url}{db}".format(url=REDIS_URL, db=CHANNELS_DB)], + }, + }, +} # Database # https://docs.djangoproject.com/en/1.8/ref/settings/#databases diff --git a/contentcuration/contentcuration/tests/test_change_signals.py b/contentcuration/contentcuration/tests/test_change_signals.py new file mode 100644 index 0000000000..5182330803 --- /dev/null +++ b/contentcuration/contentcuration/tests/test_change_signals.py @@ -0,0 +1,112 @@ +from django.core.management import call_command +from django.test import TestCase +from mock import patch + +from contentcuration.models import Change +from contentcuration.tests import testdata +from contentcuration.tests.base import BucketTestMixin +from contentcuration.tests.utils.websocket_helper import create_channel_specific_change_object +from contentcuration.tests.utils.websocket_helper import create_channel_user_common_change_object +from contentcuration.tests.utils.websocket_helper import create_errored_change_object +from contentcuration.tests.utils.websocket_helper import create_user_specific_change_object +from contentcuration.tests.viewsets.base import generate_update_event +from contentcuration.viewsets.sync.constants import CHANNEL + + +class ChangeSignalTestCase(TestCase, BucketTestMixin): + def setUp(self): + call_command("loadconstants") + if not self.persist_bucket: + self.create_bucket() + self.user = testdata.user("mrtest@testy.com") + self.channel = testdata.channel() + self.channel.editors.add(self.user) + self.client.force_login(user=self.user) + + def tearDown(self): + if not self.persist_bucket: + self.delete_bucket() + self.user.delete() + + @patch('contentcuration.viewsets.websockets.signals.broadcast_new_change_model') + def test_change_signal_handler(self, mock_signal): + """ + Test if signal is getting triggered when an change object gets created. + """ + self.client.force_login(self.user) + new_name = "This is not the old name" + Change.create_change(generate_update_event(self.channel.id, CHANNEL, {"name": new_name}, channel_id=self.channel.id), created_by_id=self.user.id) + assert mock_signal.call_count == 1 + + @patch('contentcuration.viewsets.websockets.signals.async_to_sync') + @patch('contentcuration.viewsets.websockets.signals.get_channel_layer') + def test_signal_handler_channel_specific_changes(self, mock_get_channel_layer, mock_async_to_sync): + """ + Test changes that are specific to channel(change channel name) only. + """ + change_serialized = create_channel_specific_change_object(self.user, self.channel) + channel_layer = mock_get_channel_layer.return_value + assert 2 == mock_async_to_sync.call_count + mock_async_to_sync.assert_called_with(channel_layer.group_send) + async_mock_return_value = mock_async_to_sync.return_value + async_mock_return_value.assert_any_call(str(self.user.id), { + 'type': 'broadcast_success', + 'success': change_serialized + }) + async_mock_return_value.assert_any_call(self.channel.id, { + 'type': 'broadcast_changes', + 'change': change_serialized + }) + + @patch('contentcuration.viewsets.websockets.signals.async_to_sync') + @patch('contentcuration.viewsets.websockets.signals.get_channel_layer') + def test_signal_handler_user_specific_changes(self, mock_get_channel_layer, mock_async_to_sync): + """ + Test changes that are specific to user(bookmarks) only. + """ + change_serialized = create_user_specific_change_object(self.user, self.channel) + channel_layer = mock_get_channel_layer.return_value + mock_async_to_sync.assert_called_once_with(channel_layer.group_send) + async_mock_return_value = mock_async_to_sync.return_value + async_mock_return_value.assert_any_call(str(self.user.id), { + 'type': 'broadcast_success', + 'success': change_serialized + }) + + @patch('contentcuration.viewsets.websockets.signals.async_to_sync') + @patch('contentcuration.viewsets.websockets.signals.get_channel_layer') + def test_signal_handler_user_channel_common_changes(self, mock_get_channel_layer, mock_async_to_sync): + """ + Test changes that are common to both channel and user(invitations). + """ + editor = self.user + change_serialized = create_channel_user_common_change_object(editor, self.channel) + channel_layer = mock_get_channel_layer.return_value + assert 2 == mock_async_to_sync.call_count + mock_async_to_sync.assert_called_with(channel_layer.group_send) + async_mock_return_value = mock_async_to_sync.return_value + assert 2 == mock_async_to_sync.call_count + async_mock_return_value.assert_any_call(self.channel.id, { + 'type': 'broadcast_changes', + 'change': change_serialized + }) + async_mock_return_value.assert_any_call(str(self.user.id), { + 'type': 'broadcast_success', + 'success': change_serialized + }) + + @patch('contentcuration.viewsets.websockets.signals.async_to_sync') + @patch('contentcuration.viewsets.websockets.signals.get_channel_layer') + def test_signal_handler_errored_changes(self, mock_get_channel_layer, mock_async_to_sync): + """ + Test changes that are errored! + """ + change_serialized = create_errored_change_object(self.user, self.channel) + channel_layer = mock_get_channel_layer.return_value + mock_async_to_sync.assert_called_once_with(channel_layer.group_send) + async_mock_return_value = mock_async_to_sync.return_value + assert 1 == mock_async_to_sync.call_count + async_mock_return_value.assert_called_once_with(str(self.user.id), { + 'type': 'broadcast_errors', + 'errored': change_serialized + }) diff --git a/contentcuration/contentcuration/tests/test_serializers.py b/contentcuration/contentcuration/tests/test_serializers.py index 1eed06db9e..36a006e962 100644 --- a/contentcuration/contentcuration/tests/test_serializers.py +++ b/contentcuration/contentcuration/tests/test_serializers.py @@ -146,12 +146,15 @@ class Meta: nested_writes = True def test_save__create(self): + class request: + user = self.user + context = {"request": request} s = self.ChannelSerializer( data=dict( name="New test channel", description="This is the best test channel", content_defaults=dict(author="Buster"), - ) + ), context=context ) self.assertTrue(s.is_valid()) @@ -162,6 +165,9 @@ def test_save__create(self): self.assertEqual(defaults, c.content_defaults) def test_save__update(self): + class request: + user = self.user + context = {"request": request} c = Channel( name="New test channel", description="This is the best test channel", @@ -170,7 +176,7 @@ def test_save__update(self): c.save() s = self.ChannelSerializer( - c, data=dict(content_defaults=dict(license="Special Permissions")) + c, data=dict(content_defaults=dict(license="Special Permissions")), context=context ) self.assertTrue(s.is_valid()) diff --git a/contentcuration/contentcuration/tests/test_websocket_consumer.py b/contentcuration/contentcuration/tests/test_websocket_consumer.py new file mode 100644 index 0000000000..73ae2deccc --- /dev/null +++ b/contentcuration/contentcuration/tests/test_websocket_consumer.py @@ -0,0 +1,101 @@ +import os + +import pytest +from channels.layers import get_channel_layer +from channels.testing import WebsocketCommunicator +from django.core.management import call_command +from django.test import override_settings +from django.test import TransactionTestCase + +from contentcuration.asgi import application +from contentcuration.tests import testdata + +os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" + + +class WebsocketTestCase(TransactionTestCase): + def setUp(self): + call_command("loadconstants") + self.user = testdata.user("mrtest@testy.com") + + def tearDown(self): + self.user.delete() + + @pytest.mark.asyncio + async def test_authenticated_user_websocket_connection(self): + self.client.force_login(self.user) + headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())] + communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers) + connected = await communicator.connect() + assert connected + await communicator.disconnect() + + @pytest.mark.asyncio + async def test_unauthenticated_user_websocket_connection(self): + headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())] + communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers) + connected, _ = await communicator.connect() + assert connected is False + await communicator.disconnect() + + @pytest.mark.asyncio + async def test_disconnect_websockets(self): + self.client.force_login(self.user) + headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())] + channel_layers_setting = { + "default": {"BACKEND": "channels.layers.InMemoryChannelLayer"} + } + with override_settings(CHANNEL_LAYERS=channel_layers_setting): + communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers) + connected, _ = await communicator.connect() + channel_layer = get_channel_layer() + assert connected + await communicator.disconnect() + assert channel_layer.groups == {} + + @pytest.mark.asyncio + async def test_send_payload_websockets(self): + self.client.force_login(self.user) + headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())] + communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers) + connected = await communicator.connect() + assert connected + await communicator.send_json_to({ + "payload": { + "changes": [ + { + "type": 2, + "key": "7ae83505f20a4642a004fadde7f151ed", + "table": "channel", + "rev": 253, + "channel_id": "7ae83505f20a4642a004fadde7f151ed", + "mods": { + "name": "test" + } + } + ], + "channel_revs": { + "7ae83505f20a4642a004fadde7f151ed": 51 + }, + "user_rev": 0 + }}) + response = await communicator.receive_json_from() + assert response["response_payload"] + await communicator.disconnect() + + @pytest.mark.asyncio + async def test_channels_groups(self): + self.client.force_login(self.user) + headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())] + channel_layers_setting = { + "default": {"BACKEND": "channels.layers.InMemoryChannelLayer"} + } + with override_settings(CHANNEL_LAYERS=channel_layers_setting): + communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers) + connected, _ = await communicator.connect() + channel_layer = get_channel_layer() + # check the grou for channel exist + assert channel_layer.groups['12312312312123'] + assert channel_layer.groups[f"{self.user.id}"] + assert connected + await communicator.disconnect() diff --git a/contentcuration/contentcuration/tests/utils/websocket_helper.py b/contentcuration/contentcuration/tests/utils/websocket_helper.py new file mode 100644 index 0000000000..a87b816ab3 --- /dev/null +++ b/contentcuration/contentcuration/tests/utils/websocket_helper.py @@ -0,0 +1,72 @@ +import uuid + +from le_utils.constants import content_kinds + +from contentcuration import models +from contentcuration.models import Change +from contentcuration.tests.viewsets.base import generate_create_event +from contentcuration.tests.viewsets.base import generate_update_event +from contentcuration.viewsets.sync.constants import BOOKMARK +from contentcuration.viewsets.sync.constants import CHANNEL +from contentcuration.viewsets.sync.constants import CONTENTNODE +from contentcuration.viewsets.sync.constants import EDITOR_M2M + + +def bookmark_metadata(channel): + return { + "channel": channel.id, + } + + +def contentnode_db_metadata(channel): + return { + "title": "Aron's cool contentnode", + "id": uuid.uuid4().hex, + "kind_id": content_kinds.VIDEO, + "description": "coolest contentnode this side of the Pacific", + "parent_id": channel.main_tree_id, + } + + +def create_user_specific_change_object(user, channel): + bookmark = bookmark_metadata(channel) + change_obj = Change.create_change(generate_create_event( + bookmark["channel"], + BOOKMARK, + bookmark, + user_id=user.id, + ), created_by_id=user.id) + change_obj.applied = True + change_obj.save() + change_serialized = Change.serialize(change_obj) + return change_serialized + + +def create_channel_specific_change_object(user, channel): + new_name = "This is not the old name" + change_obj = Change.create_change(generate_update_event(channel.id, CHANNEL, {"name": new_name}, channel_id=channel.id), created_by_id=user.id) + change_obj.applied = True + change_obj.save() + change_serialized = Change.serialize(change_obj) + return change_serialized + + +def create_channel_user_common_change_object(user, channel): + editor = user + change_obj = Change.create_change(generate_create_event([editor.id, channel.id], EDITOR_M2M, {}, + channel_id=channel.id, user_id=editor.id), created_by_id=user.id) + change_obj.applied = True + change_obj.save() + change_serialized = Change.serialize(change_obj) + return change_serialized + + +def create_errored_change_object(user, channel): + contentnode = models.ContentNode.objects.create(**contentnode_db_metadata(channel)) + tag = "howzat!" + change_obj = Change.create_change(generate_update_event(contentnode.id, CONTENTNODE, { + "tags": [tag]}, channel_id=channel.id), created_by_id=user.id) + change_obj.errored = True + change_obj.save() + change_serialized = Change.serialize(change_obj) + return change_serialized diff --git a/contentcuration/contentcuration/tests/viewsets/test_contentnode.py b/contentcuration/contentcuration/tests/viewsets/test_contentnode.py index a70f1a43ac..882b5644a9 100644 --- a/contentcuration/contentcuration/tests/viewsets/test_contentnode.py +++ b/contentcuration/contentcuration/tests/viewsets/test_contentnode.py @@ -373,7 +373,6 @@ def test_consolidate_extra_fields(self): self.viewset_url(pk=contentnode.id), format="json", ) self.assertEqual(response.status_code, 200, response.content) - print(response.data["extra_fields"]) self.assertEqual(response.data["extra_fields"]["options"]["completion_criteria"]["threshold"]["m"], 3) self.assertEqual(response.data["extra_fields"]["options"]["completion_criteria"]["threshold"]["n"], 6) self.assertEqual(response.data["extra_fields"]["options"]["completion_criteria"]["threshold"]["mastery_model"], exercises.M_OF_N) diff --git a/contentcuration/contentcuration/viewsets/base.py b/contentcuration/contentcuration/viewsets/base.py index ef430b0fee..df3c90e031 100644 --- a/contentcuration/contentcuration/viewsets/base.py +++ b/contentcuration/contentcuration/viewsets/base.py @@ -3,7 +3,9 @@ import uuid from contextlib import contextmanager +from asgiref.sync import async_to_sync from celery import states +from channels.layers import get_channel_layer from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q from django.http import Http404 @@ -12,6 +14,7 @@ from django_filters.constants import EMPTY_VALUES from django_filters.rest_framework import DjangoFilterBackend from django_filters.rest_framework import FilterSet +from rest_framework.exceptions import NotAuthenticated from rest_framework.filters import OrderingFilter from rest_framework.generics import get_object_or_404 from rest_framework.response import Response @@ -199,8 +202,13 @@ def create(self, validated_data): def save(self, **kwargs): instance = super(BulkModelSerializer, self).save(**kwargs) + if "request" in self.context and not self.context["request"].user.is_anonymous: + user = self.context["request"].user + else: + raise NotAuthenticated() + if self.changes: - Change.create_changes(self.changes, applied=True) + Change.create_changes(self.changes, applied=True, created_by_id=user.id) return instance @@ -894,38 +902,98 @@ def create_change_tracker(pk, table, channel_id, user, task_name): # Clean up any previous tasks specific to this in case there were failures. meta = json.dumps(dict(pk=pk, table=table)) TaskResult.objects.filter(channel_id=channel_id, task_name=task_name, meta=meta).delete() + progress = 0 + status = states.STARTED + + channel_layer = get_channel_layer() + room_group_name = channel_id task_id = uuid.uuid4().hex - task_object = TaskResult.objects.create( - task_id=task_id, - status=states.STARTED, - channel_id=channel_id, - task_name=task_name, - user=user, - meta=meta + + async_to_sync(channel_layer.group_send)( + str(room_group_name), + { + 'type': 'broadcast_tasks', + 'tasks': { + 'pk': pk, + 'table': table, + 'task_id': task_id, + 'task_name': task_name, + 'traceback': None, + 'progress': progress, + 'channel_id': channel_id, + 'status': status, + } + } ) def update_progress(progress=None): if progress: - task_object.progress = progress - task_object.save() + async_to_sync(channel_layer.group_send)( + str(room_group_name), + { + 'type': 'broadcast_tasks', + 'tasks': { + 'pk': pk, + 'table': table, + 'task_id': task_id, + 'task_name': task_name, + 'traceback': None, + 'progress': progress, + 'channel_id': channel_id, + 'status': status, + } + } + ) Change.create_change( - generate_update_event(pk, table, {TASK_ID: task_object.task_id}, channel_id=channel_id), applied=True + generate_update_event(pk, table, {TASK_ID: task_id}, channel_id=channel_id), applied=True, created_by_id=user.id ) tracker = ProgressTracker(task_id, update_progress) try: yield tracker - except Exception: - task_object.status = states.FAILURE - task_object.traceback = traceback.format_exc() - task_object.save() + except Exception as e: + status = states.FAILURE + traceback_str = traceback.format_exc() + async_to_sync(channel_layer.group_send)( + str(room_group_name), + { + 'type': 'broadcast_tasks', + 'tasks': { + 'pk': pk, + 'table': table, + 'task_id': task_id, + 'task_name': task_name, + 'traceback': traceback_str, + 'progress': progress, + 'channel_id': channel_id, + 'status': status, + } + } + ) finally: - if task_object.status == states.STARTED: + if status == states.STARTED: + progress = 100 + async_to_sync(channel_layer.group_send)( + str(room_group_name), + { + 'type': 'broadcast_tasks', + 'tasks': { + 'pk': pk, + 'table': table, + 'task_id': task_id, + 'task_name': task_name, + 'traceback': None, + 'progress': progress, + 'channel_id': channel_id, + 'status': states.SUCCESS, + } + } + ) # No error reported, cleanup. Change.create_change( - generate_update_event(pk, table, {TASK_ID: None}, channel_id=channel_id), applied=True + generate_update_event(pk, table, {TASK_ID: None}, channel_id=channel_id), applied=True, created_by_id=user.id + ) - task_object.delete() diff --git a/contentcuration/contentcuration/viewsets/channel.py b/contentcuration/contentcuration/viewsets/channel.py index c028405130..a6375cb6c1 100644 --- a/contentcuration/contentcuration/viewsets/channel.py +++ b/contentcuration/contentcuration/viewsets/channel.py @@ -481,23 +481,22 @@ def publish(self, pk, version_notes="", language=None): progress_tracker=progress_tracker, language=language ) - Change.create_changes([ + Change.create_change( generate_update_event( channel.id, CHANNEL, { "published": True, "publishing": False, "primary_token": channel.get_human_token().token, - "last_published": channel.last_published, - "unpublished_changes": _unpublished_changes_query(channel.id).exists() + "last_published": str(channel.last_published), + "unpublished_changes": bool(_unpublished_changes_query(channel.id).exists()) }, channel_id=channel.id - ), - ], applied=True) + ), created_by_id=self.request.user.id, applied=True) except Exception: Change.create_changes([ generate_update_event( channel.id, CHANNEL, {"publishing": False, "unpublished_changes": True}, channel_id=channel.id ), - ], applied=True) + ], created_by_id=self.request.user.id, applied=True) raise def sync_from_changes(self, changes): diff --git a/contentcuration/contentcuration/viewsets/sync/endpoint.py b/contentcuration/contentcuration/viewsets/sync/endpoint.py index a7b1e657fb..947aba611e 100644 --- a/contentcuration/contentcuration/viewsets/sync/endpoint.py +++ b/contentcuration/contentcuration/viewsets/sync/endpoint.py @@ -3,7 +3,6 @@ and deals with processing all the changes to make appropriate bulk creates, updates, and deletes. """ -from celery import states from django.db.models import Q from rest_framework.authentication import SessionAuthentication from rest_framework.permissions import IsAuthenticated @@ -12,7 +11,6 @@ from contentcuration.models import Change from contentcuration.models import Channel -from contentcuration.models import TaskResult from contentcuration.tasks import apply_channel_changes_task from contentcuration.tasks import apply_user_changes_task from contentcuration.viewsets.sync.constants import CHANNEL @@ -122,22 +120,6 @@ def return_changes(self, request, channel_revs): return {"changes": changes, "errors": errors, "successes": successes} - def return_tasks(self, request, channel_revs): - tasks = TaskResult.objects.filter( - channel_id__in=channel_revs.keys(), - status__in=[states.STARTED, states.FAILURE], - ).exclude(task_name__in=[apply_channel_changes_task.name, apply_user_changes_task.name]) - return { - "tasks": tasks.values( - "task_id", - "task_name", - "traceback", - "progress", - "channel_id", - "status", - ) - } - def post(self, request): response_payload = { "disallowed": [], @@ -154,6 +136,4 @@ def post(self, request): response_payload.update(self.return_changes(request, channel_revs)) - response_payload.update(self.return_tasks(request, channel_revs)) - return Response(response_payload) diff --git a/contentcuration/contentcuration/viewsets/websockets/consumers.py b/contentcuration/contentcuration/viewsets/websockets/consumers.py new file mode 100644 index 0000000000..722281be12 --- /dev/null +++ b/contentcuration/contentcuration/viewsets/websockets/consumers.py @@ -0,0 +1,185 @@ +import json +import logging as logger + +from asgiref.sync import async_to_sync +from channels.generic.http import AsyncHttpConsumer +from channels.generic.websocket import WebsocketConsumer + +from contentcuration.viewsets.sync.constants import CHANNEL +from contentcuration.viewsets.sync.constants import CREATED + + +logging = logger.getLogger(__name__) + + +class SyncConsumer(WebsocketConsumer): + # Initial reset + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.room_group_name = None + self.indiviual_room_group_name = None + + @property + def user(self): + return self.scope["user"] + + # Checks permissions + def check_authentication(self): + return self.user.is_authenticated + + def connect(self): + """ + Executes when a user tries to make a websocket connection. + - Creates and joins a group for indiviual user + - Joins a public group based on channel_id provided in url + """ + # Extract the channel_id from url + self.room_group_name = self.scope['url_route']['kwargs']['channel_id'] + + logging.debug("Connected to channel_id: " + self.room_group_name) + + self.indiviual_room_group_name = str(self.user.id) + + logging.debug("Connected to user " + str(self.user)) + + if self.check_authentication(): + # Join room group based on channel_id + async_to_sync(self.channel_layer.group_add)( + self.room_group_name, + self.channel_name + ) + + # Join private room group for indiviual user + async_to_sync(self.channel_layer.group_add)( + self.indiviual_room_group_name, + self.channel_name + ) + + self.accept() + + else: + self.close() + + def disconnect(self, close_code): + """ + Executed to leave indiviual-user and channel group + """ + # Leave channel_id room group + async_to_sync(self.channel_layer.group_discard)( + self.room_group_name, + self.channel_name + ) + + # Leave indiviual room group + async_to_sync(self.channel_layer.group_discard)( + self.indiviual_room_group_name, + self.channel_name + ) + + def receive(self, text_data): + """ + Executes when data is received from websocket + """ + from contentcuration.models import Change + from contentcuration.models import Channel + from contentcuration.tasks import apply_channel_changes_task + from contentcuration.tasks import apply_user_changes_task + + response_payload = { + "disallowed": [], + "allowed": [], + } + user_id = self.user.id + session_key = self.scope['cookies']['kolibri_studio_sessionid'] + text_data_json = json.loads(text_data) + changes = text_data_json["payload"]["changes"] + + change_channel_ids = set(x.get("channel_id") for x in changes if x.get("channel_id")) + # Channels that have been created on the client side won't exist on the server yet, so we need to add a special exception for them. + created_channel_ids = set(x.get("channel_id") for x in changes if x.get("channel_id") and x.get("table") == CHANNEL and x.get("type") == CREATED) + # However, this would also give people a mechanism to edit existing channels on the server side by adding a channel create event for an + # already existing channel, so we have to filter out the channel ids that are already created on the server side, regardless of whether + # the user making the requests has permissions for those channels. + created_channel_ids = created_channel_ids.difference( + set(Channel.objects.filter(id__in=created_channel_ids).values_list("id", flat=True).distinct()) + ) + allowed_ids = set( + Channel.filter_edit_queryset(Channel.objects.filter(id__in=change_channel_ids), self.user).values_list("id", flat=True).distinct() + ).union(created_channel_ids) + # Allow changes that are either: + # Not related to a channel and instead related to the user if the user is the current user. + user_only_changes = [] + # Related to a channel that the user is an editor for. + channel_changes = [] + # Changes that cannot be made + disallowed_changes = [] + for c in changes: + if c.get("channel_id") is None and c.get("user_id") == user_id: + user_only_changes.append(c) + elif c.get("channel_id") in allowed_ids: + channel_changes.append(c) + else: + disallowed_changes.append(c) + change_models = Change.create_changes(user_only_changes + channel_changes, created_by_id=user_id, session_key=session_key) + if user_only_changes: + apply_user_changes_task.fetch_or_enqueue(self.user, user_id=user_id) + for channel_id in allowed_ids: + apply_channel_changes_task.fetch_or_enqueue(self.user, channel_id=channel_id) + allowed_changes = [{"rev": c.client_rev, "server_rev": c.server_rev} for c in change_models] + response_payload.update({"disallowed": disallowed_changes, "allowed": allowed_changes}) + + self.send(json.dumps({ + 'response_payload': response_payload + })) + + def broadcast_changes(self, event): + """ + Receive message events sent to the subscribed groups from our Django signal handlers, and relay the messages to the frontend + """ + change = event['change'] + + # Send message to WebSocket + self.send(text_data=json.dumps({ + 'change': change + })) + + def broadcast_errors(self, event): + """ + Broadcast any errors to frontend + """ + error = event['errored'] + + # Send message to WebSocket + self.send(text_data=json.dumps({ + 'errored': error + })) + + def broadcast_tasks(self, event): + """ + Broadcast tasks to frontend + """ + task = event['tasks'] + + # Send message to WebSocket + self.send(text_data=json.dumps({ + 'task': task + })) + + def broadcast_success(self, event): + """ + Broadcast success to required user + """ + success = event['success'] + + # Send message to WebSocket + self.send(text_data=json.dumps({ + 'success': success + })) + + +class HealthCheckHttpConsumer(AsyncHttpConsumer): + """ + Consumer for handeling the only http request related with Health check + """ + async def handle(self, body): + await self.send_response(200, b"OK") diff --git a/contentcuration/contentcuration/viewsets/websockets/routing.py b/contentcuration/contentcuration/viewsets/websockets/routing.py new file mode 100644 index 0000000000..2f2b894478 --- /dev/null +++ b/contentcuration/contentcuration/viewsets/websockets/routing.py @@ -0,0 +1,11 @@ +from django.urls import re_path + +from . import consumers + +websocket_urlpatterns = [ + re_path(r'ws/sync_socket/(?P\w+)/$', consumers.SyncConsumer.as_asgi()), +] + +http_urlpatterns = [ + re_path(r'healthz$', consumers.HealthCheckHttpConsumer.as_asgi()), +] diff --git a/contentcuration/contentcuration/viewsets/websockets/signals.py b/contentcuration/contentcuration/viewsets/websockets/signals.py new file mode 100644 index 0000000000..600fd27039 --- /dev/null +++ b/contentcuration/contentcuration/viewsets/websockets/signals.py @@ -0,0 +1,114 @@ +import logging as logger + +from asgiref.sync import async_to_sync +from channels.layers import get_channel_layer +from django.db.models.signals import post_save +from django.dispatch import receiver + +from contentcuration.models import Change +from contentcuration.utils.sentry import report_exception + +logging = logger.getLogger(__name__) + + +class NoneCreatedByIdError(Exception): + """ + Use to log change object whose created_by_id is set to none. We don't raise this error, + just feed it to Sentry for reporting. + """ + + def __init__(self, instance): + + self.change_object = instance + message = ( + "The change object did not have a created_by_id {}" + ) + self.message = message.format( + instance.pk + ) + + super(NoneCreatedByIdError, self).__init__(self.message) + + +@receiver(post_save, sender=Change, weak=False) +def broadcast_new_change_model_handler(sender, instance, created, **kwargs): + broadcast_new_change_model(instance) + + +def broadcast_new_change_model(instance): + channel_layer = get_channel_layer() + serialized_change_object = Change.serialize(instance) + # Name of channel group + room_group_name = instance.channel_id + + # name of indiviual_user group + indiviual_room_group_name = instance.user_id + + if instance.created_by_id is None and instance.change_type == 2 and instance.errored is True: + async_to_sync(channel_layer.group_send)( + str(room_group_name), + { + 'type': 'broadcast_errors', + 'errored': serialized_change_object + } + ) + + if instance.created_by_id is None: + try: + raise NoneCreatedByIdError(instance) + except NoneCreatedByIdError as e: + report_exception(e) + logging.error("Missing expected Change.created_by_id") + return + + # if the change object is errored then we broadcast the info back to indiviual user + if instance.errored: + async_to_sync(channel_layer.group_send)( + str(instance.created_by_id), + { + 'type': 'broadcast_errors', + 'errored': serialized_change_object + } + ) + if instance.applied: + # if the change is related to channel we broadcast changes to channel group + if not indiviual_room_group_name and room_group_name: + async_to_sync(channel_layer.group_send)( + str(instance.created_by_id), + { + 'type': 'broadcast_success', + 'success': serialized_change_object + } + ) + async_to_sync(channel_layer.group_send)( + str(room_group_name), + { + 'type': 'broadcast_changes', + 'change': serialized_change_object + } + ) + # if the change is only related to indiviual user + elif indiviual_room_group_name and not room_group_name: + async_to_sync(channel_layer.group_send)( + str(indiviual_room_group_name), + { + 'type': 'broadcast_success', + 'success': serialized_change_object + } + ) + # if the change is realted to both user and channel then we will broadcast to both of the groups + elif indiviual_room_group_name and room_group_name: + async_to_sync(channel_layer.group_send)( + str(room_group_name), + { + 'type': 'broadcast_changes', + 'change': serialized_change_object + } + ) + async_to_sync(channel_layer.group_send)( + str(indiviual_room_group_name), + { + 'type': 'broadcast_success', + 'success': serialized_change_object + } + ) diff --git a/deploy/nginx.conf.jinja2 b/deploy/nginx.conf.jinja2 index c762a077b3..8c10268884 100644 --- a/deploy/nginx.conf.jinja2 +++ b/deploy/nginx.conf.jinja2 @@ -24,6 +24,11 @@ http { server 127.0.0.1:8081; } + # Proxy to daphne for websocket requests + upstream ws_server { + server 127.0.0.1:8082; + } + # Allow-list filter for content catalog paths server { listen 8080; @@ -84,6 +89,16 @@ http { proxy_set_header Host $host; } + location /ws/ { + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_redirect off; + proxy_pass http://ws_server; + gzip off; + } + + location /content/ { proxy_http_version 1.1; proxy_pass {{ $aws_s3_endpoint_url }}/{{ $aws_s3_bucket_name }}/; diff --git a/requirements-dev.txt b/requirements-dev.txt index f683adee29..9bf9443073 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -141,7 +141,9 @@ mock==4.0.3 # -r requirements-dev.in # django-concurrent-test-helper msgpack==1.0.4 - # via locust + # via + # -c requirements.txt + # locust nodeenv==1.6.0 # via # -r requirements-dev.in @@ -260,7 +262,9 @@ tomli==1.2.3 # coverage # pep517 typing-extensions==4.1.1 - # via locust + # via + # -c requirements.txt + # locust uritemplate==3.0.1 # via # coreapi @@ -289,7 +293,9 @@ zipp==3.4.1 zope-event==4.5.0 # via gevent zope-interface==5.4.0 - # via gevent + # via + # -c requirements.txt + # gevent # The following packages are considered to be unsafe in a requirements file: # pip diff --git a/requirements.in b/requirements.in index 2704539269..9ba054c973 100644 --- a/requirements.in +++ b/requirements.in @@ -40,4 +40,6 @@ pillow==9.2.0 python-dateutil>=2.8.1 jsonschema>=3.2.0 importlib-metadata==1.7.0 +channels==3.0.4 +channels-redis==3.3.1 django-celery-results diff --git a/requirements.txt b/requirements.txt index e6068899d1..88bfa0a52c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,16 +4,31 @@ # # pip-compile requirements.in # +aioredis==1.3.1 + # via channels-redis amqp==5.1.1 # via kombu asgiref==3.3.4 - # via django + # via + # channels + # channels-redis + # daphne + # django async-timeout==4.0.2 - # via redis + # via + # aioredis + # redis attrs==19.3.0 # via # -r requirements.in + # automat # jsonschema + # service-identity + # twisted +autobahn==21.2.1 + # via daphne +automat==20.2.0 + # via twisted backoff==2.2.1 # via -r requirements.in backports-abc==0.5 @@ -37,7 +52,15 @@ certifi==2020.12.5 # requests # sentry-sdk cffi==1.14.5 - # via google-crc32c + # via + # cryptography + # google-crc32c +channels==3.0.4 + # via + # -r requirements.in + # channels-redis +channels-redis==3.3.1 + # via -r requirements.in chardet==4.0.0 # via requests click==8.1.3 @@ -54,11 +77,21 @@ click-repl==0.2.0 # via celery confusable-homoglyphs==3.2.0 # via django-registration +constantly==15.1.0 + # via twisted +cryptography==37.0.2 + # via + # autobahn + # pyopenssl + # service-identity +daphne==3.0.2 + # via channels deprecated==1.2.13 # via redis django==3.2.14 # via # -r requirements.in + # channels # django-bulk-update # django-db-readonly # django-filter @@ -146,16 +179,27 @@ grpcio==1.37.1 # grpc-google-iam-v1 gunicorn==20.1.0 # via -r requirements.in +hiredis==2.0.0 + # via aioredis html5lib==1.1 # via -r requirements.in httplib2==0.19.1 # via # django-postmark # oauth2client +hyperlink==21.0.0 + # via + # autobahn + # twisted idna==2.10 - # via requests + # via + # hyperlink + # requests + # twisted importlib-metadata==1.7.0 # via -r requirements.in +incremental==21.3.0 + # via twisted jmespath==0.10.0 # via # boto3 @@ -168,6 +212,8 @@ kombu==5.2.4 # via celery le-utils==0.1.42 # via -r requirements.in +msgpack==1.0.4 + # via channels-redis newrelic==6.2.0.156 # via -r requirements.in oauth2client==4.1.3 @@ -203,12 +249,16 @@ pyasn1==0.4.8 # oauth2client # pyasn1-modules # rsa + # service-identity pyasn1-modules==0.2.8 # via # google-auth # oauth2client + # service-identity pycparser==2.20 # via cffi +pyopenssl==22.0.0 + # via twisted pyparsing==2.4.7 # via # httplib2 @@ -246,8 +296,11 @@ s3transfer==0.4.2 # via boto3 sentry-sdk==1.9.0 # via -r requirements.in +service-identity==21.1.0 + # via twisted six==1.16.0 # via + # automat # click-repl # google-api-core # google-auth @@ -259,8 +312,15 @@ six==1.16.0 # progressbar2 # python-dateutil # python-utils + # service-identity sqlparse==0.4.1 # via django +twisted[tls]==22.4.0 + # via daphne +txaio==22.2.1 + # via autobahn +typing-extensions==4.1.1 + # via twisted urllib3==1.26.5 # via # botocore @@ -279,6 +339,8 @@ wrapt==1.14.1 # via deprecated zipp==3.4.1 # via importlib-metadata +zope-interface==5.4.0 + # via twisted # The following packages are considered to be unsafe in a requirements file: # setuptools