Source code for chemistrylab.vessel

from typing import NamedTuple, Tuple, Callable, Optional, List
import numpy as np
import numba
import pandas as pd
from chemistrylab.extract_algorithms import separate#separate_cc as separate

[docs] class Event(NamedTuple): name: str parameter: tuple other_vessel: Optional[object]
#Apparently documenting this isn't trivial :( Event.name.__doc__ = "The registered name of the event function." Event.parameter.__doc__ = "The parameters of the registered event function" Event.other_vessel.__doc__ = "The other vessel needed for this event if requred (ex the target vessel when pouring)." def _rebuild_solute_dict(solvent_dict, solute_dict, solvents): """ Recreates the solute and solvent dict in the case where the solvents have changed. Args: solvent_dict (dict): The old solvent dict of (key,index) pairs solute_dict (dict): The old solute dict of (key,arr) pairs solvents (dict): The new list of solvents Returns: new_solvent_dict (dict): New set of (key,index) pairs new_solute_dict (dict): New set of (key,arr) pairs Note: This can be called less frequently if you allow solvents with zero amount to persist in the materials dict. """ # needs to be called when adding or removing solvents (after you set the # mols dissolved in any removed solvents to 0) new_solvent_dict = {mat:i for i,mat in enumerate(solvents)} new_solute_dict = \ {mat: np.array([solute_dict[mat][solvent_dict[sol]] if sol in solvent_dict else 0 for sol in solvents], dtype=np.float32) for mat in solute_dict} return new_solvent_dict, new_solute_dict @numba.jit(nopython=True) def _validate_solute_amounts(mol_solute, mol_solvent, mol_dissolved): """ Performs a series of consistency checks on the mol_dissolved array. Args: mol_solute (array): The total amount of each solute in the vessel (1D, size N) mol_solvent (array): The total amount of each solvent in the vessel (1D, size M) mol_dissolved (array) The amount of solute dissolved in each solvent (2D, shape [N,M]) Checks: - Make sure the sum of each row in mol_dissolved is equal to the value in mol_solute - Strategies: Increase proportional to how much solvent there is Decrease proportional to how much is already dissolved - Make sure each column of mol_dissolved if the corresponding value in mol_solvent is 0 - Strategies: Set columns to 0 before doing the sum check if there is no solvent """ # First make sure there are solvents tot_solvent=mol_solvent.sum() if tot_solvent<1e-12: mol_dissolved[:]=0 return #get normalized solvent amounts norm_solvent=mol_solvent/tot_solvent #Make sure there are no solutes in empty solvents for j,v_mol in enumerate(mol_solvent): if v_mol<1e-12: mol_dissolved[:,j]=0 #make sure the sums consistent for i,u_mol in enumerate(mol_solute): checksum=mol_dissolved[i].sum() #Set to zero if very close if u_mol<1e-16: mol_dissolved[i]=0 #Decrease amounts proportional to what's in each solvent elif checksum>u_mol: mol_dissolved[i] *= (u_mol/checksum) #But increase amounts proportional to how much solvent there is elif checksum<u_mol: mol_dissolved[i] += (u_mol-checksum)*norm_solvent layer_values=np.linspace(0, 1, 100, endpoint=True, dtype=np.float32)-1.9e-2
[docs] class Vessel: """ The Vessel class serves as any container you might find in a lab, a beaker, a dripper, etc. It simulates actions performed within a lab, such as draining contents, performing reactions, mixing, pouring, etc. The most important method is :meth:`~Vessel.push_event_to_queue` . The rest of the functions are either handeled in the backend or getter methods. These are the default :class:`Event` functions: +------------------+-----------------------------------------------------+-----------------------------------------+ | Event Name | Event Description | Event Parameters | +==================+=====================================================+=========================================+ | pour by volume | Pour from self vessel to target vessel by certain | volume (:class:`python:float`) | | | volume | | +------------------+-----------------------------------------------------+-----------------------------------------+ | pour by percent | Pour a fraction of all contents in one vessel into | fraction (:class:`python:float`) | | | another | | +------------------+-----------------------------------------------------+-----------------------------------------+ | drain by pixel | Drain from self vessel to target vessel by certain | n_pixel (:class:`python:int`) | | | pixel | | +------------------+-----------------------------------------------------+-----------------------------------------+ | mix | Shake the vessel or let it settle | t (:class:`python:float`) | +------------------+-----------------------------------------------------+-----------------------------------------+ | update_layer | Update self vessel's layer representation | | +------------------+-----------------------------------------------------+-----------------------------------------+ | change heat | Add or remove heat from the vessel | dQ (:class:`python:float`) | +------------------+-----------------------------------------------------+-----------------------------------------+ | heat contact | Connect the vessel to a reservoir for heat transfer | Tf (float), ht (float) | +------------------+-----------------------------------------------------+-----------------------------------------+ """ def __init__( self, label: str, temperature: float = 297.0, volume: float = 1.0, ignore_layout: bool = False ): """ Args: label (str): Name for the vessel temperature (float): Temperature of the vessel in Kelvin volume (float): Volume of the vessel in Litres """ #Functions to implement self.label=label self.default_dt=0.01 self.temperature=temperature self.volume=volume self.material_dict=dict() # String keys, Material values self.solute_dict=dict() # String Keys, float array values self.solvent_dict=dict() #String Keys, index values self.solvents=[] self._layers_position = np.zeros(1, dtype=np.float32) self._layers_variance = np.array([self.volume/3.46], dtype=np.float32) self._layer_volumes = np.array([self.volume], dtype=np.float32) self._variance = 1e-5 self._layers = None self.ignore_layout=ignore_layout self._layer_mats=[] def __repr__(self): return self.label
[docs] def validate_solutes(self, checksum: bool = True): """ Turns the solute dict into a 2D array, gets a 1D array of solute amounts, as well as a 1D array of solvent amounts, then performs consistency checks with _validate_solute_amounts. """ if self.ignore_layout:return n_solvents=len(self.solvents) #gather a list of solutes solutes = tuple(a for a in self.material_dict if self.material_dict[a].is_solute()) if n_solvents==0 or len(solutes)==0:return #get mol information for solutes and solvents solute_mols = np.array([self.material_dict[key].mol for key in solutes]) solvent_mols = np.array([self.material_dict[key].mol for key in self.solvents]) #get all of the dissolved amounts null = np.zeros(n_solvents) old_dict=self.solute_dict mol_dissolved=np.stack([old_dict[key] if key in old_dict else null for key in solutes]) #run the validation _validate_solute_amounts(solute_mols, solvent_mols, mol_dissolved) #set the new solute dict self.solute_dict={key:mol_dissolved[i] for i,key in enumerate(solutes)}
[docs] def validate_solvents(self): """ Updates the solute_dict and solvent_dict / solvent array when the solvents have changed """ if self.ignore_layout:return new_solvents = tuple(a for a in self.material_dict if self.material_dict[a].is_solvent()) #union should be the same length as both new_solvents and solvents union = tuple(a for a in new_solvents if a in self.solvent_dict) if len(new_solvents)!=len(self.solvents) or len(new_solvents)!= len(union) : #copy over variances self._layers_variance = np.array([self._layers_variance[self.solvent_dict[sol]] if sol in self.solvent_dict else 0 for sol in new_solvents]+[self._layers_variance[-1]], dtype = np.float32) #copy over amounts self._layer_volumes = np.array([self._layer_volumes[self.solvent_dict[sol]] if sol in self.solvent_dict else 0 for sol in new_solvents]+[self._layer_volumes[-1]], dtype = np.float32) #make a new solvent dict self.solvent_dict,self.solute_dict = _rebuild_solute_dict( self.solvent_dict, self.solute_dict, new_solvents ) n_solvents=len(new_solvents) # If we went from 0 to 1+ solvents we need to dissolve the solutes if len(self.solvents)==0 and n_solvents: for key,arr in self.solute_dict.items(): arr+=self.material_dict[key].mol/n_solvents self.solvents=new_solvents #last entry is for air self._layers_position = np.zeros(n_solvents+1, dtype=np.float32)
def _handle_overflow(self): """ Uses flled_volume() to determine if there is an overflow, if yes, it dumps a proportion of the vessel contents in order to have filled_volume <= volume. """ filled = self.filled_volume() # decrease everything proportionally and return -1 if there is an overflow if filled>self.volume: ratio = self.volume/filled for key,mat in self.material_dict.items(): mat.mol*=ratio for key,arr in self.solute_dict.items(): arr*=ratio return -1 return 0
[docs] def heat_capacity(self): """ Returns: float: The sum of the heat capacities of all materials in the vessel (in J/K) """ C_air = 1.2292875 #Heat capacity of air in J/L*K (near STP) #Adding in an approximate heat capacity for the air return self.volume*C_air+sum(mat.heat_capacity for a,mat in self.material_dict.items())
[docs] def filled_volume(self): """ Returns: float: The volume of all non-gas phase materials in the vessel (in Litres). """ return sum(mat.litres for a,mat in self.material_dict.items())
[docs] def get_material_dataframe(self): """ Returns: :class:`~pandas.DataFrame`: A DataFrame detailing all materials present in the Vessel. """ info_dict = {key:(mat.mol,mat.phase,mat.is_solute(),mat.is_solvent()) for key,mat in self.material_dict.items()} return pd.DataFrame.from_dict(info_dict, orient="index",columns = ["Amount","Phase","Solute","Solvent"])
[docs] def get_solute_dataframe(self): """ Returns: :class:`~pandas.DataFrame`: A [solutes, solvents] DataFrame detailing how much solute is dissolved in each solvent. """ return pd.DataFrame.from_dict(self.solute_dict, orient="index",columns = self.solvents)
[docs] def get_layer_dataframe(self): """ Returns: :class:`~pandas.DataFrame`: A DataFrame containing the layer information of the vessel. """ info_dict = {mat._name:( self._layers_volume[i], self._layers_position[i], self._lvar[i], self._layer_colors[i],) for i,mat in enumerate(self._layer_mats)} info_dict["air"] = ( self._layers_volume[-1], self._layers_position[-1], self._lvar[-1], self._layer_colors[-1],) return pd.DataFrame.from_dict(info_dict, orient="index",columns = ["volume","position","variance","color"])
[docs] def push_event_to_queue( self, events: Tuple[Event] = tuple(), dt: float= 0, update_layers: bool = True, ) -> Tuple[int]: """ This function calls a set of event functions in sequence specified by `events`, then returns a tuple of status codes (one for each event). Args: events (Tuple[Event]): The sequence of events to be executed. dt (float): The amount of time elapsed (defaults to 0). update_layers (bool): Whether or not to update layer information at the end of the queue. Returns: Tuple[int]: A sequence of status codes for each event. At the moment, 0 represents normal execution, and -1 represents an illegal state reached (like a vessel overflow or boiling an empty vessel). """ event_dict = type(self)._event_dict status=[] for event in events: status.append(event_dict[event.name](self, dt, event.other_vessel, *event.parameter)) if (not self.ignore_layout) and update_layers: self._mix(0,None,dt) self._update_layers(0,None) return status
def _heat_contact(self, dt, other_vessel, Tf, ht) -> int: """ Rough Estimate of heat transfer so we can simulate putting something on a bunson burner or in a water bath. Param: - Tf (Optional[float]): The temperature of the heat source - ht (float): heat transfer coeff multiplied by how long you have Note: When T is None, ht will be used as heat Q Steps: 1. Calculate total heat capacity of the vessel 2. Get T estimate from heat capacity equation & Newton's law of heat transfer 3. While T estimate is above the lowest boiling point of your materials i. Subtract d_ht required to get to boiling point from ht ii. Get boiling enthalpy of your material and calculate how much will boil iii. Boil off material and subtract the necessary d_ht used from ht iv. Calculate new T estimate based off of your new ht value 4. set vessel temperature to T """ use_dQ = (Tf is None) if use_dQ: # Adding heat (dQ) is the same as linearly approximating the exponential and log functions and setting # The reservoir temperature to one unit above the current temperature (trust me) exp= lambda x:1+x ln = lambda x:x-1 Tf = self.temperature+1 else: ln=np.log exp=np.exp other_mats=other_vessel.material_dict # Estimated final temperature before taking boiling into account T = Tf+(self.temperature-Tf)*exp(-ht/self.heat_capacity()) mdict=self.material_dict #case for changing the heat of an empty vessel total_mats = sum(mat.mol for key,mat in mdict.items()) if total_mats<1e-12: self.temperature = T # -1 if placing an empty beaker on something hot return 0 if Tf<373 else -1 #get sorted boiling points bp = sorted([(key,mdict[key]._boiling_point) for key in mdict],key = lambda x:x[1]) key,boil=bp[0] i=0 while T>boil and T>self.temperature: material = mdict[key] #amount of transfer-time required to reach boiling temperature ht += ln((Tf-boil)/(Tf-self.temperature))*self.heat_capacity() #i self.temperature = boil #i if use_dQ: Tf = self.temperature+1 boil_enthalpy=material.vapour_enthalpy #ii # ht = dQ/(T_f-T_boil) ht_used=min(ht,boil_enthalpy/(Tf-boil)) #ii ht-=ht_used #iii # Move the boiled material (iii) fraction = (ht_used*(Tf-boil)/boil_enthalpy) if ht_used > 0 else 0 if key in other_mats: d_mol = material.mol*fraction other_mats[key].mol += d_mol material.mol -= d_mol else: other_mats[key] = material.ration(fraction) #Heat capacity is called again since it should be lower now T = Tf+(self.temperature-Tf)*exp(-ht/self.heat_capacity()) i+=1 if i>=len(bp):break key,boil=bp[i] self.temperature=T self.validate_solutes() other_vessel.validate_solvents() other_vessel.validate_solutes() return other_vessel._handle_overflow() def _change_heat(self, dt, other_vessel, dQ) -> int: """ Depricated function, consider using heat contact instead. """ return self._heat_contact(dt, other_vessel, None, dQ) def _pour_by_percent(self, dt, other_vessel, fraction) -> int: """ - Take each material in this vessel and transfer it's amount * fraction to other_vessel - Check for overflow at the end - Dumping in contents should mix other_vessel TODO: Consider making this call a theoretical _add_materials function """ if fraction<1e-16:return 0 fraction = np.clip(fraction,0,1) other_mats=other_vessel.material_dict #Iterate through each material in the vessel for key,mat in self.material_dict.items(): if mat.is_solute() and len(self.solvents)>0: self.solute_dict[key] *= (1-fraction) # Update other vessel's material dict if key in other_mats: other_mats[key].mol+=mat.mol*fraction mat.mol *= (1-fraction) else: # Get new material with the same class as mat using ration other_mats[key] = mat.ration(fraction) # Rebuild the other vessel's solute dict if new solvents have been added other_vessel.validate_solvents() other_vessel.validate_solutes() return other_vessel._handle_overflow() def _pour_by_volume(self, dt, other_vessel, volume) -> int: """ Pour the same as _pour_by_percent but the fraction is determined by how much volume you want to pour vs how much of the vessel is filled. """ filled_volume=self.filled_volume() if filled_volume<1e-12: return 0 fraction = np.clip(volume/filled_volume,0,1) return self._pour_by_percent(dt, other_vessel, fraction) def _drain_by_pixel(self, dt, other_vessel, n_pixel) -> int: """ This uses layer information to drain out the bottom N layers For each solvent: i. Figure out how much solvent is in the bottom N pixels and drain that ii. Figure out how much of EACH solute is dissolved in that bit of solvent and drain those """ if self.ignore_layout:return -2 other_mats=other_vessel.material_dict drained_layers = self._hashed_layers[:n_pixel] tot_pixels=len(self._hashed_layers) for i,key in enumerate(self.solvents): #NOTE mat is the solvent material mat = self.material_dict[key] drained_volume = (drained_layers==i).sum()/tot_pixels if drained_volume<=1e-12:continue #how much solvent is drained (percent wise) fraction=np.clip(drained_volume/self._layers_volume[i],0,1) if mat.litres<1e-12: self.DEBUG=self._hashed_layers*1,self.solvents #drain out the solvent if key in other_mats: d_mol=mat.mol*fraction other_mats[key].mol+=d_mol mat.mol -= d_mol else: other_mats[key] = mat.ration(fraction) #drain the solutes for u_key in self.solute_dict: #NOTE u_mat is the solute material u_mat = self.material_dict[u_key] removed_amount = fraction*self.solute_dict[u_key][i] #update the solute dict properly self.solute_dict[u_key][i] -= removed_amount if removed_amount<=1e-12:continue #drain out the solute if u_key in other_mats: other_mats[u_key].mol+=removed_amount u_mat.mol -= removed_amount else: other_mats[u_key] = u_mat.ration(removed_amount/u_mat.mol) other_vessel.validate_solvents() other_vessel.validate_solutes() return other_vessel._handle_overflow() def _mix(self, dt, other_vessel, t) -> int: """ Realistically just a wrapper for separate.mix This updates the amounts dissolved, layer positions and layer variances """ if self.ignore_layout:return -2 t=np.float32(t) #or replace dt # Make air layer properties d_air = 1.225 #in g/L c_air = 0.65 #chosen color of air #This is just to ensure the order of solutes does not change s_names = tuple(s for s in self.solute_dict) #Grab solute and solvent objects solutes = tuple(self.material_dict[s] for s in s_names) solvents = [self.material_dict[s] for s in self.solvents] #Get solvent volumes solute_flag = sum(mat.mol for mat in solvents)<=1e-12 misc_mats = [mat for key,mat in self.material_dict.items() if (not mat.is_solvent()) and (solute_flag or not mat.is_solute())] layer_mats=solvents+misc_mats layer_volume = [mat.litres for mat in layer_mats] self._layer_mats = layer_mats # Add air to the end ov the volume list s.t it fills the remainder of the vessel layer_volume = np.array(layer_volume+[self.volume - sum(layer_volume)], dtype=np.float32) layer_density = np.array([mat.get_density() for mat in layer_mats]+[d_air], dtype=np.float32) self._layer_colors = np.array([mat._color for mat in layer_mats]+[c_air], dtype=np.float32) #Exclude air since it's not a solvent? solvent_polarity = np.array([mat.polarity for mat in solvents], dtype=np.float32) #Get solute properties solute_polarity = np.array([mat.polarity for mat in solutes], dtype=np.float32) #Hopefully this is the correct 2D array shape if len(s_names)>0: solute_amount = np.stack([self.solute_dict[a] for a in s_names]).astype(np.float32) else: solute_amount=np.zeros([0,len(solvents)],dtype=np.float32) solute_svolume = np.array([mat.litres_per_mol for mat in solutes], dtype=np.float32) self._layers_position, self._layers_volume, self._layers_variance, self._variance, new_solute_amount, self._lvar = separate.mix( layer_volume, self._layer_volumes.astype(np.float32), solute_svolume, self._layers_position.astype(np.float32), self._layers_variance.astype(np.float32), np.float32(self._variance), layer_density, solute_polarity, solvent_polarity, solute_amount, t ) self._layer_volumes = layer_volume for i,s in enumerate(s_names): self.solute_dict[s] = new_solute_amount[i] return 0 def _update_layers(self, dt, other_vessel) -> int: """ This wraps separate.map_to_state It's used to get a layer image as well as layer information TODO: Handle solutes having a volume """ self._layers,self._hashed_layers = separate.map_to_state( self._layers_volume.astype(np.float32), self._layers_position.astype(np.float32), self._lvar.astype(np.float32), self._layer_colors, layer_values )
[docs] def get_layers(self): """ Returns: List[float]: The color of each vessel layer. """ if self._layers is None: self._mix(0,None,0) self._update_layers(0,None) return self._layers
[docs] @classmethod def register(self, func: Callable, name: str): """ The method to register an event function which updates a vessel instance. Args: func (Callable[[Vessel, Tuple, Optional[Vessel]], int]): An event function which acts on one or two vessels. name (str): The name of the event function for registration. """ if name in self._event_dict: raise Exception(f"Cannot register the same Event ({name}) Twice!") self._event_dict[name]=func
#ANY SUBCLASSES SHOULD DEFINE THIS EXPLICITLY!!! _event_dict = { 'pour by volume': _pour_by_volume, 'pour by percent':_pour_by_percent, 'drain by pixel': _drain_by_pixel, 'mix': _mix, 'update layer': _update_layers, 'change heat': _change_heat, 'heat contact': _heat_contact, }