# IssueTrackerProduct # # www.issuetrackerproduct.com # Peter Bengtsson # License: ZPL # __doc__="""IssueTrackerProduct is the easiest bug/issue tracker system to use for Zope. By Peter Bengtsson Credits: Gregory Wild-Smith, sack, http://twilightuniverse.com issuetracker-development mailinglist community Gavin Kistner for the the tabbed Properties tab Danny W. Adair of Asterisk Ltd for getRolesInContext(self) bug report and patch. """ # python import string, os, re, sys import random import poplib from urlparse import urlparse try: from poplib import POP3, POP3_SSL _has_pop3_ssl = True except ImportError: from poplib import POP3 _has_pop3_ssl = False import cgi import cStringIO import inspect from time import time from socket import error as socket_error from urllib import urlopen try: import transaction except ImportError: # we must be in an older than 2.8 version of Zope transaction = None try: import csv except: csv = None try: from sets import Set except ImportError: # must be old python Set = None from email.MIMEText import MIMEText from email.MIMEMultipart import MIMEMultipart from email.Header import Header from email.Utils import parseaddr, formataddr try: import email.Parser as email_Parser import email.Header as email_Header except ImportError: email_Parser = None try: from stripogram import html2safehtml except ImportError: html2safehtml = None try: from PIL import Image except ImportError: try: import Image except ImportError: Image = None try: from Products.ExternalEditor import ExternalEditor _has_ExternalEditor = True except ImportError: _has_ExternalEditor = False try: from formatflowed import decode as formatflowed_decode _has_formatflowed_ = True except ImportError: _has_formatflowed_ = False # Zope from Products.PageTemplates.PageTemplateFile import PageTemplateFile as PTF from Globals import Persistent, InitializeClass, package_home, DTMLFile from OFS import SimpleItem, Folder, PropertyManager from DocumentTemplate import sequence from AccessControl import ClassSecurityInfo, getSecurityManager from Products.ZCatalog.CatalogAwareness import CatalogAware from Acquisition import aq_inner, aq_parent, aq_base from zLOG import LOG, ERROR, INFO, PROBLEM, WARNING from DateTime import DateTime from App.ImageFile import ImageFile from ZPublisher.HTTPRequest import record from zExceptions import NotFound, Unauthorized # Is CMF installed? try: from Products.CMFCore.utils import getToolByName as CMF_getToolByName except ImportError: CMF_getToolByName = None try: from Products.ZCTextIndex.ParseTree import ParseError _has_ZCTextIndex = 1 except: class ParseError(Exception): # make it up ourselfs pass _has_ZCTextIndex = 0 # Zope >=2.7 has OrderedFolder baked into the core, oldies have to install it manually try: from OFS.OrderedFolder import OrderedFolder as ZopeOrderedFolder except ImportError: try: from Products.OrderedFolder.OrderedFolder import OrderedFolder as ZopeOrderedFolder except ImportError: m = "OrderedFolder not installed. Reports can not be ordered" LOG("IssueTrackerProduct", WARNING, m) del m from OFS.Folder import Folder as ZopeOrderedFolder # Product from I18N import _ from upgrade import VersionController from TemplateAdder import addTemplates2Class, CTP import Notifyables import Utils from Utils import unicodify, asciify from bot_user_agents import is_bot_user_agent from Webservices import IssueTrackerWebservices from CustomField import CustomFieldsIssueTrackerBase from Permissions import * from Constants import * from Errors import * #---------------------------------------------------------------------------- import logging logger = logging.getLogger('IssueTrackerProduct') __version__=open(os.path.join(package_home(globals()), 'version.txt')).read().strip() ## https://bugs.launchpad.net/zope2/+bug/142399 def safe_hasattr(obj, name, _marker=object()): """Make sure we don't mask exceptions like hasattr(). We don't want exceptions other than AttributeError to be masked, since that too often masks other programming errors. Three-argument getattr() doesn't mask those, so we use that to implement our own hasattr() replacement. """ return getattr(obj, name, _marker) is not _marker def base_hasattr(obj, name): """Like safe_hasattr, but also disables acquisition.""" return safe_hasattr(aq_base(obj), name) _first_name_regex = re.compile('^([A-Z][a-z]+)\s') #---------------------------------------------------------------------------- def manage_hasAquirableMailHost(self): """ return if there is a MailHost object in the aqcuisition path """ return len(self.superValues(['Mail Host', 'Secure Mail Host'])) > 0 manage_addIssueTrackerForm = PTF('zpt/addIssueTrackerForm', globals()) def manage_addIssueTracker(dispatcher, id, title='', REQUEST=None): """ add IssueTracker instance via the web """ dest = dispatcher.Destination() issuetracker = IssueTracker(id, title.strip(), sitemaster_name=title) dest._setObject(id, issuetracker) self = dest._getOb(id) self.DeployStandards() self.InitZCatalog() # set that 'IssueTracker Manager' and 'IssueTracker User' should by # default have 'Access IssueTracker' permission if these are defined roles_4_view = [IssueTrackerManagerRole, IssueTrackerUserRole] self.manage_permission('View', roles=roles_4_view, acquire=1) if REQUEST is not None: # whereto next? redirect = REQUEST.RESPONSE.redirect if REQUEST.has_key('addandedit'): url = self.absolute_url() url += '/manage_PropertiesWizard?stage=0&firsttime=1' redirect(url) elif REQUEST.has_key('addandgoto'): redirect(self.absolute_url()+'/manage_workspace') elif REQUEST.has_key('DestinationURL'): redirect(REQUEST.DestinationURL+'/manage_workspace') else: redirect(REQUEST.URL1+'/manage_workspace') #---------------------------------------------------------------------------- class IssueTrackerFolderBase(Folder.Folder, Persistent): """ A base class for the IssueTracker class """ def doDebug(self): """ return True if we're in debug mode """ return DEBUG def getAutosaveInterval(self): """ return the seconds interval of how often the autosaving function should submit. """ return AUTOSAVE_INTERVAL_SECONDS def ValidEmailAddress(self, email): """ wrap script """ script = Utils.ValidEmailAddress return script(email) def html_entity_fixer(self, text, skipchars=[], extra_careful=1): """ wrap script """ return Utils.html_entity_fixer(text, skipchars=skipchars, extra_careful=extra_careful) def newline_to_br(self, text): """ wrap script """ script = Utils.newline_to_br return script(text) def encodeEmailString(self, email, title=None, nolink=0): """ wrap script """ script = Utils.encodeEmailString return script(email, title, nolink=nolink) def sortSequence(self, seq, params): """ this is useful because Python Scripts don't allow sequence.sort """ return sequence.sort(seq, params) def getOrdinalth(self, daynr, html=0): """ what Utils script """ return Utils.ordinalth(daynr, html=html) def timeSince(self, date1, date2, afterword=None, minute_granularity=False, max_no_sections=3): """ wrap Utils.timeSince() """ return Utils.timeSince(date1, date2, afterword=afterword, minute_granularity=minute_granularity, max_no_sections=max_no_sections) def ShowFilesize(self, bytes): """ pass on to utilities module """ return Utils.ShowFilesize(bytes) def LineIndent(self, text, indent): """ wrap script """ return Utils.LineIndent(text, indent) def getFileIconpath(self, filename): """ Try to find a suitable file icon """ default = '/misc_/OFSP/File_icon.gif' extension = filename.lower()[filename.rfind('.')+1:] if extension.endswith('~'): extension = extension[:-1] if ICON_ASSOCIATIONS.has_key(extension): return '/%s/%s'%(ICON_LOCATION,ICON_ASSOCIATIONS[extension]) else: return default def getRandomString(self, length=5, loweronly=0, numbersonly=0): """ return a completely random piece of string """ script = Utils.getRandomString return script(length, loweronly, numbersonly) def lengthLimit(self, string, maxsize=45, append='...'): """ show only the first 'maxsize' characters of the string """ return Utils.AwareLengthLimit(string, maxsize, append) def safe_html_quote(self, text): """ wrap this improvement to Zope's html_quote in Utils """ return Utils.safe_html_quote(text) def ascii_url_quote(self, s): """ return a string url quoted even it's it a unicode string """ if isinstance(s, unicode): return Utils.url_quote(s.encode(UNICODE_ENCODING)) else: return Utils.url_quote(s) def ascii_url_quote_plus(self, s): """ return a string url quoted (with +) even it's it a unicode string """ if isinstance(s, unicode): return Utils.url_quote_plus(s.encode(UNICODE_ENCODING)) else: return Utils.url_quote_plus(s) def tag_quote(self, text): """ wrap Utils """ return Utils.tag_quote(text) def splitTerms(self, term): """ wrap Utils script because it's need in ZPTs """ return Utils.splitTerms(term) def getContentType(self, content_type='text/html', charset=UNICODE_ENCODING): """ return the content type header value """ return '%s; charset=%s' % (content_type, charset) def getAndSetContentType(self, content_type='text/html', charset=UNICODE_ENCODING): """ return the content type header value and set it on self.REQUEST.RESPONSE """ value = self.getContentType(content_type=content_type, charset=charset) self.REQUEST.RESPONSE.setHeader('Content-Type', value) return value def unsafe_unicode_dict_getitem(self, dictionary, item): """ Return the value of this item in a dictionary object. Simply call the __getitem__ of this dictionary to pluck out an item. Why call this unsafe_...() ? If you try to do this in a guarded context (e.g. Script (Python) (or Page Template)) you'll get an Unauthorized error: d = {u'\xa3':1} d[u'\xa3'] # will raise an Unauthorized error # this works however d = {u'\xa3':1, u'asciiable':1} d[u'asciiable'] Why? I don't know. The place where it happens is the parental guardian function guarded_getitem() from ZopeGuards.py By instead calling the __getitem__ from here in unrestricted python we can bypass this. """ return dictionary[item] #---------------------------------------------------------------------------- # Misc stuff ss = lambda s: s.strip().lower() # to save some typing space def ss_remove(list_, element): correct_element = None element = ss(element) for item in list_: if ss(item) == element: correct_element = item break if correct_element is not None: list_.remove(correct_element) signature_patterns = {'url':re.compile('\[url\]', re.I), 'title':re.compile('\[title\]', re.I), 'sitemaster name':re.compile('\[sitemaster name\]', re.I), 'sitemaster email':re.compile('\[sitemaster email\]', re.I), 'date':re.compile('\[date\]', re.I), } def debug(s, tabs=0, steps=(1,), f=False): if DEBUG or f: inspect_dbg = [] if type(steps)==type(1): steps = range(1, steps+1) for i in steps: try: #caller_module = inspect.stack()[i][1] caller_method = inspect.stack()[i][3] caller_method_line = inspect.stack()[i][2] except IndexError: break inspect_dbg.append("%s:%s"%(caller_method, caller_method_line)) out = "\t"*tabs + "%s (%s)"%(s, ", ".join(inspect_dbg)) # XXX this needs attention. Consider implementing a ObserverProxy from # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/413701 print out open('issuetracker-debug.log','a').write(out+"\n") class Empty: pass #---------------------------------------------------------------------------- class IssueTracker(IssueTrackerFolderBase, CatalogAware, Notifyables.Notifyables, IssueTrackerWebservices, CustomFieldsIssueTrackerBase, ): """ IssueTracker class """ meta_type = ISSUETRACKER_METATYPE security = ClassSecurityInfo() security.setPermissionDefault(AddIssuesPermission, (IssueTrackerManagerRole, IssueTrackerUserRole, 'Anonymous', 'Owner', 'Manager')) manage_options = Folder.Folder.manage_options[:2] + \ ({'label':'Properties', 'action':'manage_editIssueTrackerPropertiesForm'}, {'label':'Management', 'action':'manage_ManagementForm'}, \ {'label':'POP3', 'action':'manage_POP3ManagementForm'}) \ + Folder.Folder.manage_options[3:] native_properties = NATIVE_PROPERTIES # used by CheckoutableTemplates to filter templates this_package_home = package_home(globals()) # used for some templates project_homepage = 'http://www.issuetrackerproduct.com' def __init__(self, id, title='', sitemaster_name=DEFAULT_SITEMASTER_NAME, sitemaster_email=DEFAULT_SITEMASTER_EMAIL): """ Init IssueTracker class """ self.id = str(id) self.title = str(title) self.types = list(DEFAULT_TYPES) self.urgencies = list(DEFAULT_URGENCIES) self.sections_options = list(DEFAULT_SECTIONS_OPTIONS) self.defaultsections = list(DEFAULT_SECTIONS) self.when_ignore_word = DEFAULT_WHEN_IGNORE_WORD self.display_date = DEFAULT_DISPLAY_DATE self.always_notify = DEFAULT_ALWAYS_NOTIFY self.sitemaster_name = sitemaster_name self.sitemaster_email = sitemaster_email self.default_type = DEFAULT_TYPE self.default_urgency = DEFAULT_URGENCY self.manager_roles = DEFAULT_MANAGER_ROLES self.default_batch_size = DEFAULT_DEFAULT_BATCH_SIZE self.allow_show_all = DEFAULT_ALLOW_SHOW_ALL self.issueprefix = DEFAULT_ISSUEPREFIX self.no_fileattachments = DEFAULT_NO_FILEATTACHMENTS self.no_followup_fileattachments = DEFAULT_NO_FOLLOWUP_FILEATTACHMENTS self.statuses = list(DEFAULT_STATUSES) self.statuses_verbs = list(DEFAULT_STATUSES_VERBS) self.display_formats = list(DEFAULT_DISPLAY_FORMATS) self.default_display_format = DEFAULT_DEFAULT_DISPLAY_FORMAT self.dispatch_on_submit = DEFAULT_DISPATCH_ON_SUBMIT self.randomid_length = DEFAULT_RANDOMID_LENGTH self.allow_issueattrchange = DEFAULT_ALLOW_ISSUEATTRCHANGE self.stop_cache = DEFAULT_STOP_CACHE self.allow_subscription = DEFAULT_ALLOW_SUBSCRIPTION self.use_tellafriend = DEFAULT_USE_TELLAFRIEND self.use_tellafriend_for_anonymous = DEFAULT_USE_TELLAFRIEND_FOR_ANONYMOUS self.show_dates_cleverly = DEFAULT_SHOW_DATES_CLEVERLY self.private_statistics = DEFAULT_PRIVATE_STATISTICS self.private_reports = DEFAULT_PRIVATE_REPORTS self.save_drafts = DEFAULT_SAVE_DRAFTS self.show_confidential_option = DEFAULT_SHOW_CONFIDENTIAL_OPTION self.show_hideme_option = DEFAULT_SHOW_HIDEME_OPTION self.show_issueurl_option = DEFAULT_SHOW_ISSUEURL_OPTION self.encode_emaildisplay = DEFAULT_ENCODE_EMAILDISPLAY self.show_always_notify_status = DEFAULT_SHOW_ALWAYS_NOTIFY_STATUS self.images_in_menu = DEFAULT_IMAGES_IN_MENU self.use_issue_assignment = DEFAULT_USE_ISSUE_ASSIGNMENT self._assignment_blacklist = [] self.signature_text = DEFAULT_SIGNATURE_TEXT self.default_sortorder = DEFAULT_SORTORDER self.can_add_new_sections = DEFAULT_CAN_ADD_NEW_SECTIONS self.show_id_with_title = DEFAULT_SHOW_ID_WITH_TITLE self.show_use_accesskeys_option = DEFAULT_SHOW_USE_ACCESSKEYS_OPTION self.show_remember_savedfilter_persistently_option = DEFAULT_SHOW_REMEMBER_SAVEDFILTER_PERSISTENTLY_OPTION self.outlook_batch_size = DEFAULT_OUTLOOK_BATCH_SIZE self.use_autosave = DEFAULT_USE_AUTOSAVE self.disallow_duplicate_issue_subjects = DEFAULT_DISALLOW_DUPLICATE_ISSUE_SUBJECTS self.use_estimated_time = DEFAULT_USE_ESTIMATED_TIME self.use_actual_time = DEFAULT_USE_ACTUAL_TIME self.include_description_in_notifications = DEFAULT_INCLUDE_DESCRIPTION_IN_NOTIFICATIONS self.spam_keywords = DEFAULT_SPAM_KEYWORDS self.show_spambot_prevention = DEFAULT_SHOW_SPAMBOT_PREVENTION self.acl_cookienames = {} self.acl_cookieemails = {} self.acl_cookiedisplayformats = {} self.menu_items = DEFAULT_MENU_ITEMS self.btreefolder_storage = False self.brother_issuetracker_paths = [] self.plugin_paths = [] ## Getting basic attributes def getId(self): """ return id """ return self.id def getTitle(self): """ return title """ return self.title security.declareProtected('View', 'getModifyTimestamp') def getModifyTimestamp(self): """ return the modify date of the issuetracker as a whole as an integer timestamp. The latest modify date is the issue with the latest modify date. """ issues = self.getIssueObjects() issues.sort(lambda x,y: cmp(y.getModifyDate(), x.getModifyDate())) if issues: return issues[0].getModifyTimestamp() return int(self.bobobase_modification_time()) def relative_url(self, url=None): """ shorter than absolute_url """ if url: return url.replace(self.REQUEST.BASE0, '') path = self.absolute_url_path() if path == '/': # urls should always be return not ending in a slash # so that you can be garanteed this in the templates return '' else: return path def XXXglobal_relative_url(self, object_or_url): """ return a simpler url of any object """ if isinstance(object_or_url, basestring): url = object_or_url else: url = object_or_url.absolute_url() return url.replace(self.REQUEST.BASE0, '') def getStatusesVerbs(self): """ return statuses_verbs """ return getattr(self, 'statuses_verbs', DEFAULT_STATUSES_VERBS) def getStatuses(self): """ return statuses """ return self.statuses def getStatusesMerged(self, aslist=0, asdict=0, verb_first=0, cleaned=False): """ return statuses and statuses_verbs next to each other So it looks like this ['taken, take', 'rejected, reject', ...] If the 'cleaned' property is set to true, we clean up all the values carefully. This is off by default so that the cleaning only happens on rare occasions such as when you're on the Properties tab. """ statuses = self.getStatuses() verbs = self.getStatusesVerbs() if cleaned: statuses = [unicodify(x.strip()) for x in statuses if x.strip()] verbs = [unicodify(x.strip()) for x in verbs if x.strip()] _big_warning = False if len(statuses) > len(verbs): _big_warning = True _add_to_verbs = [] for i in range(len(statuses)-len(verbs)): _add_to_verbs.append(statuses[len(verbs)+i]) verbs.extend(_add_to_verbs) elif len(verbs) > len(statuses): _big_warning = True _add_to_statuses = [] for i in range(len(verbs)-len(statuses)): _add_to_statuses.append(verbs[len(statuses)+i]) statuses.extend(_add_to_statuses) if _big_warning: msg = "The status list (statuses and verbs) is out of sync and "\ "has had to be temporarily merged to work. Please revisit "\ "the Properties tab." logger.warn(msg) self.statuses = statuses self.statuses_verbs = verbs nl=[] nldict = {} delimiter = ', ' for i in range(len(statuses)): if verb_first: nldict[verbs[i].strip()] = statuses[i].strip() else: nldict[statuses[i].strip()] = verbs[i].strip() if aslist: nl.append([statuses[i], verbs[i]]) else: nl.append(statuses[i]+delimiter+verbs[i]) if asdict: return nldict else: return nl def splitStatusesAndVerbs(self, statuses_and_verbs): """ list might be ['open, open', 'taken, take', ...] then split this up into two lists. Raise a ValueError if no delimeter is found or if any value is empty. """ statuses = [] verbs = [] for each in [x.strip() for x in statuses_and_verbs if x.strip()]: found_delim = max(each.find(','), each.find(';'), each.find('|')) if found_delim > -1: splitted = [each[:found_delim], each[found_delim+1:]] if not splitted[0].strip(): raise ValueError, "Status item entered blank (%r)" % each if not splitted[1].strip(): raise ValueError, "Verb item entered blank (%r)" % each statuses.append(splitted[0].strip()) verbs.append(splitted[1].strip()) elif each.strip() != '': raise ValueError, "Line contains no delimeter (%r)" % each return statuses, verbs def getSectionOptions(self): """ return section options """ return self.sections_options def getTypeOptions(self): """ return types """ return self.types def getUrgencyOptions(self): """ return urgencies """ return self.urgencies def getDefaultSections(self): """ return default sections """ return self.defaultsections def getDefaultType(self): """ return default type """ return self.default_type def getDefaultUrgency(self): """ return default urgency """ return self.default_urgency def getDefaultDisplayFormat(self): """ return default_display_format """ return getattr(self, 'default_display_format', DEFAULT_DEFAULT_DISPLAY_FORMAT) def AllowIssueAttributeChange(self): """ Determine if the allow_issueattrchange is True """ return getattr(self, 'allow_issueattrchange', DEFAULT_ALLOW_ISSUEATTRCHANGE) def AllowIssueSubscription(self): """ Determine if the allow_subscription is True """ return getattr(self, 'allow_subscription', DEFAULT_ALLOW_SUBSCRIPTION) def UseTellAFriend(self): """ Determine if we're going to use the tell-a-friend feature on the issue view """ return getattr(self, 'use_tellafriend', DEFAULT_USE_TELLAFRIEND) def UseTellAFriendForAnonymous(self): """ Determine if we're going to use the tell-a-friend feature on the issue view even for anonymous users """ return getattr(self, 'use_tellafriend_for_anonymous', DEFAULT_USE_TELLAFRIEND_FOR_ANONYMOUS) def ShowDatesCleverly(self): """ Determine if we're going to show dates differently depending on when the date is. What happens is that dates that are today are shown as 'Today 11:25' and really old dates are shown without the time part. """ return getattr(self, 'show_dates_cleverly', DEFAULT_SHOW_DATES_CLEVERLY) def PrivateStatistics(self): """ Determine if private_statistics is False """ default = DEFAULT_PRIVATE_STATISTICS return getattr(self, 'private_statistics', default) def PrivateReports(self): """ Determine if private_reports is False """ default = DEFAULT_PRIVATE_REPORTS return getattr(self, 'private_reports', default) def SaveDrafts(self): """ Return if we allow for saving drafts """ default = DEFAULT_SAVE_DRAFTS return getattr(self, 'save_drafts', default) def UseAutoSave(self): """ return if we're going to use autosave """ default = DEFAULT_USE_AUTOSAVE return getattr(self, 'use_autosave', default) def DisallowDuplicateIssueSubjects(self): """ return disallow_duplicate_issue_subjects """ default = DEFAULT_DISALLOW_DUPLICATE_ISSUE_SUBJECTS return getattr(self, 'disallow_duplicate_issue_subjects', default) def UseEstimatedTime(self): """ return use_estimated_time """ default = DEFAULT_USE_ESTIMATED_TIME return getattr(self, 'use_estimated_time', default) def AllowShowAll(self): """ return allow_show_all """ default = DEFAULT_ALLOW_SHOW_ALL return getattr(self, 'allow_show_all', default) def UseActualTime(self): """ return use_actual_time """ default = DEFAULT_USE_ACTUAL_TIME return getattr(self, 'use_actual_time', default) def _setUseActualTime(self, toggle_to=True): """ set use_actual_time """ self.use_actual_time = bool(toggle_to) def IncludeDescriptionInNotifications(self): """ return include_description_in_notifications """ default = DEFAULT_INCLUDE_DESCRIPTION_IN_NOTIFICATIONS return getattr(self, 'include_description_in_notifications', default) def getSpamKeywords(self): """ return spam_keywords if possible """ return getattr(self, 'spam_keywords', DEFAULT_SPAM_KEYWORDS) def getSpamKeywordsExpanded(self): """ the property 'spam_keywords' is a list that contains potentially sublists like this: ['foo', 'bar', ['kung', 'fu'], ] Then, return it like this: ['foo', 'bar', '\tkung', '\tfu', ] """ padding_template = ' %s' L = self.getSpamKeywords()[:] listtest = lambda x: isinstance(x, list) for item in L: if listtest(item): i = L.index(item) L.pop(i) item.reverse() for subitem in item: L.insert(i, padding_template % subitem) return L def ShowConfidentialOption(self): """ return show_confidential_option """ default = DEFAULT_SHOW_CONFIDENTIAL_OPTION return getattr(self, 'show_confidential_option', default) def ShowHideMeOption(self): """ return show_hideme_option """ default = DEFAULT_SHOW_HIDEME_OPTION return getattr(self, 'show_hideme_option', default) def ShowIssueURLOption(self): """ return show_issueurl_option """ # the default is probably False but because we don't want to surprise people # with existing issuetracker instance we resolve to True if it # hasn't been set. if hasattr(self, 'show_issueurl_option'): return self.show_issueurl_option else: #default = DEFAULT_SHOW_ISSUEURL_OPTION default = True return default def ShowDownloadButton(self): """ return show_download_button """ import warnings m = "Download button is deprecated." warnings.warn(m, DeprecationWarning) return False #default = DEFAULT_SHOW_DOWNLOAD_BUTTON #return getattr(self, 'show_download_button', default) def EncodeEmailDisplay(self): """ return encode_emaildisplay """ default = DEFAULT_ENCODE_EMAILDISPLAY return getattr(self, 'encode_emaildisplay', default) def getNoFileattachments(self): """ return no_fileattachments or default """ return getattr(self, 'no_fileattachments', DEFAULT_NO_FILEATTACHMENTS) def getNoFollowupFileattachments(self): """ return no_followup_fileattachments or default """ return getattr(self, 'no_followup_fileattachments', DEFAULT_NO_FOLLOWUP_FILEATTACHMENTS) def doDispatchOnSubmit(self): """ Check if we shall dispatch emails out """ return getattr(self, 'dispatch_on_submit', DEFAULT_DISPATCH_ON_SUBMIT) def doStopCache(self): """ return the stop_cache property """ return getattr(self, 'stop_cache', DEFAULT_STOP_CACHE) def doShowAlwaysNotifyStatus(self): """ return show_always_notify_status """ return getattr(self, 'show_always_notify_status', DEFAULT_SHOW_ALWAYS_NOTIFY_STATUS) def imagesInMenu(self): """ return if the images_in_menu attribute is True """ return getattr(self, 'images_in_menu', DEFAULT_IMAGES_IN_MENU) def CanAddNewSections(self): """ return if can_add_new_sections is True """ return getattr(self, 'can_add_new_sections', DEFAULT_CAN_ADD_NEW_SECTIONS) def ShowIdWithTitle(self): """ return show_id_with_title """ return getattr(self, 'show_id_with_title', DEFAULT_SHOW_ID_WITH_TITLE) def ShowCSVExportLink(self): """ return show_csvexport_link """ return getattr(self, 'show_csvexport_link', DEFAULT_SHOW_CVSEXPORT_LINK) def ShowAccessKeysOption(self): """ return show_use_accesskeys_option """ default=DEFAULT_SHOW_USE_ACCESSKEYS_OPTION return getattr(self, 'show_use_accesskeys_option', default) def ShowRememberSavedfilterPersistentlyOption(self): """ return show_remember_savedfilter_persistently_option """ default=DEFAULT_SHOW_REMEMBER_SAVEDFILTER_PERSISTENTLY_OPTION return getattr(self, 'show_remember_savedfilter_persistently_option', default) def getOutlookBatchSize(self): """ return outlook_batch_size (used in zpt/index_html.zpt) """ default = DEFAULT_OUTLOOK_BATCH_SIZE return getattr(self, 'outlook_batch_size', default) def ShowSpambotPrevention(self): """ return show_spambot_prevention """ default = DEFAULT_SHOW_SPAMBOT_PREVENTION return getattr(self, 'show_spambot_prevention', default) def getSitemasterEmail(self): """ return sitemaster_email """ return self.sitemaster_email def getSitemasterName(self): """ return sitemaster_name """ return self.sitemaster_name def getSitemasterFromField(self): """ return a combination of sitemaster_name and sitemaster_email """ name = self.getSitemasterName() email = self.getSitemasterEmail() assert email.strip(), "Must have email for sitemaster" if name.strip(): return "%s <%s>" % (name, email) else: return email def UseIssueAssignment(self): """ return use_issue_assignment """ return getattr(self, 'use_issue_assignment', DEFAULT_USE_ISSUE_ASSIGNMENT) def UseExtendedOptions(self): """ return if we should allow for extended options to an issue """ #### XXXXXXX more work needed here return 0 def getIssueAssignmentBlacklist(self, check_each=False): """ return _assignment_blacklist """ list = getattr(self.getRoot(), '_assignment_blacklist',[]) if check_each: checked = [] for each in list: acl_path, username = each.split(',') try: userfolder = self.unrestrictedTraverse(acl_path) except: continue if userfolder.data.has_key(username): checked.append(each) return checked else: return list def ShowDescription(self, text, display_format=''): """ pass on to utilities module """ script = Utils.ShowDescription if self.EncodeEmailDisplay(): return script(text, display_format, emaillinkfunction=self.encodeEmailString) else: return script(text, display_format) def getSignature(self): """ return signature_text """ return getattr(self, 'signature_text', DEFAULT_SIGNATURE_TEXT) def showSignature(self): """ return getSignature() with the variables replaced with real stuff """ text = self.getSignature() patterns = signature_patterns if patterns['url'].findall(text): text = re.sub(patterns['url'], self.getRootURL(), text) if patterns['title'].findall(text): text = re.sub(patterns['title'], self.getRoot().getTitle(), text) if patterns['date'].findall(text): date = DateTime().strftime(self.display_date) text = re.sub(patterns['date'], date, text) if patterns['sitemaster name'].findall(text): _v = self.getSitemasterName() text = re.sub(patterns['sitemaster name'], _v, text) if patterns['sitemaster email'].findall(text): _v = self.getSitemasterName() text = re.sub(patterns['sitemaster email'], _v, text) return text def showDate(self, date, today=None): """ return the date formatted nicely """ if self.ShowDatesCleverly(): # The whole reason why today is a parameter is because # if this function is called 20 times in one page # eg. richList.zpt then it'd be a shame to create a new # DateTime object every time. By creating it once and # passing it every time to this function we save some # CPU and memory default_fmt = self.display_date def abbr(label, date): fmt = default_fmt.replace('%H:%M','').strip() return '%s' % (date.strftime(fmt), label) if today is None: today = DateTime() if date.strftime('%Y%m%d') == today.strftime('%Y%m%d'): return abbr(_("Today"), date) + date.strftime(" %H:%M") elif (date+1).strftime('%Y%m%d') == today.strftime('%Y%m%d'): return abbr(_("Yesterday"), date) + date.strftime(" %H:%M") elif date.strftime('%Y%W') == today.strftime('%Y%W'): return abbr(date.strftime('%A'), date) + date.strftime(' %H:%M') elif (date+7).strftime('%Y%W') == today.strftime('%Y%W'): return abbr(_("Last week") + date.strftime(' %A'), date) + date.strftime(' %H:%M') #elif date.strftime('%Y%m') == today.strftime('%Y%m'): # return date.strftime(default_fmt) else: # skip the hour part fmt = default_fmt.replace('%H:%M','').strip() return date.strftime(fmt) # default thing return date.strftime(self.display_date) def getDefaultSortorder(self): """ return the default sort order """ return getattr(self, 'default_sortorder', DEFAULT_SORTORDER) # new def doShowThreads(self): """ return if threads should be shown after the issue(s) """ default = True try: return Utils.niceboolean(self.REQUEST.get('show-threads', default)) except: return default def getForcedStylesheet(self): """ return which if any forced stylesheet to use """ v = self.REQUEST.get('forced-stylesheet') if not v: return None else: if v.startswith('/') or v.startswith('http'): return v else: return "%s/%s" % (self.getRootURL(), v) def getPluginPaths(self): """ return plugin_paths """ return getattr(self, 'plugin_paths', []) def getPluginObjects(self): """ return a list of Zope objects which are plugins to the issuetracker instance like the MoreStatistics or FileArchive """ objects = [] for path in self.getPluginPaths(): if path: try: object = self.restrictedTraverse(path) objects.append(object) except: pass return objects ## ## Getting the issue objects ## def _getIssueContainer(self): root = self.getRoot() if root._isUsingBTreeFolder(): return getattr(root, BTREEFOLDER2_ID) else: return root def getBrotherPaths(self): """ return the paths of the brother issuetrackers we have """ return getattr(self, 'brother_issuetracker_paths',[]) def _getBrothers(self): """ return a list of Issue Tracker instance objects that we have defined as brothers """ paths = self.getBrotherPaths() trackers = [self.restrictedTraverse(x) for x in paths] trackers = [x for x in trackers if x.meta_type == ISSUETRACKER_METATYPE] return trackers def isFromBrother(self, issue): """ return true if the passed issue doesn't belong to this issuetracker """ return not issue.absolute_url_path().startswith(self.getRoot().absolute_url_path()) def getBrotherFromIssue(self, issue): """ return the issuetracker instance this issue belongs to """ parent = aq_parent(aq_inner(issue)) if parent.meta_type == 'BTreeFolder2': parent = aq_parent(aq_inner(parent)) return parent def getIssueObjects(self): """ return what objectValues does but with varying container """ container = self._getIssueContainer() all = list(container.objectValues(ISSUE_METATYPE)) try: brothers = self._getBrothers() if brothers: for brother in brothers: all.extend(brother.getIssueObjects()) except KeyError, msg: tmpl = 'Reference to join-in issue trackers (%s) is broken in %s' paths = ', '.join(self.getBrotherPaths()) logger.warn(tmpl % (paths, self.absolute_url_path())) return all def getIssueItems(self): """ return what objectItems does but with varying container """ container = self._getIssueContainer() brothers = self._getBrothers() if brothers: all = list(container.objectValues(ISSUE_METATYPE)) for brother in brothers: all.extend(list(brother.getIssueItems())) return all else: return container.objectItems(ISSUE_METATYPE) def getIssueIds(self): """ return what objectIds does but with varying container """ container = self._getIssueContainer() brothers = self._getBrothers() if brothers: all = list(container.objectIds(ISSUE_METATYPE)) for brother in brothers: all.extend(list(brother.getIssueIds())) return all else: return container.objectIds(ISSUE_METATYPE) def countIssueObjects(self): """ return what objectValues does """ return len(self.getIssueObjects()) def hasAnyIssues(self): """ return if there are any issues in the root at all """ return self.countIssueObjects() > 0 def ageOfOldestIssue(self): """ return the datetime object of the oldest issue """ oldest = DateTime() for issue in self.getIssueObjects(): if issue.getIssueDate() < oldest: oldest = issue.getIssueDate() return oldest def hasIssue(self, issueid): """ see if this issue exists """ return hasattr(self._getIssueContainer(), issueid) def getIssueObject(self, issueid): """ because a plain getattr() wasn't enough """ return getattr(self._getIssueContainer(), issueid) def _isUsingBTreeFolder(self): """ return if we're using a BTreeFolder2 for storing all issues """ if not hasattr(self, 'btreefolder_storage'): root = self.getRoot() self.btreefolder_storage = BTREEFOLDER2_ID in root.objectIds('BTreeFolder2') return self.btreefolder_storage ## Editing the IssueTracker def getDisplayDateFormatOptions(self): """ return a list of a different formats """ return ['%d/%m %Y', '%d/%m %Y %H:%M', '%m/%d %Y', '%m/%d %Y %H:%M', # US style '%d %b %Y', '%d %b %Y %H:%M', '%d %B %Y', '%d %B %Y %H:%M', '%d-%m-%Y', '%d-%m-%Y %H:%M', '%m-%d-%Y', '%m-%d-%Y %H:%M', # US style '%d-%b %Y', '%d-%b %Y %H:%M', '%d-%B %Y', '%d-%B %Y %H:%M', '%Y/%m/%d', '%Y/%m/%d %H:%M', '%d/%m/%Y', '%d/%m/%Y - %H:%M', '%m/%d/%Y', '%m/%d/%Y - %H:%M', ] def getDefaultSortorderOptions(self): """ return which default sort orders we can have """ return SORTORDER_ALTERNATIVES def translateSortorderOption(self, variable): """ return a nice representation of the variable for the Properties tab. """ if variable == 'modifydate': return _(u"Modification date") elif variable == 'issuedate': return _(u"Creation date") else: return variable.capitalize() security.declareProtected(VMS, 'manage_findPotentialBrothers') def manage_findPotentialBrothers(self): """ return a list of all issue tracker instances that can be found in the proximity """ all = [] root = self.getRoot() root_parent = aq_parent(aq_inner(root)) all = self._getPotentialBrothers(root_parent, skip_id=root.getId()) all.sort(lambda x,y: cmp(x.getTitle(), y.getTitle())) return all def _getPotentialBrothers(self, inobject, skip_id=None): """ recursively return all issuetracker instances """ found = [] for obj in inobject.objectValues(): # Check that the found object is something sane try: obj.meta_type except: continue try: obj.isPrincipiaFolderish except: continue if obj.meta_type==ISSUETRACKER_METATYPE: if skip_id and skip_id == obj.getId(): continue found.append(obj) elif obj.isPrincipiaFolderish: found.extend(self._getPotentialBrothers(obj, skip_id=skip_id)) return found def _savePluginPaths(self, paths): """ filter and save the paths list """ if isinstance(paths, basestring): paths = [paths] paths = [x.strip() for x in paths if x.strip()] ok = [] for each in paths: try: obj = self.restrictedTraverse(each) except: continue if each not in ok: ok.append(each) self.plugin_paths = ok security.declareProtected(VMS, 'manage_savePluginPath') def manage_savePluginPath(self, path): """ add one plugin path to this instance """ assert path, "Path can't be empty" all_paths = self.getPluginPaths() + [path] self._savePluginPaths(all_paths) security.declareProtected(VMS, 'manage_editIssueTrackerProperties') def manage_editIssueTrackerProperties(self, carefulbooleans=False, REQUEST=None): """ save all IssueTracker related issues Since booleans are controlled from checkboxes where non-existance is the same as False. This is not good because sometimes you don't even ask for these checkboxes like in the PropertiesWizard. When carefulbooleans=True, non-existant booleans are not set to False. """ hk = self.REQUEST.has_key get = self.REQUEST.get strings = ['display_date', 'sitemaster_email', 'issueprefix', 'default_display_format', 'default_sortorder', ] unicodes = ['title','sitemaster_name', 'default_type','default_urgency', 'signature_text'] lists = ['types','urgencies','sections_options','defaultsections', 'statuses','statuses_verbs','display_formats', 'manager_roles',] ints = ['default_batch_size','randomid_length','no_fileattachments', 'no_followup_fileattachments', 'outlook_batch_size'] booleans = ['dispatch_on_submit','allow_issueattrchange','stop_cache', 'allow_show_all', 'allow_subscription', 'use_tellafriend', 'use_tellafriend_for_anonymous', 'private_statistics', 'private_reports', 'show_confidential_option','show_hideme_option', 'show_issueurl_option', 'encode_emaildisplay', 'show_always_notify_status', 'images_in_menu', 'use_issue_assignment', 'save_drafts', 'can_add_new_sections', 'show_id_with_title', 'show_use_accesskeys_option', 'show_remember_savedfilter_persistently_option', 'use_autosave', 'show_csvexport_link', 'disallow_duplicate_issue_subjects', 'use_estimated_time', 'use_actual_time', 'include_description_in_notifications', 'show_dates_cleverly', 'show_spambot_prevention', ] dict = self.__dict__ for each in strings: if hk(each) and isinstance(get(each), basestring): dict[each] = get(each).strip() for each in unicodes: if hk(each) and isinstance(get(each), basestring): dict[each] = unicodify(get(each).strip()) for each in ints: if hk(each): if isinstance(get(each), int): dict[each] = get(each) else: logger.warn('%s not integer' % get(each)) for each in lists: if hk(each) and isinstance(get(each), list): dict[each] = Utils.uniqify(get(each)) for each in booleans: if hk(each) and get(each): dict[each] = True elif not carefulbooleans: dict[each] = False # now for a special one if hk('statuses-and-verbs'): if isinstance(get('statuses-and-verbs'), list): L1, L2 = self.splitStatusesAndVerbs(get('statuses-and-verbs')) self.statuses = L1 self.statuses_verbs = L2 else: logger.warn("Statuses and verbs not list type") # another special one if hk('always_notify'): # Every item must be recognized properly always_notify = get('always_notify') # clean upp the variable a bit always_notify = Utils.uniqify(always_notify) always_notify = [x.strip() for x in always_notify if x.strip()] checked = [] for each in always_notify: valid, better_spelling = self._checkAlwaysNotify(each) if valid: checked.append(better_spelling) self.always_notify = checked # another special one if get('brother_issuetracker_paths'): # every item must be recognized properly as an issuetracker instance paths = get('brother_issuetracker_paths') paths = [x.strip() for x in paths if x.strip()] # this will raise an error if it can't be reached trackers = [self.restrictedTraverse(x) for x in paths] # this will assert the meta_type trackers = [y for y in trackers if y.meta_type == ISSUETRACKER_METATYPE] self.brother_issuetracker_paths = paths else: self.brother_issuetracker_paths = [] # another special one self._savePluginPaths(get('plugin_paths',[])) # for the custom properties if REQUEST is not None: self.manage_editProperties(REQUEST) return self.manage_editIssueTrackerPropertiesForm(self.REQUEST, manage_tabs_message='IssueTracker properties updated.') def _checkAlwaysNotify(self, item, format='show'): """ return a tuple of (validity, spelling). An item is valid if it is a valid email address, an exising notifyable or an exisitng notifyable group. 'format' can either be 'show' or list (e.g. [name, email])""" item_lower = ss(item) # check the acl_users for iuf in self.superValues(ISSUEUSERFOLDER_METATYPE): for username, userdata in iuf.data.items(): showname = "%s, %s"%(userdata.getFullname(), username) if format == 'list': display = [userdata.getFullname(), userdata.getEmail()] else: display = showname if ss(showname) == item_lower: return True, display elif ss(username) == item_lower: return True, display elif ss(userdata.getFullname()) == item_lower: return True, display elif ss(userdata.getEmail()) == item_lower: return True, display elif item_lower.find(ss("(%s)"%username)) > -1: # fragmented possibly because fullname has changed return True, display elif not not re.search("\w\s*,\s*%s$"%username, item_lower, re.I): return True, display # check the notifyables all_notifyables = self.getNotifyables() for notifyable in all_notifyables: if notifyable.getName(): showname = "%s, %s"%(notifyable.getName(), notifyable.getEmail()) if format == 'list': display = [notifyable.getName(), notifyable.getEmail()] else: display = showname else: showname = notifyable.getEmail() if format == 'list': display = ['', notifyable.getEmail()] else: display = showname if item_lower == ss(showname): return True, display elif notifyable.getName().lower()==item_lower or \ notifyable.getEmail().lower()==item_lower: return True, display # check all groups if item.startswith('group: '): item_lower = item_lower[len('group:'):].strip() all_groups = self.getNotifyableGroups() for group in all_groups: if group.getId().lower() == item_lower or \ group.getTitle().lower() == item_lower: if format == 'list': return True, ["group: %s"%group.getTitle(), ""] else: return True, "group: %s"%group.getTitle() # check if it's a plain email address if Utils.ValidEmailAddress(item): if format == 'list': return True, ["", item] else: return True, item # default is to deny if format == 'list': return False, [] else: return False, item security.declareProtected(VMS, 'manage_editMenuItems') def manage_editMenuItems(self, hrefs, inurls, labels, reset_to_default=False, REQUEST=None): """ wrap up the values and save it to _setMenuItems(). _setMenuItems() accepts a list of dicts. Each inurl can be either a string or a tuple, consider it a token. """ if reset_to_default: menu_items = DEFAULT_MENU_ITEMS else: menu_items = [] assert len(hrefs)==len(inurls)==len(labels), \ "Missmatch of no. of hrefs, inurls, labels" for i in range(len(hrefs)): href = hrefs[i].strip() inurl = inurls[i].strip() label = labels[i].strip() if href+inurl+label == "": continue elif not label and href: label = href.split('/')[-1] elif not href and label: href = "/" + label if len(inurl.split()) > 1: inurl = tuple(inurl.split()) menu_items.append( dict(href=href, inurl=inurl, label=label)) # nothing can really go wrong, # load it in! self._setMenuItems(menu_items) # for the custom properties if REQUEST is not None: return self.manage_configureMenuForm(self.REQUEST, manage_tabs_message='Menu changed.') security.declareProtected(VMS, 'manage_addOtherProperty') def manage_addOtherProperty(self, id, value, type): """ Add arbitrary property """ self.manage_addProperty(id, value, type) page = self.manage_editIssueTrackerPropertiesForm return page(self.REQUEST, manage_tabs_message='Other property added.', activetab='custom' # used by the CSS magic on the Properties tab ) security.declareProtected(VMS, 'manage_delOtherProperties') def manage_delOtherProperties(self, ids): """ remove arbitrary properties """ self.manage_delProperties(ids) page = self.manage_editIssueTrackerPropertiesForm return page(self.REQUEST, manage_tabs_message='Property deleted', activetab='custom' # See comment about this parameter above ) ## General IssueTracker maintenance security.declareProtected(VMS, 'manage_canUseBTreeFolder') def manage_canUseBTreeFolder(self): """ return True if the BTreeFolder2 product is installed """ if self.filtered_meta_types(): all = self.filtered_meta_types() for each in all: if each.get('product')=='BTreeFolder2': return True return False security.declareProtected(VMS, 'manage_isUsingBTreeFolder') def manage_isUsingBTreeFolder(self): """ just a wrapping """ return self._isUsingBTreeFolder() security.declareProtected(VMS, 'manage_convert2BTreeFolder') def manage_convert2BTreeFolder(self, REQUEST=None): """ change where we store issues, before they were stored in the issue tracker root (i.e. self.getRoot()) but now we want to store them inside a container of kind BTreeFolder2. """ # 1. Do some basic tests assert self.manage_canUseBTreeFolder(), "BTreeFolder2 not installed" assert not self.manage_isUsingBTreeFolder(), "BTreeFolder already in use" # 1. Set up the container root = self.getRoot() _adder = root.manage_addProduct['BTreeFolder2'].manage_addBTreeFolder _adder(id=BTREEFOLDER2_ID) container = getattr(self, BTREEFOLDER2_ID) # 2. Transfer all issues cut = root.manage_cutObjects(ids=root.objectIds(ISSUE_METATYPE)) container.manage_pasteObjects(cut) # 3. Persistently remember this so that we don't have to look # for a BTreeFolder2 instance every time to deduce if we're # storing the issues in a BTree root.btreefolder_storage = True # 4. Copy the internal ID counter dest_key = '_nextid_%s' % ss(container.meta_type).replace(' ','') source_key = '_nextid_%s' % ss(root.meta_type).replace(' ','') if hasattr(root, source_key) and getattr(root, source_key) >= getattr(container, dest_key, 0): # do the copy! container.__dict__[dest_key] = getattr(root, source_key) # 5. Update the ZCatalog and everything else self.UpdateEverything() msg = "Converted to storing issues in BTreeFolder" if REQUEST is None: return msg else: url = root.absolute_url()+'/manage_ManagementForm' url = Utils.AddParam2URL(url, {'manage_tabs_message':msg}) REQUEST.RESPONSE.redirect(url) security.declareProtected(VMS, 'manage_convertFromBTreeFolder') def manage_convertFromBTreeFolder(self, REQUEST=None): """ change back to storing the issues right inside the issue tracker itself""" # 1. Do some basic tests assert self.manage_canUseBTreeFolder(), "BTreeFolder2 not installed" assert self.manage_isUsingBTreeFolder(), "BTreeFolder already in use" # 2. Transfer all issues root = self.getRoot() container = getattr(root, BTREEFOLDER2_ID) cut = container.manage_cutObjects(ids=container.objectIds(ISSUE_METATYPE)) root.manage_pasteObjects(cut) # 3. Persistently remember this so that we don't have to look # for a BTreeFolder2 instance every time to deduce if we're # storing the issues in a BTree root.btreefolder_storage = False # 4. Copy the internal ID counter dest_key = '_nextid_%s' % ss(root.meta_type).replace(' ','') source_key = '_nextid_%s' % ss(container.meta_type).replace(' ','') if hasattr(container, source_key) and getattr(container, source_key) >= getattr(root, dest_key, 0): # do the copy! root.__dict__[dest_key] = getattr(container, source_key) # 5. Remove the Btreefolder if possible if len(container.objectValues()) == 0: root.manage_delObjects([BTREEFOLDER2_ID]) # 6. Update the ZCatalog and everything else root.UpdateEverything() msg = "Converted back to store issues in Issue Tracker instead of BTreeFolder" if REQUEST is None: return msg else: url = root.absolute_url()+'/manage_ManagementForm' url = Utils.AddParam2URL(url, {'manage_tabs_message':msg}) REQUEST.RESPONSE.redirect(url) security.declareProtected(VMS, 'ReplaceEmail') def ReplaceEmail(self, old, new, caseinsensitive=1, REQUEST=None): """ Method that lets you change an occurance of an email address to another. Useful if a frequence user has changed email accout or something. """ if caseinsensitive: old = old.lower() root = self.getRoot() nochanges_issues = 0 nochanges_threads = 0 for issue in root.getIssueObjects(): iemail = issue.email if caseinsensitive: iemail = iemail.lower() if iemail == old: issue.email = new nochanges_issues = nochanges_issues + 1 for thread in issue.objectValues(ISSUETHREAD_METATYPE): temail = thread.email if caseinsensitive: temail = temail.lower() if temail == old: thread.email = new nochanges_threads = nochanges_threads + 1 msg = "Changed %s issues and %s threads"%\ (nochanges_issues, nochanges_threads) if REQUEST is None: return msg else: method = Utils.AddParam2URL desturl = root.absolute_url()+"/manage_ManagementForm" url = method(desturl,{'manage_tabs_message':msg}) self.REQUEST.RESPONSE.redirect(url) security.declareProtected(VMS, 'ManagementTabs') def ManagementTabs(self, whichon='main'): """ return a HTML chunk with tabs """ tabs = (('manage_ManagementForm','Main'), ('manage_ManagementNotifyables','Notifyables'), ('manage_ManagementUsers','Users'), ('manage_ManagementUpgrade','Upgrade'), ('manage_ManagementSpamProtection','Spam protection'), ) tabdicts = [] for tab in tabs: item = {} url, name = tab item['href'] = url item['name'] = name item['current'] = name.lower()==whichon.lower() tabdicts.append(item) page = self.management_tabs return page(self, self.REQUEST, tabdicts=tabdicts) def manage_beforeDelete(self, item, container): """ we're about to be deleted! """ self._old_instance_physicalpath = self.getPhysicalPath() def _postCopy(self, container, op=0): """ Called after the copy is finished to accomodate special cases. The op var is 0 for a copy, 1 for a move. """ if hasattr(self, '_old_instance_physicalpath'): old_path = self._old_instance_physicalpath new_path = self.getPhysicalPath() self._renameOldPaths(old_path, new_path) self.UpdateCatalog() def _renameOldPaths(self, old_path, new_path): """ this issuetracker has changed path from 'old_path' to 'new_path'. Change all the references where this appears. For example, there might be assignments withing issues that point to users who are defined as acl users within this issue tracker. """ old_path_joined = '/'.join(old_path) new_path_joined = '/'.join(new_path) count = {} for issue in self.getIssueObjects(): acl_adder = issue.getACLAdder() if acl_adder.find(old_path_joined) > -1: new_acl_adder = acl_adder.replace(old_path_joined, new_path_joined) issue._setACLAdder(new_acl_adder) count['issues'] = count.get('issues',0) + 1 for thread in issue.getThreadObjects(): acl_adder = thread.getACLAdder() if acl_adder.find(old_path_joined) > -1: new_acl_adder = acl_adder.replace(old_path_joined, new_path_joined) thread._setACLAdder(new_acl_adder) count['threads'] = count.get('threads',0) + 1 for assignment in issue.getAssignments(sort=False): acl_adder = assignment.getACLAdder() if acl_adder.find(old_path_joined) > -1: new_acl_adder = acl_adder.replace(old_path_joined, new_path_joined) assignment._setACLAdder(new_acl_adder) count['assignments'] = count.get('assignments',0) + 1 acl_assignee = assignment.getACLAssignee() if acl_assignee.find(old_path_joined) > -1: new_acl_assignee = acl_assignee.replace(old_path_joined, new_path_joined) assignment._setACLAssignee(new_acl_assignee) count['assignees'] = count.get('assignees',0) + 1 msg = '' if count: for k, v in count.items(): msg += "postcopy fix %s %s\n" %(v, k) if msg: LOG(self.__class__.__name__, INFO, "Post copy fixup: %s" % msg) security.declareProtected(VMS, 'UpdateEverything') def UpdateEverything(self, DestinationURL=None): """ do a DeployStandards(), AssertAllProperties() and UpdateCatalog() """ msgs = [] msgs.append(self.DeployStandards()) msgs.append(self.AssertAllProperties()) msgs.append(self.UpdateCatalog()) msgs.append(self.PrerenderDescriptionsAndComments()) msgs.append(self._cleanTempFolder(implode_if_possible=True)) msgs.append(self.CleanOldSavedFilters(user_excess_clean=True, implode_if_possible=True, clean_keyed_only_filtervaluers=True)) if base_hasattr(self, FILTERVALUEFOLDER_ID): if self.getFilterValuerCatalog() is None: self._setupFilterValuerCatalog() msgs.append('Created ZCatalog for saved filters') msgs.append(self.UpdateFilterValuerCatalog()) msg = '\n'.join([x for x in msgs if x]) if DestinationURL: method = Utils.AddParam2URL params = {'manage_tabs_message':"Everything updated\n\n%s"%msg, } try: pingurl = "http://www.issuetrackerproduct.com/UserStories/ping" pingable = urlopen(pingurl) if pingable: if hasattr(self, 'userstory_plea'): no_previous_pleas = int(getattr(self, 'userstory_plea')) else: no_previous_pleas = 0 if no_previous_pleas < 3: params['userstory'] = 'plea' self.userstory_plea = no_previous_pleas + 1 except: pass url = method(DestinationURL, params) self.REQUEST.RESPONSE.redirect(url) else: return msg security.declarePrivate('_cleanTempFolder') def _cleanTempFolder(self, hours=CLEAN_TEMPFOLDER_INTERVAL_HOURS, implode_if_possible=False): """ remove all relativly old files in the temporary directory """ tempfolder = self._getTempFolder(clean_if_necessary=False) folders2del = [] now = DateTime() for folder in tempfolder.objectValues('Folder'): if now - folder.bobobase_modification_time() > hours/24.0: folders2del.append(folder.getId()) if folders2del: # need to use 'folders2del' here (before the action) # because manage_delObjects() # will reset the list after execution if len(folders2del) < 5: del_info = ', '.join(folders2del) else: del_info = "%s folders in total"%len(folders2del) tempfolder.manage_delObjects(folders2del) msg = "Deleted temp files: " + del_info else: msg = "" if implode_if_possible: # maybe the temp-folder is now totally empty, if so, # delete it if not len(tempfolder.objectValues()): parent = tempfolder.aq_parent folderid = tempfolder.getId() parent.manage_delObjects([folderid]) msg += "\nDeleted temp folder because it was empty" msg = msg.strip() return msg def _getTempFolder(self, clean_if_necessary=True): """ make sure there's a folder called `TEMPFOLDER_ID` in the root """ id = TEMPFOLDER_ID root = self.getRoot() if id not in root.objectIds(['Folder','BTreeFolder2']): title = 'Used for temporary file uploads' if self.manage_canUseBTreeFolder(): _adder = root.manage_addProduct['BTreeFolder2'].manage_addBTreeFolder else: _adder = root.manage_addFolder _adder(id, title) elif clean_if_necessary: # clean it up from old junk self._cleanTempFolder() return getattr(root, id) security.declareProtected(VMS, 'PrerenderDescriptionsAndComments') def PrerenderDescriptionsAndComments(self, REQUEST=None): """ invoke the _prerender_* function on all issues and threads """ count_issues = 0 count_threads = 0 root = self.getRoot() for issue in root.getIssueObjects(): # fix a few possible legacy issues with the issue if isinstance(issue.getTitle(), str): issue._unicode_title() if isinstance(issue.getDescription(), str): issue._unicode_description() if isinstance(issue.fromname, str): issue.fromname = unicodify(issue.fromname) # check if the email contains non-ascii issue.email = asciify(issue.email) d_before = issue._getFormattedDescription() issue._prerender_description() d_after = issue._getFormattedDescription() if d_before != d_after: count_issues += 1 for thread in issue.getThreadObjects(): # fix a few possible legacy issues with the issue if isinstance(thread.getComment(), str): thread._unicode_comment() if isinstance(thread.fromname, str): thread.fromname = unicodify(thread.fromname) # check if the email contains non-ascii thread.email = asciify(thread.email) c_before = thread._getFormattedComment() thread._prerender_comment() c_after = thread._getFormattedComment() if d_before != d_after: count_threads += 1 if count_issues and count_threads: if count_issues == 1: msg = "1 issue and " else: msg = "%s issues and " % count_issues if count_threads == 1: msg += "1 followup " else: msg += "%s followups " % count_threads msg += "prerendered" elif not count_threads: if count_issues == 1: msg = "1 issue " else: msg = "%s issues " % count_issues msg += "prerendered" elif not count_issues: if count_threads == 1: msg = "1 followup " else: msg = "%s followups " % count_threads msg += "prerendered" else: msg = "" if REQUEST is None: return msg else: root = self.getRoot() desturl = root.absolute_url() + "/manage_ManagementForm" url = Utils.AddParam2URL(desturl, {'manage_tabs_message':msg}) REQUEST.RESPONSE.redirect(url) security.declareProtected(VMS, 'CleanOldSavedFilters') def CleanOldSavedFilters(self, user_excess_clean=False, implode_if_possible=False, clean_keyed_only_filtervaluers=False, REQUEST=None): """ remove all saved filters that are X days old. If you pass user_excess_clean=True then it goes through how many saved filters each user has. If a user has more than X saved filters, all the >X oldest ones are deleted.""" del_ids = [] treshold = FILTERVALUER_EXPIRATION_DAYS today = DateTime() container = self._getFilterValueContainer() for filtervaluer in container.objectValues(FILTEROPTION_METATYPE): try: age = today - filtervaluer.getModificationDate() except AttributeError: # if the filter valuer doesn't have a mod_date it must be very old # ie. a legacy object that we still need to support age = today - filtervaluer.bobobase_modification_time() if filtervaluer.acl_adder: # If the filtervaluer is done by some posh person who has a Zope # acl user access account, then we give them more breathing space # by increasing the treshold limit quite a lot used_treshold = treshold * 3 elif clean_keyed_only_filtervaluers and filtervaluer.getKey(): # This is quite special, filtervaluers that have a "key" have # that because they don't have an acl_adder, # adder_fromname or adder_email. Ie. users who haven't bothered # to identify themselfs at all. This kind of people glog up the # saved-filters folder with stuff that they might not reuse # because either they don't use the issuetracker more than once # or they don't support cookies (eg. Googlebot). # If this is the case, take out the filtervaluers that are # half-expired (see elif statement above) thus being less # lenient against these kind of objects. treshold = treshold / 2 if age > treshold: del_ids.append(filtervaluer.getId()) filtervaluer.unindex_object() if del_ids: msg = "Deleted %s old saved filters" % len(del_ids) else: msg = "No old saved filters to delete" container.manage_delObjects(del_ids) if not user_excess_clean: if implode_if_possible: if self._implodeFilterValueContainerIfPossible(): msg += "\nDeleted saved filters folder because it was empty" catalog = self.getFilterValuerCatalog() if catalog is not None: catalog.manage_catalogClear() if REQUEST is None: return msg else: root = self.getRoot() desturl = root.absolute_url() + "/manage_ManagementForm" url = Utils.AddParam2URL(desturl, {'manage_tabs_message':msg}) REQUEST.RESPONSE.redirect(url) # Now for an even more anal cleaning. For every user, # we only want them to have a max of FILTERVALUEFOLDER_MAX_PER_USER # filtervaluers in their name. There is actually nothing # stopping a user having more but that's only because we # don't want to annoy them with this restriction when they're # using saved filters. It is only here in the cleanup function # that we care. max_per_user = FILTERVALUER_MAX_PER_USER user_valuers = {} filtervaluers = container.objectValues(FILTEROPTION_METATYPE) sorted_filtervaluers = self.sortSequence(filtervaluers, (('mod_date',),)) # reversing puts the youngest first in the list sorted_filtervaluers.reverse() del_ids = [] for filtervaluer in sorted_filtervaluers: k = [] if filtervaluer.acl_adder: k.append(filtervaluer.acl_adder) if filtervaluer.adder_fromname: k.append(filtervaluer.adder_fromname) if filtervaluer.adder_email: k.append(filtervaluer.adder_email) if filtervaluer.getKey(): k.append(filtervaluer.getKey()) k = ','.join(k) # k is now the user key. Notice that it doesn't matter # how we identified this as long as it's unique. # But these in buckets now if k: if not user_valuers.has_key(k): user_valuers[k] = [filtervaluer.getId()] elif len(user_valuers) > max_per_user: # this one goes into the bin del_ids.append(filtervaluer.getId()) else: user_valuers[k].append(filtervaluer.getId()) # and we're done, let's see what we caught if del_ids: msg += "\nDeleted %s user excessive saved filters" % len(del_ids) container.manage_delObjects(del_ids) if implode_if_possible: if self._implodeFilterValueContainerIfPossible(): msg += "\nDeleted saved filters folder because it was empty" if REQUEST is None: return msg else: root = self.getRoot() desturl = root.absolute_url() + "/manage_ManagementForm" url = Utils.AddParam2URL(desturl, {'manage_tabs_message':msg}) REQUEST.RESPONSE.redirect(url) security.declareProtected(VMS, 'AssertAllProperties') def AssertAllProperties(self, REQUEST=None): """ invoke the assertAllProperties() on all objects """ count = 0 count += self._assertAllProperties() root = self.getRoot() for issue in root.getIssueObjects(): count += issue.assertAllProperties() for thread in issue.objectValues(ISSUETHREAD_METATYPE): count += thread.assertAllProperties() if count: msg = "Made sure %s objects have all properties."%count else: msg = "No objects needed assurance on new properties." if REQUEST is None: return msg else: root = self.getRoot() method = Utils.AddParam2URL desturl = root.absolute_url()+"/manage_ManagementForm" url = method(desturl,{'manage_tabs_message':msg}) self.REQUEST.RESPONSE.redirect(url) security.declarePrivate('_assertAllProperties') def _assertAllProperties(self): # sorry about the ugly name """ Return how many properties we made sure we have. Make sure the the root has the correct properties. """ self = self.getRoot() # be certain that we're in the root object count = 0 checks = {'menu_items':DEFAULT_MENU_ITEMS, 'show_id_with_title':DEFAULT_SHOW_ID_WITH_TITLE, 'show_use_accesskeys_option':DEFAULT_SHOW_USE_ACCESSKEYS_OPTION, 'can_add_new_sections':DEFAULT_CAN_ADD_NEW_SECTIONS, 'images_in_menu':DEFAULT_IMAGES_IN_MENU, 'use_estimated_time':DEFAULT_USE_ESTIMATED_TIME, 'use_actual_time':DEFAULT_USE_ACTUAL_TIME, 'include_description_in_notifications':DEFAULT_INCLUDE_DESCRIPTION_IN_NOTIFICATIONS, 'use_tellafriend':DEFAULT_USE_TELLAFRIEND, 'brother_issuetracker_paths':[], 'plugin_paths':[], } for key, default in checks.items(): if not hasattr(self, key): self.__dict__[key] = default count += 1 return count security.declareProtected(VMS, 'DeployStandards') def DeployStandards(self, remove_oldstuff=0, DestinationURL=None, initzcatalog=1): """ copy images and other documents into the instance unless they are already there """ t={} if initzcatalog: t = self.InitZCatalog(t=t) # create folders root = self.getRoot() #for f in ['notifyables', 'www', 'tinymce']: for f in ['notifyables', 'www']: if not f in root.objectIds('Folder'): root.manage_addFolder(f) t[f]='Folder' osj = os.path.join standards_home = osj(package_home(globals()),'standards') self._deployImages(root, standards_home, t=t, remove_oldstuff=remove_oldstuff, skipfolders=('mainbuttons','actionbuttons','.svn','CVS')) www_home = osj(standards_home,'www') self._deployImages(root.www, www_home, t=t, remove_oldstuff=remove_oldstuff, skipfolders=('.svn','CVS')) ##home = osj(standards_home, 'tinymce') ##self._deployImages(root.tinymce, home, ## t=t, remove_oldstuff=remove_oldstuff, ## check_updates=True) ##self._deployDocuments(root.tinymce, home, ## t=t, remove_oldstuff=remove_oldstuff, ## check_updates=True) # perhaps TinyMCE is now installed but 'html' is not a recognized # display format option if self.hasWYSIWYGEditor() and 'html' not in self.display_formats: df = list(self.display_formats) df.append('html') self.display_formats = df msg = "Standard objects deployed\n" if t: for k,v in t.items(): msg += "(%s)\n%s" % (k, v) else: msg = "No standard objects deployed." if DestinationURL: method = Utils.AddParam2URL url = method(DestinationURL,{'manage_tabs_message':msg}) self.REQUEST.RESPONSE.redirect(url) else: return msg def _deployImages(self, destination, directory, extensions=['.gif','.ico','.jpg','.png'], t={}, remove_oldstuff=False, check_updates=False, skipfolders=[]): """ do the actual deployment of images in a dir """ # expect 'skipfolders' to be a list of tuple if skipfolders is None: skipfolders = [] elif not isinstance(skipfolders, (tuple, list)): skipfolders = [skipfolders] osj = os.path.join base= getattr(destination,'aq_base',destination) for filestr in os.listdir(directory): if os.path.isdir(osj(directory, filestr)): if filestr in skipfolders: continue if hasattr(base, filestr) and remove_oldstuff: destination.manage_delObjects([filestr]) if not hasattr(base, filestr): destination.manage_addFolder(filestr) t[filestr] = "Folder" new_destination = getattr(destination, filestr) self._deployImages(new_destination, osj(directory, filestr), extensions=extensions, t=t, remove_oldstuff=remove_oldstuff, check_updates=check_updates, skipfolders=skipfolders) elif self._file_has_extensions(filestr, extensions): # take the image id, title = Utils.cookIdAndTitle(filestr) if hasattr(base, id) and remove_oldstuff: destination.manage_delObjects([id]) if hasattr(base, id) and check_updates: # if the new file is different, delete the existing current one this_image = getattr(destination, id) this_length = len(this_image.data) that_image = open(osj(directory, filestr),'rb').read() that_length = len(that_image) if this_length != that_length: destination.manage_delObjects([id]) if not hasattr(base, id): destination.manage_addImage(id, title=title, \ file=open(osj(directory, filestr),'rb').read()) t[id]="Image" def _file_has_extensions(self, filestr, extensions): """ check if a filestr has any of the give extensions """ for extension in extensions: if filestr.find(extension) > -1: return True return False def _deployDocuments(self, destination, directory, extensions=('.js','.css','.html','.htm'), t={}, remove_oldstuff=False, check_updates=False): """ do the actual deployment of images in a dir """ osj = os.path.join base= getattr(destination,'aq_base',destination) for filestr in os.listdir(directory): if os.path.isdir(osj(directory, filestr)): if hasattr(base, filestr) and remove_oldstuff: destination.manage_delObjects([filestr]) if not hasattr(base, filestr): destination.manage_addFolder(filestr) t[filestr] = "Folder" new_destination = getattr(destination, filestr) self._deployDocuments(new_destination, osj(directory, filestr), extensions=extensions, t=t, remove_oldstuff=remove_oldstuff, check_updates=check_updates) elif self._file_has_extensions(filestr, extensions): # take the image id, title = Utils.cookIdAndTitle(filestr) if hasattr(base, id) and remove_oldstuff: destination.manage_delObjects([id]) if hasattr(base, id) and check_updates: this_content = open(osj(directory, filestr)).read() this_content = self._massageDTMLDocumentContent(filestr, this_content) that_content = getattr(destination, id).document_src() if this_content != that_content: destination.manage_delObjects([id]) if not hasattr(base, id): content = open(osj(directory, filestr)).read() content = self._massageDTMLDocumentContent(filestr, content) destination.manage_addDTMLDocument(id, title, file=content) #destination.manage_addImage(id, title=title, \ # file=open(osj(directory, filestr),'rb').read()) t[id]="Document" def _massageDTMLDocumentContent(self, filename, content): """ return the content slightly modified. The purpose of this method is to improve and prepare the document for the usage. If the filename ends in '.js' put some caching header and some DTML code that sets the correct Content-Type. """ if content.lower().find("setHeader('Content-Type')".lower()) == -1: if filename.endswith('.js'): add = '' elif filename.endswith('.css'): add = '' else: add = None if add: content = add + content.strip() if content.find('doCache(') == -1: content = '' + content.strip() return content ## Properties wizard security.declareProtected(VMS, 'manage_PropertiesWizard') def manage_PropertiesWizard(self, REQUEST, *args, **kw): """ Overridden template """ try: firsttime = int(REQUEST.get('firsttime',0)) except: firsttime = 0 stage, msg, error = self._saveFromPropertiesWizard(REQUEST) if msg: kw['manage_tabs_message'] = msg.strip()+'\n' if error: kw['error'] = error kw['stage'] = stage kw['firsttime'] = firsttime file = 'dtml/PropertiesWizard' name = 'PropertiesWizard' return apply(DTMLFile(file, globals(), __name__=name ).__of__(self), (), kw) def _saveFromPropertiesWizard(self, request): """ return message a dict of submission error """ try: submit = int(request.get('submit',1)) except: submit = 1 try: stage = int(request.get('stage',0)) except: stage = 0 try: firsttime = int(request.get('firsttime',0)) except: firsttime = 0 msg = None error = {} if not submit: return stage, msg, error if stage == 1 and firsttime: msg = [] # attempt to save properties from stage 1 whatuse = ss(request.get('whatuse','softwaredevelopment')) if whatuse == 'helpdesk_external': sections = ['General','Front office','Back office','Other'] self.sections_options = sections msg.append("Set section options to: " + ', '.join(sections)) types = ['general', 'announcement', 'idea', 'content', 'feature request','question','other'] self.types = types msg.append("Set type options to: " +', '.join(types)) if not self.allow_subscription: self.allow_subscription = True msg.append("Allowed issue subscription") if not self.show_confidential_option: self.show_confidential_option = True msg.append("Allowed for confidential issues") if not self.show_hideme_option: self.show_hideme_option = True msg.append("Allowed for \"hide me\" option") elif whatuse == 'helpdesk_internal': sections = ['General','Back office','Other'] self.sections_options = sections msg.append("Set section options to: " + ', '.join(sections)) types = ['general', 'announcement', 'idea', 'content', 'feature request','question','other'] self.types = types msg.append("Set type options to: " +', '.join(types)) if self.isViewPermissionOn(): self.manage_ViewPermissionToggle() msg.append("Switched off Anonymous access") if not self.UseIssueAssignment(): self.manage_UseIssueAssignmentToggle() msg.append("Switched on Issue Assignment") if not self.private_statistics: self.private_statistics = True msg.append("Allow statistics") if self.encode_emaildisplay: self.encode_emaildisplay = False msg.append("Email addresses not encoded") if not self.show_always_notify_status: self.show_always_notify_status = True msg.append("Always show who was notified") if not self.CanAddNewSections(): self.can_add_new_sections = True msg.append("Can add new sections with each issue") else: # first time typical sections sections = ['General','Database','Interface','Support', 'Documentation','Other'] self.sections_options = sections msg.append("Set section options to: " + ', '.join(sections)) types = ['general','announcement','bug report', 'feature request','content request', 'usability','other'] self.types = types msg.append("Set type options to: " +', '.join(types)) if not self.UseIssueAssignment(): self.manage_UseIssueAssignmentToggle() msg.append("Switched on Issue Assignment") if not self.show_always_notify_status: self.show_always_notify_status = True msg.append("Always show who was notified") if self.no_followup_fileattachments == 0: self.no_followup_fileattachments = 1 _m = "Allowed for at least one file " _m += "attachment on follow up" msg.append(_m) msg = '\n'.join(msg) # can now move on to stage 2 stage += 1 elif stage == 2: msg = [] sections_options = request.get('sections_options',[]) # clean them a bit sections_options = [x.strip() for x in sections_options if x.strip()] sections_options = Utils.uniqify(sections_options) if not sections_options: error['sections_options'] = "No sections entered" else: self.sections_options = sections_options msg = "Set section options to: " + ', '.join(sections_options) stage += 1 elif stage == 3: defaultsections = request.get('defaultsections',[]) if not defaultsections: request.set('defaultsections', [self.sections_options[0]]) m = "None selected, try %s?"%self.sections_options[0] error['defaultsections'] = m else: # filter out unrecognized ones checked = [] for each in defaultsections: if each in self.sections_options: checked.append(each) if not checked: m = "None of selected was recognized" error['defaultsections'] = m else: self.defaultsections = checked if len(checked) > 1: msg = "Set default sections to: " else: msg = "Set default section to: " msg += ', '.join(checked) stage += 1 elif stage == 4: types = request.get('types',[]) urgencies = request.get('urgencies',[]) # clean them a bit types = [x.strip() for x in types] urgencies = [x.strip() for x in urgencies] while '' in types: types.remove('') while '' in urgencies: urgencies.remove('') types = Utils.uniqify(types) urgencies = Utils.uniqify(urgencies) if not types: error['types'] = "None entered" if not urgencies: error['urgencies'] = "None entered" if types and urgencies: self.types = types self.urgencies = urgencies msg = "Set types to: " + ', '.join(types) + '\n' msg += "Set urgencies to: " + ', '.join(urgencies) stage += 1 elif stage == 5: default_type = request.get('default_type','').strip() ok = True if default_type not in self.types: error['default_type'] = "Unrecognized" ok = False default_urgency = request.get('default_urgency','').strip() if default_urgency not in self.urgencies: error['default_urgency'] = "Unrecognized" ok = False if ok: self.default_type = default_type self.default_urgency = default_urgency msg = "Default type set to: " + default_type + '\n' msg += "Default urgency set to: " + default_urgency stage += 1 elif stage == 6: _default = self.getDefaultSortorder() default_sortorder = request.get('default_sortorder', _default) if default_sortorder not in self.getDefaultSortorderOptions(): error['default_sortorder'] = "Unrecognized option" ok = False else: self.default_sortorder = default_sortorder _translated = self.translateSortorderOption(default_sortorder) msg = "Default sort order set to %s"%_translated stage += 1 elif stage == 8: always_notify = request.get('always_notify',[]) always_notify = [x.strip() for x in always_notify] while '' in always_notify: always_notify.remove('') # Check that each is either a notifyable or a valid # email address. notifyables = self.getNotifyables() notifyables_names = [x.getName() for x in notifyables] email_checker = Utils.ValidEmailAddress checked = [] invalids = [] for each in always_notify: if each in notifyables_names: checked.append(each) elif Utils.ValidEmailAddress(each): checked.append(each) else: invalids.append(each) self.always_notify = checked if invalids: m = "Invalid entries: "+ ', '.join(invalids) error['always_notify'] = m else: msg = "Set to always be notified: " msg += ', '.join(checked) stage += 1 elif stage == 9: sitemaster_name = request.get('sitemaster_name','').strip() sitemaster_email = request.get('sitemaster_email','').strip() ok = True if not sitemaster_name: error['sitemaster_name'] = "Empty" ok = False if sitemaster_email != DEFAULT_SITEMASTER_EMAIL and \ not Utils.ValidEmailAddress(sitemaster_email): error['sitemaster_email'] = "Invalid" ok = False if ok: self.sitemaster_name = sitemaster_name self.sitemaster_email = sitemaster_email msg = "Site name set to: %s\n"%sitemaster_name msg +="Site email set to: %s"%sitemaster_email stage += 1 elif stage==10: no_fileattachments = request.get('no_fileattachments',1) no_followup_fileattachments = request.get('no_followup_fileattachments',1) display_date = request.get('display_date','').strip() show_dates_cleverly = bool(request.get('show_dates_cleverly',0)) ok = True try: no_fileattachments = int(no_fileattachments) except ValueError: error['no_fileattachments'] = "Not a number" ok = False try: no_followup_fileattachments = int(no_followup_fileattachments) except ValueError: error['no_followup_fileattachments'] = "Not a number" ok = False if not display_date: error['display_date'] = "No display date format" ok = False # nothing to test on the show_dates_cleverly if ok: self.no_fileattachments = no_fileattachments self.no_followup_fileattachments = no_followup_fileattachments self.display_date = display_date self.show_dates_cleverly = show_dates_cleverly msg = "" if no_fileattachments == 0: msg += "No file attachments to issues.\n" elif no_fileattachments == 1: msg += "One file attachment to issues.\n" else: msg += "%s file attachments to issues.\n"%no_fileattachments if no_followup_fileattachments == 0: msg += "No file attachments to follow ups.\n" elif no_followup_fileattachments == 1: msg += "One file attachment to follow ups.\n" else: msg += "%s file attachments to follow ups.\n"%no_followup_fileattachments msg += "Displays date in this format:" msg += DateTime().strftime(display_date) if show_dates_cleverly: msg += " (and dates are shown differently depending on how far from today)" msg = msg.strip() stage += 1 elif stage == 11: bool_keys = ('allow_issueattrchange', 'allow_subscription', 'use_tellafriend', 'private_statistics', 'encode_emaildisplay', 'show_always_notify_status', 'show_confidential_option', 'show_hideme_option', 'show_issueurl_option', 'can_add_new_sections', 'images_in_menu', ) for key in bool_keys: try: value = bool(int(request.get(key, getattr(self, key)))) except: continue self.__dict__[key] = value msg = "Yes/No questions set." stage = 12 else: stage += 1 #pass #raise "WhatNow", "What do we do now?" if stage == 1 and not firsttime: stage = 2 if msg == []: msg = None return stage, msg, error def ShowError(self, error, id, htmlwrap=1): """ show the error (used only by PropertiesWizard.dtml """ if error and error.has_key(id): s = error.get(id) if htmlwrap: s = '%s
'%s return s else: return s else: return '' ## Users part of Management related def getAllIssueUserFolders(self): """ return all objects that are IssueUserFolders """ return self.superValues(ISSUEUSERFOLDER_METATYPE) def getAllIssueUsers(self, userfolders=None, filter=1, exclude_assignee=None): """ return all the acl users as identifiers """ if userfolders is None: userfolders = self.getAllIssueUserFolders() elif not isinstance(userfolders, list): userfolders = [userfolders] users = [] if filter: blacklist = self.getIssueAssignmentBlacklist() else: blacklist = [] for userfolder in userfolders: userfolderpath = userfolder.getIssueUserFolderPath() for username, user in userfolder.data.items(): username = userfolderpath+','+username if username not in blacklist: # skip if exclude_assignee and username == exclude_assignee: continue users.append({'userfolder':userfolder, 'user':user, 'identifier':username}) return users security.declareProtected(VMS, 'manage_UseIssueAssignmentToggle') def manage_UseIssueAssignmentToggle(self, DestinationURL=None): """ inverse the value of self.use_issue_assignment """ self.use_issue_assignment = not self.UseIssueAssignment() if self.UseIssueAssignment(): msg = "Issue Assignment switched on" else: msg = "Issue Assignment switched off" if DestinationURL: method = Utils.AddParam2URL url = method(DestinationURL,{'manage_tabs_message':msg}) self.REQUEST.RESPONSE.redirect(url) else: return msg security.declareProtected(VMS, 'manage_AddToBlacklist') def manage_AddToBlacklist(self, add_identifiers, DestinationURL=None): """ add some identifiers to the blacklist """ before = self.getIssueAssignmentBlacklist(check_each=True) blacklist = before + add_identifiers checked = [] for identifier in blacklist: if identifier not in checked: checked.append(identifier) self._assignment_blacklist = checked if len(add_identifiers) == 1: msg = "User blacklisted" else: msg = "Users blacklisted" if DestinationURL: method = Utils.AddParam2URL url = method(DestinationURL,{'manage_tabs_message':msg}) self.REQUEST.RESPONSE.redirect(url) else: return msg security.declareProtected(VMS, 'manage_RemoveFromBlacklist') def manage_RemoveFromBlacklist(self, remove_identifiers, DestinationURL=None): """ remove some identifiers from the blacklist """ before = self.getIssueAssignmentBlacklist() checked = [] for identifier in before: if identifier not in remove_identifiers: checked.append(identifier) self._assignment_blacklist = checked if len(remove_identifiers) == 1: msg = "User blacklisted" else: msg = "Users blacklisted" if DestinationURL: method = Utils.AddParam2URL url = method(DestinationURL,{'manage_tabs_message':msg}) self.REQUEST.RESPONSE.redirect(url) else: return msg def isAnonymous(self): """ return true if the user is not logged into zope in any way. """ username = getSecurityManager().getUser().getUserName() return username.lower().replace(' ','') == 'anonymoususer' security.declareProtected(VMS, 'isViewPermissionOn') def isViewPermissionOn(self): """ return True if View permission is on for Anonymous """ return not not self.acquiredRolesAreUsedBy('View') security.declareProtected(VMS, 'manage_ViewPermissionToggle') def manage_ViewPermissionToggle(self, DestinationURL=None): """ Change the Aquire attribute for the View permission """ viewpermission_on = self.isViewPermissionOn() roles_4_view = ['Manager', IssueTrackerManagerRole, IssueTrackerUserRole] self.manage_permission('View', roles=roles_4_view, acquire=not viewpermission_on) if viewpermission_on: msg = "View permission disabled for Anonymous" else: msg = "View permission enabled for Anonymous" if DestinationURL: method = Utils.AddParam2URL url = method(DestinationURL,{'manage_tabs_message':msg}) self.REQUEST.RESPONSE.redirect(url) else: return msg ## Useful root instance methods def getRoot(self): """ Get the root instance object """ mtype = ISSUETRACKER_METATYPE r = self while r.meta_type != mtype: r = aq_parent(aq_inner(r)) return r def titleTag(self): """ return suitable content for tag """ root_title = self.getRoot().title_or_id() title = root_title if self.meta_type == ISSUE_METATYPE: prefix = "" if Utils.niceboolean(self.REQUEST.get('autorefresh')): prefix = _("(auto refreshed)") if self.ShowIdWithTitle(): title = "%s %s - #%s %s" title = title % (prefix, root_title, self.getIssueId(), self.getTitle()) else: title = "%s %s - %s" % (prefix, root_title, self.getTitle()) else: page = self.REQUEST.URL.split('/')[-1] _rtdict = {'root_title':root_title} if page == 'ListIssues': title = _('%(root_title)s - List Issues') % _rtdict elif page == 'CompleteList': title = _('%(root_title)s - Complete List') % _rtdict elif page == 'AddIssue': if self.REQUEST.form.has_key('previewissue'): title = _('Preview before adding issue - %(root_title)s') % _rtdict else: title = _('%(root_title)s - Add Issue') % _rtdict elif page == 'QuickAddIssue': title = _('%(root_title)s - Quick Add Issue') % _rtdict elif page == 'User': title = '%(root_title)s - User' % _rtdict elif page == 'About.html': title = _('About the IssueTrackerProduct version %s') title = title % self.getIssueTrackerVersion() elif page == 'SubmitIssue': if self.REQUEST.get('HTTP_REFERER').find('QuickAddIssue'): title = _('%(root_title)s - Quick Add Issue') % _rtdict else: title = _('%(root_title)s - Add Issue') % _rtdict elif page == 'What-is-WYSIWYG': title = "WYSIWYG = What You See Is What You Get" elif page == 'What-is-StructuredText': title = "About Structured Text" if isinstance(title, basestring): # legacy return Utils.html_entity_fixer(title) else: # new way return title def hasWYSIWYGEditor(self): """ return true if we have a WYSIWYG editor available """ return self.getWYSIWYGEditor() is not None def getWYSIWYGEditor(self): """ return the ztinymce configuration with the expected name """ ztinymce_conf_id = 'tinymce-issuetracker.conf' if hasattr(self.getRoot(), ztinymce_conf_id): return getattr(self.getRoot(), ztinymce_conf_id) return None def getCookiekey(self, which): """ return the cookiekey constants depending on key """ which_orig = which match_decorate = lambda x: x.lower().strip().replace('_','').replace('-','') which = match_decorate(which) keys = {'name': NAME_COOKIEKEY, 'fullname': NAME_COOKIEKEY, 'email': EMAIL_COOKIEKEY, 'displayformat': DISPLAY_FORMAT_COOKIEKEY, 'sortorder': SORTORDER_COOKIEKEY, 'sortorderreverse': SORTORDER_REVERSE_COOKIEKEY, 'draftissueids': DRAFT_ISSUE_IDS_COOKIEKEY, 'draftthreadids': DRAFT_THREAD_IDS_COOKIEKEY, 'autologin': AUTOLOGIN_COOKIEKEY, 'useaccesskeys': USE_ACCESSKEYS_COOKIEKEY, 'saved-filters': SAVED_FILTERS_COOKIEKEY, 'remember_savedfilter_persistently': REMEMBER_SAVEDFILTER_PERSISTENTLY_COOKIEKEY, 'draft_followup_ids': DRAFT_THREAD_IDS_COOKIEKEY, 'show_nextactions': SHOW_NEXTACTIONS_COOKIEKEY, } for k, v in keys.items(): if match_decorate(k) == which: return v if self.doDebug(): debug("Unable to find cookiekey for %s" % which_orig, steps=4) def __before_publishing_traverse__(self, object, request=None): """ sort things out before publising object """ self.get_environ() def get_environ(self): """ Populate REQUEST as appropriate """ request = self.REQUEST stack = request['TraversalRequestNameStack'] popped = [] _special = 'REQUEST' # things to pop out queryitems = ({'key':'start', 'mkey':'start', 'type':'int'}, {'key':'sortorder', 'mkey':'sortorder', 'type':'string'}, {'key':'reverse', 'mkey':'reverse', 'type':'boolean'}, {'key':'show', 'mkey':'show', 'type':'string'}, {'key':'report', 'mkey':'report', 'type':'string'}, ) splitter = '-' if stack: stack_copy = stack[:] found_item = 1 for each in range(len(stack_copy)): found_item = 0 stack_item = stack_copy[each] for each in queryitems: key, value = each['key'], each.get('mkey') if value is None and stack_item==key: # this is a valueless item found_item = 1 request.set(key, 1) elif stack_item.startswith("%s%s"%(key,splitter)) \ and value==_special: found_item = 1 first_key = stack_item.replace("%s%s"%(key,splitter),'') try: key, value = first_key.split(splitter,1) value = int(value) request.set(key, value) except ValueError: try: key, value = first_key.split(splitter,1) request.set(key, value) except: pass elif stack_item.startswith("%s%s"%(key,splitter)): found_item = 1 replace_what = "%s%s"%(key,splitter) if each['type']=='boolean': key = stack_item.replace(replace_what,'') key = Utils.niceboolean(key) elif each['type']=='int': key = int(stack_item.replace(replace_what,'')) else: key = stack_item.replace(replace_what,'') request.set(value, key) if found_item: stack.remove(stack_item) popped.append(stack_item) request.set('popped',popped) ## General for file attachments to issues def getFileattachmentInput(self, index, initsize=40): """ return either a file input field or a keep option """ request = self.REQUEST input_field = '<input size="%s" name="fileattachment:list" ' input_field += 'type="file" />' icon_html = '<img hspace="2" src="%s" alt="File" '\ 'title="File size %s" border="0" />' if request.has_key(TEMPFOLDER_REQUEST_KEY): upload_folder = request[TEMPFOLDER_REQUEST_KEY] # Maybe the actual folder doesn't exist any more tempfolder = self._getTempFolder() if upload_folder is None or not safe_hasattr(tempfolder, upload_folder): return input_field % initsize files = tempfolder[upload_folder].objectValues('File') try: file = files[index] file_src = self.getFileIconpath(file.getId()) file_size = self.ShowFilesize(file.getSize()) icon = icon_html%(file_src, file_size) confirm_title = _("Tick if you want to keep this file attachment") confirm = '<input type="checkbox" checked="checked" ' confirm += 'name="confirm_fileattachment:list" ' confirm += 'value="%s" title="%s" />'%(file.getId(), confirm_title) icon = '%s<a href="%s" title="File size %s">%s%s (%s)</a>'%\ (confirm, file.absolute_url(), file_size, icon, file.getId(), file_size) return icon except: return input_field % initsize else: return input_field % initsize def _uploadTempFiles(self): """ Attempt to upload fileattachments to temp-folder and stick some information in the REQUEST """ request = self.REQUEST temp_folder_id = None rkey = TEMPFOLDER_REQUEST_KEY # first, delete all unconfirmed files self._removeUnConfirmedFiles() if request.get(rkey, None) not in [None,'']: temp_folder_id = request.get(rkey) if request.has_key('fileattachment'): files = request.get('fileattachment') if not isinstance(files, (tuple, list)): files = [files] # fileattachment is a list, deal with each item for file in files: if self._isFile(file): if temp_folder_id is None: temp_folder_id = self._generateTempFolder() temp_folder = self._getTempFolder()[temp_folder_id] filename = getattr(file, 'filename') id=filename[max(filename.rfind('/'), filename.rfind('\\'), filename.rfind(':'), )+1:] if id.startswith('_'): id=id[1:] id = Utils.badIdFilter(id) temp_folder.manage_addFile(id, file=file) fileobject = getattr(temp_folder, id) if self._canCreateThumbnail(fileobject): try: self._createThumbnail(fileobject) except IOError: # we failed to create thumbnail not good. # A log message will already have been # sent. pass # This tests whether any files were uploaded if temp_folder_id is not None: request.set(rkey, temp_folder_id) return temp_folder_id security.declarePublic('_removeUnConfirmedFiles') def _removeUnConfirmedFiles(self): """ if we have a tempfolder with files that don't have a matching confirm, then delete them """ request = self.REQUEST rkey = TEMPFOLDER_REQUEST_KEY if request.get(rkey, None) not in [None,'']: temp_folder = self._getTempFolder()[request.get(rkey)] confirms = self._getConfirmFileattachments() un_upload_ids = [] for fileid in temp_folder.objectIds('File'): if not fileid in confirms: un_upload_ids.append(fileid) self._deleteTempFiles(temp_folder, un_upload_ids) # Anything left now? if len(temp_folder.objectIds('File'))==0: request.set(rkey, None) self._getTempFolder().manage_delObjects([temp_folder.getId()]) def _deleteTempFiles(self, source, ids): """ simply delete some files """ source.manage_delObjects(ids) def _isFile(self, file): """ Check if Publisher file is a real file """ if hasattr(file, 'filename'): if getattr(file, 'filename').strip() != '': # read 1 byte if file.read(1) == "": m = _(u"Filename provided (%s) but not file content") m = m % getattr(file, 'filename') raise NotAFileError, m else: file.seek(0) #rewind file return True else: return False else: return False security.declarePublic('_generateTempFolder') def _generateTempFolder(self): """ Create a folder in temp_folder with randomish id and return its id """ root = self._getTempFolder() timestamp = str(int(self.ZopeTime())) randstr = self.getRandomString(length=3, numbersonly=1) rand_id_start = "uploadtmp-it-%s"%timestamp rand_id = "%s-%s"%(rand_id_start, randstr) while hasattr(root, rand_id): new_rand_str = self.getRandomString(length=3, numbersonly=1) rand_id = "%s-%s"%(rand_id_start, new_rand_str) try: root.manage_addFolder(rand_id) tempfolder = getattr(root, rand_id) except "Unauthorized": LOG(self.__class__, PROBLEM, "Could not create temporary folder") return rand_id def getFileattachmentContainer(self, only_temporary=0): """ if TEMPFOLDER_REQUEST_KEY is set in REQUEST return folder object, otherwise return self. """ request = self.REQUEST rkey = TEMPFOLDER_REQUEST_KEY if request.has_key(rkey) and request.get(rkey) is not None: return getattr(self._getTempFolder(), request[rkey]) elif only_temporary: return None else: return self def showFileattachments(self, container=None, only_temporary=0): """ return HTML with the file attachments """ if container is None and only_temporary: container = self.getFileattachmentContainer(only_temporary=1) if not container: return '' elif container is None: # find then manually if self.meta_type == ISSUE_METATYPE: container = self if not container: return '' files = container.objectValues('File') if not files: return '' html = [] for file in files: url = file.absolute_url() url = self.relative_url(url) size = self.ShowFilesize(file.getSize()) alt = "File size: %s"%size href = '<a href="%s" rel="nofollow" title="%s">'%(url, alt) _html = '%s<img src="%s" alt="%s" title="%s" border="0" ' _html += 'class="fileatt" />' thumbid = 'thumbnail--%s'%file.getId() if hasattr(container, thumbid) and \ getattr(container, thumbid).meta_type == 'Image': src = getattr(container, thumbid).absolute_url_path() else: src = self.getFileIconpath(file.getId()) _html = _html%(href, src, alt, alt) _html += '</a>\n' file_id = file.getId() if len(file_id) > 50: file_id = file_id[:25]+'...'+file_id[-25:] _html += '%s%s</a>'%(href, self.HighlightQ(file_id, highlight_digits=True)) _html += ' <span class="shade"> (%s)</span>\n'%size html.append(_html) return '<br clear="left" />\n'.join(html)+'<br clear="left"/>' def nullifyTempfolderREQUEST(self): """ if request has tempfolder, make it None """ request = self.REQUEST rkey = TEMPFOLDER_REQUEST_KEY if request.has_key(rkey): request.set(rkey, None) ## Using ACL objects def getACLCookieNames(self): """ return acl_cookienames dict property """ return getattr(self, 'acl_cookienames', {}) def getACLCookieEmails(self): """ return acl_cookieemails dict property """ return getattr(self, 'acl_cookieemails', {}) def getACLCookieDisplayformats(self): """ return acl_cookiedisplayformats dict property """ return getattr(self, 'acl_cookiedisplayformats', {}) def setACLCookieName(self, fromname): """ append to acl_cookienames """ acluser = self._getACLUserName() if acluser: prev = self.getACLCookieNames() prev[acluser] = fromname self.acl_cookienames = prev def setACLCookieEmail(self, email): """ append to acl_cookieemails """ acluser = self._getACLUserName() if acluser: prev = self.getACLCookieEmails() prev[acluser] = email self.acl_cookieemails = prev def setACLCookieDisplayformat(self, displayformat): """ append to acl_cookiedisplayformats """ assert displayformat in self.display_formats, \ "Invalid displayformat value %r" % displayformat acluser = self._getACLUserName() if acluser: prev = self.getACLCookieDisplayformats() prev[acluser] = displayformat self.acl_cookiedisplayformats = prev def _getACLUserName(self): """ return ACL username or None """ usr = getSecurityManager().getUser().getUserName() if usr.lower().replace(' ','')=='anonymoususer': return None else: return usr ## Adding an Issue def fixSectionsSubmission(self): """ here's a special script that converts 'section' into ['section'] if present and 'sections' if not present. """ request = self.REQUEST if not request.has_key('sections') and request.get('section'): request.set('sections', [request.get('section')]) return True return False security.declareProtected(AddIssuesPermission, 'SubmitIssue') def SubmitIssue(self, REQUEST): """ This is the method to create an Issue Tracker Issue. It relies only on the REQUEST object. 1) Check data 2) Try to create issue 2a) If success, RESPONSE.redirect to issue plus Thank you message 2b) If failure, print failed data and urge to submit again """ request = self.REQUEST SubmitError = {} has_cookie = self.has_cookie get_cookie = self.get_cookie set_cookie = self.set_cookie # # Tune the data a bit # # strip whitespace for property in ['title','fromname','email', 'url2issue','display_format']: value = request.get(property, '').strip() if property in ('email', 'display_format'): value = asciify(value) request[property] = value # Special treatment needed in case STX is used upon display request['description'] = request.get('description','').strip()+' ' email_cookiekey = self.getCookiekey('email') name_cookiekey = self.getCookiekey('name') display_format_cookiekey = self.getCookiekey('display_format') # use cookie if not else specified # assume that it is not a ACL user who adds the issue acl_adder = None issueuser = self.getIssueUser() cmfuser = self.getCM