mirror of https://github.com/iv-org/invidious.git
Batch user notifications together
This commit is contained in:
parent
3e17d04875
commit
5d0149844f
|
@ -184,8 +184,9 @@ if CONFIG.popular_enabled
|
||||||
Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
|
Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
|
||||||
end
|
end
|
||||||
|
|
||||||
CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32)
|
NOTIFICATION_CHANNEL = ::Channel(VideoNotification).new(32)
|
||||||
Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(CONNECTION_CHANNEL, CONFIG.database_url)
|
CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32)
|
||||||
|
Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(NOTIFICATION_CHANNEL, CONNECTION_CHANNEL, CONFIG.database_url)
|
||||||
|
|
||||||
Invidious::Jobs.register Invidious::Jobs::ClearExpiredItemsJob.new
|
Invidious::Jobs.register Invidious::Jobs::ClearExpiredItemsJob.new
|
||||||
|
|
||||||
|
|
|
@ -249,11 +249,7 @@ def fetch_channel(ucid, pull_all_videos : Bool)
|
||||||
|
|
||||||
if was_insert
|
if was_insert
|
||||||
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Inserted, updating subscriptions")
|
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Inserted, updating subscriptions")
|
||||||
if CONFIG.enable_user_notifications
|
NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
|
||||||
Invidious::Database::Users.add_notification(video)
|
|
||||||
else
|
|
||||||
Invidious::Database::Users.feed_needs_update(video)
|
|
||||||
end
|
|
||||||
else
|
else
|
||||||
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Updated")
|
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Updated")
|
||||||
end
|
end
|
||||||
|
@ -285,11 +281,7 @@ def fetch_channel(ucid, pull_all_videos : Bool)
|
||||||
if Time.utc - video.published > 1.minute
|
if Time.utc - video.published > 1.minute
|
||||||
was_insert = Invidious::Database::ChannelVideos.insert(video)
|
was_insert = Invidious::Database::ChannelVideos.insert(video)
|
||||||
if was_insert
|
if was_insert
|
||||||
if CONFIG.enable_user_notifications
|
NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
|
||||||
Invidious::Database::Users.add_notification(video)
|
|
||||||
else
|
|
||||||
Invidious::Database::Users.feed_needs_update(video)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -119,15 +119,15 @@ module Invidious::Database::Users
|
||||||
# Update (notifs)
|
# Update (notifs)
|
||||||
# -------------------
|
# -------------------
|
||||||
|
|
||||||
def add_notification(video : ChannelVideo)
|
def add_multiple_notifications(channel_id : String, video_ids : Array(String))
|
||||||
request = <<-SQL
|
request = <<-SQL
|
||||||
UPDATE users
|
UPDATE users
|
||||||
SET notifications = array_append(notifications, $1),
|
SET notifications = array_cat(notifications, $1),
|
||||||
feed_needs_update = true
|
feed_needs_update = true
|
||||||
WHERE $2 = ANY(subscriptions)
|
WHERE $2 = ANY(subscriptions)
|
||||||
SQL
|
SQL
|
||||||
|
|
||||||
PG_DB.exec(request, video.id, video.ucid)
|
PG_DB.exec(request, video_ids, channel_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_notification(user : User, vid : String)
|
def remove_notification(user : User, vid : String)
|
||||||
|
@ -154,14 +154,14 @@ module Invidious::Database::Users
|
||||||
# Update (misc)
|
# Update (misc)
|
||||||
# -------------------
|
# -------------------
|
||||||
|
|
||||||
def feed_needs_update(video : ChannelVideo)
|
def feed_needs_update(channel_id : String)
|
||||||
request = <<-SQL
|
request = <<-SQL
|
||||||
UPDATE users
|
UPDATE users
|
||||||
SET feed_needs_update = true
|
SET feed_needs_update = true
|
||||||
WHERE $1 = ANY(subscriptions)
|
WHERE $1 = ANY(subscriptions)
|
||||||
SQL
|
SQL
|
||||||
|
|
||||||
PG_DB.exec(request, video.ucid)
|
PG_DB.exec(request, channel_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
def update_preferences(user : User)
|
def update_preferences(user : User)
|
||||||
|
|
|
@ -1,8 +1,32 @@
|
||||||
|
struct VideoNotification
|
||||||
|
getter video_id : String
|
||||||
|
getter channel_id : String
|
||||||
|
getter published : Time
|
||||||
|
|
||||||
|
def_hash @channel_id, @video_id
|
||||||
|
|
||||||
|
def ==(other)
|
||||||
|
video_id == other.video_id
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.from_video(video : ChannelVideo) : self
|
||||||
|
VideoNotification.new(video.id, video.ucid, video.published)
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize(@video_id, @channel_id, @published)
|
||||||
|
end
|
||||||
|
|
||||||
|
def clone : VideoNotification
|
||||||
|
VideoNotification.new(video_id.clone, channel_id.clone, published.clone)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
|
class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
|
||||||
|
private getter notification_channel : ::Channel(VideoNotification)
|
||||||
private getter connection_channel : ::Channel({Bool, ::Channel(PQ::Notification)})
|
private getter connection_channel : ::Channel({Bool, ::Channel(PQ::Notification)})
|
||||||
private getter pg_url : URI
|
private getter pg_url : URI
|
||||||
|
|
||||||
def initialize(@connection_channel, @pg_url)
|
def initialize(@notification_channel, @connection_channel, @pg_url)
|
||||||
end
|
end
|
||||||
|
|
||||||
def begin
|
def begin
|
||||||
|
@ -10,6 +34,58 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
|
||||||
|
|
||||||
PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
|
PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
|
||||||
|
|
||||||
|
# hash of channels to their videos (id+published) that need notifying
|
||||||
|
to_notify = Hash(String, Set(VideoNotification)).new(->(hash : Hash(String, Set(VideoNotification)), key : String) { hash[key] = Set(VideoNotification).new })
|
||||||
|
|
||||||
|
# fiber to locally cache all incoming notifications (from pubsub webhooks and refresh channels job)
|
||||||
|
spawn do
|
||||||
|
begin
|
||||||
|
loop do
|
||||||
|
notification = notification_channel.receive
|
||||||
|
to_notify[notification.channel_id] << notification
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
# fiber to regularly persist all cached notifications
|
||||||
|
spawn do
|
||||||
|
loop do
|
||||||
|
begin
|
||||||
|
LOGGER.debug("NotificationJob: waking up")
|
||||||
|
cloned = to_notify.clone
|
||||||
|
to_notify.clear
|
||||||
|
|
||||||
|
cloned.each do |channel_id, notifications|
|
||||||
|
if notifications.empty?
|
||||||
|
next
|
||||||
|
end
|
||||||
|
|
||||||
|
LOGGER.info("NotificationJob: updating channel #{channel_id} with #{notifications.size} notifications")
|
||||||
|
if CONFIG.enable_user_notifications
|
||||||
|
video_ids = notifications.map { |n| n.video_id }
|
||||||
|
Invidious::Database::Users.add_multiple_notifications(channel_id, video_ids)
|
||||||
|
notifications.each do |n|
|
||||||
|
# Deliver notifications to `/api/v1/auth/notifications`
|
||||||
|
payload = {
|
||||||
|
"topic" => n.channel_id,
|
||||||
|
"videoId" => n.video_id,
|
||||||
|
"published" => n.published.to_unix,
|
||||||
|
}.to_json
|
||||||
|
PG_DB.exec("NOTIFY notifications, E'#{payload}'")
|
||||||
|
end
|
||||||
|
else
|
||||||
|
Invidious::Database::Users.feed_needs_update(channel_id)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
LOGGER.trace("NotificationJob: Done, sleeping")
|
||||||
|
rescue ex
|
||||||
|
LOGGER.error("NotificationJob: #{ex.message}")
|
||||||
|
end
|
||||||
|
sleep 1.minute
|
||||||
|
Fiber.yield
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
action, connection = connection_channel.receive
|
action, connection = connection_channel.receive
|
||||||
|
|
||||||
|
|
|
@ -425,16 +425,6 @@ module Invidious::Routes::Feeds
|
||||||
next # skip this video since it raised an exception (e.g. it is a scheduled live event)
|
next # skip this video since it raised an exception (e.g. it is a scheduled live event)
|
||||||
end
|
end
|
||||||
|
|
||||||
if CONFIG.enable_user_notifications
|
|
||||||
# Deliver notifications to `/api/v1/auth/notifications`
|
|
||||||
payload = {
|
|
||||||
"topic" => video.ucid,
|
|
||||||
"videoId" => video.id,
|
|
||||||
"published" => published.to_unix,
|
|
||||||
}.to_json
|
|
||||||
PG_DB.exec("NOTIFY notifications, E'#{payload}'")
|
|
||||||
end
|
|
||||||
|
|
||||||
video = ChannelVideo.new({
|
video = ChannelVideo.new({
|
||||||
id: id,
|
id: id,
|
||||||
title: video.title,
|
title: video.title,
|
||||||
|
@ -450,11 +440,7 @@ module Invidious::Routes::Feeds
|
||||||
|
|
||||||
was_insert = Invidious::Database::ChannelVideos.insert(video, with_premiere_timestamp: true)
|
was_insert = Invidious::Database::ChannelVideos.insert(video, with_premiere_timestamp: true)
|
||||||
if was_insert
|
if was_insert
|
||||||
if CONFIG.enable_user_notifications
|
NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
|
||||||
Invidious::Database::Users.add_notification(video)
|
|
||||||
else
|
|
||||||
Invidious::Database::Users.feed_needs_update(video)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue