File: //usr/share/perl5/MongoDB/Role/_TopologyMonitoring.pm
# Copyright 2018 - present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
use strict;
use warnings;
package MongoDB::Role::_TopologyMonitoring;
# MongoDB role to add topology monitoring support
use version;
our $VERSION = 'v2.2.2';
use Moo::Role;
use namespace::clean;
# These are used to cache data
has old_topology_desc => ( is => 'rw' );
has old_server_desc => ( is => 'rw' );
sub publish_topology_opening {
my $self = shift;
my $event = {
topologyId => "$self",
type => "topology_opening_event"
};
eval { $self->monitoring_callback->($event) };
}
sub publish_topology_closing {
my $self = shift;
my $event = {
topologyId => "$self",
type => "topology_closed_event"
};
eval { $self->monitoring_callback->($event) };
}
sub publish_server_opening {
my ( $self, $address ) = @_;
my $event = {
topologyId => "$self",
address => $address,
type => "server_opening_event"
};
eval { $self->monitoring_callback->($event) };
}
sub publish_server_closing {
my ( $self, $address ) = @_;
my $event = {
topologyId => "$self",
address => $address,
type => "server_closed_event"
};
eval { $self->monitoring_callback->($event) };
}
sub publish_server_heartbeat_started {
my ($self, $link) = @_;
my $event = {
connectionId => $link->address,
type => "server_heartbeat_started_event"
};
eval { $self->monitoring_callback->($event) };
}
sub publish_server_heartbeat_succeeded {
my ($self, $link, $rtt_sec_fail, $is_master) = @_;
my $event = {
duration => $rtt_sec_fail,
reply => $is_master,
connectionId => $link->address,
type => "server_heartbeat_succeeded_event"
};
eval { $self->monitoring_callback->($event) };
}
sub publish_server_heartbeat_failed {
my ($self, $link, $rtt_sec_fail, $e) = @_;
my $event = {
duration => $rtt_sec_fail,
failure => $e,
connectionId => $link->address,
type => "server_heartbeat_failed_event"
};
eval { $self->monitoring_callback->($event) };
}
sub __create_server_description {
my ($self, $server) = @_;
my $server_desc;
if (defined $server->is_master) {
$server_desc = {
address => $server->address,
error => $server->error,
roundTripTime => $server->rtt_sec,
lastWriteDate => $server->is_master->{lastWrite}->{lastWriteDate},
opTime => $server->is_master->{opTime},
type => $server->type || "Unknown",
minWireVersion => $server->is_master->{min_wire_version},
maxWireVersion => $server->is_master->{max_wire_version},
me => $server->me,
arbiters => $server->arbiters,
hosts => $server->hosts,
passives => $server->passives,
(defined $server->is_master->{tags} ?
(tags => $server->is_master->{tags}) : ()),
($server->primary ne "" ? (primary => $server->primary) : ()),
(defined $server->is_master->{setName} ?
(setName => $server->is_master->{setName}) : ()),
(defined $server->is_master->{setVersion} ?
(setVersion => $server->is_master->{setVersion}) : ()),
(defined $server->is_master->{electionId} ?
(electionId => $server->is_master->{electionId}) : ()),
(defined $server->is_master->{logicalSessionTimeoutMinutes} ?
(logicalSessionTimeoutMinutes =>
$server->is_master->{logicalSessionTimeoutMinutes}) : ()),
};
} else {
$server_desc = {
address => $server->address,
error => $server->error,
roundTripTime => $server->rtt_sec,
type => $server->type || "Unknown",
me => $server->me,
arbiters => $server->arbiters,
hosts => $server->hosts,
passives => $server->passives,
#TODO figure out what tags should be
tags => undef,
($server->primary ne "" ? (primary => $server->primary) : ()),
};
}
return $server_desc;
}
sub __has_changed_servers {
my ($self, $new_server ) = @_;
# Fields considered Server Description equality
my $equal_servers = 1;
my %equality_fields = (
address => 1,
type => 1,
minWireVersion => 1,
minWireVersion => 1,
me => 1,
arbiters => 1,
hosts => 1,
passives => 1,
tags => 1,
primary => 1,
setName => 1,
setVersion => 1,
electionId => 1,
logicalSessionTimeoutMinutes => 1,
);
my $new_server_desc = $self->__create_server_description($new_server);
my %oldhash = %{$self->old_server_desc};
my %newhash = %{$new_server_desc};
foreach my $key (keys %newhash) {
if (exists($equality_fields{$key})) {
if (!exists($oldhash{$key})) {
$equal_servers = 0;
last;
} elsif (!defined($newhash{$key}) &&
!defined($oldhash{$key})) {
next;
} elsif ($newhash{$key} ne $oldhash{$key}) {
$equal_servers = 0;
last;
}
}
}
unless ( $equal_servers ) {
my $event_server = {
topologyId => "$self",
address => $new_server->address,
previousDescription => $self->old_server_desc,
newDescription => $new_server_desc,
type => "server_description_changed_event"
};
eval { $self->monitoring_callback->($event_server) };
}
}
sub publish_old_topology_desc {
my ( $self, $address, $new_server ) = @_;
if ( $address ) {
my $server = $self->servers->{$address};
my $old_server = $self->__create_server_description($server);
$self->old_server_desc($old_server);
}
if ( $new_server ) {
$self->__has_changed_servers($new_server);
}
$self->old_topology_desc( $self->__create_topology_description );
}
sub publish_new_topology_desc {
my $self = shift;
my $event_topology = {
topologyId => "$self",
previousDescription => $self->old_topology_desc,
newDescription => $self->__create_topology_description,
type => "topology_description_changed_event"
};
eval { $self->monitoring_callback->($event_topology) };
}
sub __create_topology_description {
my ( $self ) = @_;
my @servers = map { $self->__create_server_description($_) } $self->all_servers;
return {
topologyType => $self->type,
( $self->replica_set_name ne ""
? ( setName => $self->replica_set_name )
: ()
),
( defined $self->max_set_version
? ( maxSetVersion => $self->max_set_version )
: ()
),
( defined $self->max_election_id
? ( maxElectionId => $self->max_election_id )
: ()
),
servers => \@servers,
stale => $self->stale,
( defined $self->is_compatible
? ( compatible => $self->is_compatible )
: ()
),
( defined $self->compatibility_error
? ( compatibilityError => $self->compatibility_error )
: ()
),
( defined $self->logical_session_timeout_minutes
? ( logicalSessionTimeoutMinutes => $self->logical_session_timeout_minutes )
: ()
),
};
}
1;