root/trunk/lib/Perlbal/AIO.pm

Revision 687, 7.9 kB (checked in by jacques, 2 years ago)

Bump copyright and fixed typo in Interactive

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# AIO abstraction layer
2#
3# Copyright 2004, Danga Interactive, Inc.
4# Copyright 2005-2007, Six Apart, Ltd.
5
6package Perlbal::AIO;
7
8use strict;
9use POSIX qw(ENOENT EACCES EBADF);
10use Fcntl qw(SEEK_CUR SEEK_SET SEEK_END O_RDWR O_CREAT O_TRUNC);
11
12# Try and use IO::AIO, if it's around.
13BEGIN {
14    $Perlbal::OPTMOD_IO_AIO        = eval "use IO::AIO 1.6 (); 1;";
15}
16
17END {
18    IO::AIO::max_parallel(0)
19        if $Perlbal::OPTMOD_IO_AIO;
20}
21
22$Perlbal::AIO_MODE = "none";
23$Perlbal::AIO_MODE = "ioaio" if $Perlbal::OPTMOD_IO_AIO;
24
25############################################################################
26# AIO functions available to callers
27############################################################################
28
29sub aio_readahead {
30    my ($fh, $offset, $length, $user_cb) = @_;
31
32    aio_channel_push(get_chan(), $user_cb, sub {
33        my $cb = shift;
34        if ($Perlbal::AIO_MODE eq "ioaio") {
35            IO::AIO::aio_readahead($fh, $offset, $length, $cb);
36        } else {
37            $cb->();
38        }
39    });
40}
41
42sub aio_stat {
43    my ($file, $user_cb) = @_;
44
45    aio_channel_push(get_chan($file), $user_cb, sub {
46        my $cb = shift;
47        if ($Perlbal::AIO_MODE eq "ioaio") {
48            IO::AIO::aio_stat($file, $cb);
49        } else {
50            stat($file);
51            $cb->();
52        }
53    });
54}
55
56sub aio_open {
57    my ($file, $flags, $mode, $user_cb) = @_;
58
59    aio_channel_push(get_chan($file), $user_cb, sub {
60        my $cb = shift;
61
62        if ($Perlbal::AIO_MODE eq "ioaio") {
63            IO::AIO::aio_open($file, $flags, $mode, $cb);
64        } else {
65            my $fh;
66            my $rv = sysopen($fh, $file, $flags, $mode);
67            $cb->($rv ? $fh : undef);
68        }
69    });
70}
71
72sub aio_unlink {
73    my ($file, $user_cb) = @_;
74    aio_channel_push(get_chan($file), $user_cb, sub {
75        my $cb = shift;
76
77        if ($Perlbal::AIO_MODE eq "ioaio") {
78            IO::AIO::aio_unlink($file, $cb);
79        } else {
80            my $rv = unlink($file);
81            $rv = $rv ? 0 : -1;
82            $cb->($rv);
83        }
84    });
85}
86
87sub aio_write {
88    #   0    1        2        3(data) 4
89    my ($fh, $offset, $length, undef,  $user_cb) = @_;
90    return no_fh($user_cb) unless $fh;
91    my $alist = \@_;
92
93    aio_channel_push(get_chan(), $user_cb, sub {
94        my $cb = shift;
95        if ($Perlbal::AIO_MODE eq "ioaio") {
96            IO::AIO::aio_write($fh, $offset, $length, $alist->[3], 0, $cb);
97        } else {
98            my $old_off = sysseek($fh, 0, SEEK_CUR);
99            sysseek($fh, $offset, 0);
100            my $rv = syswrite($fh, $alist->[3], $length, 0);
101            sysseek($fh, $old_off, SEEK_SET);
102            $cb->($rv);
103        }
104    });
105}
106
107sub aio_read {
108    #   0    1        2        3(data) 4
109    my ($fh, $offset, $length, undef,  $user_cb) = @_;
110    return no_fh($user_cb) unless $fh;
111    my $alist = \@_;
112
113    aio_channel_push(get_chan(), $user_cb, sub {
114        my $cb = shift;
115        if ($Perlbal::AIO_MODE eq "ioaio") {
116            IO::AIO::aio_read($fh, $offset, $length, $alist->[3], 0, $cb);
117        } else {
118            my $old_off = sysseek($fh, 0, SEEK_CUR);
119            sysseek($fh, $offset, 0);
120            my $rv = sysread($fh, $alist->[3], $length, 0);
121            sysseek($fh, $old_off, SEEK_SET);
122            $cb->($rv);
123        }
124    });
125}
126
127############################################################################
128# AIO channel stuff
129#    prevents all AIO threads from being consumed by requests for same
130#    failing/overloaded disk by isolating them into separate 'channels' in
131#    parent process and not dispatching more than the max in-flight count
132#    allows.  think of a channel as a named queue.  or in reality, a disk.
133############################################################################
134
135my %chan_outstanding;  # $channel_name -> $num_in_flight
136my %chan_pending;      # $channel_name -> [ [$subref, $cb], .... ]
137my %chan_hitmaxdepth;  # $channel_name -> $times_enqueued   (not dispatched immediately)
138my %chan_submitct;     # $channel_name -> $times_submitted  (total AIO requests for this channel)
139my $use_aio_chans = 0; # keep them off for now, until mogstored code is ready to use them
140my $file_to_chan_hook; # coderef that returns $chan_name given a $filename
141
142my %chan_concurrency;  # $channel_name -> concurrency per channel
143                       #  (cache. definitive version via function call)
144
145sub get_aio_stats {
146    my $ret = {};
147    foreach my $c (keys %chan_outstanding) {
148        $ret->{$c} = {
149            cur_running  => $chan_outstanding{$c},
150            ctr_queued   => $chan_hitmaxdepth{$c} || 0,
151            ctr_total    => $chan_submitct{$c},
152        };
153    }
154
155    foreach my $c (keys %chan_pending) {
156        my $rec = $ret->{$c} ||= {};
157        $rec->{cur_queued} = scalar @{$chan_pending{$c}};
158    }
159
160    return $ret;
161}
162
163# (external API).  set trans hook, but also enables AIO channels.
164sub set_file_to_chan_hook {
165    $file_to_chan_hook = shift;   # coderef that returns $chan_name given a $filename
166    $use_aio_chans     = 1;
167}
168
169# internal API:
170sub aio_channel_push {
171    my ($chan, $user_cb, $action) = @_;
172
173    # if we were to do it immediately, bypassing AIO channels (future option?)
174    unless ($use_aio_chans) {
175        $action->($user_cb);
176        return;
177    }
178
179    # IO::AIO/etc only take one callback.  so we wrap the user
180    # (caller) function with our own that first calls theirs, then
181    # does our bookkeeping and queue management afterwards.
182    my $wrapped_cb = sub {
183        $user_cb->(@_);
184        $chan_outstanding{$chan}--;
185        aio_channel_cond_run($chan);
186    };
187
188    # in case this is the first time this queue has been used, init stuff:
189    my $chanpend = ($chan_pending{$chan} ||= []);
190    $chan_outstanding{$chan} ||= 0;
191    $chan_submitct{$chan}++;
192
193    my $max_out  = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan);
194
195    if ($chan_outstanding{$chan} < $max_out) {
196        $chan_outstanding{$chan}++;
197        $action->($wrapped_cb);
198        return;
199    } else {
200        # too deep.  enqueue.
201        $chan_hitmaxdepth{$chan}++;
202        push @$chanpend, [$action, $wrapped_cb];
203    }
204}
205
206sub aio_chan_max_concurrent {
207    my ($chan) = @_;
208    return 100 if $chan eq '[default]';
209    return 10;
210}
211
212sub aio_channel_cond_run {
213    my ($chan) = @_;
214
215    my $chanpend = $chan_pending{$chan} or return;
216    my $max_out  = $chan_concurrency{$chan} ||= aio_chan_max_concurrent($chan);
217
218    my $job;
219    while ($chan_outstanding{$chan} < $max_out && ($job = shift @$chanpend)) {
220        $chan_outstanding{$chan}++;
221        $job->[0]->($job->[1]);
222    }
223}
224
225my $next_chan;
226sub set_channel {
227    $next_chan = shift;
228}
229
230sub set_file_for_channel {
231    my ($file) = @_;
232    if ($file_to_chan_hook) {
233        $next_chan = $file_to_chan_hook->($file);
234    } else {
235        $next_chan = undef;
236    }
237}
238
239# gets currently-set channel, then clears it.  or if none set,
240# lets registered hook set the channel name from the optional
241# $file parameter.  the default channel, '[default]' has no limits
242sub get_chan {
243    return undef unless $use_aio_chans;
244    my ($file) = @_;
245    set_file_for_channel($file) if $file;
246
247    if (my $chan = $next_chan) {
248        $next_chan = undef;
249        return $chan;
250    }
251
252    return "[default]";
253}
254
255############################################################################
256# misc util functions
257############################################################################
258
259sub _fh_of_fd_mode {
260    my ($fd, $mode) = @_;
261    return undef unless defined $fd && $fd >= 0;
262
263    #TODO: use the write MODE for the given $mode;
264    my $fh = IO::Handle->new_from_fd($fd, 'r+');
265    my $num = fileno($fh);
266    return $fh;
267}
268
269sub no_fh {
270    my $cb = shift;
271
272    my $i = 1;
273    my $stack_trace = "";
274    while (my ($pkg, $filename, $line, $subroutine, $hasargs,
275               $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller($i++)) {
276        $stack_trace .= " at $filename:$line $subroutine\n";
277    }
278
279    Perlbal::log("crit", "Undef \$fh: $stack_trace");
280    $cb->(undef);
281    return undef;
282}
283
2841;
Note: See TracBrowser for help on using the browser.