Source code for planktos._swarm

'''
Swarm class of Planktos.

Created on Tues Jan 24 2017

Author: Christopher Strickland

Email: cstric12@utk.edu
'''

import sys, os, warnings
from pathlib import Path
import numpy as np
import numpy.ma as ma
from scipy import stats
import pandas as pd
if sys.platform == 'darwin': # OSX backend does not support blitting
    import matplotlib
    matplotlib.use('Qt5Agg')
import matplotlib.pyplot as plt
from matplotlib import animation, colors
from matplotlib.path import Path as mPath

from ._environment import Environment
from . import _dataio, _geom, _ibc, motion

__author__ = "Christopher Strickland"
__email__ = "cstric12@utk.edu"
__copyright__ = "Copyright 2017, Christopher Strickland"

[docs] class Swarm: ''' Fundamental Planktos object describing a group of similar agents. The Swarm class (alongside the Environment class) provides the main agent functionality of Planktos. Each Swarm object should be thought of as a group of similar (though not necessarily identical) agents. Planktos implements agents in this way rather than as individual objects for speed purposes; it is easier to vectorize a swarm of agents than individual agent objects, and also much easier to plot them all, get data on them all, etc. The Swarm object contains all information on the agents' positions, properties, and movement algorithm. It also handles plotting of the agents and saving of agent data to file for further analysis. Initial agent velocities will be set as the local fluid velocity if present, otherwise zero. Assignment to the velocities attribute can be made directly for other initial conditions. NOTE: To customize agent behavior, subclass this class and re-implement the method apply_agent_model (do not change the call signature). Parameters ---------- swarm_size : int, default=100 Number of agents in the Swarm. ignored when using the 'grid' init method. envir : Environment object, optional Environment for the Swarm to exist in. Defaults to a newly initialized Environment with all of the defaults. init : {'random', 'grid', ndarray}, default='random' Method for initalizing agent positions. * 'random': Uniform random distribution throughout the domain * 'grid': Uniform grid on interior of the domain, including capability to leave out closed immersed structures. In this case, swarm_size is ignored since it is determined by the grid dimensions. Requires the additional keyword parameters grid_dim and testdir. * 1D array: All positions set to a single point given by the x,y,[z] coordinates of this array * 2D array: All positions as specified. Shape of array should be NxD, where N is the number of agents and D is spatial dimension. In this case, swarm_size is ignored. ib_condition : {None, 'sliding' (default), 'sticky'} Boundary condition for immersed boundaries * None: Will turn off all interactions with immersed boundaries * 'sliding': (default) No flux in the direction normal to the boundary; any movement across the boundary will be subject to vector projection onto the boundary within a given time step * 'sticky': The velocity at the boundary is zero - anything that hits the boundary stops for the remainder of the time step. seed : int, optional Seed for random number generator shared_props : dictionary, optional dictionary of properties shared by all agents as name-value pairs. If none are provided, four default properties will be created, 'mu' and 'cov', corresponding to intrinsic mean drift and a covariance matrix for brownian motion respectively, and 'name' and 'color corresponding to the name of the swarm and its default color for plotting. 'mu' will be set to an array of zeros with length matching the spatial dimension, and 'cov' will be set to an identity matrix of appropriate size according to the spatial dimension. This allows the default agent behavior to be unbiased brownian motion. Examples: * diam: diameter of the particles * m: mass of the particles * Cd: drag coefficient of the particles * cross_sec: cross-sectional area of the particles * R: density ratio props : Pandas dataframe of individual agent properties, optional Pandas dataframe of individual agent properties that vary between agents. This is the method by which individual variation among the agents should be specified. The number of rows in the dataframe should match the number of agents. If no dataframe is supplied, an empty one will be created. A special property (column) can be specified called 'angle' which, if props_history is being stored, will plot as the agent heading in 2D. store_prop_history : bool Whether or not to keep a history of props at all time points name : string, optional Name of this swarm. Stored in shared_props. color : matplotlib color format Default plotting color for swarm (see https://matplotlib.org/stable/tutorials/colors/colors.html). Stored in shared_props. Can be overridden by supplying individual (and even time varying!) agent colors in a 'color' column of the props DataFrame. pool : object with a .map method, optional Worker pool used to parallelize per-agent immersed-boundary collision detection (the main runtime bottleneck when immersed meshes are present). Any object exposing ``.map(func, iterable)`` works, including ``multiprocessing.Pool``, ``concurrent.futures.ProcessPoolExecutor``, and ``concurrent.futures.ThreadPoolExecutor``; choose threads (shares memory, no pickling) or processes (true multi-core) just by which one you attach. Defaults to None, which runs serially and reproduces the unparallelized behavior exactly. The pool is created and owned by the user (Planktos does not shut it down). A process-based pool (e.g. ProcessPoolExecutor) is recommended and is mainly beneficial for the expensive moving-boundary case; for cheap static-mesh collisions, or for thread-based pools, the per-agent dispatch overhead can outweigh the work and reduce performance, so benchmark before relying on it. The pool is also accessible as ``self.pool`` inside apply_agent_model for user subclasses, but the default agent model does not use it; note that parallelizing a custom stochastic agent model requires independent per-worker RNG streams (e.g. ``np.random.SeedSequence().spawn()``) to avoid shared-RNG contention. **kwargs : dict, optional keyword arguments to be used in the 'grid' initialization method or values to be set as a Swarm object property. In the latter case, these values can be floats, ndarrays, or iterables, but keep in mind that problems will result with parsing if the number of agents is equal to the spatial dimension - this is to be avoided. This method of specifying agent properties is depreciated: use the shared_props dictionary instead. Other Parameters ---------------- grid_dim : tuple of int (x, y, [z]) number of grid points in x, y, [and z] directions for 'grid' initialization testdir : {'x0', 'x1', 'y0', 'y1', ['z0'], ['z1']}, optional two character string for testing if grid points are in the interior of an immersed structure and if so, masking them in the grid initialization. The first char is x,y, or z denoting the dimensional direction of the search ray, the second is either 0 or 1 denoting the direction (backward vs. forward) along that direction. See documentation of Swarm.grid_init for more information. Attributes ---------- envir : Environment object Environment that this Swarm belongs to positions : masked array, shape Nx2 (2D) or Nx3 (3D) spatial location of all the agents in the swarm. the mask is False for any row corresponding to an agent that is within the spatial boundaries of the environment, otherwise the mask for the row is set to True and the position of that agent is no longer updated N : read only property, int The current number of agents in the swarm, based on positions.shape[0] pos_history : list of masked arrays all previous position arrays are stored here. to get their corresponding times, check the time_history attribute of the Swarm's Environment. full_pos_history : list of masked arrays same as pos_history, but also includes the positions attribute as the last entry in the list velocities : masked array, shape Nx2 (2D) or Nx3 (3D) velocity of all the agents in the swarm. same masking properties as positions vel_history : list of masked arrays all previous velocity arrays are stored here. to get their corresponding times, check the time_history attribute of the Swarm's Environment. full_vel_history : list of masked arrays same as vel_history, but also includes the velocities attribute as the last entry in the list accelerations : masked array, shape Nx2 (2D) or Nx3 (3D) accelerations of all the agents in the swarm. same masking properties as positions ib_collision_idx : 1D array of int with length equal to the swarm size For each agent, the index of the mesh element that the agent collided with in the most recent time it moved. -1 if no collision. props : pandas DataFrame Pandas dataframe of individual agent properties that vary between agents. This is the method by which individual variation among the agents should be specified. A special column can be specified called 'angle' which, if props_history is being stored, will plot as the agent heading in 2D. props_history : List of past Pandas DataFrames or None If not None, this list records individual agent attributes at all previous points in time corresponding to the time_history attribute of the Swarm's Environment. full_props_history : List of Pandas DataFrames or None props_history plus the current time version of props shared_props : dictionary dictionary of properties shared by all agents as name-value pairs. Must include 'name' and 'color' indicating the name of the swarm and its default color for plotting. 'mu' and 'cov' are required for Brownian motion, and other properties may be required for other physics. rndState : numpy Generator object random number generator for this Swarm, seeded by the "seed" parameter Notes ----- If the agent behavior you are looking for is simply brownian motion with fluid advection, all you need to do is change the 'cov' entry in the shared_props dictionary to a covariance matrix that matches the amount of jitter you are looking for. You can also add fixed directional bias by editing the 'mu' shared_props entry. This default behavior is then accomplished by solving the relevant SDE using Euler steps, where the step size is the dt argument of the move method which you call in a loop, e.g.:: for ii in range(50): swm.move(0.1) In order to accomodate general, user-defined behavior algorithms, all other agent behaviors should be explicitly specified by subclassing this Swarm class and overriding the apply_agent_model method. This is easy, and takes the following form: :: class myagents(planktos.Swarm): def apply_agent_model(self, dt): # # Put any calculations here that are necessary to determine # where the agents should end up after a time step of length # dt assuming they don't run into a boundary of any sort. # Boundary conditions, mesh crossings, etc. will be handled # automatically by Planktos after this function returns. The # new positions you return should be an ndarray of shape NxD # where N is the number of agents in the swarm and D is the # spatial dimension of the system. The params argument is # there in case you want this method to take in any external # info (e.g. time-varying forcing functions, user-controlled # behavior switching, etc.). Note that this method has full # access to all of the Swarm attributes via the "self" # argument. For example, self.positions will return an NxD # masked array of current agent positions. The one thing this # method SHOULD NOT do is set the positions, velocities, or # accelerations attributes of the Swarm. This will be handled # automatically after this method returns, and after boundary # conditions have been checked. return newpositions Then, when you create a Swarm object, create it using:: swrm = myagents() # add Swarm parameters as necessary, as documented above This will create a Swarm object, but with your my_positions method instead of the default one! Examples -------- Create a default Swarm in an Environment with some fluid data loaded and tiled. >>> envir = planktos.Environment() >>> envir.read_IBAMR3d_vtk_data('../tests/IBAMR_test_data', d_start=5, d_finish=None) >>> envir.tile_flow(3,3) >>> swrm = Swarm(envir=envir) ''' def __init__(self, swarm_size=100, envir=None, init='random', ib_condition='sliding', seed=None, shared_props=None, props=None, store_prop_history=False, name='organism', color='darkgreen', pool=None, **kwargs): # use a new, 3D default Environment if one was not given. Or infer # dimension from init if possible. if envir is None: if isinstance(init,str): self.envir = Environment(init_swarms=self, Lz=10) elif isinstance(init,np.ndarray) and len(init.shape) == 2: if init.shape[1] == 2: self.envir = Environment(init_swarms=self) else: self.envir = Environment(init_swarms=self, Lz=10) else: if len(init) == 2: self.envir = Environment(init_swarms=self) else: self.envir = Environment(init_swarms=self, Lz=10) else: try: assert envir.__class__.__name__ == 'Environment' envir.swarms.append(self) self.envir = envir except AssertionError as ae: print("Error: invalid Environment object.") raise ae # initialize random number generator self.rndState = np.random.default_rng(seed=seed) # initialize agent locations if isinstance(init,np.ndarray) and len(init.shape) == 2: swarm_size = init.shape[0] self.positions = ma.zeros((swarm_size, len(self.envir.L))) if isinstance(init,str): if init == 'random': print('Initializing Swarm with uniform random positions...') for ii in range(len(self.envir.L)): self.positions[:,ii] = self.rndState.uniform(0, self.envir.L[ii], self.N) elif init == 'grid': assert 'grid_dim' in kwargs, "Required key word argument grid_dim missing for grid init." x_num = kwargs['grid_dim'][0]; y_num = kwargs['grid_dim'][1] if len(self.envir.L) > 2: z_num = kwargs['grid_dim'][2] else: z_num = None if 'testdir' in kwargs: testdir = kwargs['testdir'] else: testdir = None print('Initializing Swarm with grid positions...') self.positions = self.grid_init(x_num, y_num, z_num, testdir) swarm_size = self.N else: print("Initialization method {} not implemented.".format(init)) print("Exiting...") raise NameError else: if isinstance(init,np.ndarray) and len(init.shape) == 2: assert init.shape[1] == len(self.envir.L),\ "Initial location data must be Nx{} to match number of agents.".format( len(self.envir.L)) self.positions[:,:] = init[:,:] else: for ii in range(len(self.envir.L)): self.positions[:,ii] = init[ii] # Due to overloading of the __setattr__ method below, positions, velocities, # and accelerations should always have a hard mask automatically. # initialize agent velocities if self.envir.flow is not None: self.velocities = ma.array(self.get_fluid_drift(), mask=self.positions.mask.copy()) else: self.velocities = ma.array(np.zeros((swarm_size, len(self.envir.L))), mask=self.positions.mask.copy()) # initialize agent accelerations self.accelerations = ma.array(np.zeros((swarm_size, len(self.envir.L))), mask=self.positions.mask.copy()) # Initialize position and velocity history self.pos_history = [] self.vel_history = [] # Initialize IB collision detection self.ib_collision_idx = np.full(swarm_size, -1) # will be set to mesh index if collision occurs self.ib_condition = ib_condition # Optional worker pool for parallelizing immersed-boundary collision # detection. Any object exposing a .map(func, iterable) method works # (e.g. multiprocessing.Pool, concurrent.futures.ProcessPoolExecutor, # or ThreadPoolExecutor). None (default) runs serially. See # apply_boundary_conditions. Also accessible as self.pool inside # apply_agent_model for user subclasses, though the default agent model # does not use it. self.pool = pool # initialize Dataframe of non-shared properties if props is None: self.props = pd.DataFrame() # with random cov # self.props = pd.DataFrame( # {'start_pos': [tuple(self.positions[ii,:]) for ii in range(swarm_size)], # 'cov': [np.eye(len(self.envir.L))*(0.5+np.random.rand()) for ii in range(swarm_size)]} # ) else: self.props = props if store_prop_history: self.props_history = [] else: self.props_history = None # Dictionary of shared properties if shared_props is None: self.shared_props = {} else: self.shared_props = shared_props # Include necessary default properties if they aren't already set if 'mu' not in self.shared_props and 'mu' not in self.props: self.shared_props['mu'] = np.zeros(len(self.envir.L)) if 'cov' not in self.shared_props and 'cov' not in self.props: self.shared_props['cov'] = np.eye(len(self.envir.L)) if 'name' not in self.shared_props: self.shared_props['name'] = name if 'color' not in self.shared_props: self.shared_props['color'] = color # Record any kwargs as Swarm parameters for name, obj in kwargs.items(): try: if isinstance(obj,np.ndarray) and obj.shape[0] == swarm_size and\ obj.shape[0] != len(self.envir.L): self.props[name] = obj elif isinstance(obj,np.ndarray): self.shared_props[name] = obj elif len(obj) == swarm_size and len(obj) != len(self.envir.L): self.props[name] = obj else: # convert iterable to ndarray first self.shared_props[name] = np.array(obj) except TypeError: # Called len on something that wasn't iterable self.shared_props[name] = obj # Make sure mask is always hardened for positions, velocities, and accelerations def __setattr__(self, name, value): if name in ['positions', 'velocities', 'accelerations']: assert isinstance(value, np.ndarray), name+" must be an array or masked array." if not isinstance(value, ma.masked_array): value = ma.masked_array(value) value.harden_mask() super().__setattr__(name, value)
[docs] def grid_init(self, x_num, y_num, z_num=None, testdir=None): '''Return a flattened array which describes a regular grid of locations, except potentially masking any grid points in the interior of a closed, immersed structure. The full, unmasked grid will be x_num by y_num [by z_num] on the interior and boundaries of the domain. The output of this method is appropriate for finding FTLE, and that is its main purpose. It will automatically be called by the Environment class's calculate_FTLE method, and if you want to initialize a Swarm with a grid this is possible by passing the init='grid' keyword argument when the Swarm is created. So there is probably no reason to use this method directly. Grid list moves in the [Z direction], Y direction, then X direction (due to C order of memory layout). Parameters ---------- x_num, y_num, [z_num] : int number of grid points in each direction testdir : {'x0', 'x1', 'y0', 'y1', ['z0'], ['z1']}, optional to check if a point is an interior to an immersed structure, a line will be drawn from the point to a domain boundary. If the number of immersed boundary intersections is odd, the point will be considered interior and masked. This check will not be run at all if testdir is None. Otherwise, specify a direction with one of the following: 'x0','x1','y0','y1','z0','z1' (the last two for 3D problems only) denoting the dimension (x,y, or z) and the direction (0 for negative, 1 for positive). Notes ----- This algorithm is meant as a huristic only! It is not guaranteed to mask all interior grid points, and will mask non-interior points if there is not a clear line from the point to one of the boundaries of the domain. If this method fails for your geometry and better accuracy is needed, use this method as a starting point and mask/unmask as necessary. ''' # Form initial grid x_pts = np.linspace(0, self.envir.L[0], x_num) y_pts = np.linspace(0, self.envir.L[1], y_num) if z_num is not None: z_pts = np.linspace(0, self.envir.L[2], z_num) X1, X2, X3 = np.meshgrid(x_pts, y_pts, z_pts, indexing='ij') xidx, yidx, zidx = np.meshgrid(np.arange(x_num), np.arange(y_num), np.arange(z_num), indexing='ij') DIM = 3 elif len(self.envir.L) > 2: raise RuntimeError("Must specify z_num for 3D problems.") else: X1, X2 = np.meshgrid(x_pts, y_pts, indexing='ij') xidx, yidx = np.meshgrid(np.arange(x_num), np.arange(y_num), indexing='ij') DIM = 2 if testdir is None: if DIM == 2: return ma.array([X1.flatten(), X2.flatten()]).T else: return ma.array([X1.flatten(), X2.flatten(), X3.flatten]).T elif testdir[0] == 'z' and len(self.envir.L) < 3: raise RuntimeError("z-direction unavailable in 2D problems.") # Translate directional input startdim = [0,0] if testdir[0] == 'x': startdim[0] = 0 if DIM == 2: perp_idx = 1 else: perp_idx = [1,2] elif testdir[0] == 'y': startdim[0] = 1 if DIM == 2: perp_idx = 0 else: perp_idx = [0,2] elif testdir[0] == 'z': startdim[0] = 2 perp_idx = [0,1] else: raise RuntimeError("Unrecognized value in testdir, {}.".format(testdir)) try: startdim[1] = int(testdir[1]) - 1 except ValueError: raise RuntimeError("Unrecognized value in testdir, {}.".format(testdir)) # Idea: start on opposite side of domain as given direction and take a # full grid on the boundary. See if there are intersections. # If none, none of the points along that ray need to be tested further # Otherwise, we are also given the intersection points. Use to # deduce the rest. # startdim gives the dimension and index on which to place a grid # Convert X1, X2, X3 to masked arrays X1 = ma.array(X1) X2 = ma.array(X2) if DIM == 3: X3 = ma.array(X3) grids = [X1, X2, X3] idx_list = [xidx, yidx, zidx] else: grids = [X1, X2] idx_list = [xidx, yidx] # get a list of the gridpoints on correct side of the domain firstpts = [] first_idx_list = [] for X, idx in zip(grids,idx_list): if startdim[0] == 0: firstpts.append(X[startdim[1],...]) first_idx_list.append(idx[startdim[1],...]) elif startdim[0] == 1: firstpts.append(X[:,startdim[1],...]) first_idx_list.append(idx[:,startdim[1],...]) else: firstpts.append(X[:,:,startdim[1]]) first_idx_list.append(idx[:,:,startdim[1]]) firstpts = np.array([X.flatten() for X in firstpts]).T idx_vals = np.array([idx.flatten() for idx in first_idx_list]).T mesh = self.envir.ibmesh meshptlist = mesh.reshape((mesh.shape[0]*mesh.shape[1],mesh.shape[2])) for pt, idx in zip(firstpts,idx_vals): # for each pt in the grid, get a list of eligibible mesh elements as # those who have a point within a cylinder of diameter envir.max_meshpt_dist if DIM == 3: pt_bool = np.linalg.norm(meshptlist[:,perp_idx]-pt[perp_idx], axis=1)<=self.envir.max_meshpt_dist/2 else: pt_bool = np.abs(meshptlist[:,perp_idx]-pt[perp_idx])\ <=self.envir.max_meshpt_dist/2 pt_bool = pt_bool.reshape((mesh.shape[0], mesh.shape[1])) close_mesh = mesh[np.any(pt_bool, axis=1)] endpt = np.array(pt) if startdim[1] == -1: endpt[startdim[0]] = 0 else: endpt[startdim[0]] = self.envir.L[startdim[0]] # Get intersections if DIM == 2: intersections = _geom.seg_intersect_2D(pt, endpt, close_mesh[:,0,:], close_mesh[:,1,:], get_all=True) else: intersections = _geom.seg_intersect_3D_triangles(pt, endpt, close_mesh[:,0,:], close_mesh[:,1,:], close_mesh[:,2,:], get_all=True) # For completeness, we should also worry about edge cases where # intersections are not of mesh facets but of triangle points, but # as a heuristic, we will ignore this. A tweaking of the number of # grid points used could fix this problem in most cases, or it # could be fixed by hand. if intersections is not None: # Sort the intersections by distance away from pt intersections = sorted(intersections, key=lambda x: x[1]) # get list of all x,y, or z values for points along the ray # (where the dimension matches the direction of the ray) if startdim[0] == 0: current_pt_val = pt[0] - 10e-7 if DIM == 3: val_list = X1[:,idx[1],idx[2]] else: val_list = X1[:,idx[1]] elif startdim[0] == 1: current_pt_val = pt[1] - 10e-7 if DIM == 3: val_list = X2[idx[0],:,idx[2]] else: val_list = X2[idx[0],:] else: current_pt_val = pt[2] - 10e-7 val_list = X3[idx[0],idx[1],:] while len(intersections) > 0: n = len(intersections) intersection = intersections.pop(0) if startdim[0] == 0: intersect_val = intersection[0][0] elif startdim[0] == 1: intersect_val = intersection[0][1] else: intersect_val = intersection[0][2] if current_pt_val < intersect_val: pair = [current_pt_val, intersect_val] else: pair = [intersect_val, current_pt_val] # gather all points between current one and intersection # This will always mask points exactly on a mesh boundary bool_list = np.logical_and(pair[0]<=val_list,val_list<=pair[1]) # set mask if number of intersections is odd if n%2 == 1: if startdim[0] == 0: if DIM == 3: X1[bool_list,idx[1],idx[2]] = ma.masked X2[bool_list,idx[1],idx[2]] = ma.masked X3[bool_list,idx[1],idx[2]] = ma.masked else: X1[bool_list,idx[1]] = ma.masked X2[bool_list,idx[1]] = ma.masked elif startdim[0] == 1: if DIM == 3: X1[idx[0],bool_list,idx[2]] = ma.masked X2[idx[0],bool_list,idx[2]] = ma.masked X3[idx[0],bool_list,idx[2]] = ma.masked else: X1[idx[0],bool_list] = ma.masked X2[idx[0],bool_list] = ma.masked else: X1[idx[0],idx[1],bool_list] = ma.masked X2[idx[0],idx[1],bool_list] = ma.masked X3[idx[0],idx[1],bool_list] = ma.masked # Update current_pt_val to latest intersection current_pt_val = intersect_val # all points done. if DIM == 2: return ma.array([X1.flatten(), X2.flatten()]).T else: return ma.array([X1.flatten(), X2.flatten(), X3.flatten()]).T
@property def full_pos_history(self): '''History of self.positions, including present time.''' return [*self.pos_history, self.positions] @property def full_vel_history(self): '''History of self.positions, including present time.''' return [*self.vel_history, self.velocities] @property def full_props_history(self): '''History of self.props, including present time.''' if self.props_history is not None: return [*self.props_history, self.props] else: return None @property def N(self): '''Return the number of agents based on the number of entries in self.positions''' return self.positions.shape[0]
[docs] def save_data(self, path, name, pos_fmt='%.18e'): '''Save the full position history (with mask and time stamps) along with current velocity and acceleration to csv files. Save shared_props to a npz file and save props to json. The output format for the position csv is the same as for the save_pos_to_csv method. shared_props is saved as an npz file since it is likely to contain some mixture of scalars and arrays, but does not vary between the agents so is less likely to be loaded outside of Python. props is saved to json since it is likely to contain a variety of types of data, may need to be loaded outside of Python, and json will be human readable. props_history is not saved. Parameters ---------- path : str directory for storing data name : str prefix name for data files pos_fmt : str format, default='%.18e' format and precision for storing position, vel, and accel data See Also -------- save_pos_to_csv save_pos_to_vtk ''' path = Path(path) if not path.is_dir(): os.makedirs(path) self.save_pos_to_csv(str(path/name), pos_fmt, sv_vel=True, sv_accel=True) props_file = path/(name+'_props.json') self.props.to_json(str(props_file)) shared_props_file = path/(name+'_shared_props.npz') np.savez(str(shared_props_file), **self.shared_props)
[docs] def save_pos_to_csv(self, filename, fmt='%.18e', sv_vel=False, sv_accel=False): '''Save the full position history including present time, with mask and time stamps, to a csv. The output format for the position csv will be as follows: * The first row contains cycle and time information. The cycle is given, and then each time stamp is repeated D times, where D is the spatial dimension of the system. * Each subsequent row corresponds to a different agent in the Swarm. * Reading across the columns of an agent row: first, a boolean is given showing the state of the mask for that time step. Agents are masked when they have exited the domain. Then, the position vector is given as a group of D columns for the x, y, (and z) direction. Each set of 1+D columns then corresponds to a different cycle/time, as labeled by the values in the first row. The result is a csv that is N+1 by (1+D)*T, where N is the number of agents, D is the dimension of the system, and T is the number of times recorded. Parameters ---------- filename : str path/name of the file to save the data to fmt : str format, default='%.18e' fmt argument to be passed to numpy.savetxt for format and precision of numerical data sv_vel : bool, default=False whether or not to save the current time velocity data sv_accel : book, default=False whether or not to save the current time acceleration data See Also -------- save_data save_pos_to_vtk ''' if filename[-4:] != '.csv': filename = filename + '.csv' full_time = [*self.envir.time_history, self.envir.time] time_row = np.concatenate([[ii]+[jj]*self.positions.shape[1] for ii,jj in zip(range(len(full_time)), full_time)]) fmtlist = ['%u'] + [fmt]*self.positions.shape[1] np.savetxt(filename, np.vstack((time_row, np.column_stack([mat for pos in self.full_pos_history for mat in (ma.getmaskarray(pos[:,0]), pos.data)]))), fmt=fmtlist*len(full_time), delimiter=',') if sv_vel: vel_filename = filename[:-4] + '_vel.csv' np.savetxt(vel_filename, np.column_stack((self.velocities[:,0].mask, self.velocities.data)), fmt=fmtlist, delimiter=',') if sv_accel: accel_filename = filename[:-4] + '_accel.csv' np.savetxt(accel_filename, np.column_stack((self.accelerations[:,0].mask, self.accelerations.data)), fmt=fmtlist, delimiter=',')
[docs] def save_pos_to_vtk(self, path, name, all=True): '''Save position data to vtk as point data (PolyData). A different file will be created for each time step in the history, or just one file of the current positions will be created if the all argument is False. Parameters ---------- path : str location to save the data name : str name of dataset all : bool if True, save the entire history including the current time. If false, save only the current time. See Also -------- save_data save_pos_to_csv ''' if len(self.envir.L) == 2: DIM2 = True else: DIM2 = False if not all or len(self.envir.time_history) == 0: if DIM2: data = np.zeros((self.positions[~ma.getmaskarray(self.positions[:,0]),:].shape[0],3)) data[:,:2] = self.positions[~ma.getmaskarray(self.positions[:,0]),:] _dataio.write_vtk_point_data(path, name, data) else: _dataio.write_vtk_point_data(path, name, self.positions[~ma.getmaskarray(self.positions[:,0]),:]) else: for cyc, time in enumerate(self.envir.time_history): # ma.getmaskarray (not .mask) is required: an unmasked history # entry has .mask == nomask (scalar), and ~nomask would index in # a new axis rather than select the unmasked rows. unmasked = self.pos_history[cyc][ ~ma.getmaskarray(self.pos_history[cyc][:,0]),:] if DIM2: data = np.zeros((unmasked.shape[0],3)) data[:,:2] = unmasked _dataio.write_vtk_point_data(path, name, data, cycle=cyc, time=time) else: _dataio.write_vtk_point_data(path, name, unmasked, cycle=cyc, time=time) cyc = len(self.envir.time_history) if DIM2: data = np.zeros((self.positions[~ma.getmaskarray(self.positions[:,0]),:].shape[0],3)) data[:,:2] = self.positions[~ma.getmaskarray(self.positions[:,0]),:] _dataio.write_vtk_point_data(path, name, data, cycle=cyc, time=self.envir.time) else: _dataio.write_vtk_point_data(path, name, self.positions[~ma.getmaskarray(self.positions[:,0]),:], cycle=cyc, time=self.envir.time)
def _change_envir(self, envir): '''Manages a change from one Environment to another''' if self.positions.shape[1] != len(envir.L): if self.positions.shape[1] > len(envir.L): # Project swarm down to 2D self.positions = self.positions[:,:2] self.velocities = self.velocities[:,:2] self.accelerations = self.accelerations[:,:2] # Update known properties if 'mu' in self.shared_props: self.shared_props['mu'] = self.shared_props['mu'][:2] print('mu has been projected to 2D.') if 'mu' in self.props: for n,mu in enumerate(self.props['mu']): self.props['mu'][n] = mu[:2] print('mu has been projected to 2D.') if 'cov' in self.shared_props: self.shared_props['cov'] = self.shared_props['cov'][:2,:2] print('cov has been projected to 2D.') if 'cov' in self.props: for n,cov in enumerate(self.props['cov']): self.props['cov'][n] = cov[:2,:2] print('cov has been projected to 2D.') # warn about others other_props = [x for x in self.props if x not in ['mu', 'cov']] other_props += [x for x in self.shared_props if x not in ['mu', 'cov']] if len(other_props) > 0: print('WARNING: other properties {} were not projected.'.format(other_props)) else: raise RuntimeError("Swarm dimension smaller than new Environment dimension!"+ " Cannot scale up!") self.envir = envir envir.swarms.append(self)
[docs] def calc_re(self, u, diam=None): '''Calculate and return the Reynolds number as experienced by a swarm with characteristic length 'diam' in a fluid moving with velocity u. All other parameters will be pulled from the Environment's attributes. If diam is not specified, this method will look for it in the shared_props dictionary of this Swarm. Parameters ---------- u : float characteristic fluid speed, m/s diam : float, optional characteristic length scale of a single agent, m Returns ------- float Reynolds number ''' if diam is None: diam = self.shared_props['diam'] else: diam = diam if self.envir.rho is not None and self.envir.mu is not None and\ 'diam' in self.shared_props: return self.envir.rho*u*diam/self.envir.mu else: raise RuntimeError("Parameters necessary for Re calculation in Environment are undefined.")
[docs] def move(self, dt=1.0, ib_collisions='default', update_time=True, silent=False): '''Move all organisms in the swarm over one time step of length dt. DO NOT override this method when subclassing; override apply_agent_model instead!!! Performs a lot of utility tasks such as updating the positions and pos_history attributes, checking boundary conditions, and recalculating the current velocities and accelerations attributes. Parameters ---------- dt : float length of time step to move all agents ib_collisions : {None, 'default', 'sliding', 'sticky'} Boundary condition for immersed boundaries. If 'default', use the default found in self.ib_condition. If None, turn off all interaction with immersed boundaries. In sliding collisions, conduct recursive vector projection until the length of the original vector is exhausted. In sticky collisions, just return the point of intersection. update_time : bool, default=True whether or not to update the Environment's time by dt. Probably The only reason to change this to False is if there are multiple Swarm objects in the same Environment - then you want to update each before incrementing the time in the Environment. silent : bool, default=False If True, suppress printing the updated time. See Also -------- apply_agent_model : method that returns (but does not assign) the new positions of the swarm after the time step dt, which Planktos users override in order to specify their own, custom agent behavior. ''' if ib_collisions == 'default': ib_collisions = self.ib_condition # Save current position to put in the history old_positions = self.positions.copy() old_velocities = self.velocities.copy() # Conditionally save props to put in the history too if self.props_history is not None: old_props = self.props.copy() # Check that something is left in the domain to move, and move it. if not np.all(self.positions.mask): # Update positions, preserving mask self.positions[:,:] = self.apply_agent_model(dt) # Update history self.pos_history.append(old_positions) self.vel_history.append(old_velocities) if self.props_history is not None: self.props_history.append(old_props) # Update velocity and acceleration of swarm self.velocities[:,:] = (self.positions - old_positions)/dt self.accelerations[:,:] = (self.velocities - old_velocities)/dt # Apply boundary conditions (if anything was moving) if not np.all(self.positions.mask): self.apply_boundary_conditions(dt, ib_collisions=ib_collisions) self.after_move(dt) # Record new time if update_time: self.envir.time_history.append(self.envir.time) self.envir.time += dt if not silent: print('time = {}'.format(np.round(self.envir.time,11))) # Check for other Swarms in Environment and freeze them warned = False for s in self.envir.swarms: if s is not self and len(s.pos_history) < len(self.pos_history): s.pos_history.append(s.positions.copy()) if not warned: warnings.warn("Other Swarms in the Environment were not"+ " moved during this environmental timestep.\n"+ "If this was not your intent, call"+ " envir.move_swarms instead of this method"+ " to move all the swarms at once.", UserWarning) warned = True
[docs] def apply_agent_model(self, dt): '''Returns the new agent positions after a time step of dt. THIS IS THE METHOD TO OVERRIDE IF YOU WANT DIFFERENT MOVEMENT! Do not change the call signature. This method returns the new positions of all agents following a time step of length dt, whether due to behavior, drift, or anything else. It should not set the self.positions attribute. Similarly, self.velocities and self.accelerations will automatically be updated outside of this method using finite differences. The only attributes it should change is if there are any user-defined, time-varying agent properties that should be different after the time step (whether shared among all agents, and thus in self.shared_props, or individual to each agent, and thus in self.props). These can be altered directly or by using the add_prop method of this class. In this default implementation, movement is a random walk with drift as given by an Euler step solver of the appropriate SDE for this process. Drift is the local fluid velocity plus self.get_prop('mu') ('mu' is a shared_prop attribute), and the stochasticity is determined by the covariance matrix self.get_prop('cov') ('cov' is also a shared_prop attribute). Parameters ---------- dt : float length of time step Returns ------- ndarray : NxD array of new agent positions after a time step of dt given that the agents started at self.positions. N is the number of agents and D is the spatial dimension of the system. Notes ----- When writing code for this method, it can be helpful to make use of the ode generators and solvers in the planktos.motion module. Please see the documentation for the functions of this module for options. To access the current positions of each agent, use self.positions. self.positions is a masked, NxD array of agent positions where the mask refers to whether or not the agent has exited the domain. You do not want to accidently edit self.positions directly, so make sure that you get a value copy of self.positions using self.positions.copy() whenever that copy will be modified. Direct assignment of self.positions is by reference. Similarly,self.velocities and self.accelerations will provide initial velocities and accelerations for the time step for each agent respectively. Use .copy() as necessary and do not directly assign to these variables; they will be automatically updated later in the movement process. The get_fluid_drift method will return the fluid velocity at each agent location using interpolation. Call it once outside of a loop for speed. Similarly, the get_dudt method will return the time derivative of the fluid velocity at the location of each agent. The get_fluid_mag_gradient method will return the gradient of the magnitude of the fluid velocity at the location of each agent. See Also -------- get_prop : given an agent/Swarm property name, return the value(s). When accessing a property in Swarm.props, this can be preferred over accessing the property directly through the because instead of returning a pandas Series object (for a column in the DataFrame), it automatically converts to a numpy array first. add_prop : add a new agent/Swarm property or overwrite an old one get_fluid_drift : return the fluid velocity at each agent location get_dudt : return time derivative of fluid velocity at each agent get_fluid_mag_gradient : return the gradient of the magnitude of the fluid velocity at each agent ''' # default behavior for Euler_brownian_motion is dift due to mu property # plus local fluid velocity and diffusion given by cov property # specifying the covariance matrix. return motion.Euler_brownian_motion(self, dt)
[docs] def after_move(self, dt): '''This method is called after the Swarm's spatial positions have been updated via apply_agent_model, but before the environment time has been updated to the new time (prev time + dt). By default it does nothing, but you can override it in order to update agent properties or other things that should be set based on the state of the system at the end of the time step. For instance, you could use it to color agents that satisfy certain criteria, or have them switch state based upon their ending position. Parameters ---------- dt : float length of time step ''' pass
[docs] def get_prop(self, prop_name): '''Return the property requested as either a scalar (if shared) or a numpy array, ready for use in vectorized operations (left-most index specifies the agent). Parameters ---------- prop_name : str name of the property to return Returns ------- property : float or ndarray ''' if prop_name in self.props: if prop_name in self.shared_props: warnings.warn('Property {} exists '.format(prop_name)+ 'in both props and shared_props. Using the props version.') return np.stack(self.props[prop_name].array, axis=0).squeeze() elif prop_name in self.shared_props: return self.shared_props[prop_name] else: raise KeyError('Property {} not found.'.format(prop_name))
[docs] def add_prop(self, prop_name, value, shared=False): '''Method that will automatically delete any conflicting properties when adding a new one. Parameters ---------- prop_name : str name of the property to add value : any value to set the property at shared : bool if False, set as a property that applies to all agents in the swarm. if True, value should be an ndarray with a number of rows equal to the number of agents in the swarm, and the property will be set as a column in the Swarm.props DataFrame. ''' if shared: self.shared_props[prop_name] = value if prop_name in self.props: del self.props[prop_name] else: self.props[prop_name] = value if prop_name in self.shared_props: del self.shared_props[prop_name]
[docs] def get_fluid_drift(self, time=None, positions=None): '''Return fluid-based drift for all agents via interpolation. Current swarm position is used unless alternative positions are explicitly passed in. Any passed-in positions must be an NxD array where N is the number of points and D is the spatial dimension of the system. In the returned 2D ndarray, each row corresponds to an agent (in the same order as listed in self.positions) and each column is a dimension. Parameters ---------- time : float, optional time at which to return the fluid drift. defaults to the current environment time positions : ndarray, optional positions at which to return the fluid drift. defaults to the locations of the swarm agents, self.positions Returns ------- ndarray with shape NxD, where N is the number of agents and D the spatial dimension ''' # 3D? DIM3 = (len(self.envir.L) == 3) if positions is None: positions = self.positions # Interpolate fluid flow if self.envir.flow is None: return np.zeros(positions.shape) else: if time is None: return self.envir.interpolate_flow(positions, method='linear') else: return self.envir.interpolate_flow(positions, time=time, method='linear')
[docs] def get_dudt(self, time=None, positions=None): '''Return fluid time derivative at given positions via interpolation. Current swarm position is used unless alternative positions are explicitly passed in. In the returned 2D ndarray, each row corresponds to an agent (in the same order as listed in self.positions) and each column is a dimension. Parameters ---------- time : float, optional time at which to return the data. defaults to the current environment time positions : ndarray, optional positions at which to return the data. defaults to the locations of the swarm agents, self.positions Returns ------- ndarray with shape NxD, where N is the number of agents and D the spatial dimension ''' if positions is None: positions = self.positions return self.envir.interpolate_flow(positions, self.envir.dudt(time=time), method='linear')
[docs] def get_fluid_mag_gradient(self, positions=None): '''Return the gradient of the magnitude of the fluid velocity at all agent positions (or at provided positions) via linear interpolation of the gradient. The gradient is linearly interpolated from the fluid grid to the agent locations. The current environment time is always used, interpolated from data if necessary Parameters ---------- positions : ndarray, optional positions at which to return the data. defaults to the locations of the swarm agents, self.positions Returns ------- ndarray with shape NxD, where N is the number of agents and D the spatial dimension ''' if positions is None: positions = self.positions TIME_DEP = len(self.envir.flow[0].shape) != len(self.envir.L) flow_grad = None # If available, use the already calculuated gradient (if it's at the # correct time) if self.envir.mag_grad is not None: if not TIME_DEP: flow_grad = self.envir.mag_grad elif self.envir.mag_grad_time == self.envir.time: flow_grad = self.envir.mag_grad # Otherwise, calculate the gradient if flow_grad is None: self.envir.calculate_mag_gradient() flow_grad = self.envir.mag_grad # Interpolate the gradient at agent positions and return return self.envir.interpolate_flow(positions, flow_grad, method='linear')
[docs] def get_DuDt(self, time=None, positions=None): '''Return the material derivative with respect to time of the fluid velocity at all agent positions (or at provided positions) via linear interpolation of the material gradient. Current swarm position is used unless alternative positions are explicitly passed in. In the returned 2D ndarray, each row corresponds to an agent (in the same order as listed in self.positions) and each column is a dimension. Parameters ---------- time : float, optional time at which to return the data. defaults to the current environment time positions : ndarray, optional positions at which to return the data. defaults to the locations of the swarm agents, self.positions Returns ------- ndarray with shape NxD, where N is the number of agents and D the spatial dimension ''' if positions is None: positions = self.positions if time is None: time = self.envir.time # Calculate if necessary, otherwise use cached copy if self.envir.DuDt is None or self.envir.DuDt_time != time: self.envir.calculate_DuDt(time=time) # Interpolate at agent positions and return return self.envir.interpolate_flow(positions, self.envir.DuDt, method='linear')
[docs] def apply_boundary_conditions(self, dt, ib_collisions='sliding'): '''Apply boundary conditions to self.positions. There is no reason for a user to call this method directly; it is automatically called by self.move after updating agent positions according to the algorithm found in self.apply_agent_model. This method compares current agent positions (self.positions) to the previous agent positions (last entry in self.pos_history) in order to first: determine if the agent collided with any immersed structures and if so, to update self.positions using a sliding collision algorithm based on vector projection and second: assess whether or not any agents exited the domain and if so, update their positions based on the boundary conditions as specified in the enviornment class (self.envir). For noflux boundary conditions such sliding projections are really simple (since the domain is just a box), so we just do them directly/manually instead of folding them into the far more complex, recursive algorithm used for internal mesh structures. Periodic boundary conditions will recursively check for immersed boundary crossings after each crossing of the domain boundary. Parameters ---------- dt : float Length of current time step. Necessary for updating velocity and acceleration as a result of an IB collision. ib_collisions : {None, 'sliding' (default), 'sticky'} Type of interaction with immersed boundaries. If None, turn off all interaction with immersed boundaries. In sliding collisions, conduct recursive vector projection until the length of the original vector is exhausted. In sticky collisions, just return the point of intersection. ''' ##### Immersed mesh boundaries go first ##### if self.envir.ibmesh is not None and ib_collisions is not None: # if all agents are masked (exited the domain), skip all IB checks if np.all(self.positions.mask): return # otherwise, gather the indices of agents still in the domain if np.any(self.positions.mask): active = np.arange(self.N)[~ma.getmaskarray(self.positions[:,0])] else: active = np.arange(self.N) if len(active) > 0: # Precompute the shared mesh data ONCE for this time step (for a # moving mesh this avoids redundant per-agent interpolation). shared = self._precompute_ib_shared(dt, ib_collisions) # Build one small (idx, startpt, endpt) argument per active agent. # Cast to plain ndarrays so masked arrays are not handed to a # worker pool; .copy() matches the previous per-agent semantics. prev_pos = self.pos_history[-1] args = [(int(n), np.asarray(prev_pos[n,:]).copy(), np.asarray(self.positions[n,:]).copy()) for n in active] # Dispatch: a user-supplied pool (any object with .map) # parallelizes this embarrassingly-parallel work; otherwise the # builtin map runs it serially. Both call the same pure worker, # so the results are identical. worker = _ibc.make_ib_worker(shared) if self.pool is None: results = map(worker, args) else: results = self.pool.map(worker, args) # Apply results to swarm state in the parent process, keyed on the # returned idx so correctness does not depend on result order. for idx, result in results: self._apply_ib_result(idx, dt, result) ##### Environment Boundary Conditions ##### self._domain_BC_loop(dt, ib_collisions=ib_collisions)
####################################################################### ##### HELPER ROUTINES FOR BOUNDARY INTERACTIONS ##### ####################################################################### def _IBC_routine(self, idx, dt, startpt, endpt, ib_collisions='sliding'): '''Serial single-agent IB check (used by the domain-boundary recursion). Delegates to the same precompute/worker/apply helpers used by the (optionally parallelized) per-agent loop in apply_boundary_conditions, so this path stays numerically identical to that loop. Parameters ---------- idx : int Agent index dt : float Length of time step startpt : tuple Agent starting point endpt : tuple Agent ending point ib_collisions : {None, 'sliding' (default), 'sticky'} Type of interaction with immersed boundaries. If None, turn off all interaction with immersed boundaries. In sliding collisions, conduct recursive vector projection until the length of the original vector is exhausted. In sticky collisions, just return the point of intersection. ''' shared = self._precompute_ib_shared(dt, ib_collisions) worker = _ibc.make_ib_worker(shared) _, result = worker((idx, startpt, endpt)) self._apply_ib_result(idx, dt, result) def _precompute_ib_shared(self, dt, ib_collisions): '''Compute the immersed-boundary data shared by all agents this step. Returns a tuple consumed by _ibc.make_ib_worker. For a static mesh this is cheap; for a moving mesh it interpolates the mesh at the start and end of the step and computes the max inter-vertex distance and the max vertex displacement once for the whole swarm (previously recomputed per agent). Parameters ---------- dt : float Length of time step ib_collisions : {'sliding', 'sticky'} Type of interaction with immersed boundaries. Returns ------- tuple ('static', mesh, max_meshpt_dist, ib_collisions) or ('moving', start_mesh, end_mesh, max_meshpt_dist, max_mov, ib_collisions) ''' if self.envir.ibmesh.ndim == 3: # static mesh return ('static', self.envir.ibmesh, self.envir.max_meshpt_dist, ib_collisions) else: # moving mesh. first get necessary info about it start_mesh = self.envir.interpolate_temporal_mesh() end_mesh = self.envir.interpolate_temporal_mesh(time=self.envir.time+dt) # The maximum distance between meshpoints will change in time. # Calculate them here and pass it along. DIM = start_mesh.shape[1] max_meshpt_dist_start = np.concatenate(tuple( np.linalg.norm(start_mesh[:,ii,:]-start_mesh[:,(ii+1)%DIM,:], axis=1) for ii in range(DIM))).max() max_meshpt_dist_end = np.concatenate(tuple( np.linalg.norm(end_mesh[:,ii,:]-end_mesh[:,(ii+1)%DIM,:], axis=1) for ii in range(DIM))).max() max_meshpt_dist = np.max((max_meshpt_dist_start, max_meshpt_dist_end)) # Calculate the maximum distance a mesh vertex moved max_mov = np.concatenate(tuple( np.linalg.norm(end_mesh[:,ii,:]-start_mesh[:,ii,:], axis=1) for ii in range(DIM))).max() return ('moving', start_mesh, end_mesh, max_meshpt_dist, max_mov, ib_collisions) def _apply_ib_result(self, idx, dt, result): '''Apply one agent's IB collision result to swarm state. Parameters ---------- idx : int Agent index dt : float Length of time step result : tuple (new_loc, dx, mesh_idx) Output of apply_internal_static_BC / apply_internal_moving_BC. dx is None if the agent did not collide with the mesh during this step. ''' new_loc, dx, mesh_idx = result self.positions[idx] = new_loc if dx is not None: self.accelerations[idx] = (dx/dt - self.velocities[idx])/dt self.velocities[idx] = dx/dt self.ib_collision_idx[idx] = mesh_idx else: self.ib_collision_idx[idx] = -1 def _domain_BC_loop(self, dt, ib_collisions, idx_array=None): '''Loop over domain boundaries enforcing boundary conditions. Only agents in idx_array will be checked, or all unmasked agents if idx_array is not given. dt is necessary for moving immersed boundaries. ''' if idx_array is None: idx_array = np.arange(self.N) status_BC = np.zeros((len(idx_array),len(self.envir.L))) ##### Mark all domain exits! -1 for left, 1 for right ##### # skip masked entries if np.all(self.positions[idx_array].mask): return for dim in range(len(self.envir.L)): leftrow = np.logical_and(self.positions[idx_array,dim] < 0, ~self.positions[idx_array,dim].mask) rightrow = np.logical_and(self.positions[idx_array,dim] > self.envir.L[dim], ~self.positions[idx_array,dim].mask) status_BC[leftrow,dim] = -1 status_BC[rightrow,dim] = 1 ##### In cases where there are multiple exits, find the first ##### BC_mult_bool = np.sum(np.abs(status_BC), axis=1) > 1 if np.any(BC_mult_bool): # mark these for recursion mult_idx = idx_array[BC_mult_bool] # figure out which exit crossing occured first and treat that as the # only one. Use the velocity to parameterize the movement. s_array = np.zeros((len(BC_mult_bool),len(self.envir.L))) for dim in range(len(self.envir.L)): # get multiple crossing entries that have crossing in this dim dim_bool = np.logical_and(BC_mult_bool, status_BC[:,dim] != 0) #full length dim_idx = idx_array[dim_bool] #reduced length right_1 = 1*(status_BC[dim_bool,dim] == 1) #reduced length s_array[dim_bool,dim] = (self.positions[dim_idx,dim]-right_1* self.envir.L[dim])/self.velocities[dim_idx,dim] # for each row in s_array, the dimension with the largest value is # now the one crossed first. first_dim = np.argmax(s_array[BC_mult_bool,:], axis=1) # remove the other crossings from the status_BC array status_vals = status_BC[BC_mult_bool, first_dim] status_BC[BC_mult_bool,:] = 0 status_BC[BC_mult_bool, first_dim] = status_vals BC_bool = np.sum(np.abs(status_BC), axis=1) != 0 BC_bool_check = np.sum(np.abs(status_BC), axis=1) == 1 assert np.all(BC_bool == BC_bool_check), "Some multi crossings left over...?" else: mult_idx = None BC_bool = np.sum(np.abs(status_BC), axis=1) == 1 if not np.any(BC_bool): return ##### Now apply BC to the first/only boundary crossing ##### for dim, bndry in enumerate(self.envir.bndry): # Check for 3D if dim == 2 and len(self.envir.L) == 2: # Ignore last bndry condition; 2D environment. break ### Left boundary ### left_bool = np.logical_and(BC_bool, status_BC[:,dim]<0) left_idx = idx_array[left_bool] if len(left_idx) > 0: if bndry[0] == 'zero': # mask anything that exited on the left. self.positions[left_idx, :] = ma.masked self.velocities[left_idx, :] = ma.masked self.accelerations[left_idx, :] = ma.masked # no further BC checks are made: masked entries are skipped elif bndry[0] == 'noflux': # agent slides along flat boundary. pos/vel/accel in dir of # boundary will be zero. # additional IB crossings are possible, so first find # point of intersection with the boundary to enable this # check. if self.envir.ibmesh is not None and ib_collisions is not None: s_array = (self.positions[left_idx,dim]-0)/ \ self.velocities[left_idx,dim] startpts = self.positions[left_idx,:] - (np.tile(s_array, (self.velocities.shape[1],1)).T* self.velocities[left_idx,:]) # now update pos/vel/accel self.positions[left_idx, dim] = 0 self.velocities[left_idx, dim] = 0 self.accelerations[left_idx, dim] = 0 # now check for IB crossings. However, due to potential # complex interactions with the noflux boundary, enforce # sticky ib collisions in all cases. if self.envir.ibmesh is not None and ib_collisions is not None: for n, idx in enumerate(left_idx): startpt = startpts[n] endpt = self.positions[idx,:].copy() self._IBC_routine(idx, dt, startpt, endpt, 'sticky') # further domain crossings remain possible elif bndry[0] == 'periodic': # wrap everything exiting on the left to the right self.positions[left_idx, dim] =\ np.mod(self.positions[left_idx, dim],self.envir.L[dim]) # check for IB crossings. first, get the point of re-entry # using the velocity. if self.envir.ibmesh is not None and ib_collisions is not None: s_array = (self.positions[left_idx,dim]- self.envir.L[dim])/self.velocities[left_idx,dim] for n, idx in enumerate(left_idx): startpt = self.positions[idx,:] - \ s_array[n]*self.velocities[idx,:] endpt = self.positions[idx,:].copy() self._IBC_routine(idx, dt, startpt, endpt, ib_collisions) # further domain crossings are possible. if this happens, # velocity should be the same as original velocity b/c # immersed boundaries do not intersect with domain bndry, # so agent has slid off with original velocity heading. else: raise NameError ### Right boundary ### right_bool = np.logical_and(BC_bool, status_BC[:,dim]>0) right_idx = idx_array[right_bool] if len(right_idx) > 0: if bndry[1] == 'zero': # mask everything exiting on the right self.positions[right_idx, :] = ma.masked self.velocities[right_idx, :] = ma.masked self.accelerations[right_idx, :] = ma.masked # no further BC checks are made: masked entries are skipped elif bndry[1] == 'noflux': # agent slides along flat boundary. pos/vel/accel in dir of # boundary is zero. # additional IB crossings are possible, so first find # point of intersection with the boundary to enable this # check. if self.envir.ibmesh is not None and ib_collisions is not None: s_array = (self.positions[right_idx,dim]- self.envir.L[dim])/self.velocities[right_idx,dim] startpts = self.positions[right_idx,:] - (np.tile(s_array, (self.velocities.shape[1],1)).T* self.velocities[right_idx,:]) # now update pos/vel/accel self.positions[right_idx, dim] = self.envir.L[dim] self.velocities[right_idx, dim] = 0 self.accelerations[right_idx, dim] = 0 # now check for IB crossings. However, due to potential # complex interactions with the noflux boundary, enforce # sticky ib collisions in all cases. if self.envir.ibmesh is not None and ib_collisions is not None: for n, idx in enumerate(right_idx): startpt = startpts[n] endpt = self.positions[idx,:].copy() self._IBC_routine(idx, dt, startpt, endpt, 'sticky') # further domain crossings remain possible elif bndry[1] == 'periodic': # wrap everything exiting on the right to the left self.positions[right_idx, dim] =\ np.mod(self.positions[right_idx, dim],self.envir.L[dim]) # check for IB crossings. first, get the point of re-entry # using the velocity. if self.envir.ibmesh is not None and ib_collisions is not None: s_array = (self.positions[right_idx,dim]-0)/ \ self.velocities[right_idx,dim] for n, idx in enumerate(right_idx): startpt = self.positions[idx,:] - \ s_array[n]*self.velocities[idx,:] endpt = self.positions[idx,:].copy() self._IBC_routine(idx, dt, startpt, endpt, ib_collisions) # further domain crossings are possible. if this happens, # velocity should be the same as original velocity b/c # immersed boundaries do not intersect with domain bndry, # so agent has slid off with original velocity heading. else: raise NameError ##### All BC applied to first exit. Conduct recursion if necessary ##### if mult_idx is not None: self._domain_BC_loop(dt, ib_collisions, idx_array=mult_idx) ####################################################################### ##### PLOTTING METHODS ##### ####################################################################### def _calc_basic_stats(self, DIM3, t_indx=None): ''' Return basic stats about % agents remaining, fluid velocity, and agent velocity for plot printing. Parameters ---------- DIM3 : bool indicates the dimension of the domain (True for 3D) t_indx : int, optional The time index for pos_history or None for current time Returns ------- perc_left : float percentage of agents left within the domain avg_spd : float average fluid speed max_spd : float maximum fluid speed avg_spd_x : float average x-component of fluid velocity avg_spd_y : float average y-component of fluid velocity avg_spd_z : float, 3D only average z-component of fluid velocity avg_swrm_vel : array average agent velocity ''' # get % of agents left in domain if t_indx is None: num_left = self.positions[:,0].compressed().size else: num_left = self.pos_history[t_indx][:,0].compressed().size if len(self.pos_history) > 0: num_orig = self.pos_history[0][:,0].compressed().size else: num_orig = num_left perc_left = 100*num_left/num_orig # get average swarm velocity if t_indx is None: if np.all(self.velocities.mask): vel_data = np.zeros(self.velocities.shape) elif np.any(self.velocities.mask): vel_data = self.velocities[~self.velocities.mask.any(axis=1)] else: vel_data = self.velocities avg_swrm_vel = vel_data.mean(axis=0) elif t_indx == 0: avg_swrm_vel = np.zeros(len(self.envir.L)) else: vel_data = (self.pos_history[t_indx] - self.pos_history[t_indx-1])/( self.envir.time_history[t_indx]-self.envir.time_history[t_indx-1]) avg_swrm_vel = vel_data.mean(axis=0) if self.envir.flow is None and not DIM3: return perc_left, 0, 0, 0, 0, avg_swrm_vel elif self.envir.flow is None and DIM3: return perc_left, 0, 0, 0, 0, 0, avg_swrm_vel if not DIM3: # 2D flow # get current fluid flow info if len(self.envir.flow[0].shape) == 2: # temporally constant flow flow = self.envir.flow else: # temporally changing flow flow = self.envir.interpolate_temporal_flow(t_index=t_indx) flow_spd = np.sqrt(flow[0]**2 + flow[1]**2) avg_spd_x = flow[0].mean() avg_spd_y = flow[1].mean() avg_spd = flow_spd.mean() max_spd = flow_spd.max() return perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_swrm_vel else: # 3D flow if len(self.envir.flow[0].shape) == 3: # temporally constant flow flow = self.envir.flow else: # temporally changing flow flow = self.envir.interpolate_temporal_flow(t_indx) flow_spd = np.sqrt(flow[0]**2 + flow[1]**2 + flow[2]**2) avg_spd_x = flow[0].mean() avg_spd_y = flow[1].mean() avg_spd_z = flow[2].mean() avg_spd = flow_spd.mean() max_spd = flow_spd.max() return perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_spd_z, avg_swrm_vel
[docs] def plot(self, t=None, filename=None, blocking=True, dist='density', fluid=None, clip=None, figsize=None, circ_rad=0.25, plot_heading=True, save_kwargs=None, azim=None, elev=None): '''Plot the position of the swarm at time t, or at the current time if no time is supplied. The actual time plotted will depend on the history of movement steps; the closest entry in Environment.time_history will be shown without interpolation. Parameters ---------- t : float, optional time to plot. if None (default), the current time. filename : str, optional file name to save image as. Image will not be shown, only saved. blocking : bool, default True whether the plot should block execution or not dist : {'density' (default), 'cov', float, 'hist'} whether to plot Gaussian kernel density estimation or histogram. Options are: * 'density': plot Gaussian KDE using Scotts Factor from scipy.stats.gaussian_kde * 'cov': use the variance in each direction from self.shared_props['cov'] to plot Gaussian KDE * float: plot Gaussian KDE using the given bandwidth factor to multiply the KDE variance by * 'hist': plot histogram fluid : {'vort', 'quiver'}, optional Plot info on the fluid in the background. 2D only! If None, don't plot anything related to the fluid. Options are: * 'vort': plot vorticity in the background * 'quiver': quiver plot of fluid velocity in the background clip : float, optional if plotting vorticity, specifies the clip value for pseudocolor. this value is used for both negative and positive vorticity. figsize : tuple of length 2, optional figure size in inches, (width, height). default is a heurstic that works... most of the time? circ_rad : float, default=0.25 plotting size of the agent circles (in 2D only) plot_heading : bool, default=True whether or not to plot the direction (heading) of each agent as a small line. save_kwargs : dict of keyword arguments, optional keys must be valid strings that match keyword arguments for the matplotlib savefig function. These arguments will be passed to savefig assuming that a filename has been specified. azim : float, optional In 3D plots, the azimuthal viewing angle. Defaults to -60. elev : float, optional In 3D plots, the elevation viewing angle. Defaults to 30. ''' if t is not None and len(self.envir.time_history) != 0: loc = np.searchsorted(self.envir.time_history, t) if loc == len(self.envir.time_history): if (t-self.envir.time_history[-1]) > (self.envir.time-t): loc = None else: loc = -1 elif t < self.envir.time_history[loc]: if (self.envir.time_history[loc]-t) > (t-self.envir.time_history[loc-1]): loc -= 1 else: loc = None # get time and positions if loc is None: time = self.envir.time positions = self.positions else: time = self.envir.time_history[loc] positions = self.pos_history[loc] if len(self.envir.L) == 2: # 2D plot if figsize is None: aspectratio = self.envir.L[0]/self.envir.L[1] if aspectratio > 1: x_length = np.min((6*aspectratio,12)) y_length = 6 elif aspectratio < 1: x_length = 6 y_length = np.min((6/aspectratio,8)) else: x_length = 6 y_length = 6 fig = plt.figure(figsize=(x_length,y_length)) else: fig = plt.figure(figsize=figsize) ax, mesh_col, axHistx, axHisty = self.envir._plot_setup(fig) if figsize is None: # some final adjustments in a particular case if x_length == 12: ax_pos = ax.get_position().get_points() axHx_pos = np.array(axHistx.get_position().get_points()) axHy_pos = np.array(axHisty.get_position().get_points()) if ax_pos[0,1] > 0.1: extra = 2*(ax_pos[0,1] - 0.1)*y_length fig.set_size_inches(x_length,y_length-extra) prop = (y_length-extra/4)/y_length prop_wdth = (y_length-extra/2)/y_length prop_len = (y_length-extra)/y_length axHistx.set_position([axHx_pos[0,0],axHx_pos[0,1]*prop, axHx_pos[1,0]-axHx_pos[0,0], (axHx_pos[1,1]-axHx_pos[0,1])/prop_wdth]) axHisty.set_position([axHy_pos[0,0],axHy_pos[0,1]*prop_len, axHy_pos[1,0]-axHy_pos[0,0], (axHy_pos[1,1]-axHy_pos[0,1])/prop_len]) # fluid visualization if fluid == 'vort' and self.envir.flow is not None: vort = self.envir.get_2D_vorticity(t_indx=loc) if clip is not None: norm = colors.Normalize(-abs(clip),abs(clip),clip=True) else: norm = None ax.pcolormesh(self.envir.flow_points[0], self.envir.flow_points[1], vort.T, shading='gouraud', cmap='RdBu', norm=norm, alpha=0.9, antialiased=True) elif fluid == 'quiver' and self.envir.flow is not None: # get dimensions of axis to estimate a decent quiver density ax_pos = ax.get_position().get_points() fig_size = fig.get_size_inches() wdth_inch = fig_size[0]*(ax_pos[1,0]-ax_pos[0,0]) height_inch = fig_size[1]*(ax_pos[1,1]-ax_pos[0,1]) # use about 4.15/inch density of arrows x_num = round(4.15*wdth_inch) y_num = round(4.15*height_inch) M = int(round(len(self.envir.flow_points[0])/x_num)) N = int(round(len(self.envir.flow_points[1])/y_num)) # get worse case max velocity vector for scaling max_u = self.envir.flow[0].max(); max_v = self.envir.flow[1].max() max_mag = np.linalg.norm(np.array([max_u,max_v])) if len(self.envir.flow[0].shape) > 2: flow = self.envir.interpolate_temporal_flow(t_index=loc) else: flow = self.envir.flow ax.quiver(self.envir.flow_points[0][::M], self.envir.flow_points[1][::N], flow[0][::M,::N].T, flow[1][::M,::N].T, scale=max_mag*5, alpha=0.2) # ibmesh (if moving and not a current time - otherwise, done already) if mesh_col is not None and self.envir.ibmesh.ndim == 4 and t is not None: ibmesh = self.interpolate_temporal_mesh(time=t) mesh_col.set_segments(ibmesh) # Create marker headings to add to scatter paths = [] circle = mPath.circle(radius=circ_rad) if plot_heading: line_codes = np.array([mPath.MOVETO, mPath.LINETO]) codes = np.concatenate([circle.codes, line_codes]) if 'angle' in self.props: angles = self.props['angle'] else: # this is defined even for (0,0) by convention angles = np.arctan2(self.velocities[:,1], self.velocities[:,0]) for angle in angles: if ma.is_masked(angle): paths.append(circle) else: # make the heading marker stick out by one diameter line_verts = np.array([[0,0],[circ_rad*3*np.cos(angle), circ_rad*3*np.sin(angle)]]) # combine the circle and line vertices verts = np.concatenate([circle.vertices, line_verts]) # append to path list paths.append(mPath(verts, codes)) else: paths.append(circle) # scatter plot if 'color' in self.props: if self.props_history is not None and loc is not None: # Get color from history color = self.props_history[loc]['color'] else: color = self.props['color'] sc = ax.scatter(positions[:,0], positions[:,1], label=self.shared_props['name'], c=color) else: sc = ax.scatter(positions[:,0], positions[:,1], label=self.shared_props['name'], color=self.shared_props['color']) sc.set_paths(paths) # time text ax.text(0.02, 0.95, 'time = {:.2f}'.format(time), transform=ax.transAxes, fontsize=12) # textual info perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_swrm_vel = \ self._calc_basic_stats(DIM3=False, t_indx=loc) plt.figtext(0.77, 0.77, '{:.1f}% remain\n'.format(perc_left)+ '\n ------ Info ------\n'+ r'Fluid $v_{max}$'+': {:.1g} {}/s\n'.format(max_spd, self.envir.units)+ r'Fluid $\overline{v}$'+': {:.1g} {}/s\n'.format(avg_spd, self.envir.units)+ r'Agent $\overline{v}$'+': {:.1g} {}/s\n'.format(np.linalg.norm(avg_swrm_vel), self.envir.units), fontsize=10) axHistx.text(0.01, 0.98, r'Fluid $\overline{v}_x$'+': {:.2g} \n'.format(avg_spd_x)+ r'Agent $\overline{v}_x$'+': {:.2g}'.format(avg_swrm_vel[0]), transform=axHistx.transAxes, verticalalignment='top', fontsize=10) axHisty.text(0.02, 0.99, r'Fluid $\overline{v}_y$'+': {:.2g} \n'.format(avg_spd_y)+ r'Agent $\overline{v}_y$'+': {:.2g}'.format(avg_swrm_vel[1]), transform=axHisty.transAxes, verticalalignment='top', fontsize=10) if dist == 'hist': # histograms bins_x = np.linspace(0, self.envir.L[0], 26) bins_y = np.linspace(0, self.envir.L[1], 26) axHistx.hist(positions[:,0].compressed(), bins=bins_x) axHisty.hist(positions[:,1].compressed(), bins=bins_y, orientation='horizontal') else: # Gaussian Kernel Density Estimation if dist == 'cov': fac_x = self.shared_props['cov'][0,0] fac_y = self.shared_props['cov'][1,1] else: try: fac_x = float(dist) fac_y = fac_x except: fac_x = None fac_y = None xmesh = np.linspace(0, self.envir.L[0], 1000) ymesh = np.linspace(0, self.envir.L[1], 1000) # deal with point sources pos_x = positions[:,0].compressed() pos_y = positions[:,1].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 axHistx.plot(xmesh, x_density) axHisty.plot(y_density, ymesh) axHistx.get_yaxis().set_ticks([]) axHisty.get_xaxis().set_ticks([]) if np.max(x_density) != 0: axHistx.set_ylim(bottom=0, top=np.max(x_density)) else: axHistx.set_ylim(bottom=0) if np.max(y_density) != 0: axHisty.set_xlim(left=0, right=np.max(y_density)) else: axHisty.set_xlim(left=0) else: # 3D plot if figsize is None: fig = plt.figure(figsize=(10,5)) else: fig = plt.figure(figsize=figsize) ax, mesh_col, axHistx, axHisty, axHistz = self.envir._plot_setup(fig) if azim is not None or elev is not None: ax.view_init(elev, azim) # scatter plot and time text if 'color' in self.props: if self.props_history is not None and loc is not None: # Get color from history color = self.props_history[loc]['color'] else: color = self.props['color'] ax.scatter(positions[:,0], positions[:,1], positions[:,2], label=self.shared_props['name'], c=color) else: ax.scatter(positions[:,0], positions[:,1], positions[:,2], label=self.shared_props['name'], color=self.shared_props['color']) ax.text2D(0.02, 1, 'time = {:.2f}'.format(time), transform=ax.transAxes, verticalalignment='top', fontsize=12) # textual info perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_spd_z, avg_swrm_vel = \ self._calc_basic_stats(DIM3=True, t_indx=loc) ax.text2D(0.75, 0.9, r'Fluid $v_{max}$'+': {:.2g} {}/s\n'.format(max_spd, self.envir.units)+ r'Fluid $v_{avg}$'+': {:.2g} {}/s\n'.format(avg_spd, self.envir.units)+ r'Agent $v_{avg}$'+': {:.2g} {}/s'.format(np.linalg.norm(avg_swrm_vel), self.envir.units), transform=ax.transAxes, horizontalalignment='left', fontsize=10) ax.text2D(0.02, 0, '{:.1f}% remain\n'.format(perc_left), transform=fig.transFigure, fontsize=10) axHistx.text(0.02, 0.98, r'Fluid $\overline{v}_x$'+': {:.2g} {}/s\n'.format(avg_spd_x, self.envir.units)+ r'Agent $\overline{v}_x$'+': {:.2g} {}/s'.format(avg_swrm_vel[0], self.envir.units), transform=axHistx.transAxes, verticalalignment='top', fontsize=10) axHisty.text(0.02, 0.98, r'Fluid $\overline{v}_y$'+': {:.2g} {}/s\n'.format(avg_spd_y, self.envir.units)+ r'Agent $\overline{v}_y$'+': {:.2g} {}/s'.format(avg_swrm_vel[1], self.envir.units), transform=axHisty.transAxes, verticalalignment='top', fontsize=10) axHistz.text(0.02, 0.98, r'Fluid $\overline{v}_z$'+': {:.2g} {}/s\n'.format(avg_spd_z, self.envir.units)+ r'Agent $\overline{v}_z$'+': {:.2g} {}/s'.format(avg_swrm_vel[2], self.envir.units), transform=axHistz.transAxes, verticalalignment='top', fontsize=10) if dist == 'hist': # histograms bins_x = np.linspace(0, self.envir.L[0], 26) bins_y = np.linspace(0, self.envir.L[1], 26) bins_z = np.linspace(0, self.envir.L[2], 26) axHistx.hist(positions[:,0].compressed(), bins=bins_x, alpha=0.8) axHisty.hist(positions[:,1].compressed(), bins=bins_y, alpha=0.8) axHistz.hist(positions[:,2].compressed(), bins=bins_z, alpha=0.8) else: # Gaussian Kernel Density Estimation if dist == 'cov': fac_x = self.shared_props['cov'][0,0] fac_y = self.shared_props['cov'][1,1] fac_z = self.shared_props['cov'][2,2] else: try: fac_x = float(dist) fac_y = fac_x fac_z = fac_x except: fac_x = None fac_y = None fac_z = None xmesh = np.linspace(0, self.envir.L[0], 1000) ymesh = np.linspace(0, self.envir.L[1], 1000) zmesh = np.linspace(0, self.envir.L[2], 1000) # deal with point sources pos_x = positions[:,0].compressed() pos_y = positions[:,1].compressed() pos_z = positions[:,2].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 try: if len(pos_z) > 1: z_density = stats.gaussian_kde(pos_z, fac_z) z_density = z_density(zmesh) elif len(pos_z) == 1: raise np.linalg.LinAlgError else: z_density = np.zeros_like(zmesh) except np.linalg.LinAlgError: idz = (np.abs(zmesh - pos_z[0])).argmin() z_density = np.zeros_like(zmesh); z_density[idz] = 1 axHistx.plot(xmesh, x_density) axHisty.plot(ymesh, y_density) axHistz.plot(zmesh, z_density) axHistx.get_yaxis().set_ticks([]) axHisty.get_yaxis().set_ticks([]) axHistz.get_yaxis().set_ticks([]) if np.max(x_density) != 0: axHistx.set_ylim(bottom=0, top=np.max(x_density)) else: axHistx.set_ylim(bottom=0) if np.max(y_density) != 0: axHisty.set_ylim(bottom=0, top=np.max(y_density)) else: axHisty.set_ylim(bottom=0) if np.max(z_density) != 0: axHistz.set_ylim(bottom=0, top=np.max(z_density)) else: axHistz.set_ylim(bottom=0) # show the plot if filename is None: plt.show(block=blocking) else: if save_kwargs is not None: plt.savefig(filename, **save_kwargs) else: plt.savefig(filename)
[docs] def plot_all(self, movie_filename=None, frames=None, downsamp=None, fps=10, dist='density', fluid=None, clip=None, figsize=None, circ_rad=0.25, plot_heading=True, save_kwargs=None, writer_kwargs=None, azim=None, elev=None): ''' Plot the history of the swarm's movement, incl. current time in successively updating plots or saved as a movie file. A movie file is created if movie_filename is specified. Agent colors will be read from the 'color' column of props if it exists; otherwise it will default to the color attribute of the Swarm. Parameters ---------- movie_filename : string, optional file name to save movie as. file extension will determine the type of file saved. frames : iterable of integers, optional. If None, plot the entire history of the swarm's movement including the present time, with each step being a frame in the animation. If an iterable, plot only the time steps of the swarm as indexed by the iterable (note, this is an interable of the time step indices, not the time in seconds at those time steps!). downsamp : iterable of int or int, optional If None, do not downsample the agents - plot them all. If an integer, plot only the first n agents (equivalent to range(downsamp)). If an iterable, plot only the agents specified. In all cases, statistics are reported for the TOTAL population, both shown and unshown. This includes the histograms/KDE plots. fps : int, default=10 Frames per second, only used if saving a movie to file. Make sure this is at least as big as 1/dt, where dt is the time interval between frames! dist : {'density' (default), 'cov', float, 'hist'} whether to plot Gaussian kernel density estimation or histogram. Options are: * 'density': plot Gaussian KDE using Scotts Factor from scipy.stats.gaussian_kde * 'cov': use the variance in each direction from self.shared_props['cov'] to plot Gaussian KDE * float: plot Gaussian KDE using the given bandwidth factor to multiply the KDE variance by * 'hist': plot histogram fluid : {'vort', 'quiver'}, optional Plot info on the fluid in the background. 2D only! If None, don't plot anything related to the fluid. Options are: * 'vort': plot vorticity in the background * 'quiver': quiver plot of fluid velocity in the background clip : float, optional if plotting vorticity, specifies the clip value for pseudocolor. this value is used for both negative and positive vorticity. figsize : tuple of length 2, optional figure size in inches, (width, height). default is a heurstic that works... most of the time? circ_rad : float, default=0.25 plotting size of the agent circles (in 2D only) plot_heading : bool, default=True whether or not to plot the direction (heading) of each agent as a small line. save_kwargs : dict of keyword arguments, optional keys must be valid strings that match keyword arguments for the matplotlib animation.FFMpegWriter object. These arguments will be used in the writer object initiation save assuming that a movie_filename has been specified. Otherwise, defaults are the passed in fps and metadata=dict(artist='Christopher Strickland')). writer_kwargs : dict of keyword arguments, optional keys must be valid strings that match keyword arguments for a matplotlib azim : float, optional In 3D plots, the azimuthal viewing angle. Defaults to -60. elev : float, optional In 3D plots, the elevation viewing angle. Defaults to 30. ''' if len(self.envir.time_history) == 0: print('No position history! Plotting current position...') self.plot() return if movie_filename is not None: print("Creating video... this could take a long time!") DIM3 = (len(self.envir.L) == 3) if frames is None: n0 = 0 else: n0 = frames[0] if isinstance(downsamp, int): downsamp = range(downsamp) if not DIM3: ### 2D setup ### if figsize is None: aspectratio = self.envir.L[0]/self.envir.L[1] if aspectratio > 1: x_length = np.min((6*aspectratio,12)) y_length = 6 elif aspectratio < 1: x_length = 6 y_length = np.min((6/aspectratio,8)) else: x_length = 6 y_length = 6 fig = plt.figure(figsize=(x_length,y_length)) else: fig = plt.figure(figsize=figsize) ax, mesh_col, axHistx, axHisty = self.envir._plot_setup(fig) if figsize is None: # some final adjustments in a particular case if x_length == 12: ax_pos = ax.get_position().get_points() axHx_pos = np.array(axHistx.get_position().get_points()) axHy_pos = np.array(axHisty.get_position().get_points()) if ax_pos[0,1] > 0.1: extra = 2*(ax_pos[0,1] - 0.1)*y_length fig.set_size_inches(x_length,y_length-extra) prop = (y_length-extra/4)/y_length prop_wdth = (y_length-extra/2)/y_length prop_len = (y_length-extra)/y_length axHistx.set_position([axHx_pos[0,0],axHx_pos[0,1]*prop, axHx_pos[1,0]-axHx_pos[0,0], (axHx_pos[1,1]-axHx_pos[0,1])/prop_wdth]) axHisty.set_position([axHy_pos[0,0],axHy_pos[0,1]*prop_len, axHy_pos[1,0]-axHy_pos[0,0], (axHy_pos[1,1]-axHy_pos[0,1])/prop_len]) # fluid visualization if fluid == 'vort' and self.envir.flow is not None: if clip is not None: norm = colors.Normalize(-abs(clip),abs(clip),clip=True) else: norm = None fld = ax.pcolormesh([self.envir.flow_points[0]], self.envir.flow_points[1], np.zeros(self.envir.flow[0].shape[1:]).T, shading='gouraud', cmap='RdBu', norm=norm, alpha=0.9) elif fluid == 'quiver' and self.envir.flow is not None: # get dimensions of axis to estimate a decent quiver density ax_pos = ax.get_position().get_points() fig_size = fig.get_size_inches() wdth_inch = fig_size[0]*(ax_pos[1,0]-ax_pos[0,0]) height_inch = fig_size[1]*(ax_pos[1,1]-ax_pos[0,1]) # use about 4.15/inch density of arrows x_num = round(4.15*wdth_inch) y_num = round(4.15*height_inch) M = round(len(self.envir.flow_points[0])/x_num) N = round(len(self.envir.flow_points[1])/y_num) # get worse case max velocity vector for scaling max_u = self.envir.flow[0].max(); max_v = self.envir.flow[1].max() max_mag = np.linalg.norm(np.array([max_u,max_v])) x_pts = self.envir.flow_points[0][::M] y_pts = self.envir.flow_points[1][::N] fld = ax.quiver(x_pts, y_pts, np.zeros((len(y_pts),len(x_pts))), np.zeros((len(y_pts),len(x_pts))), scale=max_mag*5, alpha=0.2) # scatter plot scat = ax.scatter([], [], label=self.shared_props['name'], c=self.shared_props['color']) # set up marker headings to be added to the scatter plots circle = mPath.circle(radius=circ_rad) line_codes = np.array([mPath.MOVETO, mPath.LINETO]) codes = np.concatenate([circle.codes, line_codes]) # textual info time_text = ax.text(0.02, 0.95, '', transform=ax.transAxes, fontsize=12) perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_swrm_vel = \ self._calc_basic_stats(DIM3=False, t_indx=n0) axStats = plt.axes([0.77, 0.77, 0.25, 0.2], frameon=False) axStats.set_axis_off() stats_text = axStats.text(0,1, '{:.1f}% remain\n'.format(perc_left)+ '\n ------ Info ------\n'+ r'Fluid $v_{max}$'+': {:.1g} {}/s\n'.format(max_spd, self.envir.units)+ r'Fluid $\overline{v}$'+': {:.1g} {}/s\n'.format(avg_spd, self.envir.units)+ r'Agent $\overline{v}$'+': {:.1g} {}/s\n'.format(np.linalg.norm(avg_swrm_vel), self.envir.units), fontsize=10, transform=axStats.transAxes, verticalalignment='top') x_text = axHistx.text(0.01, 0.98, r'Fluid $\overline{v}_x$'+': {:.2g} \n'.format(avg_spd_x)+ r'Agent $\overline{v}_x$'+': {:.2g}'.format(avg_swrm_vel[0]), transform=axHistx.transAxes, verticalalignment='top', fontsize=10) y_text = axHisty.text(0.02, 0.99, r'Fluid $\overline{v}_y$'+': {:.2g} \n'.format(avg_spd_y)+ r'Agent $\overline{v}_y$'+': {:.2g}'.format(avg_swrm_vel[1]), transform=axHisty.transAxes, verticalalignment='top', fontsize=10) if dist == 'hist': # histograms data_x = self.pos_history[n0][:,0].compressed() data_y = self.pos_history[n0][:,1].compressed() bins_x = np.linspace(0, self.envir.L[0], 26) bins_y = np.linspace(0, self.envir.L[1], 26) n_x, bins_x, patches_x = axHistx.hist(data_x, bins=bins_x) n_y, bins_y, patches_y = axHisty.hist(data_y, bins=bins_y, orientation='horizontal') else: # Gaussian Kernel Density Estimation if dist == 'cov': fac_x = self.shared_props['cov'][0,0] fac_y = self.shared_props['cov'][1,1] else: try: fac_x = float(dist) fac_y = fac_x except: # estimate covariance from Scotts Factor. HOWEVER: this # estimation breaks if IC is point source. fac_x = None fac_y = None xmesh = np.linspace(0, self.envir.L[0], 1000) ymesh = np.linspace(0, self.envir.L[1], 1000) # deal with point sources pos_x = self.pos_history[n0][:,0].compressed() pos_y = self.pos_history[n0][:,1].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 xdens_plt, = axHistx.plot(xmesh, x_density) ydens_plt, = axHisty.plot(y_density, ymesh) axHistx.get_yaxis().set_ticks([]) axHisty.get_xaxis().set_ticks([]) if np.max(xdens_plt.get_ydata()) != 0: axHistx.set_ylim(bottom=0, top=np.max(xdens_plt.get_ydata())) else: axHistx.set_ylim(bottom=0) if np.max(ydens_plt.get_xdata()) != 0: axHisty.set_xlim(left=0, right=np.max(ydens_plt.get_xdata())) else: axHisty.set_xlim(left=0) else: ### 3D setup ### if figsize is None: fig = plt.figure(figsize=(10,5)) else: fig = plt.figure(figsize=figsize) ax, mesh_col, axHistx, axHisty, axHistz = self.envir._plot_setup(fig) if azim is not None or elev is not None: ax.view_init(elev, azim) # UNFORTUNATELY, 3D matplotlib plotting is very weird about masked # arrays. The implemenation does not parallel 2D: it wants a color # list that is the same length as the number of points it will be # plotting, and not the length of the masked array in total. So, # we have to check for masking and adjust appropriately. if downsamp is None: if 'color' in self.props: if self.props_history is not None: # Get color from history if ma.is_masked(self.pos_history[n0]): not_msk = ~self.pos_history[n0][:,0].mask color = self.props_history[n0].loc[not_msk, 'color'] else: color = self.props_history[n0]['color'] else: if ma.is_masked(self.pos_history[n0]): not_msk = ~self.pos_history[n0][:,0].mask color = self.props.loc[not_msk, 'color'] else: color = self.props['color'] scat = ax.scatter(self.pos_history[n0][:,0], self.pos_history[n0][:,1], self.pos_history[n0][:,2], label=self.shared_props['name'], c=color, animated=True) else: scat = ax.scatter(self.pos_history[n0][:,0], self.pos_history[n0][:,1], self.pos_history[n0][:,2], label=self.shared_props['name'], color=self.shared_props['color'], animated=True) else: if 'color' in self.props: if self.props_history is not None: # Get color from history if ma.is_masked(self.pos_history[n0][downsamp,0]): not_msk = ~self.pos_history[n0][downsamp,0].mask color = self.props_history[n0].loc[downsamp,'color'][not_msk] else: color = self.props_history[n0].loc[downsamp,'color'] else: if ma.is_masked(self.pos_history[n0][:,0]): not_msk = ~self.pos_history[n0][:,0].mask color = self.props.loc[downsamp,'color'][not_msk] else: color = self.props.loc[downsamp,'color'] scat = ax.scatter(self.pos_history[n0][downsamp,0], self.pos_history[n0][downsamp,1], self.pos_history[n0][downsamp,2], label=self.shared_props['name'], color=color, animated=True) else: scat = ax.scatter(self.pos_history[n0][downsamp,0], self.pos_history[n0][downsamp,1], self.pos_history[n0][downsamp,2], label=self.shared_props['name'], color=self.shared_props['color'], animated=True) # textual info time_text = ax.text2D(0.02, 1, 'time = {:.2f}'.format( self.envir.time_history[n0]), transform=ax.transAxes, animated=True, verticalalignment='top', fontsize=12) perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_spd_z, avg_swrm_vel = \ self._calc_basic_stats(DIM3=True, t_indx=n0) flow_text = ax.text2D(0.75, 0.9, r'Fluid $v_{max}$'+': {:.2g} {}/s\n'.format( max_spd, self.envir.units)+ r'Fluid $v_{avg}$'+': {:.2g} {}/s\n'.format( avg_spd, self.envir.units)+ r'Agent $v_{avg}$'+': {:.2g} {}/s'.format( np.linalg.norm(avg_swrm_vel), self.envir.units), transform=ax.transAxes, animated=True, horizontalalignment='left', fontsize=10) perc_text = ax.text2D(0.02, 0, '{:.1f}% remain\n'.format(perc_left), transform=fig.transFigure, animated=True, fontsize=10) x_text = axHistx.text(0.02, 0.98, r'Fluid $\overline{v}_x$'+': {:.2g} {}/s\n'.format( avg_spd_x, self.envir.units)+ r'Agent $\overline{v}_x$'+': {:.2g} {}/s'.format( avg_swrm_vel[0], self.envir.units), transform=axHistx.transAxes, animated=True, verticalalignment='top', fontsize=10) y_text = axHisty.text(0.02, 0.98, r'Fluid $\overline{v}_y$'+': {:.2g} {}/s\n'.format( avg_spd_y, self.envir.units)+ r'Agent $\overline{v}_y$'+': {:.2g} {}/s'.format( avg_swrm_vel[1], self.envir.units), transform=axHisty.transAxes, animated=True, verticalalignment='top', fontsize=10) z_text = axHistz.text(0.02, 0.98, r'Fluid $\overline{v}_z$'+': {:.2g} {}/s\n'.format( avg_spd_z, self.envir.units)+ r'Agent $\overline{v}_z$'+': {:.2g} {}/s'.format( avg_swrm_vel[2], self.envir.units), transform=axHistz.transAxes, animated=True, verticalalignment='top', fontsize=10) if dist == 'hist': # histograms data_x = self.pos_history[n0][:,0].compressed() data_y = self.pos_history[n0][:,1].compressed() data_z = self.pos_history[n0][:,2].compressed() bins_x = np.linspace(0, self.envir.L[0], 26) bins_y = np.linspace(0, self.envir.L[1], 26) bins_z = np.linspace(0, self.envir.L[2], 26) n_x, bins_x, patches_x = axHistx.hist(data_x, bins=bins_x, alpha=0.8) n_y, bins_y, patches_y = axHisty.hist(data_y, bins=bins_y, alpha=0.8) n_z, bins_z, patches_z = axHistz.hist(data_z, bins=bins_z, alpha=0.8) else: # Gaussian Kernel Density Estimation if dist == 'cov': fac_x = self.shared_props['cov'][0,0] fac_y = self.shared_props['cov'][1,1] fac_z = self.shared_props['cov'][2,2] else: try: # see if a float was passed fac_x = float(dist) fac_y = fac_x fac_z = fac_x except: # estimate covariance from Scotts Factor. HOWEVER: this # estimation breaks if IC is point source. fac_x = None fac_y = None fac_z = None xmesh = np.linspace(0, self.envir.L[0], 1000) ymesh = np.linspace(0, self.envir.L[1], 1000) zmesh = np.linspace(0, self.envir.L[2], 1000) # deal with point sources pos_x = self.pos_history[n0][:,0].compressed() pos_y = self.pos_history[n0][:,1].compressed() pos_z = self.pos_history[n0][:,2].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 try: if len(pos_z) > 1: z_density = stats.gaussian_kde(pos_z, fac_z) z_density = z_density(zmesh) elif len(pos_z) == 1: raise np.linalg.LinAlgError else: z_density = np.zeros_like(zmesh) except np.linalg.LinAlgError: idz = (np.abs(zmesh - pos_z[0])).argmin() z_density = np.zeros_like(zmesh); z_density[idz] = 1 xdens_plt, = axHistx.plot(xmesh, x_density) ydens_plt, = axHisty.plot(ymesh, y_density) zdens_plt, = axHistz.plot(zmesh, z_density) axHistx.get_yaxis().set_ticks([]) axHisty.get_yaxis().set_ticks([]) axHistz.get_yaxis().set_ticks([]) if np.max(xdens_plt.get_ydata()) != 0: axHistx.set_ylim(bottom=0, top=np.max(xdens_plt.get_ydata())) else: axHistx.set_ylim(bottom=0) if np.max(ydens_plt.get_ydata()) != 0: axHisty.set_ylim(bottom=0, top=np.max(ydens_plt.get_ydata())) else: axHisty.set_ylim(bottom=0) if np.max(zdens_plt.get_ydata()) != 0: axHistz.set_ylim(bottom=0, top=np.max(zdens_plt.get_ydata())) else: axHistz.set_ylim(bottom=0) # animation function. Called sequentially angle_props_warned = [False] def animate(n): if n < len(self.pos_history): time_text.set_text('time = {:.2f}'.format(self.envir.time_history[n])) if not DIM3: # 2D perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_swrm_vel = \ self._calc_basic_stats(DIM3=False, t_indx=n) stats_text.set_text('{:.1f}% remain\n'.format(perc_left)+ '\n ------ Info ------\n'+ r'Fluid $v_{max}$'+': {:.1g} {}/s\n'.format(max_spd, self.envir.units)+ r'Fluid $\overline{v}$'+': {:.1g} {}/s\n'.format(avg_spd, self.envir.units)+ r'Agent $\overline{v}$'+': {:.1g} {}/s\n'.format(np.linalg.norm(avg_swrm_vel), self.envir.units)) x_text.set_text(r'Fluid $\overline{v}_x$'+': {:.2g} \n'.format(avg_spd_x)+ r'Agent $\overline{v}_x$'+': {:.2g}'.format(avg_swrm_vel[0])) y_text.set_text(r'Fluid $\overline{v}_y$'+': {:.2g} \n'.format(avg_spd_y)+ r'Agent $\overline{v}_y$'+': {:.2g}'.format(avg_swrm_vel[1])) if fluid == 'vort' and self.envir.flow is not None: vort = self.envir.get_2D_vorticity(t_indx=n) fld.set_array(vort.T) fld.changed() fld.autoscale() elif fluid == 'quiver' and self.envir.flow is not None: if self.envir.flow_times is not None: flow = self.envir.interpolate_temporal_flow(t_index=n) fld.set_UVC(flow[0][::M,::N].T, flow[1][::M,::N].T) else: fld.set_UVC(self.envir.flow[0][::M,::N].T, self.envir.flow[1][::M,::N].T) warning_msg = "Using velocity for heading markers "+\ "and not the 'angles' property because "+\ "the property history was not recorded." if mesh_col is not None and self.envir.ibmesh.ndim == 4: ibmesh = self.envir.interpolate_temporal_mesh(time=self.envir.time_history[n]) mesh_col.set_segments(ibmesh) if downsamp is None: scat.set_offsets(self.pos_history[n]) if 'color' in self.props: if self.props_history is not None: scat.set_color(self.props_history[n]['color']) else: scat.set_color(self.props['color']) # Grab angles for heading markers if 'angle' in self.props and plot_heading: if self.props_history is not None: angles = self.props_history[n]['angle'] else: if not angle_props_warned[0]: warnings.warn(warning_msg, stacklevel=9) angle_props_warned[0] = True angles = np.arctan2(self.vel_history[n][:,1], self.vel_history[n][:,0]) elif plot_heading: # this is defined even for (0,0) by convention angles = np.arctan2(self.vel_history[n][:,1], self.vel_history[n][:,0]) else: scat.set_offsets(self.pos_history[n][downsamp,:]) if 'color' in self.props: if self.props_history is not None: scat.set_color(self.props_history[n].loc[downsamp,'color']) else: scat.set_color(self.props.loc[downsamp,'color']) # Grab angles for heading markers if 'angle' in self.props and plot_heading: if self.props_history is not None: angles = self.props.loc[downsamp,'angle'] else: if not angle_props_warned[0]: warnings.warn(warning_msg, stacklevel=9) angle_props_warned[0] = True angles = np.arctan2(self.vel_history[n][downsamp,1], self.vel_history[n][downsamp,0]) elif plot_heading: # this is defined even for (0,0) by convention angles = np.arctan2(self.vel_history[n][downsamp,1], self.vel_history[n][downsamp,0]) # set heading markers if plot_heading: paths = [] for angle in angles: if ma.is_masked(angle): paths.append(circle) else: # make the heading marker stick out by one diameter line_verts = np.array([[0,0],[circ_rad*3*np.cos(angle), circ_rad*3*np.sin(angle)]]) # combine the circle and line vertices verts = np.concatenate([circle.vertices, line_verts]) # append to path list paths.append(mPath(verts, codes)) scat.set_paths(paths) else: scat.set_paths([circle]) if dist == 'hist': n_x, _ = np.histogram(self.pos_history[n][:,0].compressed(), bins_x) n_y, _ = np.histogram(self.pos_history[n][:,1].compressed(), bins_y) for rect, h in zip(patches_x, n_x): rect.set_height(h) for rect, h in zip(patches_y, n_y): rect.set_width(h) if fluid == 'vort' and self.envir.flow is not None: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, fld, scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: return [fld, scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: return [scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: pos_x = self.pos_history[n][:,0].compressed() pos_y = self.pos_history[n][:,1].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 xdens_plt.set_ydata(x_density) ydens_plt.set_xdata(y_density) if np.max(xdens_plt.get_ydata()) != 0: axHistx.set_ylim(bottom=0, top=np.max(xdens_plt.get_ydata())) else: axHistx.set_ylim(bottom=0) if np.max(ydens_plt.get_xdata()) != 0: axHisty.set_xlim(left=0, right=np.max(ydens_plt.get_xdata())) else: axHisty.set_xlim(left=0) if fluid == 'vort' and self.envir.flow is not None: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, fld, scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: return [fld, scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: return [scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: # 3D perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_spd_z, avg_swrm_vel = \ self._calc_basic_stats(DIM3=True, t_indx=n) # print(n) # print(self.pos_history[n].all() is ma.masked) flow_text.set_text(r'Fluid $v_{max}$'+': {:.2g} {}/s\n'.format( max_spd, self.envir.units)+ r'Fluid $v_{avg}$'+': {:.2g} {}/s\n'.format( avg_spd, self.envir.units)+ r'Agent $v_{avg}$'+': {:.2g} {}/s'.format( np.linalg.norm(avg_swrm_vel), self.envir.units)) perc_text.set_text('{:.1f}% remain\n'.format(perc_left)) x_text.set_text(r'Fluid $\overline{v}_x$'+': {:.2g} {}/s\n'.format( avg_spd_x, self.envir.units)+ r'Agent $\overline{v}_x$'+': {:.2g} {}/s'.format( avg_swrm_vel[0], self.envir.units)) y_text.set_text(r'Fluid $\overline{v}_y$'+': {:.2g} {}/s\n'.format( avg_spd_y, self.envir.units)+ r'Agent $\overline{v}_y$'+': {:.2g} {}/s'.format( avg_swrm_vel[1], self.envir.units)) z_text.set_text(r'Fluid $\overline{v}_z$'+': {:.2g} {}/s\n'.format( avg_spd_z, self.envir.units)+ r'Agent $\overline{v}_z$'+': {:.2g} {}/s'.format( avg_swrm_vel[2], self.envir.units)) # UNFORTUNATELY, 3D matplotlib plotting is very weird about masked # arrays. The implemenation does not parallel 2D: it wants a color # list that is the same length as the number of points it will be # plotting, and not the length of the masked array in total. So, # we have to check for masking and adjust appropriately. if downsamp is None: scat._offsets3d = (np.ma.ravel(self.pos_history[n][:,0].compressed()), np.ma.ravel(self.pos_history[n][:,1].compressed()), np.ma.ravel(self.pos_history[n][:,2].compressed())) if 'color' in self.props: if self.props_history is not None: if ma.is_masked(self.pos_history[n]): not_msk = ~self.pos_history[n][:,0].mask scat.set_color(self.props_history[n].loc[not_msk,'color']) else: scat.set_color(self.props_history[n]['color']) else: if ma.is_masked(self.pos_history[n]): not_msk = ~self.pos_history[n][:,0].mask scat.set_color(self.props.loc[not_msk,'color']) else: scat.set_color(self.props['color']) else: scat._offsets3d = (np.ma.ravel(self.pos_history[n][downsamp,0].compressed()), np.ma.ravel(self.pos_history[n][downsamp,1].compressed()), np.ma.ravel(self.pos_history[n][downsamp,2].compressed())) if 'color' in self.props: if self.props_history is not None: if ma.is_masked(self.pos_history[n][downsamp,0]): not_msk = ~self.pos_history[n][downsamp,0].mask scat.set_color(self.props_history[n].loc[downsamp,'color'][not_msk]) else: scat.set_color(self.props_history[n].loc[downsamp,'color']) else: if ma.is_masked(self.pos_history[n][downsamp,0]): not_msk = ~self.pos_history[n][downsamp,0].mask scat.set_color(self.props.loc[downsamp,'color'][not_msk]) else: scat.set_color(self.props.loc[downsamp,'color']) if dist == 'hist': n_x, _ = np.histogram(self.pos_history[n][:,0].compressed(), bins_x) n_y, _ = np.histogram(self.pos_history[n][:,1].compressed(), bins_y) n_z, _ = np.histogram(self.pos_history[n][:,2].compressed(), bins_z) for rect, h in zip(patches_x, n_x): rect.set_height(h) for rect, h in zip(patches_y, n_y): rect.set_height(h) for rect, h in zip(patches_z, n_z): rect.set_height(h) fig.canvas.draw() return [scat, time_text, flow_text, perc_text, x_text, y_text, z_text] + list(patches_x) + list(patches_y) + list(patches_z) else: pos_x = self.pos_history[n][:,0].compressed() pos_y = self.pos_history[n][:,1].compressed() pos_z = self.pos_history[n][:,2].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 try: if len(pos_z) > 1: z_density = stats.gaussian_kde(pos_z, fac_z) z_density = z_density(zmesh) elif len(pos_z) == 1: raise np.linalg.LinAlgError else: z_density = np.zeros_like(zmesh) except np.linalg.LinAlgError: idz = (np.abs(zmesh - pos_z[0])).argmin() z_density = np.zeros_like(zmesh); z_density[idz] = 1 xdens_plt.set_ydata(x_density) ydens_plt.set_ydata(y_density) zdens_plt.set_ydata(z_density) if np.max(xdens_plt.get_ydata()) != 0: axHistx.set_ylim(bottom=0, top=np.max(xdens_plt.get_ydata())) else: axHistx.set_ylim(bottom=0) if np.max(ydens_plt.get_ydata()) != 0: axHisty.set_ylim(bottom=0, top=np.max(ydens_plt.get_ydata())) else: axHisty.set_ylim(bottom=0) if np.max(zdens_plt.get_ydata()) != 0: axHistz.set_ylim(bottom=0, top=np.max(zdens_plt.get_ydata())) else: axHistz.set_ylim(bottom=0) fig.canvas.draw() return [scat, time_text, flow_text, perc_text, x_text, y_text, z_text, xdens_plt, ydens_plt, zdens_plt] else: time_text.set_text('time = {:.2f}'.format(self.envir.time)) if not DIM3: # 2D end perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_swrm_vel = \ self._calc_basic_stats(DIM3=False, t_indx=None) stats_text.set_text('{:.1f}% remain\n'.format(perc_left)+ '\n ------ Info ------\n'+ r'Fluid $v_{max}$'+': {:.1g} {}/s\n'.format(max_spd, self.envir.units)+ r'Fluid $\overline{v}$'+': {:.1g} {}/s\n'.format(avg_spd, self.envir.units)+ r'Agent $\overline{v}$'+': {:.1g} {}/s\n'.format(np.linalg.norm(avg_swrm_vel), self.envir.units)) x_text.set_text(r'Fluid $\overline{v}_x$'+': {:.2g} \n'.format(avg_spd_x)+ r'Agent $\overline{v}_x$'+': {:.2g}'.format(avg_swrm_vel[0])) y_text.set_text(r'Fluid $\overline{v}_y$'+': {:.2g} \n'.format(avg_spd_y)+ r'Agent $\overline{v}_y$'+': {:.2g}'.format(avg_swrm_vel[1])) if fluid == 'vort' and self.envir.flow is not None: vort = self.envir.get_2D_vorticity() fld.set_array(vort.T) fld.changed() fld.autoscale() elif fluid == 'quiver' and self.envir.flow is not None: if self.envir.flow_times is not None: flow = self.envir.interpolate_temporal_flow() fld.set_UVC(flow[0][::M,::N].T, flow[1][::M,::N].T) else: fld.set_UVC(self.envir.flow[0][::M,::N].T, self.envir.flow[1][::M,::N].T) if mesh_col is not None and self.envir.ibmesh.ndim == 4: ibmesh = self.envir.interpolate_temporal_mesh() mesh_col.set_segments(ibmesh) if downsamp is None: scat.set_offsets(self.positions) if self.props_history is not None and 'color' in self.props: scat.set_color(self.props['color']) # Grab angles for heading markers if 'angle' in self.props and self.props_history is not None: angles = self.props['angle'] else: # this is defined even for (0,0) by convention angles = np.arctan2(self.velocities[:,1], self.velocities[:,0]) else: scat.set_offsets(self.positions[downsamp,:]) if self.props_history is not None and 'color' in self.props: scat.set_color(self.props.loc[downsamp,'color']) # Grab angles for heading markers if 'angle' in self.props and self.props_history is not None: angles = self.props.loc[downsamp,'angle'] else: # this is defined even for (0,0) by convention angles = np.arctan2(self.velocities[downsamp,1], self.velocities[downsamp,0]) # set heading markers if plot_heading: paths = [] for angle in angles: if ma.is_masked(angle): paths.append(circle) else: # make the heading marker stick out by one diameter line_verts = np.array([[0,0],[circ_rad*3*np.cos(angle), circ_rad*3*np.sin(angle)]]) # combine the circle and line vertices verts = np.concatenate([circle.vertices, line_verts]) # append to path list paths.append(mPath(verts, codes)) scat.set_paths(paths) else: scat.set_paths([circle]) if dist == 'hist': n_x, _ = np.histogram(self.positions[:,0].compressed(), bins_x) n_y, _ = np.histogram(self.positions[:,1].compressed(), bins_y) for rect, h in zip(patches_x, n_x): rect.set_height(h) for rect, h in zip(patches_y, n_y): rect.set_width(h) if fluid == 'vort' and self.envir.flow is not None: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, fld, scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: return [fld, scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: return [scat, time_text, stats_text, x_text, y_text] + list(patches_x) + list(patches_y) else: pos_x = self.positions[:,0].compressed() pos_y = self.positions[:,1].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 xdens_plt.set_ydata(x_density) ydens_plt.set_xdata(y_density) if np.max(xdens_plt.get_ydata()) != 0: axHistx.set_ylim(bottom=0, top=np.max(xdens_plt.get_ydata())) else: axHistx.set_ylim(bottom=0) if np.max(ydens_plt.get_xdata()) != 0: axHisty.set_xlim(left=0, right=np.max(ydens_plt.get_xdata())) else: axHisty.set_xlim(left=0) if fluid == 'vort' and self.envir.flow is not None: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, fld, scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: return [fld, scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: if mesh_col is not None and self.envir.ibmesh.ndim == 4: return [mesh_col, scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: return [scat, time_text, stats_text, x_text, y_text, xdens_plt, ydens_plt] else: # 3D end perc_left, avg_spd, max_spd, avg_spd_x, avg_spd_y, avg_spd_z, avg_swrm_vel = \ self._calc_basic_stats(DIM3=True) flow_text.set_text(r'Fluid $v_{max}$'+': {:.2g} {}/s\n'.format( max_spd, self.envir.units)+ r'Fluid $v_{avg}$'+': {:.2g} {}/s\n'.format( avg_spd, self.envir.units)+ r'Agent $v_{avg}$'+': {:.2g} {}/s'.format( np.linalg.norm(avg_swrm_vel), self.envir.units)) perc_text.set_text('{:.1f}% remain\n'.format(perc_left)) x_text.set_text(r'Fluid $\overline{v}_x$'+': {:.2g} {}/s\n'.format( avg_spd_x, self.envir.units)+ r'Agent $\overline{v}_x$'+': {:.2g} {}/s'.format( avg_swrm_vel[0], self.envir.units)) y_text.set_text(r'Fluid $\overline{v}_y$'+': {:.2g} {}/s\n'.format( avg_spd_y, self.envir.units)+ r'Agent $\overline{v}_y$'+': {:.2g} {}/s'.format( avg_swrm_vel[1], self.envir.units)) z_text.set_text(r'Fluid $\overline{v}_z$'+': {:.2g} {}/s\n'.format( avg_spd_z, self.envir.units)+ r'Agent $\overline{v}_z$'+': {:.2g} {}/s'.format( avg_swrm_vel[2], self.envir.units)) # UNFORTUNATELY, 3D matplotlib plotting is very weird about masked # arrays. The implemenation does not parallel 2D: it wants a color # list that is the same length as the number of points it will be # plotting, and not the length of the masked array in total. So, # we have to check for masking and adjust appropriately. if downsamp is None: scat._offsets3d = (np.ma.ravel(self.positions[:,0].compressed()), np.ma.ravel(self.positions[:,1].compressed()), np.ma.ravel(self.positions[:,2].compressed())) if 'color' in self.props: if ma.is_masked(self.positions): not_msk = ~self.positions[:,0].mask scat.set_color(self.props.loc[not_msk,'color']) else: scat.set_color(self.props['color']) else: scat._offsets3d = (np.ma.ravel(self.positions[downsamp,0].compressed()), np.ma.ravel(self.positions[downsamp,1].compressed()), np.ma.ravel(self.positions[downsamp,2].compressed())) if 'color' in self.props: if ma.is_masked(self.positions[downsamp,0]): not_msk = ~self.positions[downsamp,0].mask scat.set_color(self.props.loc[downsamp,'color'][not_msk]) else: scat.set_color(self.props.loc[downsamp,'color']) if dist == 'hist': n_x, _ = np.histogram(self.positions[:,0].compressed(), bins_x) n_y, _ = np.histogram(self.positions[:,1].compressed(), bins_y) n_z, _ = np.histogram(self.positions[:,2].compressed(), bins_z) for rect, h in zip(patches_x, n_x): rect.set_height(h) for rect, h in zip(patches_y, n_y): rect.set_height(h) for rect, h in zip(patches_z, n_z): rect.set_height(h) fig.canvas.draw() return [scat, time_text, flow_text, perc_text, x_text, y_text, z_text] + list(patches_x) + list(patches_y) + list(patches_z) else: pos_x = self.positions[:,0].compressed() pos_y = self.positions[:,1].compressed() pos_z = self.positions[:,2].compressed() try: if len(pos_x) > 1: x_density = stats.gaussian_kde(pos_x, fac_x) x_density = x_density(xmesh) elif len(pos_x) == 1: raise np.linalg.LinAlgError else: x_density = np.zeros_like(xmesh) except np.linalg.LinAlgError: idx = (np.abs(xmesh - pos_x[0])).argmin() x_density = np.zeros_like(xmesh); x_density[idx] = 1 try: if len(pos_y) > 1: y_density = stats.gaussian_kde(pos_y, fac_y) y_density = y_density(ymesh) elif len(pos_y) == 1: raise np.linalg.LinAlgError else: y_density = np.zeros_like(ymesh) except np.linalg.LinAlgError: idy = (np.abs(ymesh - pos_y[0])).argmin() y_density = np.zeros_like(ymesh); y_density[idy] = 1 try: if len(pos_z) > 1: z_density = stats.gaussian_kde(pos_z, fac_z) z_density = z_density(zmesh) elif len(pos_z) == 1: raise np.linalg.LinAlgError else: z_density = np.zeros_like(zmesh) except np.linalg.LinAlgError: idz = (np.abs(zmesh - pos_z[0])).argmin() z_density = np.zeros_like(zmesh); z_density[idz] = 1 xdens_plt.set_ydata(x_density) ydens_plt.set_ydata(y_density) zdens_plt.set_ydata(z_density) if np.max(xdens_plt.get_ydata()) != 0: axHistx.set_ylim(bottom=0, top=np.max(xdens_plt.get_ydata())) else: axHistx.set_ylim(bottom=0) if np.max(ydens_plt.get_ydata()) != 0: axHisty.set_ylim(bottom=0, top=np.max(ydens_plt.get_ydata())) else: axHisty.set_ylim(bottom=0) if np.max(zdens_plt.get_ydata()) != 0: axHistz.set_ylim(bottom=0, top=np.max(zdens_plt.get_ydata())) else: axHistz.set_ylim(bottom=0) fig.canvas.draw() return [scat, time_text, flow_text, perc_text, x_text, y_text, z_text, xdens_plt, ydens_plt, zdens_plt] # infer animation rate from dt between current and last position dt = self.envir.time - self.envir.time_history[-1] if frames is None: frames = range(len(self.pos_history)+1) anim = animation.FuncAnimation(fig, animate, frames=frames, interval=dt*100, repeat=False, blit=True) if movie_filename is not None: try: if writer_kwargs is None: writer = animation.FFMpegWriter(fps=fps, metadata=dict(artist='Christopher Strickland'))#, bitrate=1800) else: writer = animation.FFMpegWriter(**writer_kwargs) if save_kwargs is None: anim.save(movie_filename, writer=writer, dpi=150) else: anim.save(movie_filename, writer=writer, **save_kwargs) plt.close() print('Video saved to {}.'.format(movie_filename)) except: print('Failed to save animation.') print('Check that you have ffmpeg or mencoder installed; these') print("aren't Python packages, but stand-alone applications.") print("An H.264 encoder is needed on the system's path in order") print('to save to that video format.') raise else: plt.show()