← Index
NYTProf Performance Profile   « line view »
For nd2: #2 manager: init
  Run on Thu May 2 17:38:58 2019
Reported on Thu May 2 17:40:49 2019

Filename/appl/netdisco/netdisco_github_official/lib/App/Netdisco/JobQueue/PostgreSQL.pm
StatementsExecuted 515 statements in 1.74ms
Subroutines
Calls P F Exclusive
Time
Inclusive
Time
Subroutine
57111.49ms31.5sApp::Netdisco::JobQueue::PostgreSQL::::_get_denied_actionsApp::Netdisco::JobQueue::PostgreSQL::_get_denied_actions
68411523µs523µsUNIVERSAL::::can UNIVERSAL::can (xsub)
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::BEGINApp::Netdisco::JobQueue::PostgreSQL::BEGIN
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:199]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:199]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:202]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:202]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:230]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:230]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:232]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:232]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:235]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:235]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:265]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:265]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:267]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:267]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:271]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:271]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:314]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:314]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:316]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:316]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:319]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:319]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:330]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:330]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:335]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:335]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::__ANON__[:85]App::Netdisco::JobQueue::PostgreSQL::__ANON__[:85]
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_completeApp::Netdisco::JobQueue::PostgreSQL::jq_complete
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_deferApp::Netdisco::JobQueue::PostgreSQL::jq_defer
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_deleteApp::Netdisco::JobQueue::PostgreSQL::jq_delete
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_getsomeApp::Netdisco::JobQueue::PostgreSQL::jq_getsome
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_insertApp::Netdisco::JobQueue::PostgreSQL::jq_insert
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_lockApp::Netdisco::JobQueue::PostgreSQL::jq_lock
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_lockedApp::Netdisco::JobQueue::PostgreSQL::jq_locked
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_logApp::Netdisco::JobQueue::PostgreSQL::jq_log
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_queuedApp::Netdisco::JobQueue::PostgreSQL::jq_queued
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_userlogApp::Netdisco::JobQueue::PostgreSQL::jq_userlog
0000s0sApp::Netdisco::JobQueue::PostgreSQL::::jq_warm_thrustersApp::Netdisco::JobQueue::PostgreSQL::jq_warm_thrusters
Call graph for these subroutines as a Graphviz dot language file.
Line State
ments
Time
on line
Calls Time
in subs
Code
1package App::Netdisco::JobQueue::PostgreSQL;
2
3use Dancer qw/:moose :syntax :script/;
4use Dancer::Plugin::DBIC 'schema';
5
6use App::Netdisco::Util::Device
7 qw/is_discoverable is_macsuckable is_arpnipable/;
8use App::Netdisco::Backend::Job;
9
10use Module::Load ();
11use Try::Tiny;
12
13use base 'Exporter';
14our @EXPORT = ();
15our @EXPORT_OK = qw/
16 jq_warm_thrusters
17 jq_getsome
18 jq_locked
19 jq_queued
20 jq_lock
21 jq_defer
22 jq_complete
23 jq_log
24 jq_userlog
25 jq_insert
26 jq_delete
27/;
28our %EXPORT_TAGS = ( all => \@EXPORT_OK );
29
30# given a device, tests if any of the primary acls applies
31# returns a list of job actions to be denied/skipped on this host.
32
# spent 31.5s (1.49ms+31.5) within App::Netdisco::JobQueue::PostgreSQL::_get_denied_actions which was called 57 times, avg 553ms/call: # 57 times (1.49ms+31.5s) by App::Netdisco::JobQueue::PostgreSQL::jq_warm_thrusters at line 57, avg 553ms/call
sub _get_denied_actions {
335735µs my $device = shift;
345730µs my @badactions = ();
3557572µs57663µs return @badactions unless $device;
36
3757219µs6810.5s push @badactions, ('discover', @{ setting('job_prio')->{high} })
# spent 10.5s making 57 calls to App::Netdisco::Util::Device::is_discoverable, avg 184ms/call # spent 181µs making 11 calls to Dancer::setting, avg 16µs/call
38 if not is_discoverable($device);
39
4057181µs5710.5s push @badactions, (qw/macsuck nbtstat/)
# spent 10.5s making 57 calls to App::Netdisco::Util::Device::is_macsuckable, avg 184ms/call
41 if not is_macsuckable($device);
42
4357174µs5710.5s push @badactions, 'arpnip'
# spent 10.5s making 57 calls to App::Netdisco::Util::Device::is_arpnipable, avg 184ms/call
44 if not is_arpnipable($device);
45
4657191µs return @badactions;
47}
48
49sub jq_warm_thrusters {
50 #my @devices = schema('netdisco')->resultset('Device')->all;
51 my @devices = schema('netdisco')->resultset('Device')->search({ name => {like => 'KR%'}});
52 my $rs = schema('netdisco')->resultset('DeviceSkip');
53 my %actionset = ();
54
551900ns DB::enable_profile("/tmp/nytprof_warm_thrusters");
5612µs foreach my $d (@devices) {
5757199µs5731.5s my @badactions = _get_denied_actions($d);
# spent 31.5s making 57 calls to App::Netdisco::JobQueue::PostgreSQL::_get_denied_actions, avg 553ms/call
5857138µs1178µs $actionset{$d->ip} = \@badactions if scalar @badactions;
# spent 78µs making 11 calls to App::Netdisco::DB::Result::Device::ip, avg 7µs/call
59 }
60 DB::finish_profile();
61 warning "=========== NYTProf data written, save to kill process now ===========";
62
63 schema('netdisco')->txn_do(sub {
64 $rs->search({
65 backend => setting('workers')->{'BACKEND'},
66 }, { for => 'update' }, )->update({ actionset => [] });
67
68 my $deferrals = setting('workers')->{'max_deferrals'} - 1;
69 $rs->search({
70 backend => setting('workers')->{'BACKEND'},
71 deferrals => { '>' => $deferrals },
72 }, { for => 'update' }, )->update({ deferrals => $deferrals });
73
74 $rs->search({
75 backend => setting('workers')->{'BACKEND'},
76 actionset => { -value => [] },
77 deferrals => 0,
78 })->delete;
79
80 $rs->update_or_create({
81 backend => setting('workers')->{'BACKEND'},
82 device => $_,
83 actionset => $actionset{$_},
84 }, { key => 'primary' }) for keys %actionset;
85 });
86}
87
88sub jq_getsome {
89 my $num_slots = shift;
90 return () unless $num_slots and $num_slots > 0;
91
92 my $jobs = schema('netdisco')->resultset('Admin');
93 my @returned = ();
94
95 my $tasty = schema('netdisco')->resultset('Virtual::TastyJobs')
96 ->search(undef,{ bind => [
97 setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'},
98 setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'},
99 setting('workers')->{'retry_after'}, $num_slots,
100 ]});
101
102 while (my $job = $tasty->next) {
103 if ($job->device) {
104 # need to handle device discovered since backend daemon started
105 # and the skiplist was primed. these should be checked against
106 # the various acls and have device_skip entry added if needed,
107 # and return false if it should have been skipped.
108 my @badactions = _get_denied_actions($job->device);
109 if (scalar @badactions) {
110 schema('netdisco')->resultset('DeviceSkip')->find_or_create({
111 backend => setting('workers')->{'BACKEND'}, device => $job->device,
112 },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
113
114 # will now not be selected in a future _getsome()
115 next if scalar grep {$_ eq $job->action} @badactions;
116 }
117 }
118
119 # remove any duplicate jobs, incuding possibly this job if there
120 # is already an equivalent job running
121
122 # note that the self-removal of a job has an unhelpful log: it is
123 # reported as a duplicate of itself! however what's happening is that
124 # netdisco has seen another running job with same params (but the query
125 # cannot see that ID to use it in the message).
126
127 my %job_properties = (
128 action => $job->action,
129 port => $job->port,
130 subaction => $job->subaction,
131 -or => [
132 { device => $job->device },
133 ($job->device_key ? ({ device_key => $job->device_key }) : ()),
134 ],
135 );
136
137 my $gone = $jobs->search({
138 status => 'queued',
139 -and => [
140 %job_properties,
141 -or => [{
142 job => { '!=' => $job->id },
143 },{
144 job => $job->id,
145 -exists => $jobs->search({
146 status => { -like => 'queued-%' },
147 started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')],
148 %job_properties,
149 })->as_query,
150 }],
151 ],
152 }, {for => 'update'})
153 ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) });
154
155 debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
156 push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
157 }
158
159 return @returned;
160}
161
162sub jq_locked {
163 my @returned = ();
164 my $rs = schema('netdisco')->resultset('Admin')->search({
165 status => ('queued-'. setting('workers')->{'BACKEND'}),
166 started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')],
167 });
168
169 while (my $job = $rs->next) {
170 push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
171 }
172 return @returned;
173}
174
175sub jq_queued {
176 my $job_type = shift;
177
178 return schema('netdisco')->resultset('Admin')->search({
179 device => { '!=' => undef},
180 action => $job_type,
181 status => { -like => 'queued%' },
182 })->get_column('device')->all;
183}
184
185sub jq_lock {
186 my $job = shift;
187 my $happy = false;
188
189 # lock db row and update to show job has been picked
190 try {
191 my $updated = schema('netdisco')->resultset('Admin')
192 ->search({ job => $job->id, status => 'queued' }, { for => 'update' })
193 ->update({
194 status => ('queued-'. setting('workers')->{'BACKEND'}),
195 started => \"now()",
196 });
197
198 $happy = true if $updated > 0;
199 }
200 catch {
201 error $_;
202 };
203
204 return $happy;
205}
206
207sub jq_defer {
208 my $job = shift;
209 my $happy = false;
210
211 # note this taints all actions on the device. for example if both
212 # macsuck and arpnip are allowed, but macsuck fails 10 times, then
213 # arpnip (and every other action) will be prevented on the device.
214
215 # seeing as defer is only triggered by an SNMP connect failure, this
216 # behaviour seems reasonable, to me (or desirable, perhaps).
217
218 try {
219 schema('netdisco')->txn_do(sub {
220 if ($job->device) {
221 schema('netdisco')->resultset('DeviceSkip')->find_or_create({
222 backend => setting('workers')->{'BACKEND'}, device => $job->device,
223 },{ key => 'device_skip_pkey' })->increment_deferrals;
224 }
225
226 # lock db row and update to show job is available
227 schema('netdisco')->resultset('Admin')
228 ->find($job->id, {for => 'update'})
229 ->update({ status => 'queued', started => undef });
230 });
231 $happy = true;
232 }
233 catch {
234 error $_;
235 };
236
237 return $happy;
238}
239
240sub jq_complete {
241 my $job = shift;
242 my $happy = false;
243
244 # lock db row and update to show job is done/error
245
246 # now that SNMP connect failures are deferrals and not errors, any complete
247 # status, whether success or failure, indicates an SNMP connect. reset the
248 # connection failures counter to forget about occasional connect glitches.
249
250 try {
251 schema('netdisco')->txn_do(sub {
252 if ($job->device) {
253 schema('netdisco')->resultset('DeviceSkip')->find_or_create({
254 backend => setting('workers')->{'BACKEND'}, device => $job->device,
255 },{ key => 'device_skip_pkey' })->update({ deferrals => 0 });
256 }
257
258 schema('netdisco')->resultset('Admin')
259 ->find($job->id, {for => 'update'})->update({
260 status => $job->status,
261 log => $job->log,
262 started => $job->started,
263 finished => $job->finished,
264 });
265 });
266 $happy = true;
267 }
268 catch {
269 # use DDP; p $job;
270 error $_;
271 };
272
273 return $happy;
274}
275
276sub jq_log {
277 return schema('netdisco')->resultset('Admin')->search({
278 'me.log' => { '-not_like' => 'duplicate of %' },
279 }, {
280 prefetch => 'target',
281 order_by => { -desc => [qw/entered device action/] },
282 rows => (setting('jobs_qdepth') || 50),
283 })->with_times->hri->all;
284}
285
286sub jq_userlog {
287 my $user = shift;
288 return schema('netdisco')->resultset('Admin')->search({
289 username => $user,
290 log => { '-not_like' => 'duplicate of %' },
291 finished => { '>' => \"(now() - interval '5 seconds')" },
292 })->with_times->all;
293}
294
295sub jq_insert {
296 my $jobs = shift;
297 $jobs = [$jobs] if ref [] ne ref $jobs;
298 my $happy = false;
299
300 try {
301 schema('netdisco')->txn_do(sub {
302 schema('netdisco')->resultset('Admin')->populate([
303 map {{
304 device => $_->{device},
305 device_key => $_->{device_key},
306 port => $_->{port},
307 action => $_->{action},
308 subaction => ($_->{extra} || $_->{subaction}),
309 username => $_->{username},
310 userip => $_->{userip},
311 status => 'queued',
312 }} @$jobs
313 ]);
314 });
315 $happy = true;
316 }
317 catch {
318 error $_;
319 };
320
321 return $happy;
322}
323
324sub jq_delete {
325 my $id = shift;
326
327 if ($id) {
328 schema('netdisco')->txn_do(sub {
329 schema('netdisco')->resultset('Admin')->find($id)->delete();
330 });
331 }
332 else {
333 schema('netdisco')->txn_do(sub {
334 schema('netdisco')->resultset('Admin')->delete();
335 });
336 }
337}
338
339true;
 
# spent 523µs within UNIVERSAL::can which was called 684 times, avg 764ns/call: # 684 times (523µs+0s) by App::Netdisco::Util::Permission::check_acl at line 95 of App/Netdisco/Util/Permission.pm, avg 764ns/call
sub UNIVERSAL::can; # xsub