Module hug.output_format
hug/output_format.py
Defines Hug's built-in output formatting methods
Copyright (C) 2016 Timothy Edmund Crosley
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View Source
"""hug/output_format.py Defines Hug's built-in output formatting methods Copyright (C) 2016 Timothy Edmund Crosley Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from __future__ import absolute_import import base64 import mimetypes import os import re import tempfile from datetime import date, datetime, timedelta from decimal import Decimal from functools import wraps from io import BytesIO from operator import itemgetter from uuid import UUID import falcon from falcon import HTTP_NOT_FOUND from hug import introspect from hug.format import camelcase, content_type from hug.json_module import json as json_converter try: import numpy except ImportError: numpy = False IMAGE_TYPES = ( "png", "jpg", "bmp", "eps", "gif", "im", "jpeg", "msp", "pcx", "ppm", "spider", "tiff", "webp", "xbm", "cur", "dcx", "fli", "flc", "gbr", "gd", "ico", "icns", "imt", "iptc", "naa", "mcidas", "mpo", "pcd", "psd", "sgi", "tga", "wal", "xpm", "svg", "svg+xml", ) VIDEO_TYPES = ( ("flv", "video/x-flv"), ("mp4", "video/mp4"), ("m3u8", "application/x-mpegURL"), ("ts", "video/MP2T"), ("3gp", "video/3gpp"), ("mov", "video/quicktime"), ("avi", "video/x-msvideo"), ("wmv", "video/x-ms-wmv"), ) RE_ACCEPT_QUALITY = re.compile("q=(?P<quality>[^;]+)") json_converters = {} stream = tempfile.NamedTemporaryFile if "UWSGI_ORIGINAL_PROC_NAME" in os.environ else BytesIO def _json_converter(item): if hasattr(item, "__native_types__"): return item.__native_types__() for kind, transformer in json_converters.items(): if isinstance(item, kind): return transformer(item) if isinstance(item, (date, datetime)): return item.isoformat() elif isinstance(item, bytes): try: return item.decode("utf8") except UnicodeDecodeError: return base64.b64encode(item) elif hasattr(item, "__iter__"): return list(item) elif isinstance(item, (Decimal, UUID)): return str(item) elif isinstance(item, timedelta): return item.total_seconds() raise TypeError("Type not serializable") def json_convert(*kinds): """Registers the wrapped method as a JSON converter for the provided types. NOTE: custom converters are always globally applied """ def register_json_converter(function): for kind in kinds: json_converters[kind] = function return function return register_json_converter if numpy: @json_convert(numpy.ndarray) def numpy_listable(item): return item.tolist() @json_convert(str, numpy.unicode_) def numpy_stringable(item): return str(item) @json_convert(numpy.bytes_) def numpy_byte_decodeable(item): return item.decode() @json_convert(numpy.bool_) def numpy_boolable(item): return bool(item) @json_convert(numpy.integer) def numpy_integerable(item): return int(item) @json_convert(float, numpy.floating) def numpy_floatable(item): return float(item) @content_type("application/json; charset=utf-8") def json(content, request=None, response=None, ensure_ascii=False, **kwargs): """JSON (Javascript Serialized Object Notation)""" if hasattr(content, "read"): return content if isinstance(content, tuple) and getattr(content, "_fields", None): content = {field: getattr(content, field) for field in content._fields} return json_converter.dumps( content, default=_json_converter, ensure_ascii=ensure_ascii, **kwargs ).encode("utf8") def on_valid(valid_content_type, on_invalid=json): """Renders as the specified content type only if no errors are found in the provided data object""" invalid_kwargs = introspect.generate_accepted_kwargs(on_invalid, "request", "response") invalid_takes_response = introspect.takes_all_arguments(on_invalid, "response") def wrapper(function): valid_kwargs = introspect.generate_accepted_kwargs(function, "request", "response") valid_takes_response = introspect.takes_all_arguments(function, "response") @content_type(valid_content_type) @wraps(function) def output_content(content, response, **kwargs): if type(content) == dict and "errors" in content: response.content_type = on_invalid.content_type if invalid_takes_response: kwargs["response"] = response return on_invalid(content, **invalid_kwargs(kwargs)) if valid_takes_response: kwargs["response"] = response return function(content, **valid_kwargs(kwargs)) return output_content return wrapper @content_type("text/plain; charset=utf-8") def text(content, **kwargs): """Free form UTF-8 text""" if hasattr(content, "read"): return content return str(content).encode("utf8") @content_type("text/html; charset=utf-8") def html(content, **kwargs): """HTML (Hypertext Markup Language)""" if hasattr(content, "read"): return content elif hasattr(content, "render"): return content.render().encode("utf8") return str(content).encode("utf8") def _camelcase(content): if isinstance(content, dict): new_dictionary = {} for key, value in content.items(): if isinstance(key, str): key = camelcase(key) new_dictionary[key] = _camelcase(value) return new_dictionary elif isinstance(content, list): new_list = [] for element in content: new_list.append(_camelcase(element)) return new_list else: return content @content_type("application/json; charset=utf-8") def json_camelcase(content, **kwargs): """JSON (Javascript Serialized Object Notation) with all keys camelCased""" return json(_camelcase(content), **kwargs) @content_type("application/json; charset=utf-8") def pretty_json(content, **kwargs): """JSON (Javascript Serialized Object Notion) pretty printed and indented""" return json(content, indent=4, separators=(",", ": "), **kwargs) def image(image_format, doc=None): """Dynamically creates an image type handler for the specified image type""" @on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb") image_handler.__doc__ = doc or "{0} formatted image".format(image_format) return image_handler for image_type in IMAGE_TYPES: globals()["{0}_image".format(image_type.replace("+", "_"))] = image(image_type) def video(video_type, video_mime, doc=None): """Dynamically creates a video type handler for the specified video type""" @on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb") video_handler.__doc__ = doc or "{0} formatted video".format(video_type) return video_handler for (video_type, video_mime) in VIDEO_TYPES: globals()["{0}_video".format(video_type)] = video(video_type, video_mime) @on_valid("file/dynamic") def file(data, response, **kwargs): """A dynamically retrieved file""" if not data: response.content_type = "text/plain" return "" if hasattr(data, "read"): name, data = getattr(data, "name", ""), data elif os.path.isfile(data): name, data = data, open(data, "rb") else: response.content_type = "text/plain" response.status = HTTP_NOT_FOUND return "File not found!" response.content_type = mimetypes.guess_type(name, None)[0] or "application/octet-stream" return data def on_content_type( handlers, default=None, error="The requested content type does not match any of those allowed" ): """Returns a content in a different format based on the clients provided content type, should pass in a dict with the following format: {'[content-type]': action, ... } """ def output_type(data, request, response): handler = handlers.get(request.content_type.split(";")[0], default) if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ or function.__name__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type def accept_quality(accept, default=1): """Separates out the quality score from the accepted content_type""" quality = default if accept and ";" in accept: accept, rest = accept.split(";", 1) accept_quality = RE_ACCEPT_QUALITY.search(rest) if accept_quality: quality = float(accept_quality.groupdict().get("quality", quality).strip()) return (quality, accept.strip()) def accept( handlers, default=None, error="The requested content type does not match any of those allowed" ): """Returns a content in a different format based on the clients defined accepted content type, should pass in a dict with the following format: {'[content-type]': action, ... } """ def output_type(data, request, response): accept = request.accept if accept in ("", "*", "/"): handler = default or handlers and next(iter(handlers.values())) else: handler = default accepted = [accept_quality(accept_type) for accept_type in accept.split(",")] accepted.sort(key=itemgetter(0)) for _quality, accepted_content_type in reversed(accepted): if accepted_content_type in handlers: handler = handlers[accepted_content_type] break if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type def suffix( handlers, default=None, error="The requested suffix does not match any of those allowed" ): """Returns a content in a different format based on the suffix placed at the end of the URL route should pass in a dict with the following format: {'[suffix]': action, ... } """ def output_type(data, request, response): path = request.path handler = default for suffix_test, suffix_handler in handlers.items(): if path.endswith(suffix_test): handler = suffix_handler break if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type def prefix( handlers, default=None, error="The requested prefix does not match any of those allowed" ): """Returns a content in a different format based on the prefix placed at the end of the URL route should pass in a dict with the following format: {'[prefix]': action, ... } """ def output_type(data, request, response): path = request.path handler = default for prefix_test, prefix_handler in handlers.items(): if path.startswith(prefix_test): handler = prefix_handler break if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type
Variables
HTTP_NOT_FOUND
IMAGE_TYPES
RE_ACCEPT_QUALITY
VIDEO_TYPES
image_type
json_converters
video_mime
video_type
Functions
3gp_video
def 3gp_video( data, **kwargs )
3gp formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
accept
def accept( handlers, default=None, error='The requested content type does not match any of those allowed' )
Returns a content in a different format based on the clients defined accepted content type, should pass in a dict with the following format:
{'[content-type]': action, ... }
View Source
def accept( handlers, default=None, error="The requested content type does not match any of those allowed" ): """Returns a content in a different format based on the clients defined accepted content type, should pass in a dict with the following format: {'[content-type]': action, ... } """ def output_type(data, request, response): accept = request.accept if accept in ("", "*", "/"): handler = default or handlers and next(iter(handlers.values())) else: handler = default accepted = [accept_quality(accept_type) for accept_type in accept.split(",")] accepted.sort(key=itemgetter(0)) for _quality, accepted_content_type in reversed(accepted): if accepted_content_type in handlers: handler = handlers[accepted_content_type] break if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type
accept_quality
def accept_quality( accept, default=1 )
Separates out the quality score from the accepted content_type
View Source
def accept_quality(accept, default=1): """Separates out the quality score from the accepted content_type""" quality = default if accept and ";" in accept: accept, rest = accept.split(";", 1) accept_quality = RE_ACCEPT_QUALITY.search(rest) if accept_quality: quality = float(accept_quality.groupdict().get("quality", quality).strip()) return (quality, accept.strip())
avi_video
def avi_video( data, **kwargs )
avi formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
bmp_image
def bmp_image( data, **kwargs )
bmp formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
cur_image
def cur_image( data, **kwargs )
cur formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
dcx_image
def dcx_image( data, **kwargs )
dcx formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
eps_image
def eps_image( data, **kwargs )
eps formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
file
def file( data, response, **kwargs )
A dynamically retrieved file
View Source
@on_valid("file/dynamic") def file(data, response, **kwargs): """A dynamically retrieved file""" if not data: response.content_type = "text/plain" return "" if hasattr(data, "read"): name, data = getattr(data, "name", ""), data elif os.path.isfile(data): name, data = data, open(data, "rb") else: response.content_type = "text/plain" response.status = HTTP_NOT_FOUND return "File not found!" response.content_type = mimetypes.guess_type(name, None)[0] or "application/octet-stream" return data
flc_image
def flc_image( data, **kwargs )
flc formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
fli_image
def fli_image( data, **kwargs )
fli formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
flv_video
def flv_video( data, **kwargs )
flv formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
gbr_image
def gbr_image( data, **kwargs )
gbr formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
gd_image
def gd_image( data, **kwargs )
gd formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
gif_image
def gif_image( data, **kwargs )
gif formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
html
def html( content, **kwargs )
HTML (Hypertext Markup Language)
View Source
@content_type("text/html; charset=utf-8") def html(content, **kwargs): """HTML (Hypertext Markup Language)""" if hasattr(content, "read"): return content elif hasattr(content, "render"): return content.render().encode("utf8") return str(content).encode("utf8")
icns_image
def icns_image( data, **kwargs )
icns formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
ico_image
def ico_image( data, **kwargs )
ico formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
im_image
def im_image( data, **kwargs )
im formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
image
def image( image_format, doc=None )
Dynamically creates an image type handler for the specified image type
View Source
def image(image_format, doc=None): """Dynamically creates an image type handler for the specified image type""" @on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb") image_handler.__doc__ = doc or "{0} formatted image".format(image_format) return image_handler
imt_image
def imt_image( data, **kwargs )
imt formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
iptc_image
def iptc_image( data, **kwargs )
iptc formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
jpeg_image
def jpeg_image( data, **kwargs )
jpeg formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
jpg_image
def jpg_image( data, **kwargs )
jpg formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
json
def json( content, request=None, response=None, ensure_ascii=False, **kwargs )
JSON (Javascript Serialized Object Notation)
View Source
@content_type("application/json; charset=utf-8") def json(content, request=None, response=None, ensure_ascii=False, **kwargs): """JSON (Javascript Serialized Object Notation)""" if hasattr(content, "read"): return content if isinstance(content, tuple) and getattr(content, "_fields", None): content = {field: getattr(content, field) for field in content._fields} return json_converter.dumps( content, default=_json_converter, ensure_ascii=ensure_ascii, **kwargs ).encode("utf8")
json_camelcase
def json_camelcase( content, **kwargs )
JSON (Javascript Serialized Object Notation) with all keys camelCased
View Source
@content_type("application/json; charset=utf-8") def json_camelcase(content, **kwargs): """JSON (Javascript Serialized Object Notation) with all keys camelCased""" return json(_camelcase(content), **kwargs)
json_convert
def json_convert( *kinds )
Registers the wrapped method as a JSON converter for the provided types.
NOTE: custom converters are always globally applied
View Source
def json_convert(*kinds): """Registers the wrapped method as a JSON converter for the provided types. NOTE: custom converters are always globally applied """ def register_json_converter(function): for kind in kinds: json_converters[kind] = function return function return register_json_converter
m3u8_video
def m3u8_video( data, **kwargs )
m3u8 formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
mcidas_image
def mcidas_image( data, **kwargs )
mcidas formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
mov_video
def mov_video( data, **kwargs )
mov formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
mp4_video
def mp4_video( data, **kwargs )
mp4 formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
mpo_image
def mpo_image( data, **kwargs )
mpo formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
msp_image
def msp_image( data, **kwargs )
msp formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
naa_image
def naa_image( data, **kwargs )
naa formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
numpy_boolable
def numpy_boolable( item )
View Source
@json_convert(numpy.bool_) def numpy_boolable(item): return bool(item)
numpy_byte_decodeable
def numpy_byte_decodeable( item )
View Source
@json_convert(numpy.bytes_) def numpy_byte_decodeable(item): return item.decode()
numpy_floatable
def numpy_floatable( item )
View Source
@json_convert(float, numpy.floating) def numpy_floatable(item): return float(item)
numpy_integerable
def numpy_integerable( item )
View Source
@json_convert(numpy.integer) def numpy_integerable(item): return int(item)
numpy_listable
def numpy_listable( item )
View Source
@json_convert(numpy.ndarray) def numpy_listable(item): return item.tolist()
numpy_stringable
def numpy_stringable( item )
View Source
@json_convert(str, numpy.unicode_) def numpy_stringable(item): return str(item)
on_content_type
def on_content_type( handlers, default=None, error='The requested content type does not match any of those allowed' )
Returns a content in a different format based on the clients provided content type, should pass in a dict with the following format:
{'[content-type]': action, ... }
View Source
def on_content_type( handlers, default=None, error="The requested content type does not match any of those allowed" ): """Returns a content in a different format based on the clients provided content type, should pass in a dict with the following format: {'[content-type]': action, ... } """ def output_type(data, request, response): handler = handlers.get(request.content_type.split(";")[0], default) if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ or function.__name__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type
on_valid
def on_valid( valid_content_type, on_invalid=<function json at 0x7f7ee3199840> )
Renders as the specified content type only if no errors are found in the provided data object
View Source
def on_valid(valid_content_type, on_invalid=json): """Renders as the specified content type only if no errors are found in the provided data object""" invalid_kwargs = introspect.generate_accepted_kwargs(on_invalid, "request", "response") invalid_takes_response = introspect.takes_all_arguments(on_invalid, "response") def wrapper(function): valid_kwargs = introspect.generate_accepted_kwargs(function, "request", "response") valid_takes_response = introspect.takes_all_arguments(function, "response") @content_type(valid_content_type) @wraps(function) def output_content(content, response, **kwargs): if type(content) == dict and "errors" in content: response.content_type = on_invalid.content_type if invalid_takes_response: kwargs["response"] = response return on_invalid(content, **invalid_kwargs(kwargs)) if valid_takes_response: kwargs["response"] = response return function(content, **valid_kwargs(kwargs)) return output_content return wrapper
pcd_image
def pcd_image( data, **kwargs )
pcd formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
pcx_image
def pcx_image( data, **kwargs )
pcx formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
png_image
def png_image( data, **kwargs )
png formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
ppm_image
def ppm_image( data, **kwargs )
ppm formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
prefix
def prefix( handlers, default=None, error='The requested prefix does not match any of those allowed' )
Returns a content in a different format based on the prefix placed at the end of the URL route should pass in a dict with the following format:
{'[prefix]': action, ... }
View Source
def prefix( handlers, default=None, error="The requested prefix does not match any of those allowed" ): """Returns a content in a different format based on the prefix placed at the end of the URL route should pass in a dict with the following format: {'[prefix]': action, ... } """ def output_type(data, request, response): path = request.path handler = default for prefix_test, prefix_handler in handlers.items(): if path.startswith(prefix_test): handler = prefix_handler break if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type
pretty_json
def pretty_json( content, **kwargs )
JSON (Javascript Serialized Object Notion) pretty printed and indented
View Source
@content_type("application/json; charset=utf-8") def pretty_json(content, **kwargs): """JSON (Javascript Serialized Object Notion) pretty printed and indented""" return json(content, indent=4, separators=(",", ": "), **kwargs)
psd_image
def psd_image( data, **kwargs )
psd formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
sgi_image
def sgi_image( data, **kwargs )
sgi formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
spider_image
def spider_image( data, **kwargs )
spider formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
suffix
def suffix( handlers, default=None, error='The requested suffix does not match any of those allowed' )
Returns a content in a different format based on the suffix placed at the end of the URL route should pass in a dict with the following format:
{'[suffix]': action, ... }
View Source
def suffix( handlers, default=None, error="The requested suffix does not match any of those allowed" ): """Returns a content in a different format based on the suffix placed at the end of the URL route should pass in a dict with the following format: {'[suffix]': action, ... } """ def output_type(data, request, response): path = request.path handler = default for suffix_test, suffix_handler in handlers.items(): if path.endswith(suffix_test): handler = suffix_handler break if not handler: raise falcon.HTTPNotAcceptable(error) response.content_type = handler.content_type return handler(data, request=request, response=response) output_type.__doc__ = "Supports any of the following formats: {0}".format( ", ".join(function.__doc__ for function in handlers.values()) ) output_type.content_type = ", ".join(handlers.keys()) return output_type
svg_image
def svg_image( data, **kwargs )
svg formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
svg_xml_image
def svg_xml_image( data, **kwargs )
svg+xml formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
text
def text( content, **kwargs )
Free form UTF-8 text
View Source
@content_type("text/plain; charset=utf-8") def text(content, **kwargs): """Free form UTF-8 text""" if hasattr(content, "read"): return content return str(content).encode("utf8")
tga_image
def tga_image( data, **kwargs )
tga formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
tiff_image
def tiff_image( data, **kwargs )
tiff formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
ts_video
def ts_video( data, **kwargs )
ts formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
video
def video( video_type, video_mime, doc=None )
Dynamically creates a video type handler for the specified video type
View Source
def video(video_type, video_mime, doc=None): """Dynamically creates a video type handler for the specified video type""" @on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb") video_handler.__doc__ = doc or "{0} formatted video".format(video_type) return video_handler
wal_image
def wal_image( data, **kwargs )
wal formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
webp_image
def webp_image( data, **kwargs )
webp formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
wmv_video
def wmv_video( data, **kwargs )
wmv formatted video
View Source
@on_valid(video_mime) def video_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() data.save(output, format=video_type.upper()) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
xbm_image
def xbm_image( data, **kwargs )
xbm formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
xpm_image
def xpm_image( data, **kwargs )
xpm formatted image
View Source
@on_valid("image/{0}".format(image_format)) def image_handler(data, **kwargs): if hasattr(data, "read"): return data elif hasattr(data, "save"): output = stream() if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs( data.save ): data.save(output, format=image_format.upper()) else: data.save(output) output.seek(0) return output elif hasattr(data, "render"): return data.render() elif os.path.isfile(data): return open(data, "rb")
Classes
stream
class stream( /, *args, **kwargs )
Buffered I/O implementation using an in-memory bytes buffer.
Ancestors (in MRO)
- _io._BufferedIOBase
- _io._IOBase
Class variables
closed
Methods
close
def close( self, / )
Disable all I/O operations.
detach
def detach( self, / )
Disconnect this buffer from its underlying raw stream and return it.
After the raw stream has been detached, the buffer is in an unusable state.
fileno
def fileno( self, / )
Returns underlying file descriptor if one exists.
OSError is raised if the IO object does not use a file descriptor.
flush
def flush( self, / )
Does nothing.
getbuffer
def getbuffer( self, / )
Get a read-write view over the contents of the BytesIO object.
getvalue
def getvalue( self, / )
Retrieve the entire contents of the BytesIO object.
isatty
def isatty( self, / )
Always returns False.
BytesIO objects are not connected to a TTY-like device.
read
def read( self, size=-1, / )
Read at most size bytes, returned as a bytes object.
If the size argument is negative, read until EOF is reached. Return an empty bytes object at EOF.
read1
def read1( self, size=-1, / )
Read at most size bytes, returned as a bytes object.
If the size argument is negative or omitted, read until EOF is reached. Return an empty bytes object at EOF.
readable
def readable( self, / )
Returns True if the IO object can be read.
readinto
def readinto( self, buffer, / )
Read bytes into buffer.
Returns number of bytes read (0 for EOF), or None if the object is set not to block and has no data to read.
readinto1
def readinto1( self, buffer, / )
readline
def readline( self, size=-1, / )
Next line from the file, as a bytes object.
Retain newline. A non-negative size argument limits the maximum number of bytes to return (an incomplete line may be returned then). Return an empty bytes object at EOF.
readlines
def readlines( self, size=None, / )
List of bytes objects, each a line from the file.
Call readline() repeatedly and return a list of the lines so read. The optional size argument, if given, is an approximate bound on the total number of bytes in the lines returned.
seek
def seek( self, pos, whence=0, / )
Change stream position.
Seek to byte offset pos relative to position indicated by whence: 0 Start of stream (the default). pos should be >= 0; 1 Current position - pos may be negative; 2 End of stream - pos usually negative. Returns the new absolute position.
seekable
def seekable( self, / )
Returns True if the IO object can be seeked.
tell
def tell( self, / )
Current file position, an integer.
truncate
def truncate( self, size=None, / )
Truncate the file to at most size bytes.
Size defaults to the current file position, as returned by tell(). The current file position is unchanged. Returns the new size.
writable
def writable( self, / )
Returns True if the IO object can be written.
write
def write( self, b, / )
Write bytes to file.
Return the number of bytes written.
writelines
def writelines( self, lines, / )
Write lines to the file.
Note that newlines are not added. lines can be any iterable object producing bytes-like objects. This is equivalent to calling write() for each element.