"""views module.""" from collections import Counter from types import SimpleNamespace from urllib.parse import urlparse import itertools import logging from buku import BukuDb from flask import flash, redirect, request, url_for from flask_admin.babel import gettext from flask_admin.base import AdminIndexView, BaseView, expose from flask_admin.model import BaseModelView from flask_wtf import FlaskForm from jinja2 import Markup import arrow import wtforms try: from . import forms, filters as bs_filters from .filters import BookmarkField, FilterType except ImportError: from bukuserver import forms, filters as bs_filters from bukuserver.filters import BookmarkField, FilterType STATISTIC_DATA = None DEFAULT_URL_RENDER_MODE = 'full' DEFAULT_PER_PAGE = 10 log = logging.getLogger("bukuserver.views") class CustomAdminIndexView(AdminIndexView): @expose('/') def index(self): return self.render('bukuserver/home.html', form=forms.HomeForm()) @expose('/', methods=['POST',]) def search(self): "redirect to bookmark search" form = forms.HomeForm() bbm_filter = bs_filters.BookmarkBukuFilter( all_keywords=False, deep=form.deep.data, regex=form.regex.data) op_text = bbm_filter.operation() values_combi = sorted(itertools.product([True, False], repeat=3)) for idx, (all_keywords, deep, regex) in enumerate(values_combi): if deep == form.deep.data and regex == form.regex.data and not all_keywords: choosen_idx = idx url_op_text = op_text.replace(', ', '_').replace(' ', ' ').replace(' ', '_') key = ''.join(['flt', str(choosen_idx), '_buku_', url_op_text]) kwargs = {key: form.keyword.data} url = url_for('bookmark.index_view', **kwargs) return redirect(url) class CustomBukuDbModel: # pylint: disable=too-few-public-methods def __init__(self, bukudb_inst, name): self.bukudb = bukudb_inst self.name = name @property def __name__(self): return self.name class BookmarkModelView(BaseModelView): def _apply_filters(self, models, filters): for idx, flt_name, value in filters: flt = self._filters[idx] clean_value = flt.clean(value) models = list(flt.apply(models, clean_value)) return models def _create_ajax_loader(self, name, options): pass def _list_entry(self, context, model, name): parsed_url = urlparse(model.url) netloc, scheme = parsed_url.netloc, parsed_url.scheme is_scheme_valid = scheme in ('http', 'https') tag_text = [] tag_tmpl = '{0}' for tag in model.tags.split(','): if tag: tag_text.append(tag_tmpl.format(tag, url_for( 'bookmark.index_view', flt2_tags_contain=tag))) if not netloc: return Markup("""\ {0.title}
{2}
{1}{0.description} """.format( model, ''.join(tag_text), Markup.escape(model.url) )) netloc_tmpl = ' ' res = netloc_tmpl.format( 'http://www.google.com/s2/favicons?domain=', netloc) title = model.title if model.title else '<EMPTY TITLE>' if is_scheme_valid: res += '{1}'.format(model, title) else: res += title if self.url_render_mode == 'netloc': res += ' ({0})'.format( netloc, url_for('bookmark.index_view', flt2_url_netloc_match=netloc) ) res += '
' if not is_scheme_valid: res += model.url elif self.url_render_mode is None or self.url_render_mode == 'full': res += '{0.url}'.format(model) res += '
' if self.url_render_mode != 'netloc': res += tag_tmpl.format( 'netloc:{}'.format(netloc), url_for('bookmark.index_view', flt2_url_netloc_match=netloc) ) res += ''.join(tag_text) description = model.description if description: res += '
' res += description.replace('\n', '
') return Markup(res) can_set_page_size = True can_view_details = True column_filters = ['buku', 'id', 'url', 'title', 'tags'] column_formatters = {'Entry': _list_entry,} column_list = ['Entry'] create_modal = True create_modal_template = 'bukuserver/bookmark_create_modal.html' create_template = 'bukuserver/bookmark_create.html' details_modal = True edit_modal = True edit_modal_template = 'bukuserver/bookmark_edit_modal.html' edit_template = 'bukuserver/bookmark_edit.html' named_filter_urls = True def __init__(self, *args, **kwargs): self.bukudb = args[0] custom_model = CustomBukuDbModel(args[0], 'bookmark') args = [custom_model, ] + list(args[1:]) self.page_size = kwargs.pop('page_size', DEFAULT_PER_PAGE) self.url_render_mode = kwargs.pop('url_render_mode', DEFAULT_URL_RENDER_MODE) super().__init__(*args, **kwargs) def create_model(self, form): try: model = SimpleNamespace(id=None, url=None, title=None, tags=None, description=None) form.populate_obj(model) vars(model).pop('id') self._on_model_change(form, model, True) tags_in = model.tags if not tags_in.startswith(','): tags_in = ',{}'.format(tags_in) if not tags_in.endswith(','): tags_in = '{},'.format(tags_in) self.model.bukudb.add_rec( url=model.url, title_in=model.title, tags_in=tags_in, desc=model.description) except Exception as ex: if not self.handle_view_exception(ex): flash(gettext('Failed to create record. %(error)s', error=str(ex)), 'error') log.exception('Failed to create record.') return False else: self.after_model_change(form, model, True) return model def delete_model(self, model): try: self.on_model_delete(model) res = self.bukudb.delete_rec(model.id) except Exception as ex: if not self.handle_view_exception(ex): flash(gettext('Failed to delete record. %(error)s', error=str(ex)), 'error') log.exception('Failed to delete record.') return False else: self.after_model_delete(model) return res def get_list(self, page, sort_field, sort_desc, search, filters, page_size=None): bukudb = self.bukudb contain_buku_search = any(x[1] == 'buku' for x in filters) if contain_buku_search: mode_id = [x[0] for x in filters] if len(list(set(mode_id))) > 1: flash(gettext('Invalid search mode combination'), 'error') return 0, [] keywords = [x[2] for x in filters] for idx, flt_name, value in filters: if flt_name == 'buku': flt = self._filters[idx] bookmarks = bukudb.searchdb( keywords, all_keywords=flt.all_keywords, deep=flt.deep, regex=flt.regex) else: bookmarks = bukudb.get_rec_all() bookmarks = self._apply_filters(bookmarks, filters) if sort_field: key_idx = [x.value for x in BookmarkField if x.name.lower() == sort_field][0] bookmarks = sorted(bookmarks, key=lambda x:x[key_idx], reverse=sort_desc) count = len(bookmarks) if page_size and bookmarks: bookmarks = list(chunks(bookmarks, page_size))[page] data = [] for bookmark in bookmarks: bm_sns = SimpleNamespace(id=None, url=None, title=None, tags=None, description=None) for field in list(BookmarkField): if field == BookmarkField.TAGS: value = bookmark[field.value] if value.startswith(','): value = value[1:] if value.endswith(','): value = value[:-1] setattr(bm_sns, field.name.lower(), value) else: setattr(bm_sns, field.name.lower(), bookmark[field.value]) data.append(bm_sns) return count, data def get_one(self, id): bookmark = self.model.bukudb.get_rec_by_id(id) bm_sns = SimpleNamespace(id=None, url=None, title=None, tags=None, description=None) for field in list(BookmarkField): if field == BookmarkField.TAGS and bookmark[field.value].startswith(','): value = bookmark[field.value] if value.startswith(','): value = value[1:] if value.endswith(','): value = value[:-1] setattr(bm_sns, field.name.lower(), value) else: setattr(bm_sns, field.name.lower(), bookmark[field.value]) return bm_sns def get_pk_value(self, model): return model.id def scaffold_list_columns(self): return [x.name.lower() for x in BookmarkField] def scaffold_list_form(self, widget=None, validators=None): pass def scaffold_sortable_columns(self): return {x:x for x in self.scaffold_list_columns()} def scaffold_filters(self, name): res = [] if name == 'buku': values_combi = sorted(itertools.product([True, False], repeat=3)) for all_keywords, deep, regex in values_combi: res.append( bs_filters.BookmarkBukuFilter(all_keywords=all_keywords, deep=deep, regex=regex) ) elif name == BookmarkField.ID.name.lower(): res.extend([ bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.EQUAL), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.NOT_EQUAL), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.IN_LIST), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.NOT_IN_LIST), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.GREATER), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.SMALLER), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.TOP_X), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.BOTTOM_X), ]) elif name == BookmarkField.URL.name.lower(): def netloc_match_func(query, value, index): return filter(lambda x: urlparse(x[index]).netloc == value, query) res.extend([ bs_filters.BookmarkBaseFilter(name, 'netloc match', netloc_match_func), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.EQUAL), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.NOT_EQUAL), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.IN_LIST), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.NOT_IN_LIST), ]) elif name == BookmarkField.TITLE.name.lower(): res.extend([ bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.EQUAL), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.NOT_EQUAL), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.IN_LIST), bs_filters.BookmarkBaseFilter(name, filter_type=FilterType.NOT_IN_LIST), ]) elif name == BookmarkField.TAGS.name.lower(): def tags_contain_func(query, value, index): for item in query: for tag in item[index].split(','): if tag and tag == value: yield item def tags_not_contain_func(query, value, index): for item in query: for tag in item[index].split(','): if tag and tag == value: yield item res.extend([ bs_filters.BookmarkBaseFilter(name, 'contain', tags_contain_func), bs_filters.BookmarkBaseFilter(name, 'not contain', tags_not_contain_func), bs_filters.BookmarkTagNumberEqualFilter(name, 'number equal'), bs_filters.BookmarkTagNumberNotEqualFilter(name, 'number not equal'), bs_filters.BookmarkTagNumberGreaterFilter(name, 'number greater than'), bs_filters.BookmarkTagNumberSmallerFilter(name, 'number smaller than'), ]) elif name in self.scaffold_list_columns(): pass else: return super().scaffold_filters(name) return res def scaffold_form(self): cls = forms.BookmarkForm return cls def update_model(self, form, model): res = False try: original_tags = model.tags form.populate_obj(model) self._on_model_change(form, model, False) self.bukudb.delete_tag_at_index(model.id, original_tags) tags_in = model.tags if not tags_in.startswith(','): tags_in = ',{}'.format(tags_in) if not tags_in.endswith(','): tags_in = '{},'.format(tags_in) res = self.bukudb.update_rec( model.id, url=model.url, title_in=model.title, tags_in=tags_in, desc=model.description) except Exception as ex: if not self.handle_view_exception(ex): flash(gettext('Failed to update record. %(error)s', error=str(ex)), 'error') log.exception('Failed to update record.') return False else: self.after_model_change(form, model, False) return res class TagModelView(BaseModelView): def _create_ajax_loader(self, name, options): pass def _apply_filters(self, models, filters): for idx, flt_name, value in filters: flt = self._filters[idx] clean_value = flt.clean(value) models = list(flt.apply(models, clean_value)) return models def _name_formatter(self, context, model, name): data = getattr(model, name) return Markup('{}'.format( url_for('bookmark.index_view', flt1_tags_contain=data), data if data else '<EMPTY TAG>' )) can_create = False can_set_page_size = True column_filters = ['name', 'usage_count'] column_formatters = {'name': _name_formatter,} def __init__(self, *args, **kwargs): self.bukudb = args[0] custom_model = CustomBukuDbModel(args[0], 'tag') args = [custom_model, ] + list(args[1:]) self.page_size = kwargs.pop('page_size', DEFAULT_PER_PAGE) super().__init__(*args, **kwargs) def scaffold_list_columns(self): return ['name', 'usage_count'] def scaffold_sortable_columns(self): return {x:x for x in self.scaffold_list_columns()} def scaffold_form(self): class CustomForm(FlaskForm): # pylint: disable=too-few-public-methods name = wtforms.StringField(validators=[wtforms.validators.required()]) return CustomForm def scaffold_list_form(self, widget=None, validators=None): pass def get_list(self, page, sort_field, sort_desc, search, filters, page_size=None): bukudb = self.bukudb tags = bukudb.get_tag_all()[1] tags = [(x, y) for x, y in tags.items()] tags = self._apply_filters(tags, filters) if sort_field == 'usage_count': tags = sorted(tags, key=lambda x: x[1], reverse=sort_desc) elif sort_field == 'name': tags = sorted(tags, key=lambda x: x[0], reverse=sort_desc) tags = list(tags) count = len(tags) if page_size and tags: tags = list(chunks(tags, page_size))[page] data = [] for name, usage_count in tags: tag_sns = SimpleNamespace(name=None, usage_count=None) tag_sns.name, tag_sns.usage_count = name, usage_count data.append(tag_sns) return count, data def get_pk_value(self, model): return model.name def get_one(self, id): tags = self.bukudb.get_tag_all()[1] tag_sns = SimpleNamespace(name=id, usage_count=tags[id]) return tag_sns def scaffold_filters(self, name): res = [] def top_most_common_func(query, value, index): counter = Counter(x[index] for x in query) most_common = counter.most_common(value) most_common_item = [x[0] for x in most_common] return filter(lambda x: x[index] in most_common_item, query) res.extend([ bs_filters.TagBaseFilter(name, filter_type=FilterType.EQUAL), bs_filters.TagBaseFilter(name, filter_type=FilterType.NOT_EQUAL), bs_filters.TagBaseFilter(name, filter_type=FilterType.IN_LIST), bs_filters.TagBaseFilter(name, filter_type=FilterType.NOT_IN_LIST), ]) if name == 'usage_count': res.extend([ bs_filters.TagBaseFilter(name, filter_type=FilterType.GREATER), bs_filters.TagBaseFilter(name, filter_type=FilterType.SMALLER), bs_filters.TagBaseFilter(name, filter_type=FilterType.TOP_X), bs_filters.TagBaseFilter(name, filter_type=FilterType.BOTTOM_X), bs_filters.TagBaseFilter(name, 'top most common', top_most_common_func), ]) elif name == 'name': pass else: return super().scaffold_filters(name) return res def delete_model(self, model): res = None try: self.on_model_delete(model) res = self.bukudb.delete_tag_at_index(0, model.name, chatty=False) except Exception as ex: if not self.handle_view_exception(ex): flash(gettext('Failed to delete record. %(error)s', error=str(ex)), 'error') log.exception('Failed to delete record.') return False else: self.after_model_delete(model) return res def update_model(self, form, model): res = None try: original_name = model.name form.populate_obj(model) self._on_model_change(form, model, False) res = self.bukudb.replace_tag(original_name, [model.name]) except Exception as ex: if not self.handle_view_exception(ex): flash(gettext('Failed to update record. %(error)s', error=str(ex)), 'error') log.exception('Failed to update record.') return False else: self.after_model_change(form, model, False) return res def create_model(self, form): pass class StatisticView(BaseView): # pylint: disable=too-few-public-methods @expose('/', methods=('GET', 'POST')) def index(self): bukudb = BukuDb() global STATISTIC_DATA statistic_data = STATISTIC_DATA if not statistic_data or request.method == 'POST': all_bookmarks = bukudb.get_rec_all() netloc = [urlparse(x[1]).netloc for x in all_bookmarks] tag_set = [x[3] for x in all_bookmarks] tag_items = [] for tags in tag_set: tag_items.extend([x.strip() for x in tags.split(',') if x.strip()]) tag_counter = Counter(tag_items) title_items = [x[2] for x in all_bookmarks] title_counter = Counter(title_items) statistic_datetime = arrow.now() STATISTIC_DATA = { 'datetime': statistic_datetime, 'netloc': netloc, 'tag_counter': tag_counter, 'title_counter': title_counter, } else: netloc = statistic_data['netloc'] statistic_datetime = statistic_data['datetime'] tag_counter = statistic_data['tag_counter'] title_counter = statistic_data['title_counter'] netloc_counter = Counter(netloc) unique_netloc_len = len(set(netloc)) colors = [ "#F7464A", "#46BFBD", "#FDB45C", "#FEDCBA", "#ABCDEF", "#DDDDDD", "#ABCABC", "#4169E1", "#C71585", "#FF4500", "#FEDCBA", "#46BFBD"] show_netloc_table = False if unique_netloc_len > len(colors): max_netloc_item = len(colors) netloc_colors = colors show_netloc_table = True else: netloc_colors = colors[:unique_netloc_len] max_netloc_item = unique_netloc_len most_common_netlocs = netloc_counter.most_common(max_netloc_item) most_common_netlocs = [ [val[0], val[1], netloc_colors[idx]] for idx, val in enumerate(most_common_netlocs)] unique_tag_len = len(tag_counter) show_tag_rank_table = False if unique_tag_len > len(colors): max_tag_item = len(colors) tag_colors = colors show_tag_rank_table = True else: tag_colors = colors[:unique_tag_len] max_tag_item = unique_tag_len most_common_tags = tag_counter.most_common(max_tag_item) most_common_tags = [ [val[0], val[1], tag_colors[idx]] for idx, val in enumerate(most_common_tags)] unique_title_len = len(title_counter) show_title_rank_table = False if unique_title_len > len(colors): max_title_item = len(colors) title_colors = colors show_title_rank_table = True else: title_colors = colors[:unique_title_len] max_title_item = unique_title_len most_common_titles = title_counter.most_common(max_title_item) most_common_titles = [ [val[0], val[1], title_colors[idx]] for idx, val in enumerate(most_common_titles)] return self.render( 'bukuserver/statistic.html', most_common_netlocs=most_common_netlocs, netloc_counter=netloc_counter, show_netloc_table=show_netloc_table, most_common_tags=most_common_tags, tag_counter=tag_counter, show_tag_rank_table=show_tag_rank_table, most_common_titles=most_common_titles, title_counter=title_counter, show_title_rank_table=show_title_rank_table, datetime=statistic_datetime, datetime_text=statistic_datetime.humanize(arrow.now(), granularity='second'), ) def chunks(l, n): n = max(1, n) return (l[i:i+n] for i in range(0, len(l), n))