"""Sample represents a set of readings submitted
"""
import sys
import time
from DateTime import DateTime
from AccessControl import ClassSecurityInfo
from Products.CMFCore.WorkflowCore import WorkflowException
from Products.CMFCore import CMFCorePermissions
from Products.CMFCore.utils import getToolByName
from Products.Archetypes.public import *
from Products.Archetypes.references import HoldingReference
from Products.Archetypes.config import REFERENCE_CATALOG
from Products.ATExtensions.ateapi import DateTimeField, DateTimeWidget
from Products.BikaAnalytics.BikaContent import BikaSchema
from Products.BikaAnalytics.config import I18N_DOMAIN
from Products.CMFDynamicViewFTI.browserdefault import \
BrowserDefaultMixin
from Products.BikaAnalytics.config import ManageBika
from Products.BikaAnalytics.utils import sortable_title
from Products.BikaAnalytics.CustomFields import ReadingsField, SamplePriceField, ErrorsField
schema = BikaSchema.copy() + Schema((
StringField('SampleID',
required=1,
index='FieldIndex:brains',
searchable=True,
widget=StringWidget(
label='Sample ID',
label_msgid='label_sampleid',
description='The ID assigned to the client''s sample by the lab',
description_msgid='help_sampleid',
i18n_domain=I18N_DOMAIN,
visible={'edit':'hidden'},
),
),
ComputedField('BatchID',
index='FieldIndex',
searchable=True,
expression='context.aq_parent.getBatchID()',
widget=ComputedWidget(
label='Batch ID',
label_msgid='label_batchid',
description='The batch in which this sample was submitted',
description_msgid='help_batchid',
i18n_domain=I18N_DOMAIN,
visible={'edit':'hidden'},
),
),
ComputedField('BatchState',
index='FieldIndex',
searchable=True,
expression='context.getParentState()',
widget=ComputedWidget(
label='Batch State',
label_msgid='label_batchstate',
description='The state of the batch in which this sample was submitted',
description_msgid='help_batch_state',
i18n_domain=I18N_DOMAIN,
visible={'edit':'hidden'},
),
),
StringField('ClientSampleID',
index='FieldIndex',
searchable=True,
widget=StringWidget(
label='Client Sample ID',
label_msgid='label_clientsampleid',
description='An optional id that the client can provide to identify the sample',
description_msgid='help_clientreference',
i18n_domain=I18N_DOMAIN,
),
),
StringField('ResultA'),
StringField('ResultB'),
StringField('ResultC'),
SamplePriceField('PriceA'),
SamplePriceField('PriceB'),
SamplePriceField('PriceC'),
StringField('ValidityA'),
StringField('ValidityB'),
StringField('ValidityC'),
ReferenceField('ReferenceSample',
allowed_types=('StandardSample',),
relationship='SampleStandardSample',
referenceClass=HoldingReference,
widget=ReferenceWidget(
label='Reference Sample',
label_msgid='label_reference_sample',
i18n_domain=I18N_DOMAIN,
),
),
ReferenceField('SampleType',
vocabulary_display_path_bound=sys.maxint,
allowed_types=('SampleType',),
relationship='SampleSampleType',
referenceClass=HoldingReference,
widget=ReferenceWidget(
checkbox_bound=1,
label='Sample Type',
label_msgid='label_type',
i18n_domain=I18N_DOMAIN,
),
),
ReferenceField('Variety',
vocabulary_display_path_bound=sys.maxint,
allowed_types=('Variety',),
relationship='SampleVariety',
referenceClass=HoldingReference,
widget=ReferenceWidget(
checkbox_bound=1,
label='Variety',
label_msgid='label_variety',
i18n_domain=I18N_DOMAIN,
),
),
ReferenceField('Vintage',
vocabulary_display_path_bound=sys.maxint,
allowed_types=('Vintage',),
relationship='SampleVintage',
referenceClass=HoldingReference,
widget=ReferenceWidget(
checkbox_bound=1,
label='Vintage',
label_msgid='label_vintage',
i18n_domain=I18N_DOMAIN,
),
),
ReferenceField('Country',
vocabulary_display_path_bound=sys.maxint,
allowed_types=('Country',),
relationship='SampleCountry',
referenceClass=HoldingReference,
widget=ReferenceWidget(
checkbox_bound=1,
label='Country',
label_msgid='label_country',
i18n_domain=I18N_DOMAIN,
),
),
ReferenceField('Region',
vocabulary_display_path_bound=sys.maxint,
allowed_types=('Region',),
relationship='SampleRegion',
referenceClass=HoldingReference,
widget=ReferenceWidget(
checkbox_bound=1,
label='Region',
label_msgid='label_region',
i18n_domain=I18N_DOMAIN,
),
),
DateTimeField('DateLoaded',
index='DateIndex',
widget=DateTimeWidget(
label='Date loaded',
label_msgid='label_dateloaded',
visible={'edit':'hidden'},
),
),
DateTimeField('DateProcessed',
index='DateIndex',
widget=DateTimeWidget(
label='Date processed',
label_msgid='label_dateprocessed',
visible={'edit':'hidden'},
),
),
DateTimeField('DatePublished',
index='DateIndex',
widget=DateTimeWidget(
label='Date published',
label_msgid='label_datepublished',
visible={'edit':'hidden'},
),
),
TextField('Notes',
widget=TextAreaWidget(
label='Notes'
),
),
ComputedField('ClientUID',
index='FieldIndex',
expression='context.aq_parent.aq_parent.UID()',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('SampleImportUID',
index='FieldIndex',
expression='context.aq_parent.UID()',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('SampleTypeUID',
index='FieldIndex',
expression='context.getSampleType().UID()',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('VintageUID',
index='FieldIndex',
expression='context.getVintage() and context.getVintage().UID() or None',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('VarietyUID',
index='FieldIndex',
expression='context.getVariety() and context.getVariety().UID() or None',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('CountryUID',
index='FieldIndex',
expression='context.getCountry() and context.getCountry().UID() or None',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('RegionUID',
index='FieldIndex',
expression='context.getRegion() and context.getRegion().UID() or None',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('InstrumentUID',
index='FieldIndex',
expression='context.aq_parent.getInstrument() and context.aq_parent.getInstrument().UID() or None',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('ReferenceSampleUID',
index='FieldIndex',
expression='context.getReferenceSample() and context.getReferenceSample().UID() or None',
widget=ComputedWidget(
visible=False,
),
),
ComputedField('ReferenceSampleFlag',
index='FieldIndex',
expression='context.getReferenceSample() and True or False',
widget=ComputedWidget(
visible=False,
),
),
ReadingsField('Readings'),
ErrorsField('Errors'),
StringField('Validity',
index='FieldIndex',
widget=StringWidget(
label='Validity',
label_msgid='label_validity',
i18n_domain=I18N_DOMAIN,
),
),
),
)
IdField = schema['id']
TitleField = schema['title']
TitleField.required = 0
TitleField.widget.visible = {'edit': 'hidden', 'view': 'invisible'}
class Sample(VariableSchemaSupport, BrowserDefaultMixin, BaseFolder):
security = ClassSecurityInfo()
archetype_name = 'Sample'
schema = schema
allowed_content_types = ()
content_icon = 'sample.png'
immediate_view = 'base_view'
default_view = 'base_view'
use_folder_tabs = 0
global_allow = 0
filter_content_types = 1
__implements__ = BaseFolder.__implements__ + (
BrowserDefaultMixin.__implements__, )
factory_type_information = {
'title': 'Sample'
}
actions = (
{ 'id': 'view',
'name': 'View',
'action': 'string:${object_url}/',
'permissions': (CMFCorePermissions.View,),
},
{ 'id': 'edit',
'name': 'Edit',
'action': 'string:${object_url}/sample_edit_form',
'permissions': (CMFCorePermissions.ModifyPortalContent,),
'condition':'python:here.getParentState() == "loaded"',
},
{'id': 'log',
'name': 'Log',
'action': 'string:${object_url}/status_log',
'permissions': (ManageBika,),
},
)
def Title(self):
""" Return the Sample ID as title """
return self.getSampleID()
def getParentState(self):
""" Return the review state of the parent batch """
batch = self.aq_parent
wf_tool = self.portal_workflow
review_state = wf_tool.getInfoFor(batch, 'review_state', '')
return review_state
def getSubtotal(self):
""" Compute Subtotal """
stuffa = self.getPriceA()
if stuffa.has_key('price'):
suba = stuffa['price']
else:
suba = 0
stuffb = self.getPriceB()
if stuffb.has_key('price'):
subb = stuffb['price']
else:
subb = 0
return suba + subb
def getVAT(self):
""" Compute VAT """
return self.getTotalPrice() - self.getSubtotal()
def getTotalPrice(self):
""" Compute TotalPrice """
stuffa = self.getPriceA()
if stuffa.has_key('total'):
tota = stuffa['total']
else:
tota = 0
stuffb = self.getPriceB()
if stuffb.has_key('total'):
totb = stuffb['total']
else:
totb = 0
return tota + totb
getTotal = getTotalPrice
# workflow methods - here reduced to sample methods
#
def processSample(self, ref_validity):
""" process sample """
self.setDateProcessed(DateTime())
instrument = self.aq_parent.getInstrument()
readings = self.getReadings()
validity = 'ok'
reference_sample_validity = ref_validity
analyte_min = {}
analyte_max = {}
analyte_prec = {}
for a_proxy in self.portal_catalog(portal_type='Analyte'):
analyte = a_proxy.getObject()
analyte_min[analyte.getAnalyteKey()] = analyte.getMinValid()
analyte_max[analyte.getAnalyteKey()] = analyte.getMaxValid()
analyte_prec[analyte.getAnalyteKey()] = analyte.getPrecision()
factors = instrument.getFactors()
# process A result
avalidity = 'ok'
aresult = \
(float(factors['aslope']) * ( \
(float(factors['a']) * float(readings['r1'])) + \
(float(factors['b']) * float(readings['r2'])) + \
(float(factors['c']) * float(readings['r3'])) + \
(float(factors['d']) * float(readings['r4'])) + \
(float(factors['e']) * float(readings['r5'])) + \
float(factors['y']))) + float(factors['abias'])
aresult = self.get_precise_result(aresult, analyte_prec['A'])
if (not self.getReferenceSampleFlag()) and analyte_min.has_key('A'):
if float(aresult) < analyte_min['A'] \
or float(aresult) > analyte_max['A']:
avalidity = 'oor'
validity = 'oor'
# process B result
bvalidity = 'ok'
bresult = (float(factors['bslope']) * \
(float(factors['f']) * float(readings['r6']))) \
+ float(factors['bbias'])
bresult = self.get_precise_result(bresult, analyte_prec['B'])
if (not self.getReferenceSampleUID()) and analyte_min.has_key('B'):
if float(bresult) < analyte_min['B'] \
or float(bresult) > analyte_max['B']:
bvalidity = 'oor'
validity = 'oor'
# process C result
cvalidity = 'ok'
cresult = (float(factors['cslope']) * \
(float(factors['f']) * float(readings['r3']))) \
+ float(factors['cbias'])
cresult = self.get_precise_result(cresult, analyte_prec['C'])
if (not self.getReferenceSampleUID()) and analyte_min.has_key('C'):
if float(cresult) < analyte_min['C'] \
or float(cresult) > analyte_max['C']:
cvalidity = 'oor'
validity = 'oor'
""" check range for qc samples """
if self.getReferenceSampleFlag():
qc_proxies = self.portal_catalog(
portal_type='QCSpec',
getStandardSampleUID=self.getReferenceSampleUID(),
getInstrumentUID=self.getInstrumentUID())
if qc_proxies:
qc_spec = qc_proxies[0].getObject()
qc_aresult = qc_spec.getAResult()
qc_bresult = qc_spec.getBResult()
qc_cresult = qc_spec.getCResult()
else:
ss = self.getReferenceSample()
qc_aresult = ss.getAResult()
qc_bresult = ss.getBResult()
qc_cresult = ss.getCResult()
if aresult and qc_aresult['result']:
adiff = float(aresult) - float(qc_aresult['result'])
adiff = abs(adiff)
avalidity = 'ok'
if adiff > float(qc_aresult['sd3']):
avalidity = 'sd3'
reference_sample_validity = 'sd3'
elif adiff > float(qc_aresult['sd2']):
avalidity = 'sd2'
elif adiff > float(qc_aresult['sd1']):
avalidity = 'sd1'
else:
avalidity = None
if bresult and qc_bresult['result']:
bdiff = float(bresult) - float(qc_bresult['result'])
bdiff = abs(bdiff)
bvalidity = 'ok'
if bdiff > float(qc_bresult['sd3']):
bvalidity = 'sd3'
reference_sample_validity = 'sd3'
elif bdiff > float(qc_bresult['sd2']):
bvalidity = 'sd2'
elif bdiff > float(qc_bresult['sd1']):
bvalidity = 'sd1'
else:
bvalidity = None
if cresult and qc_cresult['result']:
cdiff = float(cresult) - float(qc_cresult['result'])
cdiff = abs(cdiff)
cvalidity = 'ok'
if cdiff > float(qc_cresult['sd3']):
cvalidity = 'sd3'
reference_sample_validity = 'sd3'
elif cdiff > float(qc_cresult['sd2']):
cvalidity = 'sd2'
elif cdiff > float(qc_cresult['sd1']):
cvalidity = 'sd1'
else:
cvalidity = None
validities = [None, 'ok', 'sd1', 'sd2', 'sd3']
if validities.index(avalidity) > validities.index(bvalidity):
validity = avalidity
else:
validity = bvalidity
if validities.index(validity) > validities.index(cvalidity):
validity = validity
else:
validity = cvalidity
if reference_sample_validity == 'sd3':
validity = 'sd3'
self.setValidityA(avalidity)
self.setValidityB(bvalidity)
self.setValidityC(cvalidity)
self.setValidity(validity)
self.setResultA(aresult)
self.setResultB(bresult)
self.setResultC(cresult)
self.reindexObject()
return reference_sample_validity
def retractSample(self):
""" retract sample """
self.setDateProcessed(None)
self.setResultA(None)
self.setResultB(None)
self.setResultC(None)
self.setValidityA(None)
self.setValidityB(None)
self.setValidityC(None)
self.setValidity('ok')
self.reindexObject()
security.declarePublic('current_date')
def current_date(self):
""" return current date """
return DateTime()
registerType(Sample)
def modify_fti(fti):
for a in fti['actions']:
if a['id'] == 'view':
a['visible'] = 1
if a['id'] in ('syndication', 'references', 'metadata',
'localroles'):
a['visible'] = 0
return fti