From 93e84f8feb3df9c8da34c1ca393689b9be6f3c5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Rama=C5=A1euski?= <andrej@x2.cz> Date: Wed, 5 Jan 2022 23:35:32 +0100 Subject: [PATCH] Configured message brocker --- .gitlab-ci.yml | 2 +- lib/CF/Controller/Websockets.pm | 19 ++++++++++++++----- lib/CF/Helpers/Core.pm | 13 ++++++++++--- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 23e435b..fa18d79 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -2,7 +2,7 @@ image: docker:20.10.9 variables: DOCKER_TLS_CERTDIR: "/certs" - IMAGE_VER: 2.5.4 + IMAGE_VER: 2.6.0 services: - docker:20.10.9-dind diff --git a/lib/CF/Controller/Websockets.pm b/lib/CF/Controller/Websockets.pm index 0c8c469..7c821e8 100644 --- a/lib/CF/Controller/Websockets.pm +++ b/lib/CF/Controller/Websockets.pm @@ -23,11 +23,20 @@ sub main { $c->res->headers->add('Sec-WebSocket-Extensions' => 'permessage-deflate'); } - $c->redis->pubsub->listen( - notify => sub($pubsub, $payload) { + my $pubsub; + + if ($c->cfg->{message_broker} eq 'redis') { + $pubsub = $c->redis->pubsub; + $pubsub->listen(notify => sub($pubsub, $payload) { $c->send(decode("UTF-8", $payload)); - } - ); + }); + } + else { + $pubsub = Mojo::Pg::PubSub->new(pg => $c->pg); + $pubsub->listen(notify => sub($pubsub, $payload) { + $c->send($payload); + }); + } $c->on(json => sub( $c, $message ) { if ( $message->{event} eq 'KEEPALIVE' ) { @@ -99,7 +108,7 @@ sub main { }); $c->on(finish => sub ($c, $code, $reason = undef) { - $c->redis->pubsub->unlisten('notify'); + $pubsub->unlisten('notify'); $c->app->log->debug("WebSocket closed with status $code"); }); } diff --git a/lib/CF/Helpers/Core.pm b/lib/CF/Helpers/Core.pm index 36cbc28..a952083 100644 --- a/lib/CF/Helpers/Core.pm +++ b/lib/CF/Helpers/Core.pm @@ -203,12 +203,19 @@ sub register ($class, $self, $conf) { $self->helper( notify => sub ( $c, $event, $payload ) { - my $content = encode_json ({ + my $content = { event => $event, payload => $payload, - }); + }; + + if ($self->cfg->{message_broker} eq 'redis' ) { + $self->redis->pubsub->notify( notify => encode_json($content) ); + } + else { + my $pubsub = Mojo::Pg::PubSub->new(pg => $self->pg); + $pubsub->json('notify')->notify( notify => $content ); + } - $self->redis->pubsub->notify( notify => $content ); }); } -- GitLab