| Filename | /appl/netdisco/netdisco_github_official/lib/App/Netdisco/JobQueue/PostgreSQL.pm |
| Statements | Executed 515 statements in 1.74ms |
| Calls | P | F | Exclusive Time |
Inclusive Time |
Subroutine |
|---|---|---|---|---|---|
| 57 | 1 | 1 | 1.49ms | 31.5s | App::Netdisco::JobQueue::PostgreSQL::_get_denied_actions |
| 684 | 1 | 1 | 523µs | 523µs | UNIVERSAL::can (xsub) |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::BEGIN |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:199] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:202] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:230] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:232] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:235] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:265] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:267] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:271] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:314] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:316] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:319] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:330] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:335] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::__ANON__[:85] |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_complete |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_defer |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_delete |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_getsome |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_insert |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_lock |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_locked |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_log |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_queued |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_userlog |
| 0 | 0 | 0 | 0s | 0s | App::Netdisco::JobQueue::PostgreSQL::jq_warm_thrusters |
| Line | State ments |
Time on line |
Calls | Time in subs |
Code |
|---|---|---|---|---|---|
| 1 | package App::Netdisco::JobQueue::PostgreSQL; | ||||
| 2 | |||||
| 3 | use Dancer qw/:moose :syntax :script/; | ||||
| 4 | use Dancer::Plugin::DBIC 'schema'; | ||||
| 5 | |||||
| 6 | use App::Netdisco::Util::Device | ||||
| 7 | qw/is_discoverable is_macsuckable is_arpnipable/; | ||||
| 8 | use App::Netdisco::Backend::Job; | ||||
| 9 | |||||
| 10 | use Module::Load (); | ||||
| 11 | use Try::Tiny; | ||||
| 12 | |||||
| 13 | use base 'Exporter'; | ||||
| 14 | our @EXPORT = (); | ||||
| 15 | our @EXPORT_OK = qw/ | ||||
| 16 | jq_warm_thrusters | ||||
| 17 | jq_getsome | ||||
| 18 | jq_locked | ||||
| 19 | jq_queued | ||||
| 20 | jq_lock | ||||
| 21 | jq_defer | ||||
| 22 | jq_complete | ||||
| 23 | jq_log | ||||
| 24 | jq_userlog | ||||
| 25 | jq_insert | ||||
| 26 | jq_delete | ||||
| 27 | /; | ||||
| 28 | our %EXPORT_TAGS = ( all => \@EXPORT_OK ); | ||||
| 29 | |||||
| 30 | # given a device, tests if any of the primary acls applies | ||||
| 31 | # returns a list of job actions to be denied/skipped on this host. | ||||
| 32 | # spent 31.5s (1.49ms+31.5) within App::Netdisco::JobQueue::PostgreSQL::_get_denied_actions which was called 57 times, avg 553ms/call:
# 57 times (1.49ms+31.5s) by App::Netdisco::JobQueue::PostgreSQL::jq_warm_thrusters at line 57, avg 553ms/call | ||||
| 33 | 57 | 35µs | my $device = shift; | ||
| 34 | 57 | 30µs | my @badactions = (); | ||
| 35 | 57 | 572µs | 57 | 663µs | return @badactions unless $device; # spent 663µs making 57 calls to App::Netdisco::DB::Result::Device::__ANON__[App/Netdisco/DB/Result/Device.pm:13], avg 12µs/call |
| 36 | |||||
| 37 | 57 | 219µs | 68 | 10.5s | push @badactions, ('discover', @{ setting('job_prio')->{high} }) # spent 10.5s making 57 calls to App::Netdisco::Util::Device::is_discoverable, avg 184ms/call
# spent 181µs making 11 calls to Dancer::setting, avg 16µs/call |
| 38 | if not is_discoverable($device); | ||||
| 39 | |||||
| 40 | 57 | 181µs | 57 | 10.5s | push @badactions, (qw/macsuck nbtstat/) # spent 10.5s making 57 calls to App::Netdisco::Util::Device::is_macsuckable, avg 184ms/call |
| 41 | if not is_macsuckable($device); | ||||
| 42 | |||||
| 43 | 57 | 174µs | 57 | 10.5s | push @badactions, 'arpnip' # spent 10.5s making 57 calls to App::Netdisco::Util::Device::is_arpnipable, avg 184ms/call |
| 44 | if not is_arpnipable($device); | ||||
| 45 | |||||
| 46 | 57 | 191µs | return @badactions; | ||
| 47 | } | ||||
| 48 | |||||
| 49 | sub jq_warm_thrusters { | ||||
| 50 | #my @devices = schema('netdisco')->resultset('Device')->all; | ||||
| 51 | my @devices = schema('netdisco')->resultset('Device')->search({ name => {like => 'KR%'}}); | ||||
| 52 | my $rs = schema('netdisco')->resultset('DeviceSkip'); | ||||
| 53 | my %actionset = (); | ||||
| 54 | |||||
| 55 | 1 | 900ns | DB::enable_profile("/tmp/nytprof_warm_thrusters"); | ||
| 56 | 1 | 2µs | foreach my $d (@devices) { | ||
| 57 | 57 | 199µs | 57 | 31.5s | my @badactions = _get_denied_actions($d); # spent 31.5s making 57 calls to App::Netdisco::JobQueue::PostgreSQL::_get_denied_actions, avg 553ms/call |
| 58 | 57 | 138µs | 11 | 78µs | $actionset{$d->ip} = \@badactions if scalar @badactions; # spent 78µs making 11 calls to App::Netdisco::DB::Result::Device::ip, avg 7µs/call |
| 59 | } | ||||
| 60 | DB::finish_profile(); | ||||
| 61 | warning "=========== NYTProf data written, save to kill process now ==========="; | ||||
| 62 | |||||
| 63 | schema('netdisco')->txn_do(sub { | ||||
| 64 | $rs->search({ | ||||
| 65 | backend => setting('workers')->{'BACKEND'}, | ||||
| 66 | }, { for => 'update' }, )->update({ actionset => [] }); | ||||
| 67 | |||||
| 68 | my $deferrals = setting('workers')->{'max_deferrals'} - 1; | ||||
| 69 | $rs->search({ | ||||
| 70 | backend => setting('workers')->{'BACKEND'}, | ||||
| 71 | deferrals => { '>' => $deferrals }, | ||||
| 72 | }, { for => 'update' }, )->update({ deferrals => $deferrals }); | ||||
| 73 | |||||
| 74 | $rs->search({ | ||||
| 75 | backend => setting('workers')->{'BACKEND'}, | ||||
| 76 | actionset => { -value => [] }, | ||||
| 77 | deferrals => 0, | ||||
| 78 | })->delete; | ||||
| 79 | |||||
| 80 | $rs->update_or_create({ | ||||
| 81 | backend => setting('workers')->{'BACKEND'}, | ||||
| 82 | device => $_, | ||||
| 83 | actionset => $actionset{$_}, | ||||
| 84 | }, { key => 'primary' }) for keys %actionset; | ||||
| 85 | }); | ||||
| 86 | } | ||||
| 87 | |||||
| 88 | sub jq_getsome { | ||||
| 89 | my $num_slots = shift; | ||||
| 90 | return () unless $num_slots and $num_slots > 0; | ||||
| 91 | |||||
| 92 | my $jobs = schema('netdisco')->resultset('Admin'); | ||||
| 93 | my @returned = (); | ||||
| 94 | |||||
| 95 | my $tasty = schema('netdisco')->resultset('Virtual::TastyJobs') | ||||
| 96 | ->search(undef,{ bind => [ | ||||
| 97 | setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'}, | ||||
| 98 | setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'}, | ||||
| 99 | setting('workers')->{'retry_after'}, $num_slots, | ||||
| 100 | ]}); | ||||
| 101 | |||||
| 102 | while (my $job = $tasty->next) { | ||||
| 103 | if ($job->device) { | ||||
| 104 | # need to handle device discovered since backend daemon started | ||||
| 105 | # and the skiplist was primed. these should be checked against | ||||
| 106 | # the various acls and have device_skip entry added if needed, | ||||
| 107 | # and return false if it should have been skipped. | ||||
| 108 | my @badactions = _get_denied_actions($job->device); | ||||
| 109 | if (scalar @badactions) { | ||||
| 110 | schema('netdisco')->resultset('DeviceSkip')->find_or_create({ | ||||
| 111 | backend => setting('workers')->{'BACKEND'}, device => $job->device, | ||||
| 112 | },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); | ||||
| 113 | |||||
| 114 | # will now not be selected in a future _getsome() | ||||
| 115 | next if scalar grep {$_ eq $job->action} @badactions; | ||||
| 116 | } | ||||
| 117 | } | ||||
| 118 | |||||
| 119 | # remove any duplicate jobs, incuding possibly this job if there | ||||
| 120 | # is already an equivalent job running | ||||
| 121 | |||||
| 122 | # note that the self-removal of a job has an unhelpful log: it is | ||||
| 123 | # reported as a duplicate of itself! however what's happening is that | ||||
| 124 | # netdisco has seen another running job with same params (but the query | ||||
| 125 | # cannot see that ID to use it in the message). | ||||
| 126 | |||||
| 127 | my %job_properties = ( | ||||
| 128 | action => $job->action, | ||||
| 129 | port => $job->port, | ||||
| 130 | subaction => $job->subaction, | ||||
| 131 | -or => [ | ||||
| 132 | { device => $job->device }, | ||||
| 133 | ($job->device_key ? ({ device_key => $job->device_key }) : ()), | ||||
| 134 | ], | ||||
| 135 | ); | ||||
| 136 | |||||
| 137 | my $gone = $jobs->search({ | ||||
| 138 | status => 'queued', | ||||
| 139 | -and => [ | ||||
| 140 | %job_properties, | ||||
| 141 | -or => [{ | ||||
| 142 | job => { '!=' => $job->id }, | ||||
| 143 | },{ | ||||
| 144 | job => $job->id, | ||||
| 145 | -exists => $jobs->search({ | ||||
| 146 | status => { -like => 'queued-%' }, | ||||
| 147 | started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')], | ||||
| 148 | %job_properties, | ||||
| 149 | })->as_query, | ||||
| 150 | }], | ||||
| 151 | ], | ||||
| 152 | }, {for => 'update'}) | ||||
| 153 | ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) }); | ||||
| 154 | |||||
| 155 | debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id; | ||||
| 156 | push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); | ||||
| 157 | } | ||||
| 158 | |||||
| 159 | return @returned; | ||||
| 160 | } | ||||
| 161 | |||||
| 162 | sub jq_locked { | ||||
| 163 | my @returned = (); | ||||
| 164 | my $rs = schema('netdisco')->resultset('Admin')->search({ | ||||
| 165 | status => ('queued-'. setting('workers')->{'BACKEND'}), | ||||
| 166 | started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')], | ||||
| 167 | }); | ||||
| 168 | |||||
| 169 | while (my $job = $rs->next) { | ||||
| 170 | push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); | ||||
| 171 | } | ||||
| 172 | return @returned; | ||||
| 173 | } | ||||
| 174 | |||||
| 175 | sub jq_queued { | ||||
| 176 | my $job_type = shift; | ||||
| 177 | |||||
| 178 | return schema('netdisco')->resultset('Admin')->search({ | ||||
| 179 | device => { '!=' => undef}, | ||||
| 180 | action => $job_type, | ||||
| 181 | status => { -like => 'queued%' }, | ||||
| 182 | })->get_column('device')->all; | ||||
| 183 | } | ||||
| 184 | |||||
| 185 | sub jq_lock { | ||||
| 186 | my $job = shift; | ||||
| 187 | my $happy = false; | ||||
| 188 | |||||
| 189 | # lock db row and update to show job has been picked | ||||
| 190 | try { | ||||
| 191 | my $updated = schema('netdisco')->resultset('Admin') | ||||
| 192 | ->search({ job => $job->id, status => 'queued' }, { for => 'update' }) | ||||
| 193 | ->update({ | ||||
| 194 | status => ('queued-'. setting('workers')->{'BACKEND'}), | ||||
| 195 | started => \"now()", | ||||
| 196 | }); | ||||
| 197 | |||||
| 198 | $happy = true if $updated > 0; | ||||
| 199 | } | ||||
| 200 | catch { | ||||
| 201 | error $_; | ||||
| 202 | }; | ||||
| 203 | |||||
| 204 | return $happy; | ||||
| 205 | } | ||||
| 206 | |||||
| 207 | sub jq_defer { | ||||
| 208 | my $job = shift; | ||||
| 209 | my $happy = false; | ||||
| 210 | |||||
| 211 | # note this taints all actions on the device. for example if both | ||||
| 212 | # macsuck and arpnip are allowed, but macsuck fails 10 times, then | ||||
| 213 | # arpnip (and every other action) will be prevented on the device. | ||||
| 214 | |||||
| 215 | # seeing as defer is only triggered by an SNMP connect failure, this | ||||
| 216 | # behaviour seems reasonable, to me (or desirable, perhaps). | ||||
| 217 | |||||
| 218 | try { | ||||
| 219 | schema('netdisco')->txn_do(sub { | ||||
| 220 | if ($job->device) { | ||||
| 221 | schema('netdisco')->resultset('DeviceSkip')->find_or_create({ | ||||
| 222 | backend => setting('workers')->{'BACKEND'}, device => $job->device, | ||||
| 223 | },{ key => 'device_skip_pkey' })->increment_deferrals; | ||||
| 224 | } | ||||
| 225 | |||||
| 226 | # lock db row and update to show job is available | ||||
| 227 | schema('netdisco')->resultset('Admin') | ||||
| 228 | ->find($job->id, {for => 'update'}) | ||||
| 229 | ->update({ status => 'queued', started => undef }); | ||||
| 230 | }); | ||||
| 231 | $happy = true; | ||||
| 232 | } | ||||
| 233 | catch { | ||||
| 234 | error $_; | ||||
| 235 | }; | ||||
| 236 | |||||
| 237 | return $happy; | ||||
| 238 | } | ||||
| 239 | |||||
| 240 | sub jq_complete { | ||||
| 241 | my $job = shift; | ||||
| 242 | my $happy = false; | ||||
| 243 | |||||
| 244 | # lock db row and update to show job is done/error | ||||
| 245 | |||||
| 246 | # now that SNMP connect failures are deferrals and not errors, any complete | ||||
| 247 | # status, whether success or failure, indicates an SNMP connect. reset the | ||||
| 248 | # connection failures counter to forget about occasional connect glitches. | ||||
| 249 | |||||
| 250 | try { | ||||
| 251 | schema('netdisco')->txn_do(sub { | ||||
| 252 | if ($job->device) { | ||||
| 253 | schema('netdisco')->resultset('DeviceSkip')->find_or_create({ | ||||
| 254 | backend => setting('workers')->{'BACKEND'}, device => $job->device, | ||||
| 255 | },{ key => 'device_skip_pkey' })->update({ deferrals => 0 }); | ||||
| 256 | } | ||||
| 257 | |||||
| 258 | schema('netdisco')->resultset('Admin') | ||||
| 259 | ->find($job->id, {for => 'update'})->update({ | ||||
| 260 | status => $job->status, | ||||
| 261 | log => $job->log, | ||||
| 262 | started => $job->started, | ||||
| 263 | finished => $job->finished, | ||||
| 264 | }); | ||||
| 265 | }); | ||||
| 266 | $happy = true; | ||||
| 267 | } | ||||
| 268 | catch { | ||||
| 269 | # use DDP; p $job; | ||||
| 270 | error $_; | ||||
| 271 | }; | ||||
| 272 | |||||
| 273 | return $happy; | ||||
| 274 | } | ||||
| 275 | |||||
| 276 | sub jq_log { | ||||
| 277 | return schema('netdisco')->resultset('Admin')->search({ | ||||
| 278 | 'me.log' => { '-not_like' => 'duplicate of %' }, | ||||
| 279 | }, { | ||||
| 280 | prefetch => 'target', | ||||
| 281 | order_by => { -desc => [qw/entered device action/] }, | ||||
| 282 | rows => (setting('jobs_qdepth') || 50), | ||||
| 283 | })->with_times->hri->all; | ||||
| 284 | } | ||||
| 285 | |||||
| 286 | sub jq_userlog { | ||||
| 287 | my $user = shift; | ||||
| 288 | return schema('netdisco')->resultset('Admin')->search({ | ||||
| 289 | username => $user, | ||||
| 290 | log => { '-not_like' => 'duplicate of %' }, | ||||
| 291 | finished => { '>' => \"(now() - interval '5 seconds')" }, | ||||
| 292 | })->with_times->all; | ||||
| 293 | } | ||||
| 294 | |||||
| 295 | sub jq_insert { | ||||
| 296 | my $jobs = shift; | ||||
| 297 | $jobs = [$jobs] if ref [] ne ref $jobs; | ||||
| 298 | my $happy = false; | ||||
| 299 | |||||
| 300 | try { | ||||
| 301 | schema('netdisco')->txn_do(sub { | ||||
| 302 | schema('netdisco')->resultset('Admin')->populate([ | ||||
| 303 | map {{ | ||||
| 304 | device => $_->{device}, | ||||
| 305 | device_key => $_->{device_key}, | ||||
| 306 | port => $_->{port}, | ||||
| 307 | action => $_->{action}, | ||||
| 308 | subaction => ($_->{extra} || $_->{subaction}), | ||||
| 309 | username => $_->{username}, | ||||
| 310 | userip => $_->{userip}, | ||||
| 311 | status => 'queued', | ||||
| 312 | }} @$jobs | ||||
| 313 | ]); | ||||
| 314 | }); | ||||
| 315 | $happy = true; | ||||
| 316 | } | ||||
| 317 | catch { | ||||
| 318 | error $_; | ||||
| 319 | }; | ||||
| 320 | |||||
| 321 | return $happy; | ||||
| 322 | } | ||||
| 323 | |||||
| 324 | sub jq_delete { | ||||
| 325 | my $id = shift; | ||||
| 326 | |||||
| 327 | if ($id) { | ||||
| 328 | schema('netdisco')->txn_do(sub { | ||||
| 329 | schema('netdisco')->resultset('Admin')->find($id)->delete(); | ||||
| 330 | }); | ||||
| 331 | } | ||||
| 332 | else { | ||||
| 333 | schema('netdisco')->txn_do(sub { | ||||
| 334 | schema('netdisco')->resultset('Admin')->delete(); | ||||
| 335 | }); | ||||
| 336 | } | ||||
| 337 | } | ||||
| 338 | |||||
| 339 | true; | ||||
# spent 523µs within UNIVERSAL::can which was called 684 times, avg 764ns/call:
# 684 times (523µs+0s) by App::Netdisco::Util::Permission::check_acl at line 95 of App/Netdisco/Util/Permission.pm, avg 764ns/call |