#!/usr/bin/perl
#
use strict;
use Class::Autouse qw(LJ::Event);
use lib "$ENV{LJHOME}/cgi-bin";
require "ljlib.pl";
require "ljmail.pl";
package LJ::Cmdbuffer;
# built-in commands
%LJ::Cmdbuffer::cmds =
(
# ping weblogs.com with updates? takes a $u argument
weblogscom => {
too_old => 60*60*2, # 2 hours old = qbufferd not running?
once_per_user => 1,
run => \&LJ::Cmdbuffer::_weblogscom,
},
);
#
# name: LJ::Cmdbuffer::flush
# des: Flush up to 500 rows of a given command type from the [dbtable[cmdbuffer]] table.
# args: dbh, db, cmd, userid?
# des-dbh: master database handle
# des-db: database cluster master
# des-cmd: a command type registered in %LJ::Cmdbuffer::cmds
# des-userid: optional userid to which flush should be constrained
# returns: 1 on success, 0 on failure
#
sub LJ::Cmdbuffer::flush
{
my ($dbh, $db, $cmd, $userid) = @_;
return 0 unless $cmd;
my $mode = "run";
if ($cmd =~ s/:(\w+)//) {
$mode = $1;
}
my $code = $LJ::Cmdbuffer::cmds{$cmd} ?
$LJ::Cmdbuffer::cmds{$cmd}->{$mode} : $LJ::HOOKS{"cmdbuf:$cmd:$mode"}->[0];
return 0 unless $code;
# start/finish modes
if ($mode ne "run") {
$code->($dbh);
return 1;
}
# 0 = never too old
my $too_old = LJ::Cmdbuffer::get_property($cmd, 'too_old') || 0;
# 0 == okay to run more than once per user
my $once_per_user = LJ::Cmdbuffer::get_property($cmd, 'once_per_user') || 0;
# 'url' = urlencode, 'raw' = don't urlencode
my $arg_format = LJ::Cmdbuffer::get_property($cmd, 'arg_format') || 'url';
# 0 == order of the jobs matters, process oldest first
my $unordered = LJ::Cmdbuffer::get_property($cmd, 'unordered') || 0;
my $clist;
my $loop = 1;
my $where = "cmd=" . $dbh->quote($cmd);
if ($userid) {
$where .= " AND journalid=" . $dbh->quote($userid);
}
my $orderby;
unless ($unordered) {
$orderby = "ORDER BY cbid";
}
my $LIMIT = 500;
while ($loop &&
($clist = $db->selectall_arrayref("SELECT cbid, UNIX_TIMESTAMP() - UNIX_TIMESTAMP(instime), journalid ".
"FROM cmdbuffer ".
"WHERE $where $orderby LIMIT $LIMIT")) &&
$clist && @$clist)
{
my @too_old;
my @cbids;
# citem: [ cbid, age, journalid ]
foreach my $citem (@$clist) {
if ($too_old && $citem->[1] > $too_old) {
push @too_old, $citem->[0];
} else {
push @cbids, $citem->[0];
}
}
if (@too_old) {
local $" = ",";
$db->do("DELETE FROM cmdbuffer WHERE cbid IN (@too_old)");
}
foreach my $cbid (@cbids) {
my $got_lock = $db->selectrow_array("SELECT GET_LOCK('cbid-$cbid',10)");
return 0 unless $got_lock;
# sadly, we have to do another query here to verify the job hasn't been
# done by another thread. (otherwise we could've done it above, instead
# of just getting the id)
my $c = $db->selectrow_hashref("SELECT cbid, journalid, cmd, instime, args " .
"FROM cmdbuffer WHERE cbid=?", undef, $cbid);
next unless $c;
if ($arg_format eq "url") {
my $a = {};
LJ::decode_url_string($c->{'args'}, $a);
$c->{'args'} = $a;
}
# otherwise, arg_format eq "raw"
# run handler
$code->($dbh, $db, $c);
# if this task is to be run once per user, go ahead and delete any jobs
# for this user of this type and remove them from the queue
my $wh = "cbid=$cbid";
if ($once_per_user) {
$wh = "cmd=" . $db->quote($cmd) . " AND journalid=" . $db->quote($c->{journalid});
@$clist = grep { $_->[2] != $c->{journalid} } @$clist;
}
$db->do("DELETE FROM cmdbuffer WHERE $wh");
$db->do("SELECT RELEASE_LOCK('cbid-$cbid')");
}
$loop = 0 unless scalar(@$clist) == $LIMIT;
}
return 1;
}
#
# name: LJ::Cmdbuffer::get_property
# des: Get a property of an async job type, either built-in or site-specific.
# args: cmd, prop
# des-cmd: a registered async job type
# des-prop: the property name to look up
# returns: Value of property (whatever it may be) on success; undef on failure.
#
sub get_property {
my ($cmd, $prop) = @_;
return undef unless $cmd && $prop;
if (my $c = $LJ::Cmdbuffer::cmds{$cmd}) {
return $c->{$prop};
}
if (LJ::are_hooks("cmdbuf:$cmd:$prop")) {
return LJ::run_hook("cmdbuf:$cmd:$prop");
}
return undef;
}
sub _weblogscom {
# user, title, url
my ($dbh, $db, $c) = @_;
my $a = $c->{'args'};
eval {
eval "use XMLRPC::Lite;";
unless ($@) {
XMLRPC::Lite
->new( proxy => "http://rpc.weblogs.com/RPC2",
timeout => 5 )
->call('weblogUpdates.ping', # xml-rpc method call
LJ::ehtml($a->{'title'}) . " \@ $LJ::SITENAMESHORT",
$a->{'url'},
"$LJ::SITEROOT/misc/weblogs-change.bml?user=$a->{'user'}");
}
};
return 1;
}
1;