Skip to content
Snippets Groups Projects
Verified Commit 181b62cd authored by Andrej Ramašeuski's avatar Andrej Ramašeuski
Browse files

Prepnuti pubsub s postgres na redis

parent 81c13c01
No related branches found
No related tags found
No related merge requests found
Pipeline #5925 passed
...@@ -2,7 +2,7 @@ image: docker:20.10.9 ...@@ -2,7 +2,7 @@ image: docker:20.10.9
variables: variables:
DOCKER_TLS_CERTDIR: "/certs" DOCKER_TLS_CERTDIR: "/certs"
IMAGE_VER: 2.1.2 IMAGE_VER: 2.2.0
services: services:
- docker:20.10.9-dind - docker:20.10.9-dind
......
...@@ -26,6 +26,7 @@ RUN cpanm \ ...@@ -26,6 +26,7 @@ RUN cpanm \
Media::Info \ Media::Info \
Mojolicious@8.73 \ Mojolicious@8.73 \
Mojo::Pg \ Mojo::Pg \
Mojo::Redis \
Mojo::JWT \ Mojo::JWT \
Mojolicious::Plugin::Authentication \ Mojolicious::Plugin::Authentication \
Mojolicious::Plugin::Authorization Mojolicious::Plugin::Authorization
...@@ -39,5 +40,4 @@ RUN npm update && npm install && npm run build && rm -rf mode_modules ...@@ -39,5 +40,4 @@ RUN npm update && npm install && npm run build && rm -rf mode_modules
USER nobody USER nobody
EXPOSE 3000 EXPOSE 3000
#CMD hypnotoad -f /opt/PiTube/script/pitube
CMD /opt/PiTube/script/pitube daemon CMD /opt/PiTube/script/pitube daemon
...@@ -2,10 +2,10 @@ package PiTube; ...@@ -2,10 +2,10 @@ package PiTube;
use Mojo::Base 'Mojolicious'; use Mojo::Base 'Mojolicious';
use Mojo::Pg; use Mojo::Pg;
use Mojo::Redis;
use Mojolicious::Plugin::Authentication; use Mojolicious::Plugin::Authentication;
use Mojolicious::Plugin::Authorization; use Mojolicious::Plugin::Authorization;
use Net::OAuth2::Profile::WebServer; use Net::OAuth2::Profile::WebServer;
use Redis;
use PiTube::Schema; use PiTube::Schema;
sub startup { sub startup {
...@@ -24,7 +24,10 @@ sub startup { ...@@ -24,7 +24,10 @@ sub startup {
$self->plugin('PiTube::Helpers::OAuth2'); $self->plugin('PiTube::Helpers::OAuth2');
# Pripojeni na redis # Pripojeni na redis
my $redis = Redis->new( %{ $cfg->{redis} } ); # my $redis = Redis->new( %{ $cfg->{redis} } );
# $self->helper( redis => sub { return $redis; } );
my $redis = Mojo::Redis->new( 'redis://' . $cfg->{redis}{server} );
$self->helper( redis => sub { return $redis; } ); $self->helper( redis => sub { return $redis; } );
# migrace schematu # migrace schematu
......
package PiTube::Controller::Nginx; package PiTube::Controller::Nginx;
use Mojo::Base 'Mojolicious::Controller'; use Mojo::Base 'Mojolicious::Controller';
use Mojo::Pg::PubSub;
use feature 'signatures'; use feature 'signatures';
no warnings qw{ experimental::signatures }; no warnings qw{ experimental::signatures };
...@@ -9,7 +8,6 @@ no warnings qw{ experimental::signatures }; ...@@ -9,7 +8,6 @@ no warnings qw{ experimental::signatures };
use constant HLS => qr/hls\/([a-z0-9\-]+)(_\w+)?(\/\w+)?\.(m3u8|ts)$/i; use constant HLS => qr/hls\/([a-z0-9\-]+)(_\w+)?(\/\w+)?\.(m3u8|ts)$/i;
sub callback_rtmp($c) { sub callback_rtmp($c) {
my $pubsub = Mojo::Pg::PubSub->new(pg => $c->pg);
if ( $c->param('call') =~ /publish/ ) { if ( $c->param('call') =~ /publish/ ) {
my $name = $c->param('name'); my $name = $c->param('name');
...@@ -43,7 +41,7 @@ sub callback_rtmp($c) { ...@@ -43,7 +41,7 @@ sub callback_rtmp($c) {
is_live => ( $c->param('call') =~ /done/ ) ? 'f':'t', is_live => ( $c->param('call') =~ /done/ ) ? 'f':'t',
}); });
$pubsub->json('streams')->notify( streams => { $c->redis->pubsub->json('streams')->notify( streams => {
call => $c->param('call'), call => $c->param('call'),
stream_id => $stream->id, stream_id => $stream->id,
}); });
......
package PiTube::Controller::Websockets; package PiTube::Controller::Websockets;
use Mojo::Base 'Mojolicious::Controller'; use Mojo::Base 'Mojolicious::Controller';
use Mojo::Pg::PubSub; use Mojo::JSON qw(encode_json);
use feature 'signatures'; use feature 'signatures';
no warnings qw{ experimental::signatures }; no warnings qw{ experimental::signatures };
...@@ -10,7 +10,7 @@ use constant SOCKET_INACTIVITY_TIMEOUT => 300; ...@@ -10,7 +10,7 @@ use constant SOCKET_INACTIVITY_TIMEOUT => 300;
sub main { sub main {
my $c = shift; my $c = shift;
my $ip = $c->tx->remote_address; # my $ip = $c->tx->remote_address;
my $key = $c->req->headers->header('Sec-WebSocket-Key'); my $key = $c->req->headers->header('Sec-WebSocket-Key');
$c->inactivity_timeout(SOCKET_INACTIVITY_TIMEOUT); $c->inactivity_timeout(SOCKET_INACTIVITY_TIMEOUT);
...@@ -20,25 +20,25 @@ sub main { ...@@ -20,25 +20,25 @@ sub main {
$c->res->headers->add('Sec-WebSocket-Extensions' => 'permessage-deflate'); $c->res->headers->add('Sec-WebSocket-Extensions' => 'permessage-deflate');
} }
my $pubsub = Mojo::Pg::PubSub->new(pg => $c->pg); $c->redis->pubsub->json('streams')->listen(
streams => sub($pubsub, $payload) {
$pubsub->listen(streams => sub($pubsub, $payload) { $c->send({json => $payload});
$c->send($payload); }
}); );
$c->on(json => sub( $c, $message ) { $c->on(json => sub( $c, $message ) {
if ( $message->{stream} ) { if ( $message->{stream} ) {
$c->redis->set( $c->redis->db->set(
join (':', ('live', $message->{stream}, $key)), join (':', ('live', $message->{stream}, $key)),
'live', 'EX', 16 'live', 'EX', 16
); );
my $count = $c->redis->keys( 'live:' . $message->{stream} . ':*' ); my $keys = $c->redis->db->keys( 'live:' . $message->{stream} . ':*' );
$c->send({json => { watchers => $count }}); $c->send(encode_json({ watchers => scalar @{$keys} }));
} }
}); });
$c->on(finish => sub ($c, $code, $reason = undef) { $c->on(finish => sub ($c, $code, $reason = undef) {
$pubsub->unlisten('streams'); $c->redis->pubsub->unlisten('streams');
$c->app->log->debug("WebSocket closed with status $code"); $c->app->log->debug("WebSocket closed with status $code");
}); });
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment