"""
Module defining `EventMonitor` and `SpikeMonitor`.
"""
import numpy as np
from brian2.core.functions import timestep
from brian2.core.names import Nameable
from brian2.core.spikesource import SpikeSource
from brian2.core.variables import Variables
from brian2.groups.group import CodeRunner
from brian2.monitors.ratemonitor import RateMonitor
from brian2.units.allunits import hertz, second
from brian2.units.fundamentalunits import Quantity, check_units
__all__ = ["EventMonitor", "SpikeMonitor"]
[docs]
class EventMonitor(RateMonitor):
"""
Record events from a `NeuronGroup` or another event source.
The recorded events can be accessed in various ways:
the attributes `~EventMonitor.i` and `~EventMonitor.t` store all the indices
and event times, respectively. Alternatively, you can get a dictionary
mapping neuron indices to event trains, by calling the `event_trains`
method.
Parameters
----------
source : `NeuronGroup`, `SpikeSource`
The source of events to record.
event : str
The name of the event to record
variables : str or sequence of str, optional
Which variables to record at the time of the event (in addition to the
index of the neuron). Can be the name of a variable or a list of names.
record : bool, optional
Whether or not to record each event in `i` and `t` (the `count` will
always be recorded). Defaults to ``True``.
when : str, optional
When to record the events, by default records events in the same slot
where the event is emitted. See :ref:`scheduling` for possible values.
order : int, optional
The priority of of this group for operations occurring at the same time
step and in the same scheduling slot. Defaults to the order where the
event is emitted + 1, i.e. it will be recorded directly afterwards.
name : str, optional
A unique name for the object, otherwise will use
``source.name+'_eventmonitor_0'``, etc.
codeobj_class : class, optional
The `CodeObject` class to run code with.
See Also
--------
SpikeMonitor
"""
invalidates_magic_network = False
add_to_magic_network = True
def __init__(
self,
source,
event,
variables=None,
record=True,
when=None,
order=None,
name="eventmonitor*",
codeobj_class=None,
):
if not isinstance(source, SpikeSource):
raise TypeError(
f"{self.__class__.__name__} can only monitor groups "
"producing spikes (such as NeuronGroup), but the given "
f"argument is of type {type(source)}."
)
#: The source we are recording from
self.source = source
#: Whether to record times and indices of events
self.record = record
#: The array of event counts (length = size of target group)
self.count = None
del self.count # this is handled by the Variable mechanism
if event not in source.events:
if event == "spike":
threshold_text = " Did you forget to set a 'threshold'?"
else:
threshold_text = ""
raise ValueError(
f"Recorded group '{source.name}' does not define an event "
f"'{event}'.{threshold_text}"
)
if when is None:
if order is not None:
raise ValueError("Cannot specify order if when is not specified.")
# TODO: Would be nicer if there was a common way of accessing the
# relevant object for NeuronGroup and SpikeGeneratorGroup
if hasattr(source, "thresholder"):
parent_obj = source.thresholder[event]
else:
parent_obj = source
when = parent_obj.when
order = parent_obj.order + 1
elif order is None:
order = 0
#: The event that we are listening to
self.event = event
if variables is None:
variables = {}
elif isinstance(variables, str):
variables = {variables}
#: The additional variables that will be recorded
self.record_variables = set(variables)
for variable in variables:
if variable not in source.variables:
raise ValueError(
f"'{variable}' is not a variable of the recorded group"
)
if self.record:
self.record_variables |= {"i", "t"}
# Some dummy code so that code generation takes care of the indexing
# and subexpressions
code = [f"_to_record_{v} = _source_{v}" for v in sorted(self.record_variables)]
code = "\n".join(code)
self.codeobj_class = codeobj_class
# Since this now works for general events not only spikes, we have to
# pass the information about which variable to use to the template,
# it can not longer simply refer to "_spikespace"
eventspace_name = f"_{event}space"
# Handle subgroups correctly
start = getattr(source, "start", 0)
stop = getattr(source, "stop", len(source))
source_N = getattr(source, "_source_N", len(source))
Nameable.__init__(self, name=name)
self.variables = Variables(self)
self.variables.add_reference(eventspace_name, source)
for variable in self.record_variables:
source_var = source.variables[variable]
self.variables.add_reference(f"_source_{variable}", source, variable)
self.variables.add_auxiliary_variable(
f"_to_record_{variable}",
dimensions=source_var.dim,
dtype=source_var.dtype,
)
self.variables.add_dynamic_array(
variable,
size=0,
dimensions=source_var.dim,
dtype=source_var.dtype,
read_only=True,
)
self.variables.add_arange("_source_idx", size=len(source))
self.variables.add_array(
"count",
size=len(source),
dtype=np.int32,
read_only=True,
index="_source_idx",
)
self.variables.add_constant("_source_start", start)
self.variables.add_constant("_source_stop", stop)
self.variables.add_constant("_source_N", source_N)
self.variables.add_array(
"N", size=1, dtype=np.int32, read_only=True, scalar=True
)
record_variables = {
varname: self.variables[varname] for varname in self.record_variables
}
template_kwds = {
"eventspace_variable": source.variables[eventspace_name],
"record_variables": record_variables,
"record": self.record,
}
needed_variables = {eventspace_name} | self.record_variables
CodeRunner.__init__(
self,
group=self,
code=code,
template="spikemonitor",
name=None, # The name has already been initialized
clock=source.clock,
when=when,
order=order,
needed_variables=needed_variables,
template_kwds=template_kwds,
)
self.variables.create_clock_variables(self._clock, prefix="_clock_")
self.add_dependency(source)
self.written_readonly_vars = {
self.variables[varname] for varname in self.record_variables
}
self._enable_group_attributes()
[docs]
def resize(self, new_size):
# Note that this does not set N, this has to be done in the template
# since we use a restricted pointer to access it (which promises that
# we only change the value through this pointer)
for variable in self.record_variables:
self.variables[variable].resize(new_size)
[docs]
def reinit(self):
"""
Clears all recorded spikes
"""
raise NotImplementedError()
@property
def it(self):
"""
Returns the pair (`i`, `t`).
"""
if not self.record:
raise AttributeError(
"Indices and times have not been recorded."
"Set the record argument to True to record "
"them."
)
return self.i, self.t
@property
def it_(self):
"""
Returns the pair (`i`, `t_`).
"""
if not self.record:
raise AttributeError(
"Indices and times have not been recorded."
"Set the record argument to True to record "
"them."
)
return self.i, self.t_
def _values_dict(self, first_pos, sort_indices, used_indices, var):
sorted_values = self.state(var, use_units=False)[sort_indices]
dim = self.variables[var].dim
event_values = {}
current_pos = 0 # position in the all_indices array
for idx in range(len(self.source)):
if current_pos < len(used_indices) and used_indices[current_pos] == idx:
if current_pos < len(used_indices) - 1:
event_values[idx] = Quantity(
sorted_values[
first_pos[current_pos] : first_pos[current_pos + 1]
],
dim=dim,
)
else:
event_values[idx] = Quantity(
sorted_values[first_pos[current_pos] :], dim=dim
)
current_pos += 1
else:
event_values[idx] = Quantity([], dim=dim)
return event_values
[docs]
def values(self, var):
"""
Return a dictionary mapping neuron indices to arrays of variable values
at the time of the events (sorted by time).
Parameters
----------
var : str
The name of the variable.
Returns
-------
values : dict
Dictionary mapping each neuron index to an array of variable
values at the time of the events
Examples
--------
>>> from brian2 import *
>>> G = NeuronGroup(2, '''counter1 : integer
... counter2 : integer
... max_value : integer''',
... threshold='counter1 >= max_value',
... reset='counter1 = 0')
>>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS
CodeRunner(...)
>>> G.max_value = [50, 100]
>>> mon = EventMonitor(G, event='spike', variables='counter2')
>>> run(10*ms)
>>> counter2_values = mon.values('counter2')
>>> print(counter2_values[0])
[ 50 100]
>>> print(counter2_values[1])
[100]
"""
if not self.record:
raise AttributeError(
"Indices and times have not been recorded."
"Set the record argument to True to record "
"them."
)
indices = self.i[:]
# We have to make sure that the sort is stable, otherwise our spike
# times do not necessarily remain sorted.
sort_indices = np.argsort(indices, kind="mergesort")
used_indices, first_pos = np.unique(self.i[:][sort_indices], return_index=True)
return self._values_dict(first_pos, sort_indices, used_indices, var)
[docs]
def all_values(self):
"""
Return a dictionary mapping recorded variable names (including ``t``)
to a dictionary mapping neuron indices to arrays of variable values at
the time of the events (sorted by time). This is equivalent to (but more
efficient than) calling `values` for each variable and storing the
result in a dictionary.
Returns
-------
all_values : dict
Dictionary mapping variable names to dictionaries which themselves
are mapping neuron indicies to arrays of variable values at the
time of the events.
Examples
--------
>>> from brian2 import *
>>> G = NeuronGroup(2, '''counter1 : integer
... counter2 : integer
... max_value : integer''',
... threshold='counter1 >= max_value',
... reset='counter1 = 0')
>>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS
CodeRunner(...)
>>> G.max_value = [50, 100]
>>> mon = EventMonitor(G, event='spike', variables='counter2')
>>> run(10*ms)
>>> all_values = mon.all_values()
>>> print(all_values['counter2'][0])
[ 50 100]
>>> print(all_values['t'][1])
[ 9.9] ms
"""
if not self.record:
raise AttributeError(
"Indices and times have not been recorded."
"Set the record argument to True to record "
"them."
)
indices = self.i[:]
sort_indices = np.argsort(indices, kind="mergesort")
used_indices, first_pos = np.unique(self.i[:][sort_indices], return_index=True)
all_values_dict = {}
for varname in self.record_variables - {"i"}:
all_values_dict[varname] = self._values_dict(
first_pos, sort_indices, used_indices, varname
)
return all_values_dict
[docs]
def event_trains(self):
"""
Return a dictionary mapping neuron indices to arrays of event times.
Equivalent to calling ``values('t')``.
Returns
-------
event_trains : dict
Dictionary that stores an array with the event times for each
neuron index.
See Also
--------
SpikeMonitor.spike_trains
"""
return self.values("t")
[docs]
@check_units(bin_size=second)
def binned_rate(self, bin_size):
"""
Return the event rates binned with the given bin size.
Parameters
----------
bin_size : `Quantity`
The size of the bins in seconds. Should be a multiple of dt.
Returns
-------
bins : `Quantity`
The start time of the bins.
binned_values : `Quantity`
The binned rates as a 2D array (neurons × bins) in Hz.
Notes
-----
The returned bin times represent the **start** of each bin interval, not the center.
This is consistent with how Brian2 records spike times and other temporal data.
For example, a spike recorded at time `t` occurred during the interval `[t, t+dt)`.
For plotting purposes, especially with larger bin sizes, you may want to use bin
centers instead of bin starts for a more intuitive visualization. You can easily
calculate the bin centers by adding half the bin size::
>> bins, rates = monitor.binned_rate(10*ms)
>> bin_centers = bins + 10*ms / 2
>> plt.plot(bin_centers, rates)
This adjustment is particularly helpful when the bins are large relative to the
time scale of interest, as it better represents where the rate measurement applies
within each time window.
"""
if (bin_size / self.clock.dt) % 1 > 1e-6:
raise ValueError("bin_size has to be a multiple of dt.")
# Get the total duration and number of bins
bin_timesteps = timestep(bin_size, self.clock.dt)
num_bins = int(self.clock.timestep // bin_timesteps)
bin_starts_timesteps = (np.arange(num_bins)) * bin_timesteps
bins = bin_starts_timesteps * self.clock.dt
num_neurons = len(self.source)
# Now we initialize the binned values array (neurons × bins)
binned_values = np.zeros((num_neurons, num_bins))
if self.record and len(self.t) > 0:
# Get the event times and indices
event_times = self.t[:]
event_timesteps = np.asarray(timestep(event_times, self.clock.dt))
bin_indices = event_timesteps // bin_timesteps
np.add.at(binned_values, (self.i[:], bin_indices), 1)
# Convert counts to rates (Hz)
binned_values = binned_values / float(bin_size)
return bins, Quantity(binned_values, dim=hertz.dim)
@property
def num_events(self):
"""
Returns the total number of recorded events.
"""
return self.N[:]
def __repr__(self):
classname = self.__class__.__name__
return f"<{classname}, recording event '{self.event}' from '{self.group.name}'>"
[docs]
def after_run(self):
super().after_run()
# In Cython runtime mode, we directly update the underlying dynamic array,
# so the size attribute of the Variable does not get updated automatically
for var in self.record_variables:
try:
self.variables[var].size = len(self.variables[var].get_value())
except NotImplementedError:
pass # Does not apply to standalone mode
[docs]
class SpikeMonitor(EventMonitor):
"""
Record spikes from a `NeuronGroup` or other spike source.
The recorded spikes can be accessed in various ways (see Examples below):
the attributes `~SpikeMonitor.i` and `~SpikeMonitor.t` store all the indices
and spike times, respectively. Alternatively, you can get a dictionary
mapping neuron indices to spike trains, by calling the `spike_trains`
method. If you record additional variables with the ``variables`` argument,
these variables can be accessed by their name (see Examples).
Parameters
----------
source : (`NeuronGroup`, `SpikeSource`)
The source of spikes to record.
variables : str or sequence of str, optional
Which variables to record at the time of the spike (in addition to the
index of the neuron). Can be the name of a variable or a list of names.
record : bool, optional
Whether or not to record each spike in `i` and `t` (the `count` will
always be recorded). Defaults to ``True``.
when : str, optional
When to record the events, by default records events in the same slot
where the event is emitted. See :ref:`scheduling` for possible values.
order : int, optional
The priority of of this group for operations occurring at the same time
step and in the same scheduling slot. Defaults to the order where the
event is emitted + 1, i.e. it will be recorded directly afterwards.
name : str, optional
A unique name for the object, otherwise will use
``source.name+'_spikemonitor_0'``, etc.
codeobj_class : class, optional
The `CodeObject` class to run code with.
Examples
--------
>>> from brian2 import *
>>> spikes = SpikeGeneratorGroup(3, [0, 1, 2], [0, 1, 2]*ms)
>>> spike_mon = SpikeMonitor(spikes)
>>> net = Network(spikes, spike_mon)
>>> net.run(3*ms)
>>> print(spike_mon.i[:])
[0 1 2]
>>> print(spike_mon.t[:])
[ 0. 1. 2.] ms
>>> print(spike_mon.t_[:])
[ 0. 0.001 0.002]
>>> from brian2 import *
>>> G = NeuronGroup(2, '''counter1 : integer
... counter2 : integer
... max_value : integer''',
... threshold='counter1 >= max_value',
... reset='counter1 = 0')
>>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS
CodeRunner(...)
>>> G.max_value = [50, 100]
>>> mon = SpikeMonitor(G, variables='counter2')
>>> net = Network(G, mon)
>>> net.run(10*ms)
>>> print(mon.i[:])
[0 0 1]
>>> print(mon.counter2[:])
[ 50 100 100]
"""
def __init__(
self,
source,
variables=None,
record=True,
when=None,
order=None,
name="spikemonitor*",
codeobj_class=None,
):
#: The array of spike counts (length = size of target group)
self.count = None
del self.count # this is handled by the Variable mechanism
super().__init__(
source,
event="spike",
variables=variables,
record=record,
when=when,
order=order,
name=name,
codeobj_class=codeobj_class,
)
@property
def num_spikes(self):
"""
Returns the total number of recorded spikes.
"""
return self.num_events
# We "re-implement" the following functions only to get more specific
# doc strings (and to make sure that the methods are included in the
# reference documentation for SpikeMonitor).
[docs]
def spike_trains(self):
"""
Return a dictionary mapping neuron indices to arrays of spike times.
Returns
-------
spike_trains : dict
Dictionary that stores an array with the spike times for each
neuron index.
Examples
--------
>>> from brian2 import *
>>> spikes = SpikeGeneratorGroup(3, [0, 1, 2], [0, 1, 2]*ms)
>>> spike_mon = SpikeMonitor(spikes)
>>> run(3*ms)
>>> spike_trains = spike_mon.spike_trains()
>>> spike_trains[1]
array([ 1.]) * msecond
"""
return self.event_trains()
[docs]
def values(self, var):
"""
Return a dictionary mapping neuron indices to arrays of variable values
at the time of the spikes (sorted by time).
Parameters
----------
var : str
The name of the variable.
Returns
-------
values : dict
Dictionary mapping each neuron index to an array of variable
values at the time of the spikes.
Examples
--------
>>> from brian2 import *
>>> G = NeuronGroup(2, '''counter1 : integer
... counter2 : integer
... max_value : integer''',
... threshold='counter1 >= max_value',
... reset='counter1 = 0')
>>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS
CodeRunner(...)
>>> G.max_value = [50, 100]
>>> mon = SpikeMonitor(G, variables='counter2')
>>> run(10*ms)
>>> counter2_values = mon.values('counter2')
>>> print(counter2_values[0])
[ 50 100]
>>> print(counter2_values[1])
[100]
"""
return super().values(var)
[docs]
def all_values(self):
"""
Return a dictionary mapping recorded variable names (including ``t``)
to a dictionary mapping neuron indices to arrays of variable values at
the time of the spikes (sorted by time). This is equivalent to (but more
efficient than) calling `values` for each variable and storing the
result in a dictionary.
Returns
-------
all_values : dict
Dictionary mapping variable names to dictionaries which themselves
are mapping neuron indicies to arrays of variable values at the
time of the spikes.
Examples
--------
>>> from brian2 import *
>>> G = NeuronGroup(2, '''counter1 : integer
... counter2 : integer
... max_value : integer''',
... threshold='counter1 >= max_value',
... reset='counter1 = 0')
>>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS
CodeRunner(...)
>>> G.max_value = [50, 100]
>>> mon = SpikeMonitor(G, variables='counter2')
>>> run(10*ms)
>>> all_values = mon.all_values()
>>> print(all_values['counter2'][0])
[ 50 100]
>>> print(all_values['t'][1])
[ 9.9] ms
"""
return super().all_values()
def __repr__(self):
classname = self.__class__.__name__
return f"<{classname}, recording from '{self.group.name}'>"