# /*##########################################################################
#
# Copyright (c) 2020-2026 Timo Fuchs
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__author__ = "Timo Fuchs"
__credits__ = ['Finn Schroeter']
__copyright__ = "Copyright 2020-2026 Timo Fuchs"
__license__ = "MIT License"
from .. import __version__
__maintainer__ = "Timo Fuchs"
__email__ = "tfuchs@cornell.edu"
import logging
from .. import logger_utils
logger = logging.getLogger(__name__) # noqa
import gc
import copy
import sys
import os
import re
from silx.gui import qt
import warnings
from io import StringIO
import concurrent.futures
import threading
#from IPython import embed
import silx.gui.plot
from silx.gui.plot import items
from silx.gui.colors import Colormap
import fabio
#from silx import sx
import silx
from silx.utils.weakref import WeakMethodProxy
from silx.gui.plot.Profile import ProfileToolBar
from silx.gui.plot.tools.roi import RegionOfInterestManager
from silx.gui.plot.actions import control as control_actions
try:
from silx.gui import console
except:
console = False
import traceback
from . import qutils, ROIutils
from .QScanSelector import QScanSelector
from .QReflectionSelector import QReflectionSelector, QReflectionAnglesDialog
from .QUBCalculator import QUBCalculator
from .peak1Dintegr import RockingPeakIntegrator
from .ArrayTableDialog import ArrayTableDialog
from .bgroi import RectangleBgROI
from .database import DataBase, FILTERS
from ..backend.scans import SimulationScan
from ..backend import backends
from ..backend import universalScanLoader
from ..backend import interlacedScanLoader
from .. import resources
try:
from . import _roi_sum_accel
HAS_ACCEL = True
except:
print(traceback.format_exc())
HAS_ACCEL = False
import numpy as np
from ..datautils.xrayutils import HKLVlieg, CTRcalc
from ..datautils.xrayutils import ReciprocalNavigation as rn
#legacy import:
from ..backend.beamline.id31_tools import Fastscan
QTVERSION = qt.qVersion()
DEBUG = 0
MAX_ROIS_DISPLAY = 100
MAX_MEMORY = 6000 # MB,
try:
import psutil
memory_info = psutil.virtual_memory()
avail_memory = memory_info.total / (1024**2)
MAX_MEMORY = avail_memory*0.7 # cap memory at 70%
except Exception:
print('Cannot retrieve available memory size. Cap usage to 6 GB.')
silx.config.DEFAULT_PLOT_SYMBOL = '.'
[docs]
class orGUI(qt.QMainWindow):
def __init__(self,configfile,parent=None):
"""Initialize the main orGUI window.
.. note::
CLI-capable. CLI mode still creates this Qt main-window object, but
it must not show blocking dialogs during construction.
"""
qt.QMainWindow.__init__(self, parent)
#self.setWindowIcon(resources.getQicon("orguiicon"))
self.h5database = None # must be a h5py file-like, by default not opened to avoid reading issues at beamtimes!
self.images_loaded = False
self.resetZoom = True
self.background_image = None
#icon = resources.getQicon("sum_image.svg")
self.fscan = None
self.activescanname = "scan"
self.numberthreads = int(min(os.cpu_count(), 16)) if os.cpu_count() is not None else 1
self.maxMemory = MAX_MEMORY
self.maxROIs = MAX_ROIS_DISPLAY
self.filedialogdir = os.getcwd()
self.excludedImagesDialog = ArrayTableDialog(True, 1)
self.excludedImagesDialog.setArrayData(np.array([-1]),editable=True, header= ['image no'])
self.centralPlot = Plot2DHKL(self.newXyHKLConverter(),parent=self)
self.centralPlot.setDefaultColormap(Colormap(name='inferno',normalization='log'))
self.centralPlot.setCallback(self._graphCallback)
toolbar = qt.QToolBar()
toolbar.addAction(control_actions.OpenGLAction(parent=toolbar, plot=self.centralPlot))
self.centralPlot.addToolBar(toolbar)
self.currentImageLabel = None
self.currentAddImageLabel = None
selectorDock = qt.QDockWidget("Scan data")
selectorDock.setAllowedAreas(qt.Qt.LeftDockWidgetArea | qt.Qt.RightDockWidgetArea)
self.scanSelector = QScanSelector(self)
selectorDock.setWidget(self.scanSelector)
self.addDockWidget(qt.Qt.LeftDockWidgetArea,selectorDock)
self.imagepath = ''
self.imageno = 0
#ubWidget = qt.QSplitter(qt.Qt.Vertical)
#ubWidget.setChildrenCollapsible(False)
ubLayout = qt.QVBoxLayout()
ubWidget = qt.QWidget()
self.ubcalc = QUBCalculator(None, self)
self.ubcalc.sigNewReflection.connect(self._onNewReflection)
self.resetIntegrPlotCurves = qt.QAction("Reset plot",self)
self.resetIntegrPlotCurves.triggered.connect(self._removeAllIntegrPlotCurves)
self.resetIntegrPlotCurveSet = qt.QAction("Remove curves",self)
self.resetIntegrPlotCurveSet.triggered.connect(self._removeIntegrPlotCurveSet)
maincentralwidget = qt.QTabWidget()
self.integrdataPlot = silx.gui.plot.Plot1D(self)
legendwidget = self.integrdataPlot.getLegendsDockWidget()
plotRemovalBar = qt.QToolBar()
plotRemovalBar.addAction(self.resetIntegrPlotCurves)
plotRemovalBar.addAction(self.resetIntegrPlotCurveSet)
self.integrdataPlot.addToolBar(plotRemovalBar)
toolbar = qt.QToolBar()
toolbar.addAction(control_actions.OpenGLAction(parent=toolbar, plot=self.integrdataPlot))
self.integrdataPlot.addToolBar(toolbar)
self.integrdataPlot.addDockWidget(qt.Qt.RightDockWidgetArea,legendwidget)
legendwidget.show()
self.database = DataBase(self.integrdataPlot)
dbdockwidget = qt.QDockWidget("Integrated data")
dbdockwidget.setWidget(self.database)
self.integrdataPlot.addDockWidget(qt.Qt.RightDockWidgetArea,dbdockwidget)
self.roPkIntegrTab = RockingPeakIntegrator(self.database)
self.scanSelector.sigImageNoChanged.connect(self._onSliderValueChanged)
self.scanSelector.sigImagePathChanged.connect(self._onImagePathChanged)
self.scanSelector.sigScanChanged.connect(self._onScanChanged)
self.scanSelector.showMaxAct.toggled.connect(self._onMaxToggled)
self.scanSelector.showSumAct.toggled.connect(self._onSumToggled)
self.scanSelector.sigROIChanged.connect(self.updateROI)
self.scanSelector.sigROIintegrate.connect(self.integrateROI)
self.scanSelector.sigSearchHKL.connect(self.onSearchHKLforStaticROI)
self.scanSelector.excludeImageAct.toggled.connect(self._onToggleExcludeImage)
toolbar = self.scanSelector.getScanToolbar()
self.centralPlot.addToolBar(qt.Qt.BottomToolBarArea,toolbar)
maincentralwidget.addTab(self.centralPlot,"Scan Image browser")
maincentralwidget.addTab(self.integrdataPlot,"ROI integrated data")
maincentralwidget.addTab(self.roPkIntegrTab, "Rocking scan integrate")
self.setCentralWidget(maincentralwidget)
# Create the object controlling the ROIs and set it up
self.roiManager = RegionOfInterestManager(self.centralPlot)
self.roiManager.setColor('pink') # Set the color of ROI
#self.roiTable = RegionOfInterestTableWidget()
#self.roiTable.setRegionOfInterestManager(self.roiManager)
self.rocking_rois = []
self.roiS1 = RectangleBgROI()
self.roiS1.setLineWidth(2)
self.roiS1.setLineStyle('-')
self.roiS1.setColor('red')
self.roiS1.setBgStyle('pink', '-', 2.)
self.roiS1.setVisible(True)
self.roiS1.setGeometry(origin=(0, 0), size=(0, 0))
# S1 is also fixed roi, editing enabled when static ROI:
self.roiS1.sigEditingFinished.connect(self._onStaticROIedited)
self.roiS2 = RectangleBgROI()
self.roiS2.setLineWidth(2)
self.roiS2.setLineStyle('-')
self.roiS2.setColor('red')
self.roiS2.setBgStyle('pink', '-', 2.)
self.roiS2.setVisible(True)
self.roiS2.setGeometry(origin=(0, 0), size=(0, 0))
self.roiManager.addRoi(self.roiS1,useManagerColor=False)
self.roiManager.addRoi(self.roiS2,useManagerColor=False)
#self.reflTable.view._model.dataChanged.connect(printmodel)
#self.reflTable.setArrayData(np.array([0,0,0,0,10,10],dtype=np.float64))
ubDock = qt.QDockWidget("Reciprocal space navigation")
ubDock.setAllowedAreas(qt.Qt.LeftDockWidgetArea | qt.Qt.RightDockWidgetArea | qt.Qt.BottomDockWidgetArea)
self.reflectionSel = QReflectionSelector(self.centralPlot, self.ubcalc, self)
self.reflectionSel.sigQueryImageChange.connect(self._onChangeImage)
self.reflectionSel.sigQueryCenterPlot.connect(self._onCenterGraph)
self.ubcalc.setReflectionHandler(self.getReflections)
self.ubcalc.sigPlottableMachineParamsChanged.connect(self._onPlotMachineParams)
self.ubcalc.sigReplotRequest.connect(self.updatePlotItems)
self.allimgsum = None
self.allimgmax = None
self.reflectionSel.setSizePolicy(qt.QSizePolicy.Preferred, qt.QSizePolicy.Expanding)
ubLayout.addWidget(self.reflectionSel)
self.ubcalc.setSizePolicy(qt.QSizePolicy.Preferred, qt.QSizePolicy.Maximum)
ubLayout.addWidget(self.ubcalc)
ubWidget.setLayout(ubLayout)
ubDock.setWidget(ubWidget)
self.centralPlot.addDockWidget(qt.Qt.RightDockWidgetArea,ubDock)
menu_bar = qt.QMenuBar()
file = menu_bar.addMenu("&File")
file.addAction(self.scanSelector.openFileAction)
file.addAction(self.scanSelector.refreshFileAction)
file.addAction(self.scanSelector.closeFileAction)
file.addSeparator()
self.folderToScan = file.addAction("Generate scan from images")
self.folderToScan.triggered.connect(self._onLoadScanFromImages)
self.interlacedScanMenu = file.addAction("Load interlaced/segmented scan")
self.interlacedScanMenu.triggered.connect(self._onLoadInterlacedScan)
file.addSeparator()
self.loadImagesAct = file.addAction("reload images")
self.loadImagesAct.triggered.connect(self._onLoadAll)
config_menu = menu_bar.addMenu("&Config")
loadConfigAct = qt.QAction("Load config",self) # connected with UBCalculator below
#loadXtalAct = qt.QAction("Load Crystal file",self)
machineParamsAct = qt.QAction("Machine parameters",self)
machineParamsAct.setCheckable(True)
xtalParamsAct = qt.QAction("Crystal parameters",self)
xtalParamsAct.setCheckable(True)
cpucountAct = qt.QAction("Set CPU count",self)
roicountAct = qt.QAction("Set ROI count",self)
loadConfigAct.triggered.connect(self.ubcalc._onLoadConfig)
machineParamsAct.toggled.connect(lambda checked: self.ubcalc.machineDialog.setVisible(checked))
self.ubcalc.machineDialog.sigHide.connect(lambda : machineParamsAct.setChecked(False))
xtalParamsAct.toggled.connect(lambda checked: self.ubcalc.xtalDialog.setVisible(checked))
self.ubcalc.xtalDialog.sigHide.connect(lambda : xtalParamsAct.setChecked(False))
cpucountAct.triggered.connect(self._onSelectCPUcount)
roicountAct.triggered.connect(self._onSelectROIcount)
self.autoLoadAct = qt.QAction("Auto load scans",self)
self.autoLoadAct.setCheckable(True)
self.autoLoadAct.setChecked(True)
self.showExcludedImagesAct = qt.QAction("Excluded images",self)
self.showExcludedImagesAct.setCheckable(True)
self.showExcludedImagesAct.toggled.connect(lambda visible : self.excludedImagesDialog.setVisible(visible))
self.backgroundImageAct = qt.QAction("Subtract/select background image",self)
self.backgroundImageAct.toggled.connect(self._onSetBackgroundImage)
self.backgroundImageAct.setCheckable(True)
self.backgroundImageAct.setChecked(False)
self.dbCompressionAct = qt.QAction("Database compression",self)
self.dbCompressionAct.triggered.connect(self._onChangeDBCompression)
config_menu.addAction(loadConfigAct)
#config_menu.addAction(loadXtalAct)
config_menu.addSeparator()
config_menu.addAction(machineParamsAct)
config_menu.addAction(xtalParamsAct)
config_menu.addSeparator()
config_menu.addAction(cpucountAct)
config_menu.addAction(self.dbCompressionAct)
config_menu.addAction(self.autoLoadAct)
config_menu.addSeparator()
config_menu.addAction(self.showExcludedImagesAct)
config_menu.addAction(self.backgroundImageAct)
view_menu = menu_bar.addMenu("&View")
showRefReflectionsAct = view_menu.addAction("reference reflections")
showRefReflectionsAct.setCheckable(True)
showRefReflectionsAct.setChecked(True)
showRefReflectionsAct.toggled.connect(lambda checked: self.reflectionSel.setReferenceReflectionsVisible(checked))
showBraggAct = view_menu.addAction("allowed Bragg reflections")
showBraggAct.setCheckable(True)
showBraggAct.setChecked(False)
showBraggAct.toggled.connect(self.onShowBragg)
showROIAct = view_menu.addAction("show ROI")
showROIAct.setCheckable(True)
showROIAct.setChecked(False)
showROIAct.toggled.connect(self.onShowROI)
self.roivisible = False
#self.scanSelector.showROICheckBox.addAction(showROIAct)
self.showCTRreflAct = view_menu.addAction("CTR reflections")
self.showCTRreflAct.setCheckable(True)
self.showCTRreflAct.setChecked(False)
self.showCTRreflAct.toggled.connect(self.onShowCTRreflections)
self.reflectionsVisible = False
self.showMachineParamsAct = view_menu.addAction("machine parameters")
self.showMachineParamsAct.setCheckable(True)
self.showMachineParamsAct.setChecked(False)
self.showMachineParamsAct.toggled.connect(self._onPlotMachineParams)
view_menu.addSeparator()
view_menu.addAction(roicountAct)
view_menu.addSeparator()
saveBraggAct = view_menu.addAction("save allowed Bragg reflections")
saveBraggAct.setCheckable(False)
saveBraggAct.triggered.connect(self.saveBraggRefl)
if console:
view_menu.addSeparator()
custom_banner = f"""orGUI v. {__version__} console
Available variables:
orgui : top level gui
ub : gui for UB matrix and angle calculations
"""
self.console_dockwidget = console.IPythonDockWidget(self, {'orgui': self, 'ub': self.ubcalc, 'ROIutils' : ROIutils}, custom_banner, "orGUI console")
self.console_dockwidget.setAllowedAreas(qt.Qt.LeftDockWidgetArea | qt.Qt.RightDockWidgetArea | qt.Qt.BottomDockWidgetArea)
self.tabifyDockWidget(selectorDock,self.console_dockwidget)
#self.addDockWidget(qt.Qt.LeftDockWidgetArea,self.console_dockwidget)
self.console_dockwidget.setVisible(False)
consoleViewAct = self.console_dockwidget.toggleViewAction()
view_menu.addAction(consoleViewAct)
##############################
editUAct = qt.QAction("Edit orientation matrix",self)
editUAct.setCheckable(True)
editUAct.toggled.connect(lambda checked: self.ubcalc.ueditDialog.setVisible(checked))
self.ubcalc.ueditDialog.sigHide.connect(lambda : editUAct.setChecked(False))
calcCTRsAvailableAct = qt.QAction("Calculate available CTRs",self)
calcCTRsAvailableAct.triggered.connect(self._onCalcAvailableCTR)
rs = menu_bar.addMenu("&Reciprocal space")
rs.addAction(calcCTRsAvailableAct)
rs.addAction(editUAct)
simul = menu_bar.addMenu("&Simulation")
createScanAct = simul.addAction("Create dummy scan")
createScanAct.triggered.connect(self._onCreateScan)
helpmenu = menu_bar.addMenu("&Help")
diffractAct = helpmenu.addAction("Diffraction geometry")
diffractAct.triggered.connect(self._onShowDiffractionGeometry)
helpmenu.addSeparator()
aboutAct = helpmenu.addAction("About")
aboutAct.triggered.connect(self._onShowAbout)
aboutQtAct = helpmenu.addAction("About Qt")
aboutQtAct.triggered.connect(lambda : qt.QMessageBox.aboutQt(self))
self.setMenuBar(menu_bar)
if configfile is not None:
self.ubcalc.readConfig(configfile)
def _removeAllIntegrPlotCurves(self):
"""CLI-capable: clear all integration curves from the plot widget."""
# remove plotted curves
#curveList = self.integrdataPlot.getAllCurves(just_legend=True)
#for i in curveList:
# self.integrdataPlot.removeCurve(i)
# remove plotted curves and curves that have been hidden
itemList = self.integrdataPlot.getAllCurves(withhidden=True)
for it in itemList:
self.integrdataPlot.removeCurve(it)
def _removeIntegrPlotCurveSet(self):
"""GUI-only: open a dialog to choose integration curves to hide/delete."""
curveList = self.integrdataPlot.getAllCurves(just_legend=True,withhidden=True)
# find out if curves are already hidden to later check boxes in QT GUI window
hidden = np.zeros_like(curveList,dtype='?')
for nr,i in enumerate(curveList):
if self.integrdataPlot.getCurve(i).getVisibleBounds() == None:
hidden[nr] = True
d = QPlotDeleteWindow(curveList,hidden)
if d.exec() == qt.QDialog.Accepted:
if d.action == 'delete':
try:
for i,j in enumerate(d.boxes):
if j.isChecked() == True:
self.integrdataPlot.removeCurve(curveList[i])
self.integrdataPlot.getLegendsDockWidget().updateLegends()
except MemoryError:
logger.exception("Can not delete selected plots.",
extra={'title' : "Can not delete selected plots.",
'description' : "Can not delete selected plots.",
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
elif d.action == 'hide':
try:
for i,j in enumerate(d.boxes):
if j.isChecked() == True:
self.integrdataPlot.getCurve(curveList[i]).setVisible(False)
self.integrdataPlot.getLegendsDockWidget().updateLegends()
except MemoryError:
logger.exception("Can not delete selected plots.",
extra={'title' : "Can not delete selected plots.",
'description' : "Can not delete selected plots.",
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
[docs]
def get_rocking_coordinates(self, H_0=None, H_1=None, maxValue=None,step_width=None, **kwargs):
"""Calculate detector coordinates along a reciprocal-space line.
The trajectory is the line :math:`\\vec{H}_0 + s\\vec{H}_1` in
reciprocal space, where :math:`\\vec{H}_0` is a reciprocal-space origin
vector, :math:`\\vec{H}_1` is a direction vector, and :math:`s` is a
scalar trajectory coordinate.
:param numpy.ndarray H_0:
Starting reciprocal-space vector in r.l.u. Shape ``(3,)``.
:param numpy.ndarray H_1:
Reciprocal-space direction vector in r.l.u. Shape ``(3,)``.
:param float maxValue:
Maximum scalar :math:`s` value sampled along
:math:`\\vec{H}_0 + s\\vec{H}_1`.
:param float step_width:
Step width in scalar ``s`` units.
:returns:
Reflection dictionary with detector pixel coordinates, masks, and
trajectory values.
:rtype: dict
.. note::
CLI-capable. Missing arguments are read from GUI widgets.
"""
# going back to the more universal integration along H_0 + s*H_1 coordinates
if H_0 is None:
H_0 = self.scanSelector.ro_H_0_dialog.get_hkl()
if H_1 is None:
H_1 = self.scanSelector.ro_H_1_dialog.get_hkl() # default to CTR integration
if step_width is None:
step_width = self.scanSelector.roscanDeltaS.value()
if maxValue is None:
maxValue = self.scanSelector.roscanMaxS.value()
dc = self.ubcalc.detectorCal
xoffset, yoffset = self.scanSelector.roioptions.get_offsets()
xoffset = kwargs.get('xoffset', xoffset)
yoffset = kwargs.get('yoffset', yoffset)
step_nr = round(maxValue/step_width) + 1
s_points = np.linspace(0,maxValue,step_nr)
hkl_desired = np.outer(H_1, s_points).T + H_0 # F contiguous is faster in anglesToHKL
refldict = self.ubcalc.calcReflection(hkl_desired) # F contiguous is faster
ymask1 = np.logical_and(refldict['xy_1'][...,1] >= 0, refldict['xy_1'][...,1] < dc.detector.shape[0])
xmask1 = np.logical_and(refldict['xy_1'][...,0] >= 0, refldict['xy_1'][...,0] < dc.detector.shape[1])
yxmask1 = np.logical_and(xmask1,ymask1)
ymask2 = np.logical_and(refldict['xy_2'][...,1] >= 0, refldict['xy_2'][...,1] < dc.detector.shape[0])
xmask2 = np.logical_and(refldict['xy_2'][...,0] >= 0, refldict['xy_2'][...,0] < dc.detector.shape[1])
yxmask2 = np.logical_and(xmask2,ymask2)
refldict['mask_1'] = yxmask1
refldict['mask_2'] = yxmask2
refldict['s'] = s_points
if xoffset != 0. or yoffset != 0.:
#warnings.warn("Nonzero pixel offset selected. Experimental feature! Angles and hkl are incorrect!!!")
refldict['xy_1'][..., 0] += xoffset
refldict['xy_2'][..., 0] += xoffset
refldict['xy_1'][..., 1] += yoffset
refldict['xy_2'][..., 1] += yoffset
refldict['H_0'] = H_0
refldict['H_1'] = H_1
return refldict
[docs]
def get_Bragg_rocking_coordinates(self, strainVec=None, **kwargs):
"""Calculate detector coordinates for Bragg rocking reflections.
:param numpy.ndarray strainVec:
Optional fractional strain applied component-wise to the direct
lattice constants ``a`` before calculating Bragg coordinates. This
scales each lattice constant as ``a_strained = a * (1 + strainVec)``
and does not represent a full strain tensor. If omitted, GUI spin
boxes are read and interpreted as percent strain.
:returns:
Reflection dictionary with hkl values in r.l.u., detector pixel
coordinates, angles in rad, and masks.
:rtype: dict
.. note::
CLI-capable for ``th`` scans when state is already configured.
"""
if strainVec is None:
strainVec = np.array([h.value() for h in self.scanSelector.strain_Bragg])/100.0
xoffset, yoffset = self.scanSelector.roioptions.get_offsets()
xoffset = kwargs.get('xoffset', xoffset)
yoffset = kwargs.get('yoffset', yoffset)
if self.fscan is not None:
if self.fscan.axisname != 'th':
raise NotImplementedError("Calculation of available Bragg reflections is not implemented for %s - scans" % self.fscan.axisname)
xtal = self.ubcalc.crystal
ommin = np.deg2rad(np.amin(self.fscan.omega))
ommax = np.deg2rad(np.amax(self.fscan.omega))
dc = self.ubcalc.detectorCal
mu = self.ubcalc.mu
ub = self.ubcalc.ubCal
chi = self.ubcalc.chi
phi = self.ubcalc.phi
xtal.setEnergy(ub.getEnergy()*1e3)
# apply strain:
ub_strained = copy.deepcopy(ub)
xtal_cp = copy.deepcopy(xtal)
xtal_cp.a = xtal_cp.a * (1. + strainVec)
ub_strained.setLattice(xtal_cp)
if self.scanSelector.bragg_multiple_enable.isChecked():
hkl_factor = np.array([h.value() for h in self.scanSelector.bragg_multiple])
a = xtal_cp.a
a /= hkl_factor
xtal_singleatom = CTRcalc.UnitCell(a, np.rad2deg(xtal_cp.alpha))
xtal_singleatom.addAtom('Pt',[0.,0.,0.],0.1,0.1,1.)
xtal_singleatom.setEnergy(ub.getEnergy()*1e3)
ub_strained.setLattice(xtal_singleatom)
try:
hkls, yx, angles = rn.thscanBragg(xtal_singleatom,ub_strained,mu,dc,(ommin,ommax), chi=chi, phi=phi)
hkls = hkls.astype(np.float64)
hkls *= hkl_factor.T
except Exception as e:
raise Exception("Cannot calculate Bragg reflections") from e
else:
try:
hkls, yx, angles = rn.thscanBragg(xtal_cp,ub_strained,mu,dc,(ommin,ommax), chi=chi, phi=phi)
hkls = hkls.astype(np.float64)
except Exception as e:
raise Exception("Cannot calculate Bragg reflections") from e
if hkls.size > 0:
s_points = np.arange(hkls.shape[0])
mask = np.ones(hkls.shape[0], dtype=bool)
else:
s_points = np.array([])
mask = np.array([])
refldict = {
'hkl' : hkls,
'xy_1' : yx[:,::-1],
'angles_1' : angles,
's' : s_points,
'mask_1' : mask
}
if xoffset != 0.0 or yoffset != 0.0:
#warnings.warn("Nonzero pixel offset selected. Experimental feature! Angles and hkl are incorrect!!!")
refldict['xy_1'][..., 0] += xoffset
refldict['xy_1'][..., 1] += yoffset
return refldict
[docs]
def intkeys_rocking(self, refldict, **kwargs):
"""Build center ROI slice coordinates for rocking-scan integration.
This converts detector coordinates from ``refldict`` into rectangular
center-ROI slice tuples. The slice tuple order is detector
``(x, y)``/``(horizontal, vertical)``. NumPy images are indexed as
``image[y_slice, x_slice]``, so integration code applies these slices
as ``key[::-1]``. Coordinates and ROI extents are clipped to the
detector bounds.
:param dict refldict:
Reflection dictionary produced by a rocking coordinate helper. For
CTR-style rocking scans, it represents points sampled along the
reciprocal-space line :math:`\\vec{H}_0 + s\\vec{H}_1`. The selected
intersection must provide ``xy_<intersect>`` detector coordinates
in pixels, and normally ``mask_<intersect>`` detector-valid flags.
:param int vsize:
Optional vertical base ROI size in pixels. Defaults to the GUI
value and, by default, may be adjusted by advanced ROI options such
as detector inclination and projected sample size.
:param int hsize:
Optional horizontal base ROI size in pixels. Defaults to the GUI
value and, by default, may be adjusted by advanced ROI options such
as detector inclination and projected sample size.
:param numpy.ndarray size_exact:
Optional per-point ROI sizes in pixels with columns
``(hsize, vsize)``. If provided, advanced ROI size calculation is
skipped and these exact per-point sizes are used.
:param bool mask:
Whether to apply ``mask_<intersect>`` before creating slices.
Defaults to ``True``.
:param bool autovsize:
Whether to derive vertical ROI size from the median vertical
spacing between neighboring detector coordinates. Defaults to the
GUI setting.
:param bool autohsize:
Whether to derive horizontal ROI size from the median horizontal
spacing between neighboring detector coordinates. Defaults to the
GUI setting.
:param int intersect:
Detector-intersection index to use, usually ``1`` or ``2``.
Defaults to the GUI-selected intersection.
:returns:
Dictionary containing ``center`` as a list of ``(x_slice, y_slice)``
ROI coordinates, effective ``vsize`` and ``hsize`` in pixels, and
``size_exact`` when corrected per-point sizes were used.
:rtype: dict
.. note::
CLI-capable. Defaults are read from GUI ROI controls.
"""
vsize = kwargs.get('vsize' ,int(self.scanSelector.vsize.value()))
hsize = kwargs.get('hsize' ,int(self.scanSelector.hsize.value()))
size_exact = kwargs.get('size_exact', None)
apply_mask = kwargs.get('mask' ,True)
autoROIVsize = kwargs.get('autovsize', self.scanSelector.autoROIVsize.isChecked())
autoROIHsize = kwargs.get('autohsize', self.scanSelector.autoROIHsize.isChecked())
if self.scanSelector.intersS1Act.isChecked():
intersect = 1
elif self.scanSelector.intersS2Act.isChecked():
intersect = 2
else:
intersect = 1 # default
intersect = kwargs.get('intersect', intersect)
xy = refldict['xy_%s' % int(intersect)]
if size_exact is None:
roioptions = self.scanSelector.roioptions.get_parameters()
if roioptions['DetectorInclination'] or roioptions['ProjectSampleSize']:
if roioptions['ProjectSampleSize']:
size_exact = ROIutils.calc_corrections(xy,
self.ubcalc.detectorCal,
np.array([hsize, vsize]),
roioptions,
roioptions['DetectorInclination'],
roioptions['factor'])
else:
size_exact = ROIutils.calc_corrections(xy,
self.ubcalc.detectorCal,
np.array([hsize, vsize]),
None,
roioptions['DetectorInclination'],
roioptions['factor'])
if apply_mask:
xy = xy[refldict['mask_%s' % int(intersect)]]
if size_exact is not None:
size_exact = size_exact[refldict['mask_%s' % int(intersect)]]
step_nr = xy.shape[0]
if step_nr == 0:
return {'center' : [], 'vsize' : vsize, 'hsize': hsize}
if step_nr > 1:
if autoROIVsize:
#dist_in_pixels = np.abs(xy[0][1] - xy[-1][1])
dist_in_pixels = np.median(np.abs(np.diff(xy[:,1])))
#roi_vlength = np.ceil(dist_in_pixels/step_nr)
vsize = int(np.ceil(dist_in_pixels))
if size_exact is not None:
size_exact[:, 1] = vsize
if autoROIHsize:
#dist_in_pixels = np.abs(xy[0][1] - xy[-1][1])
dist_in_pixels = np.median(np.abs(np.diff(xy[:,0])))
#roi_vlength = np.ceil(dist_in_pixels/step_nr)
hsize = int(np.ceil(dist_in_pixels))
if size_exact is not None:
size_exact[:, 0] = hsize
detvsize, dethsize = self.ubcalc.detectorCal.detector.shape
coord_restr = np.clip( xy, [0,0], [dethsize, detvsize])
if size_exact is not None:
vhalfsize = size_exact[:, 1] // 2
hhalfsize = size_exact[:, 0] // 2
fromcoords = np.round(coord_restr - np.vstack([hhalfsize, vhalfsize]).T)
tocoords = np.round(coord_restr + np.vstack([hhalfsize, vhalfsize]).T)
mask_hsize = (size_exact[:, 0] % 2).astype(bool)
remainder_mask = coord_restr[:,0] % 1 < 0.5
tocoords[mask_hsize & remainder_mask, 0] += 1
fromcoords[mask_hsize & ~remainder_mask, 0] -= 1
mask_vsize = (size_exact[:, 1] % 2).astype(bool)
remainder_mask = coord_restr[:,1] % 1 < 0.5
tocoords[mask_vsize & remainder_mask, 1] += 1
fromcoords[mask_vsize & ~remainder_mask, 1] -= 1
else:
vhalfsize = vsize // 2
hhalfsize = hsize // 2
fromcoords = np.round(coord_restr - np.array([hhalfsize, vhalfsize]))
tocoords = np.round(coord_restr + np.array([hhalfsize, vhalfsize]))
if hsize % 2:
remainder_mask = coord_restr[:,0] % 1 < 0.5
tocoords[remainder_mask, 0] += 1
fromcoords[~remainder_mask, 0] -= 1
if vsize % 2:
remainder_mask = coord_restr[:,1] % 1 < 0.5
tocoords[remainder_mask, 1] += 1
fromcoords[~remainder_mask, 1] -= 1
fromcoords = np.clip( fromcoords, [0,0], [dethsize, detvsize])
tocoords = np.clip( tocoords, [0,0], [dethsize, detvsize])
locations = []
for roifrom, toroi in zip(fromcoords, tocoords): # any way to do this with ndarray operations?
locations.append(tuple(slice(int(fromcoord), int(tocoord)) for fromcoord, tocoord in zip(roifrom,toroi)))
roi_dict = {'center' : locations, 'vsize' : vsize, 'hsize': hsize}
if size_exact is not None:
roi_dict['size_exact'] = size_exact
return roi_dict
[docs]
def intbkgkeys_rocking(self, refldict, **kwargs):
"""Build center and background ROI slice coordinates.
This extends :meth:`intkeys_rocking` by adding left, right, top, and
bottom background ROIs around each center ROI. Slice tuple order stays
in detector ``(x, y)``/``(horizontal, vertical)`` coordinates. NumPy
images are indexed as ``image[y_slice, x_slice]``, so integration code
applies these slices as ``key[::-1]``.
:param dict refldict:
Reflection dictionary produced by a rocking coordinate helper. For
CTR-style rocking scans, it represents points sampled along the
reciprocal-space line :math:`\\vec{H}_0 + s\\vec{H}_1`. The selected
intersection must provide ``xy_<intersect>`` detector coordinates
in pixels, and normally ``mask_<intersect>`` detector-valid flags.
:param int left:
Optional left-background width in detector pixels. Defaults to the
GUI value.
:param int right:
Optional right-background width in detector pixels. Defaults to the
GUI value.
:param int top:
Optional top-background height in detector pixels. Defaults to the
GUI value.
:param int bottom:
Optional bottom-background height in detector pixels. Defaults to
the GUI value.
:param kwargs:
Center-ROI options forwarded to :meth:`intkeys_rocking`, including
``vsize``, ``hsize``, ``size_exact``, ``mask``, ``autovsize``,
``autohsize``, and ``intersect``.
:returns:
Dictionary containing ``center``, ``left``, ``right``, ``top``, and
``bottom`` lists of ``(x_slice, y_slice)`` ROI coordinates,
effective center-ROI ``vsize`` and ``hsize`` in pixels, and
``size_exact`` when corrected per-point sizes were used.
:rtype: dict
.. note::
CLI-capable. Defaults are read from GUI ROI controls.
"""
left = kwargs.get( 'left' ,int(self.scanSelector.left.value()))
right = kwargs.get( 'right' ,int(self.scanSelector.right.value()))
top = kwargs.get( 'top' ,int(self.scanSelector.top.value()))
bottom = kwargs.get( 'bottom' ,int(self.scanSelector.bottom.value()))
detvsize, dethsize = self.ubcalc.detectorCal.detector.shape
roi_dict = self.intkeys_rocking(refldict, **kwargs)
crois = roi_dict['center']
leftrois = []
rightrois = []
toprois = []
bottomrois = []
for croi in crois:
leftrois.append((slice(int(np.clip(croi[0].start - left, 0, dethsize)), croi[0].start), croi[1]))
rightrois.append((slice(croi[0].stop,int(np.clip(croi[0].stop + right, 0, dethsize))), croi[1]))
toprois.append((croi[0], slice(int(np.clip(croi[1].start - top, 0, detvsize)), croi[1].start)))
bottomrois.append((croi[0], slice(croi[1].stop, int(np.clip(croi[1].stop + bottom,0,detvsize)) )))
roi_dict['left'] = leftrois
roi_dict['right'] = rightrois
roi_dict['top'] = toprois
roi_dict['bottom'] = bottomrois
return roi_dict
[docs]
def rocking_integrate(self,xylist, rois, hkl_del_gam, refldict, name):
"""Integrate rocking-scan images over prepared ROIs.
For CTR-style rocking scans, ``refldict`` may contain
:math:`\\vec{H}_0` and :math:`\\vec{H}_1` reciprocal-space vectors in
r.l.u.; together they define the line
:math:`\\vec{H}_0 + s\\vec{H}_1` that is recorded in the output
trajectory.
:param numpy.ndarray xylist:
ROI center coordinates in detector pixels.
:param dict rois:
Center and background ROI slice definitions.
:param numpy.ndarray hkl_del_gam:
Per-point hkl, detector-angle, and pixel metadata.
:param dict refldict:
Reflection metadata used to describe the saved trajectory.
:param str name:
Dataset name stem for saved integrated data.
:returns:
Status dictionary describing success, cancellation, or error.
:rtype: dict
.. note::
CLI-capable. Progress reporting is routed through ``logger_utils``.
"""
logger.info("Start rocking integration of scan %s" % name)
try:
image = self.fscan.get_raw_img(0)
except Exception as e:
logger.exception("No image found in current scan.",
extra={'title' : 'No image found in current scan',
'description' : 'Cannot integrate scan: No image found in current scan.',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
return {'status': 'error', 'message' : 'No image found in current scan', 'traceback' : traceback.format_exc()}
if self.database.nxfile is None:
logger.exception("Cannot integrate scan: No database available.",
extra={'title' : 'Cannot integrate scan',
'description' : 'Cannot integrate scan: No database available.',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
return {'status': 'error', 'message' : 'No database available'}
dc = self.ubcalc.detectorCal
imgmask = None
if self.scanSelector.useMaskBox.isChecked():
if self.centralPlot.getMaskToolsDockWidget().getSelectionMask() is None:
if logger_utils.get_logging_context() == 'gui':
btn = qt.QMessageBox.question(self,"No mask available","""No mask was selected with the masking tool.
Do you want to continue without mask?""")
if btn != qt.QMessageBox.Yes:
return {'status': 'cancelled', 'message' : 'Reason: no mask selected'}
logger.warn("No mask was selected with the masking tool.")
else:
imgmask = self.centralPlot.getMaskToolsDockWidget().getSelectionMask() > 0.
corr = self.scanSelector.useSolidAngleBox.isChecked() or\
self.scanSelector.usePolarizationBox.isChecked()
C_arr = np.ones(dc.detector.shape,dtype=np.float64)
if self.scanSelector.useSolidAngleBox.isChecked():
C_arr /= dc.solidAngleArray()
if self.scanSelector.usePolarizationBox.isChecked():
C_arr /= dc.polarization(factor=dc._polFactor,axis_offset=dc._polAxis)
def fill_counters(image,pixelavail, key, bkgkey):
"""CLI-safe: sum one center ROI and its background ROIs."""
cimg = image[key[::-1]]
# !!!!!!!!!! add mask here !!!!!!!!!
croi = np.nansum(cimg)
cpixel = np.nansum(pixelavail[key[::-1]])
bgroi = 0.
bgpixel = 0.
for bg in bkgkey:
bgimg = image[bg[::-1]]
bgroi += np.nansum(bgimg)
bgpixel += np.nansum(pixelavail[bg[::-1]])
return (croi, cpixel, bgroi, bgpixel)
hkl_del_gam_1 = hkl_del_gam[0] # needed to initialize integration
# initialize 1d np arrays for storing roi integration counters for all images
croi1_a = np.zeros_like(hkl_del_gam_1.shape[0],dtype=np.float64)
cpixel1_a = np.zeros_like(hkl_del_gam_1.shape[0],dtype=np.float64)
bgroi1_a = np.zeros_like(hkl_del_gam_1.shape[0],dtype=np.float64)
bgpixel1_a = np.zeros_like(hkl_del_gam_1.shape[0],dtype=np.float64)
# initialize 2d np array to store roi integration counters together for all images/ROIs
croi1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
cpixel1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
bgroi1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
bgpixel1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
Corr_croi1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
Corr_cpixel1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
Corr_bgroi1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
Corr_bgpixel1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
bgimg_croi1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
bgimg_cpixel1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
bgimg_bgroi1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
bgimg_bgpixel1_all = np.zeros((hkl_del_gam_1.shape[0],) + (xylist.shape[0],),dtype=np.float64)
progress = logger_utils.create_progress_logger(self, len(self.fscan), "Integrating images")
background_image = self.background_image
has_bg_img = False
if HAS_ACCEL:
if imgmask is not None:
mask = np.ascontiguousarray(imgmask, dtype=bool)
else:
mask = np.zeros(image.img.shape, dtype=bool)
if corr:
C_arr = np.ascontiguousarray(C_arr, dtype=np.float64)
else:
C_arr = np.ones(image.img.shape, dtype=np.float64)
C_arr[mask] = np.nan
roi_lists_numba = []
for roiname in ['center', 'left', 'right', 'top', 'bottom']:
roi_list = []
for r in rois[roiname]:
roi_list.append(np.array([[r[0].start , r[0].stop], [r[1].start , r[1].stop]]))
roi_list = np.ascontiguousarray(np.stack(roi_list), dtype=np.int64)
roi_lists_numba.append(roi_list)
if background_image is not None and background_image.shape == image.img.shape:
bg = background_image.astype(np.float64, order='C', copy=True)
bg[mask] = np.nan
has_bg_img = True
def sumImage(i):
"""CLI-safe worker: read and integrate one image with background."""
image = self.fscan.get_raw_img(i).img.astype(np.float64, order='C', copy=True) # unlocks gil during file read
all_counters = np.zeros((roi_lists_numba[0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
Carr_counters = np.zeros((roi_lists_numba[0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
BgImg_counters = np.zeros((roi_lists_numba[0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
_roi_sum_accel.processImage_bg_Carr(image, bg, mask, C_arr, *roi_lists_numba, all_counters, Carr_counters, BgImg_counters) # numba nopython and nogil mode
return all_counters, Carr_counters, BgImg_counters
else:
def sumImage(i):
"""CLI-safe worker: read and integrate one image."""
image = self.fscan.get_raw_img(i).img.astype(np.float64, order='C', copy=True) # unlocks gil during file read
Carr_counters = np.zeros((roi_lists_numba[0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
all_counters = np.zeros((roi_lists_numba[0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
_roi_sum_accel.processImage_Carr(image, mask, C_arr, *roi_lists_numba, all_counters, Carr_counters) # numba nopython and nogil mode
return all_counters, Carr_counters
else:
def sumImage(i):
"""CLI-safe worker: read and integrate one image without acceleration."""
image = self.fscan.get_raw_img(i).img.astype(np.float64, order='C', copy=True)
if background_image is not None and background_image.shape == image.shape:
np.subtract(image, background_image, out=image)
if imgmask is not None:
image[imgmask] = np.nan
pixelavail = (~imgmask).astype(np.float64)
else:
pixelavail = np.ones_like(image)
if corr:
image *= C_arr
all_counters1 = np.zeros((xylist.shape[0],) + (4,))
for crnr in range(xylist.shape[0]):
# set ROI (moved to rocking-function)
# get roi
key = rois['center'][crnr]
bgkey = [rois['left'][crnr], rois['right'][crnr], rois['top'][crnr], rois['bottom'][crnr]]
# fill counters
counters1 = fill_counters(image,pixelavail,key, bgkey)
all_counters1[crnr] = counters1
return all_counters1
cancelled = False
with concurrent.futures.ThreadPoolExecutor(max_workers=self.numberthreads) as executor:
futures = {}
for i in range(len(self.fscan)):
futures[executor.submit(sumImage, i)] = i
status = "no error"
for f in concurrent.futures.as_completed(futures): # iteration over jobs
try:
i = futures[f]
if has_bg_img:
img_counters, Carr_counters, BgImg_counters = f.result()
bgimg_croi1_all[i] = BgImg_counters.T[0]
bgimg_cpixel1_all[i] = BgImg_counters.T[1]
bgimg_bgroi1_all[i] = BgImg_counters.T[2]
bgimg_bgpixel1_all[i] = BgImg_counters.T[3]
else:
img_counters, Carr_counters = f.result()
croi1_all[i] = img_counters.T[0]
cpixel1_all[i] = img_counters.T[1]
bgroi1_all[i] = img_counters.T[2]
bgpixel1_all[i] = img_counters.T[3]
Corr_croi1_all[i] = Carr_counters.T[0]
Corr_cpixel1_all[i] = Carr_counters.T[1]
Corr_bgroi1_all[i] = Carr_counters.T[2]
Corr_bgpixel1_all[i] = Carr_counters.T[3]
#for j in range(len(img_counters)): # iteration over ROIs
# (croi1, cpixel1, bgroi1, bgpixel1) = img_counters[j]
#
# croi1_all[i][j] = croi1
# cpixel1_all[i][j] = cpixel1
# bgroi1_all[i][j] = bgroi1
# bgpixel1_all[i][j] = bgpixel1
progress.update(futures[f])
del f
except concurrent.futures.CancelledError:
del f
except Exception as e:
logger.warn("Cannot read image, cancel to avoid memory leak:\n%s" % traceback.format_exc())
[f.cancel() for f in futures]
cancelled = True
status = 'error'
exc_info = sys.exc_info()
del f
if progress.wasCanceled():
cancelled = True
[f.cancel() for f in futures]
break
progress.finish()
if cancelled:
if status == 'error':
trace = "".join(traceback.format_exception(*exc_info))
logger.error('Error during integration',
exc_info=exc_info,
extra={'title' : 'Error during integration',
'description' : 'Error during integration. Integration was aborted.',
'show_dialog' : True,
"dialog_level" : logging.ERROR,
'parent' : self})
return {'status': 'error', 'message' : 'Error during integration', 'traceback' : trace}
else:
return {'status': 'cancelled', 'message' : 'Reason: Cancelled during integration'}
currentPlotCount = len(self.integrdataPlot.getAllCurves())
numberOfNewPlots = xylist.shape[0]
maxAmountOfPlots = 30
plotOnlyNth = (numberOfNewPlots // max((maxAmountOfPlots-currentPlotCount),1)) + 1
#print('Number of integration curves: ' + str(numberOfPlots))
#print('We can plot every ' + str(plotOnlyNth) + '-th curve.' )
suffix = ''
i = 0
while(self.activescanname + "/measurement/" + name + suffix in self.database.nxfile):
suffix = "_%s" % i
i += 1
name = name + suffix
auxcounters = {"@NX_class": "NXcollection"}
for auxname in self.fscan.auxillary_counters:
if hasattr(self.fscan, auxname):
cntr = getattr(self.fscan, auxname)
if cntr is not None:
auxcounters[auxname] = cntr
if hasattr(self.fscan, "title"):
title = str(self.fscan.title)
else:
title = "%s-scan" % self.fscan.axisname
mu, om = self.getMuOm()
if len(np.asarray(om).shape) == 0:
om = np.full_like(mu,om)
if len(np.asarray(mu).shape) == 0:
mu = np.full_like(om,mu)
data = {self.activescanname:{ # legacy, to be removed!
"instrument": {
"@NX_class": "NXinstrument",
"positioners": {
"@NX_class": "NXcollection",
self.fscan.axisname: self.fscan.axis
}
},
"auxillary" : auxcounters,
"measurement": {
"@NX_class": "NXentry",
"@default": name ,
name : {
"@NX_class": "NXentry",
"@default": "rois",
"@orgui_meta": "rocking",
"rois" : {
"@NX_class": "NXcollection",
"@default": None,
"@orgui_meta": "roi rocking",
}
}
},
"title":"%s" % title,
"@NX_class": "NXentry",
"@default": "measurement/%s" % name,
"@orgui_meta": "scan"
}
}
croibg1_bgimg_a = None
croibg1_bgimg_err_a = None
#plot and save data in database
for d in range(croi1_all.shape[1]):
roi_d = rois['center'][d]
roi_size = (roi_d[0].stop - roi_d[0].start) * (roi_d[1].stop - roi_d[1].start)
hkl_del_gam_1 = hkl_del_gam[d]
croi1_a = croi1_all[...,d]
cpixel1_a = cpixel1_all[...,d]
bgroi1_a = bgroi1_all[...,d]
bgpixel1_a = bgpixel1_all[...,d]
Corr_croi1_a = Corr_croi1_all[...,d]
Corr_cpixel1_a = Corr_cpixel1_all[...,d]
Corr_bgroi1_a = Corr_bgroi1_all[...,d]
Corr_bgpixel1_a = Corr_bgpixel1_all[...,d]
bgimg_croi1_a = bgimg_croi1_all[...,d]
bgimg_cpixel1_a = bgimg_cpixel1_all[...,d]
bgimg_bgroi1_a = bgimg_bgroi1_all[...,d]
bgimg_bgpixel1_a = bgimg_bgpixel1_all[...,d]
Corr1 = Corr_croi1_a * ( roi_size / Corr_cpixel1_a) # normalize to number of pixels of center roi (croi)
if np.any(bgimg_cpixel1_a): # assume the background image has no errors (would need a separate error image for that)
bgimg_croi1_norm = bgimg_croi1_a * ( cpixel1_a / bgimg_cpixel1_a)
if np.any(bgpixel1_a):
bgimg_bgroi1_norm = bgimg_bgroi1_a * ( bgpixel1_a / bgimg_bgpixel1_a)
# method 1: simply subtract bg image from data and then subtract the remaining background
croibg1_a = ( (croi1_a - bgimg_croi1_norm) - (cpixel1_a/bgpixel1_a) * (bgroi1_a - bgimg_bgroi1_norm) ) * ( roi_size / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a + ((cpixel1_a/bgpixel1_a)**2) * bgroi1_a) * ( roi_size / cpixel1_a)
# method 2: scale bg image croi and subtract scaled bg image croi. Use ratio of bgroi of image and bg image as scale factor.
factor = bgroi1_a / bgimg_bgroi1_norm
croibg1_bgimg_a = ( croi1_a - factor * bgimg_croi1_norm ) * ( roi_size / cpixel1_a)
croibg1_bgimg_err_a = np.sqrt(croi1_a + ((cpixel1_a/bgpixel1_a)**2) * bgroi1_a) * ( roi_size / cpixel1_a)
else: # not possible if no bgroi is set.
croibg1_a = (croi1_a - bgimg_croi1_norm) * ( roi_size / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a) * ( roi_size / cpixel1_a)
else: # no background image
if np.any(bgpixel1_a):
croibg1_a = ( croi1_a - (cpixel1_a/bgpixel1_a) * bgroi1_a ) * ( roi_size / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a + ((cpixel1_a/bgpixel1_a)**2) * bgroi1_a) * ( roi_size / cpixel1_a)
else:
croibg1_a = croi1_a * ( roi_size / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a) * ( roi_size / cpixel1_a)
if corr:
croibg1_a *= Corr1
croibg1_err_a *= Corr1
if croibg1_bgimg_a is not None:
croibg1_bgimg_a *= Corr1
croibg1_bgimg_err_a *= Corr1
rod_mask1 = np.isfinite(croibg1_a)
axis_masked = hkl_del_gam_1[:,5][rod_mask1]
croibg1_a_masked = croibg1_a[rod_mask1]
croibg1_err_a_masked = croibg1_err_a[rod_mask1]
# save data
x, y = xylist[d]
name1 = "rocking_%s" % (d)
if 'angles' in refldict:
alpha1, delta1, gamma1, omega1, chi1, phi1 = refldict['angles'][d]
sixc_angles_hkl = {
"@NX_class": "NXpositioner",
"alpha" : np.rad2deg(alpha1),
"omega" : np.rad2deg(omega1),
"theta" : np.rad2deg(-1*omega1),
"delta" : np.rad2deg(delta1),
"gamma" : np.rad2deg(gamma1),
"chi" : np.rad2deg(chi1),
"phi" : np.rad2deg(phi1),
"@unit" : "deg"
}
traj1 = {
#"@direction" : u"Rocking scan at fixed pixel location along H_1*s + H_0 in reciprocal space",
"@NX_class": "NXcollection",
"axis" : hkl_del_gam_1[:,5],
"HKL_sixc_angles" : sixc_angles_hkl
}
# determine the type of rocking scan:
if 'H_1' in refldict: # H_1 * s H_0 -like rocking scan (CTR scan)
traj1["s"] = refldict['s_masked'][d]
traj1["H_1"] = refldict['H_1']
traj1["H_0"] = refldict['H_0']
# equal refldict['hkl_masked']?
traj1["HKL_pk"] = refldict['H_1']*refldict['s_masked'][d] + refldict['H_0']
elif 's_masked' in refldict:
traj1["s"] = refldict['s_masked'][d]
traj1["HKL_pk"] = refldict['hkl_masked'][d]
else:
traj1 = {
#"@direction" : u"Rocking scan at fixed pixel location along H_1*s + H_0 in reciprocal space",
"@NX_class": "NXcollection",
"axis" : hkl_del_gam_1[:,5]
}
suffix = ''
i = 0
while(self.activescanname + "/measurement/" + name + "/" + name1 + suffix in self.database.nxfile):
suffix = "_%s" % i
i += 1
availname1 = name1 + suffix
x, y = xylist[d] #
# x_coord1_a = xylist[:,0]
# y_coord1_a = xylist[:,1]
datas1 = {
"@NX_class": "NXdata",
"sixc_angles": {
"@NX_class": "NXpositioner",
"alpha" : np.rad2deg(mu),
"omega" : np.rad2deg(om),
"theta" : np.rad2deg(-1*om),
"delta" : np.rad2deg(hkl_del_gam_1[:,3]),
"gamma" : np.rad2deg(hkl_del_gam_1[:,4]),
"chi" : np.rad2deg(self.ubcalc.chi),
"phi" : np.rad2deg(self.ubcalc.phi),
"@unit" : "deg"
},
"hkl": {
"@NX_class": "NXcollection",
"h" : hkl_del_gam_1[:,0],
"k" : hkl_del_gam_1[:,1],
"l" : hkl_del_gam_1[:,2]
},
"counters":{
"@NX_class": "NXdetector",
"croibg" : croibg1_a,
"croibg_errors" : croibg1_err_a,
'croibg_bgimg': croibg1_bgimg_a, # when None, will not create data set
'croibg_bgimg_errors': croibg1_bgimg_err_a, # when None, will not create data set
"croi" : croi1_a,
"bgroi" : bgroi1_a,
"croi_pix" : cpixel1_a,
"bgroi_pix" : bgpixel1_a,
"Cfactors_croi" : Corr_croi1_a,
"Cfactors_bgroi" : Corr_bgroi1_a,
"bgimg_croi" : bgimg_croi1_a,
"bgimg_bgroi" : bgimg_bgroi1_a
},
"pixelcoord": {
"@NX_class": "NXdetector",
"x" : x,
"y" : y,
'vsize' : (roi_d[1].stop - roi_d[1].start),
'hsize' : (roi_d[0].stop - roi_d[0].start)
},
"trajectory" : traj1,
"@signal" : "counters/croibg",
"@axes": "trajectory/axis",
"@title": self.activescanname + "_" + availname1,
"@orgui_meta": "roi rocking"
}
data[self.activescanname]["measurement"][name]["rois"]["@default"] = availname1
if np.any(cpixel1_a > 0.):
data[self.activescanname]["measurement"][name]["rois"][availname1] = datas1
if d % plotOnlyNth == 0 and not min(croibg1_a_masked)==max(croibg1_a_masked):
self.integrdataPlot.addCurve(axis_masked,croibg1_a_masked,legend=self.activescanname + "_" + availname1,
xlabel="trajectory/%s" % self.fscan.axisname, ylabel="counters/croibg", yerror=croibg1_err_a_masked)
# lets keep legacy data structure for now
data_2d_structured = {self.activescanname:{
"instrument": {
"@NX_class": "NXinstrument",
"positioners": {
"@NX_class": "NXcollection",
self.fscan.axisname: self.fscan.axis
}
},
"auxillary" : auxcounters,
"measurement": {
"@NX_class": "NXentry",
"@default": name ,
name : {
"@NX_class": "NXentry",
"@default": "rois",
"@orgui_meta": "rocking"
}
},
"title":"%s" % title,
"@NX_class": "NXentry",
"@default": "measurement/%s" % name,
"@orgui_meta": "scan"
}
}
alpha = []; theta = []; delta = []; gamma = []; chi = []; phi = []; omega = []
alpha_pk = []; theta_pk = []; delta_pk = []; gamma_pk = []; chi_pk = []; phi_pk = []; omega_pk = []
x = []; y = []; h = []; k = []; l = []
croibg = []; croibg_errors = []; croi = []; bgroi = []; croi_pix = []; bgroi_pix = []
croibg_bgimg = []; croibg_bgimg_errors = []; Cfactors_croi = []; Cfactors_bgroi = []; bgimg_croi = []; bgimg_bgroi = []
axis = []; s = []; H_0 = []; H_1 = []; HKL_pk = []
vsize = []; hsize = []
#from IPython import embed; embed()
optional_labels = {
's' : s, 'H_1' : H_1, 'H_0' : H_0, 'HKL_pk' : HKL_pk
}
for sc in data[self.activescanname]["measurement"][name]["rois"]:
if sc.startswith('@'):
continue
try:
dsc = data[self.activescanname]["measurement"][name]["rois"][sc]
# 2D arrays
alpha.append(dsc["sixc_angles"]["alpha"])
theta.append(dsc["sixc_angles"]["theta"])
delta.append(dsc["sixc_angles"]["delta"])
gamma.append(dsc["sixc_angles"]["gamma"])
chi.append(dsc["sixc_angles"]["chi"])
phi.append(dsc["sixc_angles"]["phi"])
omega.append(dsc["sixc_angles"]["omega"])
# 2D arrays
h.append(dsc["hkl"]["h"])
k.append(dsc["hkl"]["k"])
l.append(dsc["hkl"]["l"])
# 2D arrays
croibg.append(dsc["counters"]["croibg"])
croibg_errors.append(dsc["counters"]["croibg_errors"])
croi.append(dsc["counters"]["croi"])
bgroi.append(dsc["counters"]["bgroi"])
croi_pix.append(dsc["counters"]["croi_pix"])
bgroi_pix.append(dsc["counters"]["bgroi_pix"])
if dsc["counters"]["croibg_bgimg"] is not None:
croibg_bgimg.append(dsc["counters"]["croibg_bgimg"])
croibg_bgimg_errors.append(dsc["counters"]["croibg_bgimg_errors"])
Cfactors_croi.append(dsc["counters"]["Cfactors_croi"])
Cfactors_bgroi.append(dsc["counters"]["Cfactors_bgroi"])
bgimg_croi.append(dsc["counters"]["bgimg_croi"])
bgimg_bgroi.append(dsc["counters"]["bgimg_bgroi"])
# 1D arrays
x.append(dsc["pixelcoord"]["x"])
y.append(dsc["pixelcoord"]["y"])
# 1D arrays
vsize.append(dsc["pixelcoord"]["vsize"])
hsize.append(dsc["pixelcoord"]["hsize"])
axis.append(dsc["trajectory"]["axis"])
for lbl in optional_labels:
if lbl in dsc["trajectory"]:
optional_labels[lbl].append(dsc["trajectory"][lbl])
# 1d Array
if "HKL_sixc_angles" in dsc["trajectory"]:
alpha_pk.append(dsc["trajectory"]["HKL_sixc_angles"]["alpha"])
theta_pk.append(dsc["trajectory"]["HKL_sixc_angles"]["theta"])
delta_pk.append(dsc["trajectory"]["HKL_sixc_angles"]["delta"])
gamma_pk.append(dsc["trajectory"]["HKL_sixc_angles"]["gamma"])
chi_pk.append(dsc["trajectory"]["HKL_sixc_angles"]["chi"])
phi_pk.append(dsc["trajectory"]["HKL_sixc_angles"]["phi"])
omega_pk.append(dsc["trajectory"]["HKL_sixc_angles"]["omega"])
except Exception as e:
logger.exception("Unexpected exception while creating data sets to save")
# from IPython import embed; embed()
# sys.exit(0)
rois = {
"@NX_class": "NXcollection",
"@default": "croibg",
"@orgui_meta": "roi rocking",
"alpha" : np.vstack(alpha),
"theta" : np.vstack(theta),
"delta" : np.vstack(delta),
"gamma" : np.vstack(gamma),
"chi" : np.vstack(chi),
"phi" : np.vstack(phi),
"omega" : np.vstack(omega),
"h" : np.vstack(h),
"k" : np.vstack(k),
"l" : np.vstack(l),
"croibg" : np.vstack(croibg),
"croibg_errors" : np.vstack(croibg_errors),
"croi" : np.vstack(croi),
"bgroi" : np.vstack(bgroi),
"croi_pix" : np.vstack(croi_pix),
"bgroi_pix" : np.vstack(bgroi_pix),
"Cfactors_croi" : np.vstack(Cfactors_croi),
"Cfactors_bgroi" : np.vstack(Cfactors_bgroi),
"bgimg_croi" : np.vstack(bgimg_croi),
"bgimg_bgroi" : np.vstack(bgimg_bgroi),
"x" : np.array(x),
"y" : np.array(y),
"vsize" : np.array(vsize),
"hsize" : np.array(hsize),
"axis" : np.vstack(axis),
}
if alpha_pk:
rois["alpha_pk"] = np.array(alpha_pk)
rois["theta_pk"] = np.array(theta_pk)
rois["delta_pk"] = np.array(delta_pk)
rois["gamma_pk"] = np.array(gamma_pk)
rois["chi_pk"] = np.array(chi_pk)
rois["phi_pk"] = np.array(phi_pk)
rois["omega_pk"] = np.array(omega_pk)
if croibg_bgimg:
rois["croibg_bgimg"] = np.vstack(croibg_bgimg)
rois["croibg_bgimg_errors"] = np.vstack(croibg_bgimg_errors)
for lbl in optional_labels:
if optional_labels[lbl]:
rois[lbl] = np.squeeze(np.vstack(optional_labels[lbl]))
scsize = rois['axis'].shape[0]
for t in rois:
if t.startswith('@'):
continue
if rois[t].shape[0] != scsize:
logger.error(
"Error during ro integration: roi %s does not match scan size %s. "
"This is likely a coding error",
t,
scsize,
)
return {'status': 'error', 'message' : "Error during ro integration: size mismatch", 'traceback' : ""}
data_2d_structured[self.activescanname]["measurement"][name]["rois"] = rois
self.database.add_nxdict(data_2d_structured)
logger.info(f"Rocking integration succeeded and data saved with name %s" % name)
return {'status': 'success'}
[docs]
def updatePlotItems(self, recalculate=True):
"""Refresh displayed ROI and reflection overlays.
:param bool recalculate:
Recalculate CTR coordinates before updating reflection markers.
.. note::
CLI-capable, but it mutates Qt plot widgets.
"""
if self.roivisible:
try:
self.updateROI()
except Exception:
pass
if self.reflectionsVisible:
if recalculate:
try:
hkm = self.calculateAvailableCTR()
hk = np.unique(hkm[:,:2],axis=0)
H_0 = np.hstack((hk, np.zeros((hk.shape[0],1))))
H_1 = np.array([0,0,1])
self.reflectionsToDisplay = H_0, H_1
except Exception:
self.showCTRreflAct.setChecked(False)
self.reflectionsVisible = False
self.updateReflections()
if self.reflectionSel.showBraggReflections:
try:
self.calcBraggRefl()
except:
pass
def _onSetBackgroundImage(self, checked):
"""GUI-only: select or clear a background image via a file dialog."""
if not checked:
self.background_image = None
self.plotImage(self.imageno) # will not raise Exception,
return
extensions = {}
for description, ext in silx.io.supported_extensions().items():
extensions[description] = " ".join(sorted(list(ext)))
extensions["NeXus layout from EDF files"] = "*.edf"
extensions["NeXus layout from TIFF image files"] = "*.tif *.tiff"
extensions["NeXus layout from CBF files"] = "*.cbf"
extensions["NeXus layout from MarCCD image files"] = "*.mccd"
all_supported_extensions = set()
for name, exts in extensions.items():
exts = exts.split(" ")
all_supported_extensions.update(exts)
all_supported_extensions = sorted(list(all_supported_extensions))
filters = []
filters.append("All supported files (%s)" % " ".join(all_supported_extensions))
for name, extension in extensions.items():
filters.append("%s (%s)" % (name, extension))
filters.append("All files (*)")
fileTypeFilter = ""
for f in filters:
fileTypeFilter += f + ";;"
# call dialog
filename,_ = qt.QFileDialog.getOpenFileName(self,"Open background image",'',fileTypeFilter[:-2])
if filename == '':
self.backgroundImageAct.setChecked(False)
self.plotImage(self.imageno) # will not raise Exception,
return
try:
with fabio.open(filename) as fabf:
self.background_image = fabf.data.astype(np.float64, order='C', copy=True)
self.plotImage(self.imageno) # will not raise Exception,
except Exception:
traceback.print_exc()
self.backgroundImageAct.setChecked(False)
self.plotImage(self.imageno) # will not raise Exception,
return
#dialog = ImageFileDialog.ImageFileDialog(self)
#result = dialog.exec()
#if result:
# self.background_image = dialog.selectedImage().astype(np.float64)
[docs]
def onShowBragg(self,visible):
"""GUI/CLI hint: toggle Bragg reflection overlays on the plot."""
try:
self.reflectionSel.setBraggReflectionsVisible(visible)
if visible:
self.calcBraggRefl()
except Exception:
logger.exception('Cannot show Bragg reflections',
extra={'title' : 'Cannot show Bragg reflections',
'description' : 'Cannot show Bragg reflections',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
# qutils.warning_detailed_message(self, "Cannot show show Bragg reflections", "Cannot show Bragg reflections", traceback.format_exc())
#qt.QMessageBox.critical(self,"Cannot show show Bragg reflections", "Cannot Cannot show Bragg reflections:\n%s" % traceback.format_exc())
[docs]
def onShowROI(self,visible):
"""GUI/CLI hint: toggle ROI display and refresh ROI graphics."""
self.roivisible = visible
try:
self.updateROI()
except Exception:
logger.exception('Cannot show ROI',
extra={'title' : 'Cannot show ROI',
'description' : 'Cannot show ROI',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
# qutils.warning_detailed_message(self, "Cannot show ROI", "Cannot show ROI", traceback.format_exc())
#qt.QMessageBox.critical(self,"Cannot show ROI", "Cannot Cannot show ROI:\n%s" % traceback.format_exc())
[docs]
def onShowCTRreflections(self,visible):
"""GUI/CLI hint: toggle CTR reflection overlays on the plot."""
self.reflectionsVisible = visible
if self.reflectionsVisible:
try:
hkm = self.calculateAvailableCTR()
except Exception:
self.showCTRreflAct.setChecked(False)
self.reflectionsVisible = False
logger.exception('Cannot calculate CTR locations',
extra={'title' : 'Cannot calculate CTR locations',
'description' : 'Cannot calculate CTR locations',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
# qutils.warning_detailed_message(self, "Cannot calculate CTR locations", "Cannot calculate CTR locatons", traceback.format_exc())
#qt.QMessageBox.critical(self,"Cannot calculate CTR locatons", "Cannot calculate CTR locatons:\n%s" % traceback.format_exc())
return
hk = np.unique(hkm[:,:2],axis=0)
H_0 = np.hstack((hk, np.zeros((hk.shape[0],1))))
H_1 = np.array([0,0,1])
self.reflectionsToDisplay = H_0, H_1
self.updateReflections()
def _onShowAbout(self):
"""GUI-only: show the modal About dialog."""
dial = AboutDialog(self, __version__)
dial.exec()
def _onShowDiffractionGeometry(self):
"""GUI-only: show or create the diffraction-geometry dialog."""
if hasattr(self, 'diffractometerdialog'):
self.diffractometerdialog.show()
else:
self.diffractometerdialog = QDiffractometerImageDialog()
self.diffractometerdialog.show()
def _onToggleExcludeImage(self, exclude):
"""CLI-capable: update the excluded-image list for the active image."""
currentimgno = self.scanSelector.slider.value()
data = self.excludedImagesDialog.getData()
imgno_in_excudearray = currentimgno in data
if imgno_in_excudearray and exclude:
return
if not imgno_in_excudearray and exclude:
data = np.hstack([data, currentimgno])
self.excludedImagesDialog.updateArrayData(data)
else:
data = data[data != currentimgno]
self.excludedImagesDialog.updateArrayData(data)
def _onSelectCPUcount(self):
"""GUI-only: ask the user for the worker-thread count."""
maxavail = os.cpu_count() if os.cpu_count() is not None else 1
if 'SLURM_CPUS_ON_NODE' in os.environ:
maxavail = int(os.environ['SLURM_CPUS_ON_NODE'])
cpus, success = qt.QInputDialog.getInt(self,"CPU count",
"CPU count (detected: %s)" % maxavail,
self.numberthreads,1)
if success:
self.numberthreads = cpus
def _onSelectROIcount(self):
"""GUI-only: ask the user for the maximum displayed ROI count."""
rois, success = qt.QInputDialog.getInt(self,"ROI count",
"Max ROI count to display (current: %s)" % self.maxROIs,
self.maxROIs,1)
if success:
self.maxROIs = rois
def _onChangeDBCompression(self):
"""GUI-only: ask the user for the database compression filter."""
filter_names = list(FILTERS.keys())
currentCompression = self.database.compression
for fn in filter_names:
if currentCompression == FILTERS[fn]:
break
idx = filter_names.index(fn)
selection, success = qt.QInputDialog.getItem(self,"Data compression settings",
"Available data base compression methods:\n(See discussion under https://github.com/tifuchs/orGUI/issues/16)\nRecommended: Blosc-lz4-Shuffle-5",
filter_names, idx, False)
if success:
self.database.compression = FILTERS[selection]
[docs]
def calcBraggRefl(self):
"""Calculate and display available Bragg reflections for the scan.
.. note::
CLI-capable for ``th`` scans, but it updates GUI reflection state.
"""
if self.fscan is not None:
if self.fscan.axisname != 'th':
raise NotImplementedError("Calculation of available Bragg reflections is not implemented for %s - scans" % self.fscan.axisname)
try:
xtal = self.ubcalc.crystal
ommin = np.deg2rad(np.amin(self.fscan.omega))
ommax = np.deg2rad(np.amax(self.fscan.omega))
dc = self.ubcalc.detectorCal
mu = self.ubcalc.mu
ub = self.ubcalc.ubCal
chi = self.ubcalc.chi
phi = self.ubcalc.phi
xtal.setEnergy(ub.getEnergy()*1e3)
hkls, yx, angles = rn.thscanBragg(xtal,ub,mu,dc,(ommin,ommax), chi=chi, phi=phi)
self.reflectionSel.setBraggReflections(hkls, yx, angles)
except Exception:
logger.exception('Cannot calculate Bragg reflections',
extra={'title' : 'Cannot calculate Bragg reflections',
'description' : 'Cannot calculate Bragg reflections',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
# qutils.warning_detailed_message(self, "Cannot calculate Bragg reflections", "Cannot calculate Bragg reflections", traceback.format_exc())
#qt.QMessageBox.critical(self,"Cannot calculate Bragg reflections", "Cannot calculate Bragg reflections:\n%s" % traceback.format_exc())
[docs]
def saveBraggRefl(self):
"""Save calculated Bragg reflection coordinates selected in the GUI.
.. note::
GUI-only. This path opens confirmation and file-save dialogs.
"""
try:
hkls, yx, angles = self.reflectionSel.getBraggReflections()
except ValueError:
if self.fscan is not None:
try:
xtal = self.ubcalc.crystal
ommin = np.deg2rad(np.amin(self.fscan.omega))
ommax = np.deg2rad(np.amax(self.fscan.omega))
dc = self.ubcalc.detectorCal
mu = self.ubcalc.mu
ub = self.ubcalc.ubCal
chi = self.ubcalc.chi
phi = self.ubcalc.phi
xtal.setEnergy(ub.getEnergy()*1e3)
hkls, yx, angles = rn.thscanBragg(xtal,ub,mu,dc,(ommin,ommax), chi=chi, phi=phi)
#self.reflectionSel.setBraggReflections(hkls, yx, angles)
except Exception:
logger.exception('Cannot calculate Bragg reflections',
extra={'title' : 'Cannot calculate Bragg reflections',
'description' : 'Cannot calculate Bragg reflections',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
# qutils.warning_detailed_message(self, "Cannot calculate Bragg reflections", "Cannot calculate Bragg reflections", traceback.format_exc())
#qt.QMessageBox.critical(self,"Cannot calculate Bragg reflections", "Cannot calculate Bragg reflections:\n%s" % traceback.format_exc())
return
else:
logger.exception('Cannot calculate Bragg reflections: No scan loaded.',
extra={'title' : 'Cannot calculate Bragg reflections',
'description' : 'Cannot calculate Bragg reflections: No scan loaded.',
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
# qt.QMessageBox.critical(self,"Cannot calculate Bragg reflections", "Cannot calculate Bragg reflections:\nNo scan loaded.")
return
hkm = np.concatenate((hkls, yx[:,::-1], np.rad2deg(angles)), axis=1)
sio = StringIO()
np.savetxt(sio,hkm,fmt="%.3f", delimiter='\t',header="H K L x y alpha delta gamma omega chi phi")
#Question dialog for saving the possible CTR locations
msgbox = qt.QMessageBox(qt.QMessageBox.Question,'Saving Bragg reflection ...',
'Found possible Bragg reflections. Do you want to save the following positions?',
qt.QMessageBox.Yes | qt.QMessageBox.No, self)
msgbox.setDetailedText(sio.getvalue())
clickedbutton = msgbox.exec()
#Question dialog for saving the possible CTR locations
#clickedbutton=qt.QMessageBox.question(self, 'Saving CTR locations...', 'Do you want to save the following positions: \n' + hkstring +"?");
if clickedbutton==qt.QMessageBox.Yes:
#File saving
fileTypeDict = {'dat Files (*.dat)': '.dat', 'txt Files (*.txt)': '.txt', 'All files (*)': '', }
fileTypeFilter = ""
for f in fileTypeDict:
fileTypeFilter += f + ";;"
filename, filetype = qt.QFileDialog.getSaveFileName(self,"Save reflections",
self.filedialogdir,
fileTypeFilter[:-2])
if filename == '':
return
self.filedialogdir = os.path.splitext(filename)[0]
filename += fileTypeDict[filetype]
np.savetxt(filename,hkm,fmt="%.3f",header="H K L x y alpha delta gamma omega chi phi")
[docs]
def calculateAvailableCTR(self):
"""Calculate CTR coordinates available to the current ``th`` scan.
:returns:
Array columns containing H, K, and detector-side information.
:rtype: numpy.ndarray
.. note::
CLI-safe when a compatible scan and UB state are loaded.
"""
if self.fscan is None:
raise Exception("No scan selected!")
if self.fscan.axisname != 'th':
raise NotImplementedError("Calculation of available CTRs is not implemented for %s - scans" % self.fscan.axisname)
xtal = self.ubcalc.crystal
ommin = np.deg2rad(np.amin(self.fscan.omega))
ommax = np.deg2rad(np.amax(self.fscan.omega))
dc = self.ubcalc.detectorCal
mu = self.ubcalc.mu
chi = self.ubcalc.chi
phi = self.ubcalc.phi
ub = self.ubcalc.ubCal
xtal.setEnergy(ub.getEnergy()*1e3)
hk, xmirror = rn.thscanCTRs(xtal,ub,mu,dc,(ommin,ommax), chi=chi, phi=phi)
xmirror = np.array(xmirror).astype(np.float64)
#making the hk list of arrays into a reasonable string
hkm = np.concatenate((np.array(hk), xmirror.reshape((1,xmirror.size)).T), axis=1)
return hkm
def _onCalcAvailableCTR(self):
"""GUI-only: calculate CTRs and optionally save them via dialogs."""
try:
hkm = self.calculateAvailableCTR()
except Exception:
qutils.warning_detailed_message(self, "Cannot calculate CTR locatons", "Cannot calculate CTR locatons", traceback.format_exc())
#qt.QMessageBox.critical(self,"Cannot calculate CTR locatons", "Cannot calculate CTR locatons:\n%s" % traceback.format_exc())
return
sio = StringIO()
np.savetxt(sio,hkm,fmt="%.3f", delimiter='\t',header="H K detectorRight")
#Question dialog for saving the possible CTR locations
msgbox = qt.QMessageBox(qt.QMessageBox.Question,'Saving CTR locations...',
'Found CTRs. Do you want to save the following positions?',
qt.QMessageBox.Yes | qt.QMessageBox.No, self)
msgbox.setDetailedText(sio.getvalue())
clickedbutton = msgbox.exec()
#Question dialog for saving the possible CTR locations
#clickedbutton=qt.QMessageBox.question(self, 'Saving CTR locations...', 'Do you want to save the following positions: \n' + hkstring +"?");
if clickedbutton==qt.QMessageBox.Yes:
#File saving
fileTypeDict = {'dat Files (*.dat)': '.dat', 'txt Files (*.txt)': '.txt', 'All files (*)': '', }
fileTypeFilter = ""
for f in fileTypeDict:
fileTypeFilter += f + ";;"
filename, filetype = qt.QFileDialog.getSaveFileName(self,"Save reflections",
self.filedialogdir,
fileTypeFilter[:-2])
if filename == '':
return
self.filedialogdir = os.path.splitext(filename)[0]
filename += fileTypeDict[filetype]
np.savetxt(filename,hkm,fmt="%.3f",header="H K mirror")
[docs]
def getReflections(self):
"""Return selected reference reflections for UB calculation.
The reference reflections are taken from the reflection selector. For
each selected reflection, the detector pixel coordinate is converted to
surface detector angles ``delta`` and ``gamma`` in rad, and the image
number is converted to ``omega`` in rad. The returned angle rows use
the Vlieg six-circle order ``[alpha, delta, gamma, omega, chi, phi]``.
In this GUI workflow, ``alpha`` is the current fixed incidence angle
stored as ``self.ubcalc.mu``.
:returns:
Tuple ``(hkls, angles)``. ``hkls`` has columns ``[h, k, l]`` in
r.l.u. ``angles`` has columns
``[alpha, delta, gamma, omega, chi, phi]`` in rad.
:rtype: tuple[numpy.ndarray, numpy.ndarray]
.. note::
CLI-safe when reference-selection state is already populated.
"""
hkls = []
angles = []
for refl in self.reflectionSel.reflections:
#print(refl.xy)
gamma, delta = self.ubcalc.detectorCal.surfaceAnglesPoint(np.array([refl.xy[1]]),np.array([refl.xy[0]]),self.ubcalc.mu)
delta = float(delta[0]); gamma = float(gamma[0])
try:
pos = np.array([self.ubcalc.mu,delta,gamma,self.imageNoToOmega(refl.imageno),self.ubcalc.chi,self.ubcalc.phi])
except:
# from IPython import embed; embed()
raise
#print(pos)
hkls.append(refl.hkl)
angles.append(pos)
return np.array(hkls), np.array(angles)
def _onPlotMachineParams(self, enable=None):
"""GUI/CLI hint: toggle detector center and azimuth plot markers."""
#[cp,azimxy,polax] = paramslist
if enable is None:
enable = self.showMachineParamsAct.isChecked()
if enable:
fit2DCal = self.ubcalc.detectorCal.getFit2D()
cp = fit2DCal['centerX'], fit2DCal['centerY']
gam_p,_ = self.ubcalc.detectorCal.rangegamdel_p
azimy,azimx = self.ubcalc.detectorCal.pixelsPrimeBeam(gam_p[1]/5, 0 )[0]
self.centralPlot.addMarker(cp[0],cp[1],legend="CentralPixel",text="CP",color='yellow',symbol='+')
self.centralPlot.addMarker(azimx,azimy,legend="azimuth",text="Azim",color='yellow',symbol='+')
else:
self.centralPlot.removeMarker("CentralPixel")
self.centralPlot.removeMarker("azimuth")
[docs]
def searchPixelCoordHKL(self, hkl):
"""Find detector coordinates and image numbers for a reflection.
This calculates the two possible diffractometer solutions for ``hkl``
with :meth:`QUBCalculator.calcReflection`, which uses
:meth:`HKLVlieg.VliegAngles.anglesZmode`. The angle calculation follows
the six-circle geometry described by Lohmeier & Vlieg, J. Appl. Cryst.
26, 706-716 (1993), https://doi.org/10.1107/S0021889893004868.
Scan-axis mapping:
* ``mu`` scans: the image number is selected from the solution
``alpha`` angle.
* ``th`` scans: the image number is selected from ``-omega`` because
the scan axis stores theta with the opposite sign convention used
internally for omega.
:param numpy.ndarray hkl:
Reciprocal-space coordinate in r.l.u. Shape ``(3,)``.
:returns:
Reflection dictionary from ``calcReflection`` with added
``imageno_1``/``imageno_2`` entries when the candidate angle falls
within the active scan range, and ``selectable_1``/``selectable_2``
flags indicating whether ``xy_1``/``xy_2`` lie on the detector.
The ``angles_1`` and ``angles_2`` rows use
``[alpha, delta, gamma, omega, chi, phi]`` in rad; ``xy_1`` and
``xy_2`` are detector pixel coordinates as ``(x, y)``.
:rtype: dict
.. note::
CLI-capable for active ``mu`` and ``th`` scans. Unsupported scan
axes are reported through logging and return ``None``.
"""
refldict = self.ubcalc.calcReflection(hkl)
axisname = self.fscan.axisname
dc = self.ubcalc.detectorCal
if self.fscan.axisname == 'mu':
angle_idx = 0
sign = 1.
elif self.fscan.axisname == 'th':
angle_idx = 3
sign = -1.
else:
logger.error("Cannot calculate reflection. %s is no supported scan axis." % self.fscan.axisname,
extra={'title' : 'Cannot calculate reflection.',
'description' : "Cannot calculate reflection. %s is no supported scan axis." % self.fscan.axisname,
'show_dialog' : True,
"dialog_level" : logging.WARNING,
'parent' : self})
return
try:
imageno1 = self.axisToImageNo(np.rad2deg(refldict['angles_1'][angle_idx]) * sign)
refldict['imageno_1'] = imageno1
except:
imageno1 = None
xy = refldict['xy_1']
onDetector = (xy[0] >= 0 and xy[0] < dc.detector.shape[1]) and \
(xy[1] >= 0 and xy[1] < dc.detector.shape[0])
if onDetector:
refldict['selectable_1'] = True
else:
refldict['selectable_1'] = False
try:
imageno2 = self.axisToImageNo(np.rad2deg(refldict['angles_2'][angle_idx]) * sign)
refldict['imageno_2'] = imageno2
except:
imageno2 = None
xy = refldict['xy_2']
onDetector = (xy[0] >= 0 and xy[0] < dc.detector.shape[1]) and \
(xy[1] >= 0 and xy[1] < dc.detector.shape[0])
if onDetector:
refldict['selectable_2'] = True
else:
refldict['selectable_2'] = False
return refldict
[docs]
def onSearchHKLforStaticROI(self, hkl):
"""GUI/CLI hint: search an hkl and prompt for a static ROI location."""
try:
refldict = self.searchPixelCoordHKL(hkl)
except Exception as e:
qutils.warning_detailed_message(self, "Cannot calculate location of reflection", "Cannot calculate position of reflection:\n%s" % e, traceback.format_exc())
return
refl_dialog = QReflectionAnglesDialog(refldict,"Select reflection location", self)
if qt.QDialog.Accepted == refl_dialog.exec():
for i, cb in enumerate(refl_dialog.checkboxes,1):
if cb.isChecked():
xy = refldict['xy_%s' % i]
self.scanSelector.set_xy_static_loc(xy[0], xy[1])
return
def _onStaticROIedited(self):
"""GUI/CLI hint: synchronize ROI widget edits back to scan controls."""
xy = self.roiS1.getCenter()
hsize, vsize = np.round(self.roiS1.getSize())
self.scanSelector.hsize.blockSignals(True)
self.scanSelector.vsize.blockSignals(True)
self.scanSelector.hsize.setValue(hsize)
self.scanSelector.vsize.setValue(vsize)
self.scanSelector.hsize.blockSignals(False)
self.scanSelector.vsize.blockSignals(False)
self.scanSelector.set_xy_static_loc(xy[0], xy[1])
def _onNewReflection(self,refldict):
"""GUI-only: prompt the user to add calculated reflection candidates."""
axisname = self.fscan.axisname
dc = self.ubcalc.detectorCal
if self.fscan.axisname == 'mu':
angle_idx = 0
sign = 1.
elif self.fscan.axisname == 'th':
angle_idx = 3
sign = -1.
else:
qt.QMessageBox.warning(self,"Cannot calculate reflection","Cannot calculate reflection.\n%s is no supported scan axis." % self.fscan.axisname)
return
try:
imageno1 = self.axisToImageNo(np.rad2deg(refldict['angles_1'][angle_idx]) * sign)
refldict['imageno_1'] = imageno1
xy = refldict['xy_1']
onDetector = (xy[0] >= 0 and xy[0] < dc.detector.shape[1]) and \
(xy[1] >= 0 and xy[1] < dc.detector.shape[0])
if onDetector:
refldict['selectable_1'] = True
else:
refldict['selectable_1'] = False
except:
imageno1 = None
refldict['selectable_1'] = False
try:
imageno2 = self.axisToImageNo(np.rad2deg(refldict['angles_2'][angle_idx]) * sign)
refldict['imageno_2'] = imageno2
xy = refldict['xy_2']
onDetector = (xy[0] >= 0 and xy[0] < dc.detector.shape[1]) and \
(xy[1] >= 0 and xy[1] < dc.detector.shape[0])
if onDetector:
refldict['selectable_2'] = True
else:
refldict['selectable_2'] = False
except:
imageno2 = None
refldict['selectable_2'] = False
refl_dialog = QReflectionAnglesDialog(refldict,"Select reflections to add into list of reference reflections", self)
if qt.QDialog.Accepted == refl_dialog.exec():
for i, cb in enumerate(refl_dialog.checkboxes,1):
if cb.isChecked():
xy = refldict['xy_%s' % i]
eventdict = {'x' : xy[0], 'y': xy[1]}
self.reflectionSel.addReflection(eventdict,refldict['imageno_%s' % i],refldict['hkl'])
[docs]
def newXyHKLConverter(self):
"""Create a pixel-to-hkl converter bound to the active image state.
:returns:
Callable accepting ``x`` and ``y`` detector pixel coordinates.
:rtype: callable
.. note::
CLI-safe. The returned callable reads current scan and UB state.
"""
def xyToHKL(x,y):
"""CLI-safe: convert detector pixels to hkl and detector angles."""
#print("xytoHKL:")
#print("x,y = %s, %s" % (x,y))
if self.fscan is None:
return np.array([np.nan,np.nan,np.nan, np.nan, np.nan])
mu, om = self.getMuOm(self.imageno)
gamma, delta = self.ubcalc.detectorCal.surfaceAnglesPoint(np.array([y]),np.array([x]), mu)
#print(self.ubcalc.detectorCal)
#print(x,y)
#print(self.ubcalc.detectorCal.tth(np.array([y]),np.array([x])))
pos = [mu,delta[0],gamma[0],om,self.ubcalc.chi,self.ubcalc.phi]
pos = HKLVlieg.crystalAngles(pos,self.ubcalc.n)
hkl = np.concatenate((np.array(self.ubcalc.angles.anglesToHkl(*pos)),np.rad2deg([delta[0],gamma[0]])))
return hkl
return xyToHKL
[docs]
def getMuOm(self, imageno=None):
"""Return mu and omega for an image or the whole active scan.
:param imageno:
Optional image index. If omitted, arrays for the full scan are
returned where available.
:returns:
``mu`` and ``omega`` angles in rad.
:rtype: tuple
.. note::
CLI-safe when a scan is loaded.
"""
if imageno is not None:
if self.fscan.axisname == 'th':
mu = self.ubcalc.mu
axisValue = self.imageNoToAxis(imageno)
if axisValue is not None:
om = -1 * np.deg2rad(self.imageNoToAxis(imageno))
else:
om = 0
elif self.fscan.axisname == 'mu':
axisValue = self.imageNoToAxis(imageno)
if axisValue is not None:
mu = np.deg2rad(self.imageNoToAxis(imageno))
else:
mu = 0
om = -1 * np.deg2rad(self.fscan.th)
if len(np.asarray(om).shape) > 0:
om = om[0]
else:
mu = self.ubcalc.mu
om = -1 * np.deg2rad(self.fscan.th)
if len(np.asarray(om).shape) > 0:
om = om[0]
return mu, om
else:
if self.fscan.axisname == 'th':
mu = self.ubcalc.mu
om = -1 * np.deg2rad(self.fscan.axis)
elif self.fscan.axisname == 'mu':
mu = np.deg2rad(self.fscan.axis)
om = -1 * np.deg2rad(self.fscan.th)
else:
mu = self.ubcalc.mu
om = -1 * np.deg2rad(self.fscan.th)
return mu, om
[docs]
def omegaToImageNo(self,omega):
"""Map an omega angle to the nearest image index.
:param float omega:
Omega angle in rad.
:returns:
Nearest image index.
:rtype: int
.. note::
CLI-safe when a scan is loaded.
"""
if self.fscan is not None:
omrad = np.deg2rad(self.fscan.omega)
ommax = np.amax(omrad)
ommin = np.amin(omrad)
#print(ommin,omega,ommax)
if omega < ommin or omega > ommax:
omdeg = np.rad2deg([ommin,omega,ommax])
raise Exception("omega not in range: %s < %s < %s" % tuple(omdeg))
return np.argmin(np.abs(omrad -omega))
else:
raise Exception("No Scan selected")
[docs]
def imageNoToOmega(self,imageno):
"""Return omega for an image index.
:param int imageno:
Image index in the active scan.
:returns:
Omega angle in rad, ``0.0`` when no scan is loaded or None if the image number is higher than the length of the scan.
.. note::
CLI-safe.
"""
try:
if self.fscan is not None:
return np.deg2rad(self.fscan.omega[imageno])
else:
return 0.
except IndexError as ieE:
print(ieE)
return None
[docs]
def imageNoToAxis(self,imageno):
"""Return the scan-axis value for an image index.
:param int imageno:
Image index in the active scan.
:returns:
Scan-axis value in the scan's stored units, usually deg.
.. note::
CLI-safe.
"""
try:
if self.fscan is not None:
return self.fscan.axis[imageno]
else:
return 0.
except IndexError as ieE:
print(ieE)
return None
[docs]
def axisToImageNo(self,axisval):
"""Map a scan-axis value to the nearest image index.
:param float axisval:
Scan-axis value in the scan's stored units, usually deg.
:returns:
Nearest image index.
:rtype: int
.. note::
CLI-safe when a scan is loaded.
"""
if self.fscan is not None:
#axis = np.deg2rad(self.fscan.axis)
axismax = np.amax(self.fscan.axis)
axismin = np.amin(self.fscan.axis)
#print(ommin,omega,ommax)
if axisval < axismin or axisval > axismax:
axisrange = [axismin,axisval,axismax]
raise Exception("Value of scan axis \"%s\" not in range: %s < %s < %s" % tuple([self.fscan.axisname]+axisrange))
return np.argmin(np.abs(self.fscan.axis - axisval))
else:
raise Exception("No Scan loaded")
def _onCreateScan(self):
"""GUI-only: create a simulation scan from dialog-entered angles."""
try:
mu, om = self.getMuOm(self.imageno)
except:
mu = self.ubcalc.mu
om = 0.
th = om*-1.
muTh = np.rad2deg([mu,th]) #defaults if fixed
diag = QScanCreator(muTh)
if diag.exec() == qt.QDialog.Accepted:
shape = self.ubcalc.detectorCal.detector.shape
try:
axis = diag.scanaxis.currentText()
if axis == 'theta':
axis = 'th'
elif axis == 'mu':
pass
fscan = SimulationScan(shape, diag.omstart.value(),
diag.omend.value(),
diag.no.value(),
axis, diag.fixedAngle.value())
self._onScanChanged(fscan)
except MemoryError:
qutils.warning_detailed_message(self, "Can not create simulation scan","Can not create simualtion scan. Memory is insufficient for the scan size. See details for further information.", traceback.format_exc())
def _onLoadInterlacedScan(self):
"""GUI-only: build an interlaced scan from selected HDF5 scans."""
# GUI function to concatenate multiple scans into one
# a user can select which scans to combine in a GUI dialog
# uses new class interlacedScan
# grab the selected h5 file / node from the tree in the scan selector tab
model = self.scanSelector.hdfTreeView.model()
selection = self.scanSelector.hdfTreeView.selectionModel()
indexes = selection.selectedIndexes()
if indexes == []:
qutils.warning_detailed_message(self, "Can not create interlaced scan","Can not create interlaced scan. Select a node in the tree view first!", traceback.format_exc())
return
rootI = indexes.pop(0)
# address the root node to correctly get the scan names
if rootI.parent().isValid():
h5file = model.data(model.parent(rootI), role=silx.gui.hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE)
else:
h5file = model.data(rootI, role=silx.gui.hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE)
isID31 = self.scanSelector.btid.currentText() in ['ch5523','ch5700','ch5918','ch6392','ch7131','ch7149','ch7856','ch8153','id31_default']
kl_full = list(h5file.keys())
kl = np.empty(0,dtype=int)
for i in kl_full:
if isID31:
pattern = r'\.\d'
result = re.findall(pattern, i)[0][1:]
if result == '1':
# select only scan names which are ending on suffix '.1' (fast counters of id31 hdf5 format)
kl = np.append(kl,i)
else:
kl = np.append(kl,i)
# separate scan nr and delete duplicates suffixes
if isID31:
# try to get the scan nr and '/title' from the hdf5 file
nr = np.empty(0,dtype=int)
name = np.empty(0,dtype=str)
for i in kl:
pattern = r'\d+\.'
result = re.findall(pattern, i)
name = np.append(name,h5file[i +'/title'])
nr = np.append(nr,int(result[0][:-1]))
lsort = np.argsort(nr)[::1]
nr = nr[lsort]
name = name[lsort]
else:
nr = np.empty(0,dtype=int)
name = np.empty(0,dtype=str)
for nth,i in enumerate(kl):
name = np.append(name,i)
nr = np.append(nr,nth+1) # create scan nr list with ascending integers, starting with 1
# This will later be used to address the subscans, so check if your scans are handled like this!!!
# open GUI dialog to select which scans to combine
interlacedSelectDialog = qt.QDialog()
llayout = qt.QGridLayout()
llayout.addWidget(qt.QLabel("Available scans:"),0,0)
a = qt.QScrollArea()
b = qt.QFormLayout()
box = qt.QGroupBox()
scanBoxes = []
#labels = []
for i,item in enumerate(nr):
ithScanBox = qt.QCheckBox()
scanBoxes.append(ithScanBox)
#labels.append(qt.QLabel('Scan '+str(item)+':'+str(name[i][2:-1])))
if isID31:
b.addRow(qt.QLabel('Scan '+str(item)+': '+name[i].decode()),ithScanBox)
else:
b.addRow(qt.QLabel('Scan '+str(item)+': '+name[i]),ithScanBox)
box.setLayout(b)
a.setWidget(box)
a.setWidgetResizable(True)
llayout.addWidget(a,1,0,1,-1)
llayout.addWidget(qt.QLabel("sort the scans by axis values?"),2,0)
noScans = qt.QCheckBox()
llayout.addWidget(noScans,2,1)
llayout.addWidget(qt.QLabel("Backend:"),3,0)
IS_btid = qt.QComboBox()
[IS_btid.addItem(bt) for bt in backends.fscans]
IS_btid.setCurrentText(self.scanSelector.btid.currentText())
llayout.addWidget(IS_btid,3,1)
llayout.addWidget(qt.QLabel("scan axis:"),4,0)
axisbox = qt.QComboBox()
[axisbox.addItem(a) for a in ['th','mu']]
axisbox.setCurrentText('th')
llayout.addWidget(axisbox,4,1)
buttons = qt.QDialogButtonBox(qt.QDialogButtonBox.Ok | qt.QDialogButtonBox.Cancel)
buttons.button(qt.QDialogButtonBox.Ok).clicked.connect(interlacedSelectDialog.accept)
buttons.button(qt.QDialogButtonBox.Cancel).clicked.connect(interlacedSelectDialog.reject)
llayout.addWidget(buttons,5,0,-1,-1)
interlacedSelectDialog.setLayout(llayout)
interlacedSelectDialog.setWindowTitle("Segmented scan loader")
if not interlacedSelectDialog.exec() == qt.QDialog.Accepted:
return
# generate scan objects for selected scans
selectedScans = []
for i,j in enumerate(scanBoxes):
if j.isChecked():
selectedScans.append(nr[i])
#selectedScans.append(name[i])
nodes = list(self.scanSelector.hdfTreeView.selectedH5Nodes())
obj = nodes[0]
scansegments = []
for i in selectedScans:
ddict = dict()
ddict['scanno'] = int(i)
ddict['file'] = obj.local_filename
#ddict['node'] = kl[i]
ddict['beamtime'] = IS_btid.currentText()
try:
scansegments.append(backends.openScan(IS_btid.currentText(),ddict))
except Exception as e:
msg = qt.QMessageBox()
msg.setIcon(qt.QMessageBox.Warning)
msg.setWindowTitle("Cannot open scan")
msg.setText("Cannot open scan:\n%s\nDo you want to continue?" % e)
msg.setDetailedText(traceback.format_exc())
msg.setStandardButtons(qt.QMessageBox.Yes | qt.QMessageBox.No)
msg.setDefaultButton(qt.QMessageBox.Yes)
result = msg.exec()
if result != qt.QMessageBox.Yes:
return
if not scansegments: # no scans loaded - abort.
qt.QMessageBox.critical(self, 'No scans loaded', 'No scans were loaded.')
return
# create interlaced scan object
self.scanno = 1
self.fscan = interlacedScanLoader.InterlacedScan(scansegments,noScans.isChecked(),axisbox.currentText())
self.imageno = 0
self.plotImage()
self.scanSelector.setAxis(self.fscan.axis, self.fscan.axisname)
self.activescanname = "%s-segmentedScan %s %s-%s" % (self.fscan.axisname, ','.join(str(itemNr) for itemNr in selectedScans), np.amin(self.fscan.axis),np.amax(self.fscan.axis))
# generate sum and max image
self.images_loaded = False
if self.fscan is not None and self.autoLoadAct.isChecked():
self.loadAll()
self.scanSelector.showMaxAct.setChecked(False)
self.scanSelector.showMaxAct.setChecked(True)
def _onLoadScanFromImages(self):
"""GUI-only: import detector images through file and setup dialogs."""
# generates a scan from a selected folder containing raw detector images
# generate file source selection GUI
# create filter of scan image formats (following code is copied from silx view)
extensions = {}
for description, ext in silx.io.supported_extensions().items():
extensions[description] = " ".join(sorted(list(ext)))
extensions["NeXus layout from EDF files"] = "*.edf"
extensions["NeXus layout from TIFF image files"] = "*.tif *.tiff"
extensions["NeXus layout from CBF files"] = "*.cbf"
extensions["NeXus layout from MarCCD image files"] = "*.mccd"
all_supported_extensions = set()
for name, exts in extensions.items():
exts = exts.split(" ")
all_supported_extensions.update(exts)
all_supported_extensions = sorted(list(all_supported_extensions))
filters = []
filters.append("All supported files (%s)" % " ".join(all_supported_extensions))
for name, extension in extensions.items():
filters.append("%s (%s)" % (name, extension))
filters.append("All files (*)")
fileTypeFilter = ""
for f in filters:
fileTypeFilter += f + ";;"
# call dialog
filename,_ = qt.QFileDialog.getOpenFileName(self,"Open data source",'',fileTypeFilter[:-2])
# Qt dialog returns '' if cancelled
if filename == '':
qt.QMessageBox.warning(self,"Error - Open data source","No data source selected")
return
# search files using ImportImagesScan backend
importedscan = universalScanLoader.ImportImagesScan(filename)
if importedscan.inpath == None:
qt.QMessageBox.critical(self,
"Images could not be imported",
"The selected data source is not suitable\n"\
"It is necessary to select a file containing raw detector image(s)!")
return
if importedscan.shape != self.ubcalc.detectorCal.detector.shape:
qt.QMessageBox.critical(self,
"Detector data mismatch",
"The selected image data shape does not match to the detector data shape:\n"\
"Detector Size %sx%s\n"\
"Data size %sx%s\n"\
"Please first adjust the detector configuration to load this data" %
(*self.ubcalc.detectorCal.detector.shape, *importedscan.shape))
return
[imagePrefix, found_scanfiles] = importedscan.inpath
# generate dialog with list of files and frames
nrofFilesfound = len(found_scanfiles)
messageStr = 'Found ' + str(nrofFilesfound) + ' files in selected directory'
if importedscan.FramesPerFile > 1:
if nrofFilesfound == 0:
messageStr = 'No images found!!!'
fullStr = messageStr
elif 0 < nrofFilesfound < 4:
messageStr += ':\n'
for i in range(nrofFilesfound-1):
messageStr += imagePrefix + found_scanfiles[i] + ': ' + str(importedscan.FramesPerFile) + ' frames' ### mark expected nr when file not actually loaded
if i > 0:
messageStr += ' (expected)'
messageStr += '\n'
messageStr += imagePrefix + found_scanfiles[nrofFilesfound-1] + ': ' + str(importedscan.FramesLastFile) + ' frames\n' + str(importedscan.nopoints) + ' frames in total.'
fullStr = messageStr
else:
messageStr += ':\n'
for i in range(0,3):
messageStr += imagePrefix + found_scanfiles[i] + ': ' + str(importedscan.FramesPerFile) + ' frames'
if i > 0:
messageStr += ' (expected)'
messageStr += '\n'
fullStr = messageStr
for i in range(3,nrofFilesfound-1):
fullStr += imagePrefix + found_scanfiles[i] + ': ' + str(importedscan.FramesPerFile) + ' frames (expected) \n'
messageStr += '...' + '\n' + imagePrefix + found_scanfiles[nrofFilesfound-1] + ': ' + str(importedscan.FramesLastFile) + ' frames\n' + str(importedscan.nopoints) + ' frames in total.'
fullStr += imagePrefix + found_scanfiles[nrofFilesfound-1] + ': ' + str(importedscan.FramesLastFile) + ' frames\n' + str(importedscan.nopoints) + ' frames in total.'
else:
if nrofFilesfound == 0:
messageStr = 'No images found!!!'
fullStr = messageStr
elif 0 < nrofFilesfound < 4:
messageStr += ':\n'
for i in range(nrofFilesfound-1):
messageStr += imagePrefix + found_scanfiles[i] + '\n'
messageStr += imagePrefix + found_scanfiles[nrofFilesfound-1]
fullStr = messageStr
else:
messageStr += ':\n'
for i in range(0,3):
messageStr += imagePrefix + found_scanfiles[i] + '\n'
fullStr = messageStr
for i in range(3,nrofFilesfound):
fullStr += imagePrefix + found_scanfiles[i] + '\n'
messageStr += '...' + '\n' + imagePrefix + found_scanfiles[nrofFilesfound-1]
fullStr += '\n' + str(importedscan.nopoints) + ' frames in total.'
msg0 = qt.QMessageBox(self)
msg0.setWindowTitle("Manual scan import")
msg0.setText(messageStr)
msg0.setDetailedText(fullStr)
msg0.exec()
# angle conversions
try:
mu, om = self.getMuOm(self.imageno)
except:
mu = self.ubcalc.mu
om = 0.
th = om*-1.
muTh = np.rad2deg([mu,th]) #defaults if fixed
# open scan creator GUI to let the user insert missing scan angles
diag = QImportScanCreator(muTh)
# detector pixel nr and frame nr is adapted from opened image file
diag.no.setValue(importedscan.nopoints)
if diag.exec() == qt.QDialog.Accepted:
try:
axis = diag.scanaxis.currentText()
if axis == 'theta':
axis = 'th'
elif axis == 'mu':
pass
# pass inserted angles to scan object
importedscan.set_axis(diag.omstart.value(),diag.omend.value(),axis,diag.fixedAngle.value())
self._onScanChanged(importedscan)
except MemoryError:
qutils.warning_detailed_message(self, "Can not create scan","Can not create scan. Memory is insufficient for the scan size. See details for further information.", traceback.format_exc())
def _onScanChanged(self,sel_list):
"""CLI-capable: load or activate the selected scan object/list."""
self.resetZoom = True
#print(sel_list)
self.activescanname = "scan"
if isinstance(sel_list,list):
self.sel_list = sel_list
if len(sel_list):
self.specfile = sel_list[0]['SourceName']
try:
self.scanno = int(float(sel_list[0]['Key']))-1
self.fscan = Fastscan(self.specfile,self.scanno)
self.imageno = 0
except Exception:
self.scanno = 0
self.fscan = FioFastsweep(self.specfile)
self.imageno = 0
self.reflectionSel.setImage(self.imageno)
if self.imagepath != '':
self.fscan.set_image_folder(self.imagepath)
self.plotImage()
self.scanSelector.setAxis(self.fscan.axis, self.fscan.axisname)
else:
self.scanSelector.setRange(0,0)
self.imageno = 0
self.reflectionSel.setImage(self.imageno)
#print(self.centralPlot._callback)
elif isinstance(sel_list,universalScanLoader.ImportImagesScan):
self.scanno = 1
self.fscan = sel_list
self.imageno = 0
self.plotImage()
self.scanSelector.setAxis(self.fscan.axis, self.fscan.axisname)
self.activescanname = "%s-rawImport %s-%s" % (self.fscan.axisname, np.amin(self.fscan.axis),np.amax(self.fscan.axis))
self.images_loaded = False
if self.fscan is not None and self.autoLoadAct.isChecked():
self.loadAll()
self.scanSelector.showMaxAct.setChecked(False)
self.scanSelector.showMaxAct.setChecked(True)
elif isinstance(sel_list,SimulationScan):
self.scanno = 1
self.fscan = sel_list
self.imageno = 0
self.plotImage()
self.scanSelector.setAxis(self.fscan.axis, self.fscan.axisname)
self.activescanname = "%s-sim %s-%s" % (self.fscan.axisname, np.amin(self.fscan.axis),np.amax(self.fscan.axis))
else:
if 'name' in sel_list:
self.activescanname = sel_list['name']
else:
self.activescanname = "scan"
self.hdffile = sel_list['file']
#self.scanname = sel_list['name'].strip("/")
try:
logger.info('Loading scan...')
if logger_utils.get_logging_context() == 'gui':
msg = qt.QMessageBox(self)
msg.setWindowTitle("Loading Scan")
msg.setText("Loading Scan. This might take a while...")
msg.setStandardButtons(qt.QMessageBox.Cancel)
msg.setModal(True)
msg.show()
if 'beamtime' in sel_list:
self.fscan = backends.openScan(sel_list['beamtime'], sel_list)
else:
self.fscan = backends.openScan(self.scanSelector.btid.currentText(), sel_list)
self.plotImage()
self.scanSelector.setAxis(self.fscan.axis, self.fscan.axisname)
if logger_utils.get_logging_context() == 'gui':
msg.hide()
self.images_loaded = False
if self.fscan is not None and self.autoLoadAct.isChecked():
self.loadAll()
self.scanSelector.showMaxAct.setChecked(False)
self.scanSelector.showMaxAct.setChecked(True)
except Exception as exc:
if logger_utils.get_logging_context() == 'gui':
msg.hide()
exc_msg = str(exc) or exc.__class__.__name__
logger.exception("Cannot open scan",
extra={'title' : 'Cannot open scan',
'description' : 'Cannot open scan:\n%s' % exc_msg,
'show_dialog' : True,
'parent' : self})
# qutils.warning_detailed_message(self, "Cannot open scan", "Cannot open scan" , traceback.format_exc())
#qt.QMessageBox.critical(self,"Cannot open scan", "Cannot open scan:\n%s" % traceback.format_exc())
if hasattr(self.fscan, 'name'):
self.activescanname = self.fscan.name
def _onImagePathChanged(self,path):
"""CLI-capable: update the image folder for the active scan."""
#print("newpath %s" % path)
self.imagepath = path
if self.fscan is not None:
self.fscan.set_image_folder(self.imagepath)
self.plotImage()
self.scanSelector.setAxis(self.fscan.axis, self.fscan.axisname)
#self.scanSelector.slider.setMinimum(0)
#self.scanSelector.slider.setMaximum(self.fscan.nopoints-1)
else:
self.scanSelector.setRange(0,0)
self.imageno = 0
self.reflectionSel.setImage(self.imageno)
#print(self.centralPlot._callback)
def _onChangeImage(self,imageno):
"""GUI/CLI hint: switch the active image and replot it."""
if self.fscan is not None:
self.scanSelector.slider.setValue(imageno)
self.plotImage(self.scanSelector.slider.value())
def _onSliderValueChanged(self,value):
"""GUI/CLI hint: replot the image selected by the scan slider."""
if self.fscan is not None:
self.plotImage(value)
#print(self.centralPlot._callback)
def _onLoadAll(self):
"""GUI/CLI hint: reload all images and refresh max-image display."""
self.images_loaded = False
if self.fscan is not None:
self.loadAll()
self.scanSelector.showMaxAct.setChecked(False)
self.scanSelector.showMaxAct.setChecked(True)
[docs]
def loadAll(self):
"""Load all scan images and compute summed and maximum images.
:returns:
``None``. Results are stored on ``self.allimgsum`` and
``self.allimgmax``.
.. note::
CLI-capable. Progress reporting is routed through ``logger_utils``.
"""
try:
image = self.fscan.get_raw_img(0)
except Exception as e:
logger.exception('No image found in scan. (image 0 is missing)',
extra={'title' : 'No image found in scan.',
'description' : 'No image found in scan. (image 0 is missing)',
'show_dialog' : False,
"dialog_level" : logging.ERROR,
'parent' : self})
return
self.allimgsum = np.zeros_like(image.img,dtype=np.float64)
self.allimgmax = np.zeros_like(image.img,dtype=np.float64)
progress = logger_utils.create_progress_logger(self, len(self.fscan), "Reading images")
img_size = self.allimgsum.nbytes / (1024**2)
chunk_size = min(np.floor((self.maxMemory - 2000) / (img_size * self.numberthreads)), 5)
image_numbers = np.arange(len(self.fscan))
#self.excludedImagesDialog.getData()
lock = threading.Lock()
self.images_loaded = True
with concurrent.futures.ThreadPoolExecutor(max_workers=self.numberthreads) as executor: # speedup only for the file reads
futures = {}
excl = self.excludedImagesDialog.getData()
bg = self.background_image
def readfile_max(imgno):
"""CLI-safe worker: read one image into sum and max buffers."""
if imgno in excl: # skip if excluded
return imgno
image = self.fscan.get_raw_img(imgno).img.astype(np.float64, order='C', copy=True)
if bg is not None and bg.shape == image.shape:
if HAS_ACCEL:
_roi_sum_accel.calcBgSub(image, bg)
else:
np.subtract(image, bg, out=image)
with lock:
self.allimgsum += image
np.maximum(self.allimgmax,image, out=self.allimgmax)
return imgno
for i in range(len(self.fscan)):
futures[executor.submit(readfile_max, i)] = i
for f in concurrent.futures.as_completed(futures):
try:
imgno = f.result()
progress.update(imgno)
except concurrent.futures.CancelledError:
pass
except Exception as e:
logger.warn("Cannot read image:\n%s" % traceback.format_exc())
# print("Cannot read image:\n%s" % traceback.format_exc())
if progress.wasCanceled():
[f.cancel() for f in futures]
self.images_loaded = False
logger.warn("Loading of images cancelled. Max and Sum images are incomplete.")
break
progress.finish()
def _onMaxToggled(self,value):
"""GUI/CLI hint: toggle display of the precomputed maximum image."""
if self.scanSelector.showSumAct.isChecked():
self.scanSelector.showSumAct.setChecked(False)
if value:
if not self.images_loaded and self.fscan is not None:
if logger_utils.get_logging_context() == 'gui':
btn = qt.QMessageBox.question(self,"Incomplete sum / max image", "Sum/Max image was not loaded completely. Displayed maximum image will be incomplete! Do you want to load all images?",qt.QMessageBox.Yes | qt.QMessageBox.No | qt.QMessageBox.Cancel)
if btn == qt.QMessageBox.Yes:
self.loadAll()
elif btn == qt.QMessageBox.Cancel:
self.scanSelector.showMaxAct.setChecked(False)
return
else:
logger.warning("Maximum image requested before all images were loaded; continuing with incomplete data.")
if self.allimgmax is not None:
self.currentAddImageLabel = self.centralPlot.addImage(self.allimgmax,legend="special",
replace=False,resetzoom=False,copy=True,z=1)
self.centralPlot.setActiveImage(self.currentAddImageLabel)
self.scanSelector.alphaslider.setLegend(self.currentAddImageLabel)
else:
self.scanSelector.showMaxAct.setChecked(False)
else:
if self.currentAddImageLabel is not None:
self.centralPlot.setActiveImage(self.currentImageLabel)
self.centralPlot.removeImage(self.currentAddImageLabel)
self.currentAddImageLabel = None
def _onSumToggled(self,value):
"""GUI/CLI hint: toggle display of the precomputed summed image."""
if self.scanSelector.showMaxAct.isChecked():
self.scanSelector.showMaxAct.setChecked(False)
if value:
if not self.images_loaded and self.fscan is not None:
if logger_utils.get_logging_context() == 'gui':
btn = qt.QMessageBox.question(self,"Incomplete sum / max image", "Sum/Max image was not loaded completely. Displayed sum image will be incomplete! Do you want to load all images?",qt.QMessageBox.Yes | qt.QMessageBox.No | qt.QMessageBox.Cancel)
if btn == qt.QMessageBox.Yes:
self.loadAll()
elif btn == qt.QMessageBox.Cancel:
self.scanSelector.showSumAct.setChecked(False)
return
else:
logger.warning("Summed image requested before all images were loaded; continuing with incomplete data.")
if self.allimgsum is not None:
self.currentAddImageLabel = self.centralPlot.addImage(self.allimgsum,legend="special",
replace=False,resetzoom=False,copy=True,z=1)
self.centralPlot.setActiveImage(self.currentAddImageLabel)
self.scanSelector.alphaslider.setLegend(self.currentAddImageLabel)
else:
self.scanSelector.showSumAct.setChecked(False)
else:
if self.currentAddImageLabel is not None:
self.centralPlot.setActiveImage(self.currentImageLabel)
self.centralPlot.removeImage(self.currentAddImageLabel)
self.currentAddImageLabel = None
[docs]
def plotImage(self,key=0):
"""Plot one raw image from the active scan.
:param int key:
Image index in the active scan.
.. note::
CLI-capable, but it mutates Qt plot and reflection widgets.
"""
try:
image = self.fscan.get_raw_img(key).img.astype(np.float64, order='C', copy=True)
bg = self.background_image
if bg is not None and bg.shape == image.shape:
if HAS_ACCEL:
_roi_sum_accel.calcBgSub(image, bg)
else:
np.subtract(image, bg, out=image)
#if self.currentImageLabel is not None:
# self.centralPlot.removeImage(self.currentImageLabel)
self.currentImageLabel = self.centralPlot.addImage(image,legend="scan_image",
replace=False,resetzoom=self.resetZoom,copy=True)
if self.currentAddImageLabel is None:
self.centralPlot.setActiveImage(self.currentImageLabel)
self.resetZoom = False
self.imageno = key
self.reflectionSel.setImage(self.imageno)
self.updateROI(image_changed=True)
self.updateReflections()
mu, om = self.getMuOm(self.imageno)
self.ubcalc.uedit.setAngles(mu, self.ubcalc.chi, self.ubcalc.phi, om)
self.scanSelector.excludeImageAct.blockSignals(True)
self.scanSelector.excludeImageAct.setChecked(key in self.excludedImagesDialog.getData())
self.scanSelector.excludeImageAct.blockSignals(False)
except Exception as e:
logger.exception('Cannot plot image',
extra={'title' : 'Cannot plot image',
'description' : 'Cannot plot image',
'show_dialog' : False,
"dialog_level" : logging.ERROR,
'parent' : self})
[docs]
def updateReflections(self):
"""Update CTR reflection markers for the current image.
.. note::
CLI-capable, but it mutates Qt plot widgets.
"""
if not self.reflectionsVisible:
self.centralPlot.removeCurve('all_image_reflections')
return
H_0, H_1 = self.reflectionsToDisplay
#H_0 = np.array([[1,0,0], [1,1,0]])
#H_1 = np.array([[0,0,1], [0,0,1]])
hkl_del_gam_1, hkl_del_gam_2 = self.getROIloc(self.imageno, H_0, H_1, intersect=True)
mask1 = hkl_del_gam_1[:,-1].nonzero()
mask2 = hkl_del_gam_2[:,-1].nonzero()
masked_hkl_del_gam = np.vstack((hkl_del_gam_1[mask1],hkl_del_gam_2[mask2]))
self.centralPlot.addCurve(masked_hkl_del_gam[:,-3],masked_hkl_del_gam[:,-2],legend='all_image_reflections',
linestyle=' ', symbol='.', color='y',resetzoom=False)
[docs]
def updateROI(self, **kwargs):
"""Update visible ROI graphics for the current scan mode.
.. note::
CLI-capable, but it mutates Qt ROI widgets and reads GUI controls.
"""
if not self.roivisible:
#for roi in self.rois:
self.roiS1.setVisible(False)
self.roiS2.setVisible(False)
if self.rocking_rois:
for roi in self.rocking_rois:
roi.setVisible(False)
roi.setEditable(False)
self.roiManager._roisUpdated()
return
#self.centralPlot.removeMarker('main_croi_loc')
current_mode = self.scanSelector.scanstab.currentIndex()
if (current_mode == 0 or current_mode == 1):
if self.rocking_rois:
for roi in self.rocking_rois:
roi.setVisible(False)
roi.setEditable(False)
try:
hkl_del_gam_1, hkl_del_gam_2 = self.getROIloc(self.imageno)
except:
# print(traceback.format_exc())
#for roi in self.rois:
# roi.setVisible(False)
self.roiS1.setVisible(False)
self.roiS2.setVisible(False)
#self.centralPlot.removeMarker('main_croi_loc')
return
if hkl_del_gam_1[0,-1]:
if self.roivisible:
self.plotROI(hkl_del_gam_1[0,6:8], self.roiS1)
for i, spinbox in enumerate(self.scanSelector.hkl_static):
spinbox.setValue(hkl_del_gam_1[0,i])
else:
self.roiS1.setVisible(False)
if hkl_del_gam_2[0,-1]:
if self.roivisible:
self.plotROI(hkl_del_gam_2[0,6:8], self.roiS2)
for i, spinbox in enumerate(self.scanSelector.hkl_static):
spinbox.setValue(hkl_del_gam_1[0,i])
else:
self.roiS2.setVisible(False)
if current_mode == 1:
self.roiS1.setEditable(True)
else:
self.roiS1.setEditable(False)
elif (current_mode == 2 and not kwargs.get('image_changed', False)):
self.roiS1.setVisible(False)
self.roiS2.setVisible(False)
try:
refldict = self.get_rocking_coordinates()
except:
#print(traceback.format_exc())
if self.rocking_rois:
for roi in self.rocking_rois:
roi.setVisible(False)
roi.setEditable(False)
return
roi_keys = self.intbkgkeys_rocking(refldict)
self.scanSelector.autoSize_label.setText("%s x %s" % (roi_keys['hsize'], roi_keys['vsize']))
number_rois = len(roi_keys['center'])
divider = 1
if number_rois > self.maxROIs:
divider = np.ceil(number_rois / self.maxROIs)
no_rois_to_display = int(np.floor(number_rois / divider))
# lazy create ROIs
if len(self.rocking_rois) < no_rois_to_display:
for i in range(no_rois_to_display - len(self.rocking_rois)):
roi = RectangleBgROI()
roi.setLineWidth(1)
roi.setLineStyle('-')
roi.setColor('red')
roi.setBgStyle('pink', '-', 1.)
roi.setVisible(False)
roi.setGeometry(origin=(0, 0), size=(0, 0))
self.rocking_rois.append(roi)
self.roiManager.addRoi(roi,useManagerColor=False)
for roino, i in enumerate(np.arange(no_rois_to_display)*divider):
ckey = roi_keys['center'][int(i)]
leftkey = roi_keys['left'][int(i)]
rightkey = roi_keys['right'][int(i)]
topkey = roi_keys['top'][int(i)]
bottomkey = roi_keys['bottom'][int(i)]
origin =(ckey[0].start, ckey[1].start)
size = (ckey[0].stop - ckey[0].start, ckey[1].stop - ckey[1].start)
#leftkey, rightkey, topkey, bottomkey
left = leftkey[0].stop - leftkey[0].start
right = rightkey[0].stop - rightkey[0].start
top = topkey[1].stop - topkey[1].start
bottom = bottomkey[1].stop - bottomkey[1].start
self.rocking_rois[roino].setGeometry(origin=origin, size=size, left=left, right=right, top=top, bottom=bottom)
self.rocking_rois[roino].setVisible(True)
self.rocking_rois[roino].setEditable(False)
for roi in self.rocking_rois[no_rois_to_display:]:
roi.setVisible(False)
roi.setEditable(False)
elif (current_mode == 3 and not kwargs.get('image_changed', False)):
self.roiS1.setVisible(False)
self.roiS2.setVisible(False)
try:
refldict = self.get_Bragg_rocking_coordinates()
if len(refldict['xy_1']) <= 0:
raise Exception('No reflections found.')
except:
# print(traceback.format_exc())
if self.rocking_rois:
for roi in self.rocking_rois:
roi.setVisible(False)
roi.setEditable(False)
logger.exception('Cannot calculate Bragg rocking coordinates',
extra={'title' : 'Cannot calculate Bragg rocking coordinates',
'description' : 'Cannot calculate Bragg rocking coordinates',
'show_dialog' : False,
"dialog_level" : logging.ERROR,
'parent' : self})
return
roi_keys = self.intbkgkeys_rocking(refldict, autovsize=False, autohsize=False, intersect=1)
number_rois = len(roi_keys['center'])
divider = 1
if number_rois > self.maxROIs:
divider = np.ceil(number_rois / self.maxROIs)
no_rois_to_display = int(np.floor(number_rois / divider))
# lazy create ROIs
if len(self.rocking_rois) < no_rois_to_display:
for i in range(no_rois_to_display - len(self.rocking_rois)):
roi = RectangleBgROI()
roi.setLineWidth(1)
roi.setLineStyle('-')
roi.setColor('red')
roi.setBgStyle('pink', '-', 1.)
roi.setVisible(False)
roi.setGeometry(origin=(0, 0), size=(0, 0))
self.rocking_rois.append(roi)
self.roiManager.addRoi(roi,useManagerColor=False)
for roino, i in enumerate(np.arange(no_rois_to_display)*divider):
ckey = roi_keys['center'][int(i)]
leftkey = roi_keys['left'][int(i)]
rightkey = roi_keys['right'][int(i)]
topkey = roi_keys['top'][int(i)]
bottomkey = roi_keys['bottom'][int(i)]
origin =(ckey[0].start, ckey[1].start)
size = (ckey[0].stop - ckey[0].start, ckey[1].stop - ckey[1].start)
#leftkey, rightkey, topkey, bottomkey
left = leftkey[0].stop - leftkey[0].start
right = rightkey[0].stop - rightkey[0].start
top = topkey[1].stop - topkey[1].start
bottom = bottomkey[1].stop - bottomkey[1].start
self.rocking_rois[roino].setGeometry(origin=origin, size=size, left=left, right=right, top=top, bottom=bottom)
self.rocking_rois[roino].setVisible(True)
self.rocking_rois[roino].setEditable(True)
for roi in self.rocking_rois[no_rois_to_display:]:
roi.setVisible(False)
roi.setEditable(False)
self.roiManager._roisUpdated()
#self.centralPlot.removeMarker('main_croi_loc')
[docs]
def getStaticROIparams(self, xy, **kwargs):
"""Calculate hkl, detector angles, and pixel metadata for fixed ROIs.
:param xy:
ROI center coordinates in detector pixels.
:returns:
Array containing hkl in r.l.u., detector angles in rad, trajectory
coordinate, pixel coordinates, and detector mask flag.
:rtype: numpy.ndarray
.. note::
CLI-safe when scan and UB state are loaded.
"""
if self.fscan is None:
raise Exception("No scan loaded!")
mu, om = self.getMuOm()
#mu_cryst = HKLVlieg.crystalAngles_singleArray(mu, self.ubcalc.n)
dc = self.ubcalc.detectorCal
angles = self.ubcalc.angles
if 'mask' in kwargs:
mask = kwargs['mask']
xy = xy[mask]
if len(np.asarray(om).shape) == 0:
om = np.full(len(self.fscan),om)
hkl_del_gam = np.empty((xy.shape[0],len(self.fscan), 6), dtype=np.float64)
for i, xy_i in enumerate(xy):
x = np.full(len(self.fscan),xy_i[0])
y = np.full(len(self.fscan),xy_i[1])
gamma, delta, alpha = self.ubcalc.detectorCal.crystalAnglesPoint(y, x, mu, self.ubcalc.n)
if len(np.asarray(alpha).shape) == 0:
alpha = np.full(len(self.fscan),alpha)
hkl = self.ubcalc.angles.anglesToHkl(alpha, delta, gamma, om, self.ubcalc.chi, self.ubcalc.phi)
#for i in range(len(self.fscan)):
hkl_del_gam[i,:,:3] = np.array(hkl).T
hkl_del_gam[i,:, 3] = delta
hkl_del_gam[i,:, 4] = gamma
hkl_del_gam[i,:, 5] = self.fscan.axis
return hkl_del_gam
[docs]
def getROIloc(self, imageno=None, H_0=None, H_1=None, **kwargs):
"""Calculate detector ROI locations for a reciprocal-space line.
When :math:`\\vec{H}_0` and :math:`\\vec{H}_1` are provided, they
define the line :math:`\\vec{H}_0 + s\\vec{H}_1` in reciprocal space.
The returned arrays describe the two possible detector intersections
of that line with the Ewald sphere.
:param int imageno:
Optional image index. If omitted, locations are calculated over the
scan where applicable.
:param numpy.ndarray H_0:
Starting reciprocal-space vector in r.l.u. Shape ``(3,)`` or
``(N, 3)``.
:param numpy.ndarray H_1:
Reciprocal-space direction vector in r.l.u. Shape ``(3,)`` or
``(N, 3)``.
:returns:
Two arrays for the two possible detector intersections.
:rtype: tuple[numpy.ndarray, numpy.ndarray]
.. note::
CLI-capable. Missing coordinates are read from GUI controls.
"""
if self.fscan is None:
raise Exception("No scan loaded!")
mu, om = self.getMuOm(imageno)
mu_cryst = HKLVlieg.crystalAngles_singleArray(mu, self.ubcalc.n)
dc = self.ubcalc.detectorCal
#mu = self.ubcalc.mu
angles = self.ubcalc.angles
if (self.scanSelector.scanstab.currentIndex() == 1 and not kwargs.get('intersect', False)):
if imageno is None:
hkl_del_gam_1 = np.ones((len(self.fscan),6),dtype=np.float64)
x = np.full(len(self.fscan),self.scanSelector.xy_static[0].value())
y = np.full(len(self.fscan),self.scanSelector.xy_static[1].value())
gamma, delta, alpha = self.ubcalc.detectorCal.crystalAnglesPoint(y, x, mu, self.ubcalc.n)
s = np.arange(len(self.fscan))
if len(np.asarray(om).shape) == 0:
om = np.full(len(self.fscan),om)
if len(np.asarray(alpha).shape) == 0:
alpha = np.full(len(self.fscan),alpha)
yx1 = np.vstack((y,x)).T
yx2 = np.full_like(yx1, np.inf)
for i in range(len(self.fscan)):
pos = [alpha[i],delta[i],gamma[i],
om[i],
self.ubcalc.chi,
self.ubcalc.phi]
#pos = np.vstack(pos).T
hkl = np.array(self.ubcalc.angles.anglesToHkl(*pos))
hkl_del_gam_1[i, :3] = hkl
hkl_del_gam_1[:, 3] = delta
hkl_del_gam_1[:, 4] = gamma
hkl_del_gam_1[:, 5] = self.fscan.axis
hkl_del_gam_2 = np.full_like(hkl_del_gam_1, -1)
else:
hkl_del_gam_1 = np.ones(6,dtype=np.float64)
x = self.scanSelector.xy_static[0].value()
y = self.scanSelector.xy_static[1].value()
yx1 = np.zeros((1,2))
yx1[0][0] = y
yx1[0][1] = x
yx2 = np.full_like(yx1, np.inf)
if len(np.asarray(om).shape) > 0:
om = om[imageno]
if len(np.asarray(mu).shape) > 0:
mu = mu[imageno]
gamma, delta, alpha = self.ubcalc.detectorCal.crystalAnglesPoint(np.array([y]),np.array([x]), mu, self.ubcalc.n)
gamma, delta, alpha = gamma[0], delta[0], alpha # crystalAnglesPoint retains shape, even for 0d array
pos = [alpha,delta,gamma,om,self.ubcalc.chi,self.ubcalc.phi]
hkl_del_gam_1[:3] = np.array(self.ubcalc.angles.anglesToHkl(*pos))
hkl_del_gam_1[3] = delta
hkl_del_gam_1[4] = gamma
hkl_del_gam_1[5] = self.fscan.axis[imageno]
hkl_del_gam_2 = np.full_like(hkl_del_gam_1, -1)
else:
if H_0 is None or H_1 is None:
H_1 = np.array([h.value() for h in self.scanSelector.H_1])
H_0 = np.array([h.value() for h in self.scanSelector.H_0])
hkl_del_gam_1, hkl_del_gam_2, Qa_1, Qa_2 = angles.anglesIntersectLineEwald(H_0, H_1, mu_cryst, om, self.ubcalc.phi,self.ubcalc.chi, Qalpha=True)
# H, K, L ,delta_1, gamma_1, HKL_Q1[-1]=s
delta1 = hkl_del_gam_1[...,3]
delta2 = hkl_del_gam_2[...,3]
gam1 = hkl_del_gam_1[...,4]
gam2 = hkl_del_gam_2[...,4]
Qmin, Qmax = dc.Qrange
Qa_1_n = np.linalg.norm(Qa_1, axis=-1)
Qa_2_n = np.linalg.norm(Qa_2, axis=-1)
mask1 = np.logical_and(Qmin <= Qa_1_n , Qmax >= Qa_1_n)
mask2 = np.logical_and(Qmin <= Qa_2_n , Qmax >= Qa_2_n)
yx1 = dc.pixelsCrystalAngles(gam1, delta1, mu, self.ubcalc.n)
yx2 = dc.pixelsCrystalAngles(gam2, delta2, mu, self.ubcalc.n)
yx1[~mask1] = np.inf
yx2[~mask2] = np.inf
ymask1 = np.logical_and(yx1[...,0] >= 0, yx1[...,0] < dc.detector.shape[0])
xmask1 = np.logical_and(yx1[...,1] >= 0, yx1[...,1] < dc.detector.shape[1])
yxmask1 = np.logical_and(xmask1,ymask1)
ymask2 = np.logical_and(yx2[...,0] >= 0, yx2[...,0] < dc.detector.shape[0])
xmask2 = np.logical_and(yx2[...,1] >= 0, yx2[...,1] < dc.detector.shape[1])
yxmask2 = np.logical_and(xmask2,ymask2)
xy1 = yx1[...,::-1]
xy2 = yx2[...,::-1]
if not kwargs.get('intersect', False) and self.scanSelector.scanstab.currentIndex() != 1:
xoffset, yoffset = self.scanSelector.roioptions.get_offsets()
if xoffset != 0. or yoffset != 0.:
logger.warn("Nonzero pixel offset selected. Experimental feature! Angles and hkl are incorrect!!!")
xy1[..., 0] += xoffset
xy2[..., 0] += xoffset
xy1[..., 1] += yoffset
xy2[..., 1] += yoffset
return np.concatenate((np.atleast_2d(hkl_del_gam_1), xy1, yxmask1[...,np.newaxis]),axis=-1),\
np.concatenate((np.atleast_2d(hkl_del_gam_2), xy2, yxmask2[...,np.newaxis]),axis=-1)
[docs]
def plotROI(self, loc, roi):
"""Set a visible ROI object around a detector-pixel location.
:param loc:
ROI center in detector pixels.
:param roi:
ROI widget to update.
.. note::
CLI-capable, but it mutates Qt ROI widgets.
"""
key = self.intkey(loc)
leftkey, rightkey, topkey, bottomkey = self.bkgkeys(loc)
#print([(roi, roi.isEditable()) for roi in self.rois])
#croi:
origin =(key[0].start, key[1].start)
size = (key[0].stop - key[0].start, key[1].stop - key[1].start)
#leftkey, rightkey, topkey, bottomkey
left = leftkey[0].stop - leftkey[0].start
right = rightkey[0].stop - rightkey[0].start
top = topkey[1].stop - topkey[1].start
bottom = bottomkey[1].stop - bottomkey[1].start
roi.setGeometry(origin=origin, size=size, left=left, right=right, top=top, bottom=bottom)
roi.setVisible(True)
#self.roiManager._roisUpdated()
[docs]
def integrateROI(self):
"""Integrate the active ROI workflow and save data to the database.
The active tab in ``self.scanSelector.scanstab`` selects the
integration workflow:
* ``hklscan`` (tab id ``0``): integrate stationary-scan ROIs whose
detector
coordinates are calculated from the reciprocal-space line
:math:`\\vec{H}_0 + s\\vec{H}_1`. :math:`\\vec{H}_0` and
:math:`\\vec{H}_1` are numpy vector values in r.l.u. read from the
ROI controls, and the two Ewald-sphere intersections are integrated
as separate S1/S2 trajectories.
* ``fixed`` (tab id ``1``): integrate a stationary detector-pixel ROI
from the ``xy_static`` controls. The same pixel coordinates are used
through the scan, while the corresponding reciprocal-space
coordinates and diffractometer angles are recorded for each image.
* ``rocking hklscan`` (tab id ``2``): delegate to
:meth:`rocking_extraction`, which integrates multiple rocking-scan
ROIs whose coordinates are sampled along
:math:`\\vec{H}_0 + s\\vec{H}_1`.
* ``rocking Bragg`` (tab id ``3``): delegate to
:meth:`rocking_Bragg_extraction`, which calculates Bragg peak
coordinates from the current crystal, detector, UB, strain, and scan
state before integrating valid rocking-scan ROIs.
All modes use the current UI/database state for scan selection, ROI
sizes, masks, background settings, and correction factors. The
resulting intensities and metadata are written to the active Nexus
database file.
:returns:
Status dictionary describing success, cancellation, or error.
:rtype: dict
.. note::
CLI-capable when scan, database, and ROI state are preconfigured.
"""
if self.scanSelector.scanstab.currentIndex() == 2:
return self.rocking_extraction()
elif self.scanSelector.scanstab.currentIndex() == 3:
return self.rocking_Bragg_extraction()
try:
image = self.fscan.get_raw_img(0)
except Exception as e:
logger.exception('Cannot perform stationary scan integration: no images found.',
extra={'title' : 'Cannot integrate scan',
'description' : 'Cannot perform stationary scan integration: no images found.',
'show_dialog' : False,
"dialog_level" : logging.WARNING,
'parent' : self})
# print("no images found! %s" % e)
return {'status': 'error', 'message' : 'No image found in current scan', 'traceback' : traceback.format_exc()}
if self.database.nxfile is None:
logger.exception('Cannot perform stationary scan integration: no database available.',
extra={'title' : 'Cannot integrate scan',
'description' : 'Cannot perform stationary scan integration: no database available.',
'show_dialog' : False,
"dialog_level" : logging.WARNING,
'parent' : self})
# print("No database available")
return {'status': 'error', 'message' : 'No database available'}
logger.info("Start integration of stationary scan")
dc = self.ubcalc.detectorCal
#mu = self.ubcalc.mu
angles = self.ubcalc.angles
H_1 = np.array([h.value() for h in self.scanSelector.H_1])
H_0 = np.array([h.value() for h in self.scanSelector.H_0])
vsize = int(self.scanSelector.vsize.value())
hsize = int(self.scanSelector.hsize.value())
roi_size = vsize*hsize # as set in GUI, no corrections
imgmask = None
if self.scanSelector.useMaskBox.isChecked():
if self.centralPlot.getMaskToolsDockWidget().getSelectionMask() is None:
if logger_utils.get_logging_context() == 'gui':
btn = qt.QMessageBox.question(self,"No mask available","""No mask was selected with the masking tool.
Do you want to continue without mask?""")
if btn != qt.QMessageBox.Yes:
return {'status': 'cancelled', 'message' : 'Reason: no mask selected'}
logger.warn("No mask was selected with the masking tool. Continue without mask.")
else:
imgmask = self.centralPlot.getMaskToolsDockWidget().getSelectionMask() > 0.
corr = self.scanSelector.useSolidAngleBox.isChecked() or\
self.scanSelector.usePolarizationBox.isChecked()
C_arr = np.ones(dc.detector.shape,dtype=np.float64)
if self.scanSelector.useSolidAngleBox.isChecked():
C_arr /= dc.solidAngleArray()
if self.scanSelector.usePolarizationBox.isChecked():
C_arr /= dc.polarization(factor=dc._polFactor,axis_offset=dc._polAxis)
hkl_del_gam_s1, hkl_del_gam_s2 = self.getROIloc()
nodatapoints = len(self.fscan)
#print(hkl_del_gam_1s.shape)
if hkl_del_gam_s1.shape[0] == 1:
hkl_del_gam_1 = np.zeros((nodatapoints,hkl_del_gam_s1.shape[1]), dtype=np.float64)
hkl_del_gam_2 = np.zeros((nodatapoints,hkl_del_gam_s1.shape[1]), dtype=np.float64)
hkl_del_gam_1[:] = hkl_del_gam_s1[0]
hkl_del_gam_2[:] = hkl_del_gam_s2[0]
else:
hkl_del_gam_1, hkl_del_gam_2 = hkl_del_gam_s1, hkl_del_gam_s2
dataavail = np.logical_or(hkl_del_gam_1[:,-1],hkl_del_gam_2[:,-1])
croi1_a = np.zeros_like(dataavail,dtype=np.float64)
cpixel1_a = np.zeros_like(dataavail,dtype=np.float64)
bgroi1_a = np.zeros_like(dataavail,dtype=np.float64)
bgpixel1_a = np.zeros_like(dataavail,dtype=np.float64)
x_coord1_a = hkl_del_gam_1[:,6]
y_coord1_a = hkl_del_gam_1[:,7]
roi_hsize1_a = np.full_like(dataavail, hsize, dtype=int)
roi_vsize1_a = np.full_like(dataavail, vsize, dtype=int)
croi2_a = np.zeros_like(dataavail,dtype=np.float64)
cpixel2_a = np.zeros_like(dataavail,dtype=np.float64)
bgroi2_a = np.zeros_like(dataavail,dtype=np.float64)
bgpixel2_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_croi1_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_cpixel1_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_bgroi1_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_bgpixel1_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_croi1_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_cpixel1_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_bgroi1_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_bgpixel1_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_croi2_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_cpixel2_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_bgroi2_a = np.zeros_like(dataavail,dtype=np.float64)
bgimg_bgpixel2_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_croi2_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_cpixel2_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_bgroi2_a = np.zeros_like(dataavail,dtype=np.float64)
Corr_bgpixel2_a = np.zeros_like(dataavail,dtype=np.float64)
x_coord2_a = hkl_del_gam_2[:,6]
y_coord2_a = hkl_del_gam_2[:,7]
roi_hsize2_a = np.full_like(dataavail, hsize, dtype=int)
roi_vsize2_a = np.full_like(dataavail, vsize, dtype=int)
progress = logger_utils.create_progress_logger(self, len(self.fscan), "Integrating stationary scan")
has_bg_img = False
if imgmask is not None:
mask = np.ascontiguousarray(imgmask, dtype=bool)
else:
mask = np.zeros(image.img.shape, dtype=bool)
if corr:
C_arr = np.ascontiguousarray(C_arr, dtype=np.float64)
C_arr[mask] = 0.0
else:
C_arr = np.ones(image.img.shape, dtype=np.float64)
C_arr[mask] = 0.0
j = 0
for i in range(len(self.fscan)):
key = self.intkey(hkl_del_gam_1[i,6:8])
croi_key = np.array([[key[0].start , key[0].stop], [key[1].start , key[1].stop]])
roi_hsize1_a[i] = int(np.abs(np.diff(croi_key[0])[0]))
roi_vsize1_a[i] = int(np.abs(np.diff(croi_key[1])[0]))
key = self.intkey(hkl_del_gam_2[i,6:8])
croi_key = np.array([[key[0].start , key[0].stop], [key[1].start , key[1].stop]])
roi_hsize2_a[i] = int(np.abs(np.diff(croi_key[0])[0]))
roi_vsize2_a[i] = int(np.abs(np.diff(croi_key[1])[0]))
if HAS_ACCEL:
roi_lists_numba = []
for i in range(len(self.fscan)):
roi_lists = [[], [], [], [], []]
if hkl_del_gam_1[i,-1]:
key = self.intkey(hkl_del_gam_1[i,6:8])
croi_key = np.array([[key[0].start , key[0].stop], [key[1].start , key[1].stop]])
roi_lists[0].append(croi_key) # center
bkgkey = self.bkgkeys(hkl_del_gam_1[i,6:8])
for r, l in zip(bkgkey, roi_lists[1:]):
l.append(np.array([[r[0].start , r[0].stop], [r[1].start , r[1].stop]]))
else:
[l.append(np.array([[0 , 0], [0 , 0]])) for l in roi_lists[1:]] # will result in zeros, convert to np.nan later
roi_lists[0].append(np.array([[0 , 0], [0 , 0]]))
if hkl_del_gam_2[i,-1]:
key = self.intkey(hkl_del_gam_2[i,6:8])
croi_key = np.array([[key[0].start , key[0].stop], [key[1].start , key[1].stop]])
roi_lists[0].append(croi_key) # center
bkgkey = self.bkgkeys(hkl_del_gam_2[i,6:8])
for r, l in zip(bkgkey, roi_lists[1:]):
l.append(np.array([[r[0].start , r[0].stop], [r[1].start , r[1].stop]]))
else:
[l.append(np.array([[0 , 0], [0 , 0]])) for l in roi_lists[1:]] # will result in zeros, convert to np.nan later
roi_lists[0].append(np.array([[0 , 0], [0 , 0]]))
roi_lists = [np.ascontiguousarray(np.stack(l), dtype=np.int64) for l in roi_lists]
roi_lists_numba.append(roi_lists)
if self.background_image is not None and self.background_image.shape == image.img.shape:
has_bg_img = True
background_image = self.background_image.astype(np.float64, order='C', copy=True)
background_image[mask] = 0.
def sumImage(i):
"""CLI-safe worker: integrate one stationary image with background."""
all_counters = np.zeros((roi_lists_numba[i][0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
Carr_counters = np.zeros((roi_lists_numba[i][0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
BgImg_counters = np.zeros((roi_lists_numba[i][0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
if not dataavail[i]:
return all_counters, Carr_counters, BgImg_counters
image = self.fscan.get_raw_img(i).img.astype(np.float64, order='C', copy=True) # unlocks gil during file read
_roi_sum_accel.processImage_bg_Carr(image, background_image, mask, C_arr, *roi_lists_numba[i], all_counters, Carr_counters, BgImg_counters) # numba nopython and nogil mode
return all_counters, Carr_counters, BgImg_counters
else:
def sumImage(i):
"""CLI-safe worker: integrate one stationary image."""
all_counters = np.zeros((roi_lists_numba[i][0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
Carr_counters = np.zeros((roi_lists_numba[i][0].shape[0],) + (4,), dtype=np.float64) # need gil for python object creation
if not dataavail[i]:
return all_counters, Carr_counters
image = self.fscan.get_raw_img(i).img.astype(np.float64, order='C', copy=True) # unlocks gil during file read
_roi_sum_accel.processImage_Carr(image, mask, C_arr, *roi_lists_numba[i], all_counters, Carr_counters) # numba nopython and nogil mode
return all_counters, Carr_counters
else: # not HAS_ACCEL
if self.background_image is not None and self.background_image.shape == image.img.shape:
has_bg_img = True
background_image = self.background_image.astype(np.float64, order='C', copy=True)
background_image[mask] = 0.
def sumImage(i):
"""CLI-safe worker: integrate one image with background."""
all_counters = np.zeros((2,) + (4,), dtype=np.float64) # need gil for python object creation
Carr_counters = np.zeros((2,) + (4,), dtype=np.float64) # need gil for python object creation
BgImg_counters = np.zeros((2,) + (4,), dtype=np.float64) # need gil for python object creation
if not dataavail[i]:
return all_counters, Carr_counters, BgImg_counters
else:
image = self.fscan.get_raw_img(i).img.astype(np.float64, order='C', copy=True)
if imgmask is not None:
image[imgmask] = np.nan
pixelavail = (~imgmask).astype(np.float64)
else:
pixelavail = np.ones_like(image)
for intersect, hkl_del_gam_current in zip(range(2), [hkl_del_gam_1, hkl_del_gam_2]):
if hkl_del_gam_current[i,-1]:
key = self.intkey(hkl_del_gam_current[i,6:8])
bkgkey = self.bkgkeys(hkl_del_gam_current[i,6:8])
all_counters[intersect,0] = np.nansum(image[key[::-1]])
Carr_counters[intersect,0] = np.nansum(C_arr[key[::-1]])
BgImg_counters[intersect,0] = np.nansum(background_image[key[::-1]])
cpixel1 = np.nansum(pixelavail[key[::-1]])
all_counters[intersect,1] = cpixel1
Carr_counters[intersect,1] = cpixel1
BgImg_counters[intersect,1] = cpixel1
bgpixel1 = 0.0
for bg in bkgkey:
bgimg = image[bg[::-1]]
all_counters[intersect,2] += np.nansum(image[bg[::-1]])
Carr_counters[intersect,2] += np.nansum(C_arr[bg[::-1]])
BgImg_counters[intersect,2] += np.nansum(background_image[bg[::-1]])
bgpixel1 += np.nansum(pixelavail[bg[::-1]])
all_counters[intersect,3] = bgpixel1
Carr_counters[intersect,3] = bgpixel1
BgImg_counters[intersect,3] = bgpixel1
return all_counters, Carr_counters, BgImg_counters
else:
def sumImage(i):
"""CLI-safe worker: integrate one image without acceleration."""
all_counters = np.zeros((2,) + (4,), dtype=np.float64) # need gil for python object creation
Carr_counters = np.zeros((2,) + (4,), dtype=np.float64) # need gil for python object creation
if not dataavail[i]:
return all_counters, Carr_counters
else:
image = self.fscan.get_raw_img(i).img.astype(np.float64, order='C', copy=True)
if imgmask is not None:
image[imgmask] = np.nan
pixelavail = (~imgmask).astype(np.float64)
else:
pixelavail = np.ones_like(image)
for intersect, hkl_del_gam_current in zip(range(2), [hkl_del_gam_1, hkl_del_gam_2]):
if hkl_del_gam_current[i,-1]:
key = self.intkey(hkl_del_gam_current[i,6:8])
bkgkey = self.bkgkeys(hkl_del_gam_current[i,6:8])
all_counters[intersect,0] = np.nansum(image[key[::-1]])
Carr_counters[intersect,0] = np.nansum(C_arr[key[::-1]])
cpixel1 = np.nansum(pixelavail[key[::-1]])
all_counters[intersect,1] = cpixel1
Carr_counters[intersect,1] = cpixel1
bgpixel1 = 0.0
for bg in bkgkey:
bgimg = image[bg[::-1]]
all_counters[intersect,2] += np.nansum(image[bg[::-1]])
Carr_counters[intersect,2] += np.nansum(C_arr[bg[::-1]])
bgpixel1 += np.nansum(pixelavail[bg[::-1]])
all_counters[intersect,3] = bgpixel1
Carr_counters[intersect,3] = bgpixel1
return all_counters, Carr_counters
cancelled = False
with concurrent.futures.ThreadPoolExecutor(max_workers=self.numberthreads) as executor: # speedup only for the file reads
futures = {}
for i in range(len(self.fscan)):
futures[executor.submit(sumImage, i)] = i
for f in concurrent.futures.as_completed(futures):
try:
i = futures[f]
#(croi1, cpixel1, bgroi1, bgpixel1), (croi2, cpixel2, bgroi2, bgpixel2) = f.result()
if has_bg_img:
all_counters, Carr_counters, BgImg_counters = f.result()
bgimg_croi1_a[i] = BgImg_counters[0, 0]
bgimg_cpixel1_a[i] = BgImg_counters[0, 1]
bgimg_bgroi1_a[i] = BgImg_counters[0, 2]
bgimg_bgpixel1_a[i] = BgImg_counters[0, 3]
bgimg_croi2_a[i] = BgImg_counters[1, 0]
bgimg_cpixel2_a[i] = BgImg_counters[1, 1]
bgimg_bgroi2_a[i] = BgImg_counters[1, 2]
bgimg_bgpixel2_a[i] = BgImg_counters[1, 3]
else:
all_counters, Carr_counters = f.result()
bgimg_croi1_a[i] = 0.0
bgimg_cpixel1_a[i] = 0.0
bgimg_bgroi1_a[i] = 0.0
bgimg_bgpixel1_a[i] = 0.0
bgimg_croi2_a[i] = 0.0
bgimg_cpixel2_a[i] = 0.0
bgimg_bgroi2_a[i] = 0.0
bgimg_bgpixel2_a[i] = 0.0
croi1_a[i] = all_counters[0, 0]
cpixel1_a[i] = all_counters[0, 1]
bgroi1_a[i] = all_counters[0, 2]
bgpixel1_a[i] = all_counters[0, 3]
croi2_a[i] = all_counters[1, 0]
cpixel2_a[i] = all_counters[1, 1]
bgroi2_a[i] = all_counters[1, 2]
bgpixel2_a[i] = all_counters[1, 3]
Corr_croi1_a[i] = Carr_counters[0, 0]
Corr_cpixel1_a[i] = Carr_counters[0, 1]
Corr_bgroi1_a[i] = Carr_counters[0, 2]
Corr_bgpixel1_a[i] = Carr_counters[0, 3]
Corr_croi2_a[i] = Carr_counters[1, 0]
Corr_cpixel2_a[i] = Carr_counters[1, 1]
Corr_bgroi2_a[i] = Carr_counters[1, 2]
Corr_bgpixel2_a[i] = Carr_counters[1, 3]
progress.update(futures[f])
except concurrent.futures.CancelledError:
pass
except Exception as e:
logger.warn("Cannot read image:\n%s" % traceback.format_exc())
if progress.wasCanceled():
[f.cancel() for f in futures]
cancelled = True
break
progress.finish()
if cancelled:
return {'status': 'cancelled', 'message' : 'Reason: Cancelled during integration'}
roi_size1 = roi_hsize1_a * roi_vsize1_a
roi_size2 = roi_hsize2_a * roi_vsize2_a
Corr1 = Corr_croi1_a * ( roi_size1 / Corr_cpixel1_a) # normalize to number of pixels of center roi (croi)
Corr2 = Corr_croi2_a * ( roi_size2 / Corr_cpixel2_a)
croibg1_bgimg_a = None
croibg1_bgimg_err_a = None
if np.any(bgimg_cpixel1_a): # assume the background image has no errors (would need a separate error image for that)
bgimg_croi1_norm = bgimg_croi1_a * ( cpixel1_a / bgimg_cpixel1_a)
if np.any(bgpixel1_a):
bgimg_bgroi1_norm = bgimg_bgroi1_a * ( bgpixel1_a / bgimg_bgpixel1_a)
# method 1: simply subtract bg image from data and then subtract the remaining background
croibg1_a = ( (croi1_a - bgimg_croi1_norm) - (cpixel1_a/bgpixel1_a) * (bgroi1_a - bgimg_bgroi1_norm) ) * ( roi_size1 / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a + ((cpixel1_a/bgpixel1_a)**2) * bgroi1_a) * ( roi_size1 / cpixel1_a)
# method 2: scale bg image croi and subtract scaled bg image croi. Use ratio of bgroi of image and bg image as scale factor.
factor = bgroi1_a / bgimg_bgroi1_norm
croibg1_bgimg_a = ( croi1_a - factor * bgimg_croi1_norm ) * ( roi_size1 / cpixel1_a)
croibg1_bgimg_err_a = np.sqrt(croi1_a + ((cpixel1_a/bgpixel1_a)**2) * bgroi1_a) * ( roi_size1 / cpixel1_a)
else: # not possible if no bgroi is set.
croibg1_a = (croi1_a - bgimg_croi1_norm) * ( roi_size1 / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a) * ( roi_size1 / cpixel1_a)
else: # no background image
if np.any(bgpixel1_a):
croibg1_a = ( croi1_a - (cpixel1_a/bgpixel1_a) * bgroi1_a ) * ( roi_size1 / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a + ((cpixel1_a/bgpixel1_a)**2) * bgroi1_a) * ( roi_size1 / cpixel1_a)
else:
croibg1_a = croi1_a * ( roi_size1 / cpixel1_a)
croibg1_err_a = np.sqrt(croi1_a) * ( roi_size1 / cpixel1_a)
croibg2_bgimg_a = None
croibg2_bgimg_err_a = None
if np.any(bgimg_cpixel2_a): # assume the background image has no errors (would need a separate error image for that)
bgimg_croi2_norm = bgimg_croi2_a * ( cpixel2_a / bgimg_cpixel2_a)
if np.any(bgpixel2_a):
bgimg_bgroi2_norm = bgimg_bgroi2_a * ( bgpixel2_a / bgimg_bgpixel2_a)
# method 1: simply subtract bg image from data and then subtract the remaining background
croibg2_a = ( (croi2_a - bgimg_croi2_norm) - (cpixel2_a/bgpixel2_a) * (bgroi2_a - bgimg_bgroi2_norm) ) * ( roi_size2 / cpixel2_a)
croibg2_err_a = np.sqrt(croi2_a + ((cpixel2_a/bgpixel2_a)**2) * bgroi2_a) * ( roi_size2 / cpixel2_a)
# method 2: scale bg image croi and subtract scaled bg image croi. Use ratio of bgroi of image and bg image as scale factor.
factor = bgroi2_a / bgimg_bgroi2_norm
croibg2_bgimg_a = ( croi2_a - factor * bgimg_croi2_norm ) * ( roi_size2 / cpixel2_a)
croibg2_bgimg_err_a = np.sqrt(croi2_a + ((cpixel2_a/bgpixel2_a)**2) * bgroi2_a) * ( roi_size2 / cpixel2_a)
else: # not possible if no bgroi is set.
croibg2_a = (croi2_a - bgimg_croi2_norm) * ( roi_size2 / cpixel2_a)
croibg2_err_a = np.sqrt(croi2_a) * ( roi_size2 / cpixel2_a)
else: # no background image
if np.any(bgpixel2_a):
croibg2_a = ( croi2_a - (cpixel2_a/bgpixel2_a) * bgroi2_a ) * ( roi_size2 / cpixel2_a)
croibg2_err_a = np.sqrt(croi2_a + ((cpixel2_a/bgpixel2_a)**2) * bgroi2_a) * ( roi_size2 / cpixel2_a)
else:
croibg2_a = croi2_a * ( roi_size2 / cpixel2_a)
croibg2_err_a = np.sqrt(croi2_a) * ( roi_size2 / cpixel2_a)
if corr:
croibg1_a *= Corr1
croibg1_err_a *= Corr1
croibg2_a *= Corr2
croibg2_err_a *= Corr2
if croibg1_bgimg_a is not None:
croibg1_bgimg_a *= Corr1
croibg1_bgimg_err_a *= Corr1
if croibg2_bgimg_a is not None:
croibg2_bgimg_a *= Corr2
croibg2_bgimg_err_a *= Corr2
rod_mask1 = np.isfinite(croibg1_a)
rod_mask2 = np.isfinite(croibg2_a)
s1_masked = hkl_del_gam_1[:,5][rod_mask1]
s2_masked = hkl_del_gam_2[:,5][rod_mask2]
croibg1_a_masked = croibg1_a[rod_mask1]
croibg2_a_masked = croibg2_a[rod_mask2]
croibg1_err_a_masked = croibg1_err_a[rod_mask1]
croibg2_err_a_masked = croibg2_err_a[rod_mask2]
#name = str(H_1) + "*s+" + str(H_0)
if self.scanSelector.scanstab.currentIndex() == 1:
x = self.scanSelector.xy_static[0].value()
y = self.scanSelector.xy_static[1].value()
name1 = "pixloc[%.2f %.2f]" % (x,y)
name2 = "pixloc[%.2f %.2f]_2" % (x,y) # does not exist, Just for compatibility
traj1 = {
"@NX_class": "NXcollection",
"@direction" : "Fixed pixel coordinates",
"s" : hkl_del_gam_1[:,5]
}
traj2 = {
"@NX_class": "NXcollection",
"@direction" : "Fixed pixel coordinates",
"s" : hkl_del_gam_2[:,5]
}
else:
name1 = str(H_1) + "*s1+" + str(H_0)
name2 = str(H_1) + "*s2+" + str(H_0)
traj1 = {
"@NX_class": "NXcollection",
"@direction" : "Intergrated along H_1*s + H_0 in reciprocal space",
"H_1" : H_1,
"H_0" : H_0,
"s" : hkl_del_gam_1[:,5]
}
traj2 = {
"@NX_class": "NXcollection",
"@direction" : "Intergrated along H_1*s + H_0 in reciprocal space",
"H_1" : H_1,
"H_0" : H_0,
"s" : hkl_del_gam_2[:,5]
}
defaultS1 = croibg1_a_masked.size > croibg2_a_masked.size
if hasattr(self.fscan, "title"):
title = str(self.fscan.title)
else:
title = "%s-scan" % self.fscan.axisname
mu, om = self.getMuOm()
if len(np.asarray(om).shape) == 0:
om = np.full_like(mu,om)
if len(np.asarray(mu).shape) == 0:
mu = np.full_like(om,mu)
suffix = ''
i = 0
while(self.activescanname + "/measurement/" + name1 + suffix in self.database.nxfile):
suffix = "_%s" % i
i += 1
availname1 = name1 + suffix
suffix = ''
i = 0
while(self.activescanname + "/measurement/" + name2 + suffix in self.database.nxfile):
suffix = "_%s" % i
i += 1
availname2 = name2 + suffix
auxcounters = {"@NX_class": "NXcollection"}
for auxname in self.fscan.auxillary_counters:
if hasattr(self.fscan, auxname):
cntr = getattr(self.fscan, auxname)
if cntr is not None:
auxcounters[auxname] = cntr
datas1 = {
"@NX_class": "NXdata",
"sixc_angles": {
"@NX_class": "NXpositioner",
"alpha" : np.rad2deg(mu),
"omega" : np.rad2deg(om),
"theta" : np.rad2deg(-1*om),
"delta" : np.rad2deg(hkl_del_gam_1[:,3]),
"gamma" : np.rad2deg(hkl_del_gam_1[:,4]),
"chi" : np.rad2deg(self.ubcalc.chi),
"phi" : np.rad2deg(self.ubcalc.phi),
"@unit" : "deg"
},
"hkl": {
"@NX_class": "NXcollection",
"h" : hkl_del_gam_1[:,0],
"k" : hkl_del_gam_1[:,1],
"l" : hkl_del_gam_1[:,2]
},
"counters":{
"@NX_class": "NXdetector",
"croibg" : croibg1_a,
"croibg_errors" : croibg1_err_a,
'croibg_bgimg': croibg1_bgimg_a, # when None, will not create data set
'croibg_bgimg_errors': croibg1_bgimg_err_a, # when None, will not create data set
"croi" : croi1_a,
"bgroi" : bgroi1_a,
"croi_pix" : cpixel1_a,
"bgroi_pix" : bgpixel1_a,
"Cfactors_croi" : Corr_croi1_a,
"Cfactors_bgroi" : Corr_bgroi1_a,
"bgimg_croi" : bgimg_croi1_a,
"bgimg_bgroi" : bgimg_bgroi1_a
},
"pixelcoord": {
"@NX_class": "NXdetector",
"x" : x_coord1_a,
"y" : y_coord1_a,
'vsize' : vsize,
'hsize' : hsize,
'vsize_corr' : roi_vsize1_a,
'hsize_corr' : roi_hsize1_a
},
"trajectory" : traj1,
"@signal" : "counters/croibg",
"@axes": "trajectory/s",
"@title": self.activescanname + "_" + availname1,
"@orgui_meta": "roi"
}
datas2 = {
"@NX_class": "NXdata",
"sixc_angles": {
"@NX_class": "NXpositioner",
"alpha" : np.rad2deg(mu),
"omega" : np.rad2deg(om),
"theta" : np.rad2deg(-1*om),
"delta" : np.rad2deg(hkl_del_gam_2[:,3]),
"gamma" : np.rad2deg(hkl_del_gam_2[:,4]),
"chi" : np.rad2deg(self.ubcalc.chi),
"phi" : np.rad2deg(self.ubcalc.phi),
"@unit" : "deg"
},
"hkl": {
"@NX_class": "NXcollection",
"h" : hkl_del_gam_2[:,0],
"k" : hkl_del_gam_2[:,1],
"l" : hkl_del_gam_2[:,2]
},
"counters":{
"@NX_class": "NXdetector",
"croibg" : croibg2_a,
"croibg_errors" : croibg2_err_a,
'croibg_bgimg': croibg2_bgimg_a,
'croibg_bgimg_errors': croibg2_bgimg_err_a,
"croi" : croi2_a,
"bgroi" : bgroi2_a,
"croi_pix" : cpixel2_a,
"bgroi_pix" : bgpixel2_a,
"Cfactors_croi" : Corr_croi2_a,
"Cfactors_bgroi" : Corr_bgroi2_a,
"bgimg_croi" : bgimg_croi2_a,
"bgimg_bgroi" : bgimg_bgroi2_a
},
"pixelcoord": {
"@NX_class": "NXdetector",
"x" : x_coord2_a,
"y" : y_coord2_a,
'vsize' : vsize,
'hsize' : hsize,
'vsize_corr' : roi_vsize2_a,
'hsize_corr' : roi_hsize2_a
},
"trajectory" : traj2,
"@signal" : "counters/croibg",
"@axes": "trajectory/s",
"@title": self.activescanname + "_" + availname2,
"@orgui_meta": "roi"
}
data = {self.activescanname:{
"instrument": {
"@NX_class": "NXinstrument",
"positioners": {
"@NX_class": "NXcollection",
self.fscan.axisname: self.fscan.axis
}
},
"auxillary" : auxcounters,
"measurement": {
"@NX_class": "NXentry",
"@default": availname1 if defaultS1 else availname2,
},
"title":"%s" % title,
"@NX_class": "NXentry",
"@default": "measurement/%s" % (availname1 if defaultS1 else availname2),
"@orgui_meta": "scan"
}
}
names_to_log = ""
if np.any(cpixel1_a > 0.):
self.integrdataPlot.addCurve(s1_masked,croibg1_a_masked,legend=self.activescanname + "_" + availname1,
xlabel="trajectory/s", ylabel="counters/croibg", yerror=croibg1_err_a_masked)
data[self.activescanname]["measurement"][availname1] = datas1
names_to_log += availname1
if np.any(cpixel2_a > 0.):
self.integrdataPlot.addCurve(s2_masked,croibg2_a_masked,legend=self.activescanname + "_" + availname2,
xlabel="trajectory/s", ylabel="counters/croibg", yerror=croibg2_err_a_masked)
data[self.activescanname]["measurement"][availname2] = datas2
names_to_log += availname2
self.database.add_nxdict(data)
logger.info("stationary scan integrated and saved with name(s) %s" % names_to_log)
return {'status': 'success'}
def _graphCallback(self,eventdict):
"""GUI-only: handle plot mouse and marker events."""
#print(eventdict)
if eventdict['event'] == 'mouseDoubleClicked':
#newReflection = np.array([1,1,1,self.imageno,eventdict['x'],eventdict['y']])
if self.scanSelector.select_roi_action.isChecked():
self.scanSelector.set_xy_static_loc(eventdict['x'], eventdict['y'])
self.scanSelector.select_roi_action.setChecked(False)
else:
hkl = self.centralPlot.xyHKLConverter(eventdict['x'],eventdict['y'])[:3]
self.reflectionSel.addReflection(eventdict,self.imageno,hkl)
if eventdict['event'] == 'markerMoved':
if eventdict['label'].startswith('__'):
return
self.reflectionSel.moveReflection(eventdict['label'],[eventdict['x'],eventdict['y']])
self.reflectionSel.setReflectionActive(eventdict['label'])
if eventdict['event'] == 'markerClicked':
if eventdict['label'].startswith('__'):
return
self.reflectionSel.setReflectionActive(eventdict['label'])
[docs]
def intkey(self, coords):
"""Create a center ROI key from detector pixel coordinates.
The returned key represents the clipped horizontal and vertical bounds
of the center ROI around ``coords``.
ROI sizes are read from the current ``hsize`` and ``vsize`` controls.
In non-fixed scan modes, advanced ROI options may replace those nominal
sizes with detector-inclination or projected-sample-size corrected
bounds before clipping to the detector.
:param numpy.ndarray coords:
ROI center coordinates ``(x, y)`` in detector pixels.
:returns:
ROI key ``(x_bounds, y_bounds)`` clipped to the detector extent.
:rtype: tuple[slice, slice]
.. note::
CLI-capable. ROI dimensions are read from GUI controls.
"""
vsize = int(self.scanSelector.vsize.value())
hsize = int(self.scanSelector.hsize.value())
detvsize, dethsize = self.ubcalc.detectorCal.detector.shape
coord_restr = np.clip( np.asarray(coords), [0,0], [dethsize, detvsize])
roioptions = self.scanSelector.roioptions.get_parameters()
current_mode = self.scanSelector.scanstab.currentIndex()
if (roioptions['DetectorInclination'] or roioptions['ProjectSampleSize']) and current_mode != 1:
if roioptions['ProjectSampleSize']:
size_exact = ROIutils.calc_corrections(coord_restr,
self.ubcalc.detectorCal,
np.array([hsize, vsize]),
roioptions,
roioptions['DetectorInclination'],
roioptions['factor'])
else:
size_exact = ROIutils.calc_corrections(coord_restr,
self.ubcalc.detectorCal,
np.array([hsize, vsize]),
None,
roioptions['DetectorInclination'],
roioptions['factor'])
hsize = size_exact[0][0]
vsize = size_exact[0][1]
vhalfsize = vsize // 2
hhalfsize = hsize // 2
fromcoords = np.round(np.asarray(coord_restr) - np.array([hhalfsize, vhalfsize]))
tocoords = np.round(np.asarray(coord_restr) + np.array([hhalfsize, vhalfsize]))
if hsize % 2:
if coord_restr[0] % 1 < 0.5:
tocoords[0] += 1
else:
fromcoords[0] -= 1
if vsize % 2:
if coord_restr[1] % 1 < 0.5:
tocoords[1] += 1
else:
fromcoords[1] -= 1
fromcoords = np.clip( np.asarray(fromcoords), [0,0], [dethsize, detvsize])
tocoords = np.clip( np.asarray(tocoords), [0,0], [dethsize, detvsize])
loc = tuple(slice(int(fromcoord), int(tocoord)) for fromcoord, tocoord in zip(fromcoords,tocoords))
#from IPython import embed; embed()
return loc
[docs]
def bkgkeys(self, coords):
"""Create background ROI keys around a center ROI.
Background keys are derived from the center ROI key returned by
:meth:`intkey`. The left and right background ROIs extend horizontally
beside the center ROI and keep the same vertical bounds. The top and
bottom background ROIs extend vertically above and below the center ROI
and keep the same horizontal bounds.
Background widths and heights are read from the current ``left``,
``right``, ``top``, and ``bottom`` controls. All bounds are clipped to
the detector extent.
:param numpy.ndarray coords:
Center ROI coordinates ``(x, y)`` in detector pixels.
:returns:
Left, right, top, and bottom background ROI keys.
:rtype: tuple[tuple[slice, slice], tuple[slice, slice], tuple[slice, slice], tuple[slice, slice]]
.. note::
CLI-capable. Background sizes are read from GUI controls.
"""
left = int(self.scanSelector.left.value())
right = int(self.scanSelector.right.value())
top = int(self.scanSelector.top.value())
bottom = int(self.scanSelector.bottom.value())
detvsize, dethsize = self.ubcalc.detectorCal.detector.shape
croi = self.intkey(coords)
hcroikey = croi[0]
vcroikey = croi[1]
leftkey = (slice(int(np.clip(croi[0].start - left, 0, dethsize)), croi[0].start), croi[1])
rightkey = (slice(croi[0].stop,int(np.clip(croi[0].stop + right, 0, dethsize))), croi[1])
topkey = (croi[0], slice(int(np.clip(croi[1].start - top, 0, detvsize)), croi[1].start))
bottomkey = (croi[0], slice(croi[1].stop, int(np.clip(croi[1].stop + bottom,0,detvsize)) ))
return leftkey, rightkey, topkey, bottomkey
def _onCenterGraph(self, xy):
"""GUI/CLI hint: recenter the plot axes on detector pixel coordinates."""
#img = self.centralPlot.getImage()
#shape = img.shape if img is not None else (100,100)
x1, x2 = self.centralPlot.getXAxis().getLimits()
x_center = (x1+x2)/2
x1_new = x1 - x_center + xy[0]
x2_new = x2 - x_center + xy[0]
self.centralPlot.getXAxis().setLimits(x1_new, x2_new)
y1, y2 = self.centralPlot.getYAxis().getLimits()
y_center = (y1+y2)/2
y1_new = y1 - y_center + xy[1]
y2_new = y2 - y_center + xy[1]
self.centralPlot.getYAxis().setLimits(y1_new, y2_new)
[docs]
def closeEvent(self,event):
"""GUI/CLI hint: close the database before the main window closes."""
self.database.close()
super().closeEvent(event)
[docs]
class Plot2DHKL(silx.gui.plot.PlotWindow):
sigKeyPressDelete = qt.pyqtSignal()
def __init__(self,xyHKLConverter,parent=None,backend=None):
"""GUI-only: initialize the image plot with hkl position readout."""
self.xyHKLConverter = xyHKLConverter
posInfo = [
('X', lambda x, y: x),
('Y', lambda x, y: y),
('H', lambda x, y: self.xyHKLConverter(x,y)[0]),
('K', lambda x, y: self.xyHKLConverter(x,y)[1]),
('L', lambda x, y: self.xyHKLConverter(x,y)[2]),
('del', lambda x, y: self.xyHKLConverter(x,y)[3]),
('gam', lambda x, y: self.xyHKLConverter(x,y)[4]),
('Data', WeakMethodProxy(self._getImageValue))]
super().__init__(parent=parent, backend=backend,
resetzoom=True, autoScale=False,
logScale=False, grid=False,
curveStyle=False, colormap=True,
aspectRatio=True, yInverted=True,
copy=True, save=True, print_=True,
control=True, position=posInfo,
roi=False, mask=True)
if parent is None:
self.setWindowTitle('Plot2D')
self.getXAxis().setLabel('Columns')
self.getYAxis().setLabel('Rows')
#if silx.config.DEFAULT_PLOT_IMAGE_Y_AXIS_ORIENTATION == 'downward':
self.getYAxis().setInverted(True)
self.profile = ProfileToolBar(plot=self)
self.addToolBar(self.profile)
self.colorbarAction.setVisible(True)
self.getColorBarWidget().setVisible(True)
# Put colorbar action after colormap action
actions = self.toolBar().actions()
for action in actions:
if action is self.getColormapAction():
break
self.sigActiveImageChanged.connect(self.__activeImageChanged)
[docs]
def keyPressEvent(self, event):
"""GUI-only: emit delete-key signal for plot interactions."""
key = event.key()
if key == qt.Qt.Key_Delete and not event.isAutoRepeat():
self.sigKeyPressDelete.emit()
super().keyPressEvent(event)
[docs]
def setXyHKLconverter(self,xyHKLConverter):
"""Set the pixel-to-hkl converter used by the plot readout.
.. note::
CLI-capable, but normally used by GUI plot setup.
"""
self.xyHKLConverter = xyHKLConverter
def __activeImageChanged(self, previous, legend):
"""Handle change of active image
:param Union[str,None] previous: Legend of previous active image
:param Union[str,None] legend: Legend of current active image
"""
if previous is not None:
item = self.getImage(previous)
if item is not None:
item.sigItemChanged.disconnect(self.__imageChanged)
if legend is not None:
item = self.getImage(legend)
item.sigItemChanged.connect(self.__imageChanged)
positionInfo = self.getPositionInfoWidget()
if positionInfo is not None:
positionInfo.updateInfo()
def __imageChanged(self, event):
"""Handle update of active image item
:param event: Type of changed event
"""
if event == items.ItemChangedType.DATA:
positionInfo = self.getPositionInfoWidget()
if positionInfo is not None:
positionInfo.updateInfo()
def _getImageValue(self, x, y):
"""Get status bar value of top most image at position (x, y)
:param float x: X position in plot coordinates
:param float y: Y position in plot coordinates
:return: The value at that point or '-'
"""
value = '-'
valueZ = -float('inf')
mask = 0
maskZ = -float('inf')
for image in self.getAllImages():
data = image.getData(copy=False)
isMask = isinstance(image, items.MaskImageData)
if isMask:
zIndex = maskZ
else:
zIndex = valueZ
if image.getZValue() >= zIndex:
# This image is over the previous one
ox, oy = image.getOrigin()
sx, sy = image.getScale()
row, col = (y - oy) / sy, (x - ox) / sx
if row >= 0 and col >= 0:
# Test positive before cast otherwise issue with int(-0.5) = 0
row, col = int(row), int(col)
if (row < data.shape[0] and col < data.shape[1]):
v, z = data[row, col], image.getZValue()
if not isMask:
value = v
valueZ = z
else:
mask = v
maskZ = z
if maskZ > valueZ and mask > 0:
return value, "Masked"
return value
def _getImageDims(self, *args):
"""GUI/CLI hint: return active-image dimensions for plot status text."""
activeImage = self.getActiveImage()
if (activeImage is not None and
activeImage.getData(copy=False) is not None):
dims = activeImage.getData(copy=False).shape[1::-1]
return 'x'.join(str(dim) for dim in dims)
else:
return '-'
#@deprecated(replacement="getProfilePlot", since_version="0.5.0")
[docs]
def getProfileWindow(self):
"""Return the profile plot window.
.. note::
CLI-capable, but normally used by GUI profile tools.
"""
return self.getProfilePlot()
[docs]
def getProfilePlot(self):
"""Return plot window used to display profile curve.
:return: :class:`Plot1D`
"""
return self.profile.getProfilePlot()
[docs]
class QImportScanCreator(qt.QDialog):
def __init__(self,defaultMuTh, parent=None):
"""GUI-only: initialize the raw-image import scan setup dialog."""
qt.QDialog.__init__(self, parent)
self.defaultMuTh = defaultMuTh
layout = qt.QGridLayout()
layout.addWidget(qt.QLabel("scan axis:"),0,0)
layout.addWidget(qt.QLabel("axis start:"),1,0)
layout.addWidget(qt.QLabel("axis end:"),2,0)
self.fixed_label = qt.QLabel("mu (fixed):")
layout.addWidget(self.fixed_label,3,0)
layout.addWidget(qt.QLabel("no frames:"),5,0)
self.scanaxis = qt.QComboBox()
self.scanaxis.addItem("theta")
self.scanaxis.addItem("mu")
self.scanaxis.setCurrentIndex(0)
self.scanaxis.currentIndexChanged.connect(self.onScanAxisChanged)
self.omstart = qt.QDoubleSpinBox()
self.omstart.setRange(-180,180)
self.omstart.setDecimals(4)
self.omstart.setSuffix(" °")
self.omstart.setValue(-90)
self.omend = qt.QDoubleSpinBox()
self.omend.setRange(-180,180)
self.omend.setDecimals(4)
self.omend.setSuffix(" °")
self.omend.setValue(90)
self.no = qt.QSpinBox()
self.no.setReadOnly(True)
self.no.setRange(1,1000000000)
self.no.setValue(180)
self.fixedAngle = qt.QDoubleSpinBox()
self.fixedAngle.setRange(-180,180)
self.fixedAngle.setValue(self.defaultMuTh[0])
layout.addWidget(self.scanaxis,0,1)
layout.addWidget(self.omstart,1,1)
layout.addWidget(self.omend,2,1)
layout.addWidget(self.no,5,1)
layout.addWidget(self.fixedAngle,3,1)
buttons = qt.QDialogButtonBox(qt.QDialogButtonBox.Ok | qt.QDialogButtonBox.Cancel)
layout.addWidget(buttons,6,0,-1,-1)
test = qt.QLabel("Parameters determined from loaded scan:")
layout.addWidget(test,4,0,1,2)
buttons.button(qt.QDialogButtonBox.Ok).clicked.connect(self.accept)
buttons.button(qt.QDialogButtonBox.Cancel).clicked.connect(self.reject)
self.setLayout(layout)
[docs]
def onScanAxisChanged(self, index):
"""GUI-only: update fixed-angle fields for the selected scan axis."""
if index == 0:
self.omstart.setValue(-90.)
self.omend.setValue(90.)
self.fixed_label.setText("mu (fixed):")
self.fixedAngle.setValue(self.defaultMuTh[0])
elif index == 1:
self.omstart.setValue(0.)
self.omend.setValue(15.)
self.fixed_label.setText("theta (fixed):")
self.fixedAngle.setValue(self.defaultMuTh[1])
[docs]
class QPlotDeleteWindow(qt.QDialog):
def __init__(self,curveList,hidden,parent=None):
"""GUI-only: initialize a dialog for hiding or deleting curves."""
qt.QDialog.__init__(self, parent)
self.curves = curveList
self.action = None
layout = qt.QGridLayout()
# create 'select all' button
self.selectAllPlotsCheckbox = qt.QCheckBox()
layout.addWidget(qt.QLabel('select all'),0,0,1,1)
layout.addWidget(self.selectAllPlotsCheckbox,0,1,1,1)
self.selectAllPlotsCheckbox.stateChanged.connect(self.checkOrUncheckAll)
# create an entry (checkbox + name) for each plot curve
self.boxes = []
for i,j in enumerate(self.curves):
newbox = qt.QCheckBox()
if hidden[i] == True:
newbox.setChecked(True)
self.boxes.append(newbox)
layout.addWidget(qt.QLabel(j),i+1,0,1,1)
layout.addWidget(self.boxes[i],i+1,1,1,1)
self.buttons = qt.QDialogButtonBox()
self.buttons.addButton('Delete curves',self.buttons.ActionRole)
self.buttons.addButton('Hide curves',self.buttons.ActionRole)
self.buttons.addButton(qt.QDialogButtonBox.Cancel)
if curveList == []:
layout.addWidget(self.buttons,0,0,-1,-1)
else:
layout.addWidget(self.buttons,i+2,0,-1,-1)
self.buttons.buttons()[1].clicked.connect(self.deleteClicked)
self.buttons.buttons()[2].clicked.connect(self.hideClicked)
self.buttons.button(qt.QDialogButtonBox.Cancel).clicked.connect(self.reject)
self.setLayout(layout)
[docs]
def deleteClicked(self):
"""GUI-only: accept the dialog with a delete-curves action."""
self.action = 'delete'
self.accept()
[docs]
def hideClicked(self):
"""GUI-only: accept the dialog with a hide-curves action."""
self.action = 'hide'
self.accept()
[docs]
def checkOrUncheckAll(self):
"""GUI-only: mirror the select-all checkbox to all curve checkboxes."""
if self.selectAllPlotsCheckbox.isChecked() == True:
for i in self.boxes:
i.blockSignals(True)
i.setChecked(True)
i.blockSignals(False)
elif self.selectAllPlotsCheckbox.isChecked() == False:
for i in self.boxes:
i.blockSignals(True)
i.setChecked(False)
i.blockSignals(False)
[docs]
class QScanCreator(qt.QDialog):
def __init__(self,defaultMuTh, parent=None):
"""GUI-only: initialize the simulation scan setup dialog."""
qt.QDialog.__init__(self, parent)
self.defaultMuTh = defaultMuTh
layout = qt.QGridLayout()
layout.addWidget(qt.QLabel("scan axis:"),0,0)
layout.addWidget(qt.QLabel("axis start:"),1,0)
layout.addWidget(qt.QLabel("axis end:"),2,0)
layout.addWidget(qt.QLabel("no points:"),3,0)
self.fixed_label = qt.QLabel("mu (fixed):")
layout.addWidget(self.fixed_label,4,0)
self.scanaxis = qt.QComboBox()
self.scanaxis.addItem("theta")
self.scanaxis.addItem("mu")
self.scanaxis.setCurrentIndex(0)
self.scanaxis.currentIndexChanged.connect(self.onScanAxisChanged)
self.omstart = qt.QDoubleSpinBox()
self.omstart.setRange(-180,180)
self.omstart.setDecimals(4)
self.omstart.setSuffix(" °")
self.omstart.setValue(-90)
self.omend = qt.QDoubleSpinBox()
self.omend.setRange(-180,180)
self.omend.setDecimals(4)
self.omend.setSuffix(" °")
self.omend.setValue(90)
self.no = qt.QSpinBox()
self.no.setRange(1,1000000000)
self.no.setValue(180)
self.fixedAngle = qt.QDoubleSpinBox()
self.fixedAngle.setRange(-180,180)
self.fixedAngle.setValue(self.defaultMuTh[0])
layout.addWidget(self.scanaxis,0,1)
layout.addWidget(self.omstart,1,1)
layout.addWidget(self.omend,2,1)
layout.addWidget(self.no,3,1)
layout.addWidget(self.fixedAngle,4,1)
buttons = qt.QDialogButtonBox(qt.QDialogButtonBox.Ok | qt.QDialogButtonBox.Cancel)
layout.addWidget(buttons,5,0,-1,-1)
buttons.button(qt.QDialogButtonBox.Ok).clicked.connect(self.accept)
buttons.button(qt.QDialogButtonBox.Cancel).clicked.connect(self.reject)
self.setLayout(layout)
[docs]
def onScanAxisChanged(self, index):
"""GUI-only: update fixed-angle fields for the selected scan axis."""
if index == 0:
self.omstart.setValue(-90.)
self.omend.setValue(90.)
self.fixed_label.setText("mu (fixed):")
self.fixedAngle.setValue(self.defaultMuTh[0])
elif index == 1:
self.omstart.setValue(0.)
self.omend.setValue(15.)
self.fixed_label.setText("theta (fixed):")
self.fixedAngle.setValue(self.defaultMuTh[1])
[docs]
class QDiffractometerImageDialog(qt.QDialog):
def __init__(self, parent=None):
"""GUI-only: initialize the diffraction-geometry image dialog."""
qt.QDialog.__init__(self, parent)
verticalLayout = qt.QVBoxLayout(self)
verticalLayout.setContentsMargins(0, 0, 0, 0)
img = qutils.AspectRatioPixmapLabel(self)
pixmp = qt.QPixmap(resources.getDiffractometerPath())
#img.setScaledContents(False)
img.setPixmap(pixmp)
verticalLayout.addWidget(img)
#reader = qt.QImageReader()
#img = reader.read()
#view = qt.QGraphicsView(self)
#svg = qt.QSvgWidget(self)
#svg.load(resources.getDiffractometerPath())
#verticalLayout.addWidget(svg)
self.setLayout(verticalLayout)
[docs]
class AboutDialog(qt.QDialog):
def __init__(self,version, msg='' ,parent=None):
"""GUI-only: initialize the About dialog content."""
qt.QDialog.__init__(self, parent)
layout = qt.QVBoxLayout()
self.setWindowTitle("About orGUI")
pixmap = resources.getSplashScreen(str(version))
self.logo = qt.QLabel()
app = qt.QApplication.instance()
screenGeometry = app.primaryScreen().availableGeometry()
splashpm = pixmap.scaledToHeight(int(screenGeometry.height()/5), qt.Qt.SmoothTransformation)
self.logo.setPixmap(splashpm)
messageStr = "orGUI version %s" % version
messageStr += msg
messageStr += """<br> <br>
Copyright (c) 2020-2026 Timo Fuchs, published under MIT License
<br> <br>
orGUI: Orientation and Integration with 2D detectors.<br>
Zenodo. <a href=\"https://doi.org/10.5281/zenodo.12592485\">https://doi.org/10.5281/zenodo.12592485</a> <br> <br>
New software updates will be published under <a href=\"https://doi.org/10.5281/zenodo.12592485\">Zenodo</a>.
<br> <br>
Help requests can be send via Email to Timo Fuchs.
<br> <br>
"orGUI" was developed during the PhD work of Timo Fuchs,<br>
within the group of Olaf Magnussen.
"""
self.label = qt.QLabel()
self.label.setText(messageStr)
self.label.setTextInteractionFlags(qt.Qt.TextBrowserInteraction)
self.label.setTextFormat(qt.Qt.RichText)
buttons = qt.QDialogButtonBox(qt.QDialogButtonBox.Ok)
buttons.button(qt.QDialogButtonBox.Ok).clicked.connect(self.accept)
layout.addWidget(self.logo)
layout.addWidget(self.label)
layout.addWidget(buttons)
self.setLayout(layout)
[docs]
class UncaughtHook(qt.QObject):
#_exception_caught = qt.Signal(object)
def __init__(self, *args, **kwargs):
"""GUI-only: install the Qt-aware uncaught exception hook."""
super().__init__(*args, **kwargs)
# this registers the exception_hook() function as hook with the Python interpreter
sys.excepthook = self.exception_hook
self.orgui = None
# connect signal to execute the message box function always on main thread
#self._exception_caught.connect(show_exception_box)
[docs]
def set_orgui(self,orgui):
"""Attach the main window used for fatal-error recovery.
.. note::
GUI-only. The recovery path can show modal dialogs.
"""
self.orgui = orgui
[docs]
def exception_hook(self, exc_type, exc_value, exc_traceback):
"""Handle uncaught exceptions through the GUI fatal-error path.
.. note::
GUI-only. This path may show modal dialogs and terminate the process.
"""
if issubclass(exc_type, KeyboardInterrupt):
# ignore keyboard interrupt to support console applications
sys.__excepthook__(exc_type, exc_value, exc_traceback)
else:
exc_info = (exc_type, exc_value, exc_traceback)
log_msg = '\n'.join([''.join(traceback.format_tb(exc_traceback)),
f'{exc_type.__name__}: {exc_value}'])
print(f"Uncaught exception:\n {log_msg}")# exc_info=exc_info)
# trigger message box show
#self._exception_caught.emit(log_msg)
if qt.QApplication.instance() is not None:
if self.orgui is None:
errorbox = qt.QMessageBox(qt.QMessageBox.Critical,
"Uncaught Exception",
f"An unexpected error occured. The program will terminate now:\n{log_msg}",
qt.QMessageBox.Ok)
errorbox.exec()
sys.exit(1)
else:
resBtn = qutils.critical_detailed_message(self.orgui, "Uncaught Exception", "An unexpected error has occured.\norGUI will terminate now.\nDo you want to try to save the database before terminating?" ,log_msg, qt.QMessageBox.Save | qt.QMessageBox.Discard)
#errorbox = qt.QMessageBox(qt.QMessageBox.Critical,
# "Uncaught Exception",
# "An unexpected error occured:\n{0}\nDo you want to try to save the data before terminating?".format(log_msg),
# qt.QMessageBox.Save | qt.QMessageBox.Discard)
#resBtn = errorbox.exec()
if resBtn == qt.QMessageBox.Save:
try:
self.orgui.database.onSaveDBFile()
except Exception:
print("Fatal error: Cannot save database:\n%s" % traceback.format_exc())
qutils.critical_detailed_message(self.orgui, "Fatal error", "Cannot save database." ,traceback.format_exc())
self.orgui.database.close()
else:
print("No QApplication instance available.")
sys.exit(1)
[docs]
def main(configfile):
"""Start a standalone GUI application for a config file.
:param configfile:
Path to the orGUI configuration file.
.. note::
GUI-only. CLI startup is handled by ``orgui.main``.
"""
a = qt.QApplication(['orGUI'])
qt_exception_hook = UncaughtHook()
mainWindow = orGUI(configfile)
qt_exception_hook.set_orgui(mainWindow)
mainWindow.show()
#a.lastWindowClosed.connect(a.quit)
return a.exec_()
if __name__ == '__main__':
main("./config")