Module hug.output_format

hug/output_format.py

Defines Hug's built-in output formatting methods

Copyright (C) 2016 Timothy Edmund Crosley

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View Source
"""hug/output_format.py

Defines Hug's built-in output formatting methods

Copyright (C) 2016  Timothy Edmund Crosley

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated

documentation files (the "Software"), to deal in the Software without restriction, including without limitation

the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and

to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or

substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED

TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF

CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR

OTHER DEALINGS IN THE SOFTWARE.

"""

from __future__ import absolute_import

import base64

import mimetypes

import os

import re

import tempfile

from datetime import date, datetime, timedelta

from decimal import Decimal

from functools import wraps

from io import BytesIO

from operator import itemgetter

from uuid import UUID

import falcon

from falcon import HTTP_NOT_FOUND

from hug import introspect

from hug.format import camelcase, content_type

from hug.json_module import json as json_converter

try:

    import numpy

except ImportError:

    numpy = False

IMAGE_TYPES = (

    "png",

    "jpg",

    "bmp",

    "eps",

    "gif",

    "im",

    "jpeg",

    "msp",

    "pcx",

    "ppm",

    "spider",

    "tiff",

    "webp",

    "xbm",

    "cur",

    "dcx",

    "fli",

    "flc",

    "gbr",

    "gd",

    "ico",

    "icns",

    "imt",

    "iptc",

    "naa",

    "mcidas",

    "mpo",

    "pcd",

    "psd",

    "sgi",

    "tga",

    "wal",

    "xpm",

    "svg",

    "svg+xml",

)

VIDEO_TYPES = (

    ("flv", "video/x-flv"),

    ("mp4", "video/mp4"),

    ("m3u8", "application/x-mpegURL"),

    ("ts", "video/MP2T"),

    ("3gp", "video/3gpp"),

    ("mov", "video/quicktime"),

    ("avi", "video/x-msvideo"),

    ("wmv", "video/x-ms-wmv"),

)

RE_ACCEPT_QUALITY = re.compile("q=(?P<quality>[^;]+)")

json_converters = {}

stream = tempfile.NamedTemporaryFile if "UWSGI_ORIGINAL_PROC_NAME" in os.environ else BytesIO

def _json_converter(item):

    if hasattr(item, "__native_types__"):

        return item.__native_types__()

    for kind, transformer in json_converters.items():

        if isinstance(item, kind):

            return transformer(item)

    if isinstance(item, (date, datetime)):

        return item.isoformat()

    elif isinstance(item, bytes):

        try:

            return item.decode("utf8")

        except UnicodeDecodeError:

            return base64.b64encode(item)

    elif hasattr(item, "__iter__"):

        return list(item)

    elif isinstance(item, (Decimal, UUID)):

        return str(item)

    elif isinstance(item, timedelta):

        return item.total_seconds()

    raise TypeError("Type not serializable")

def json_convert(*kinds):

    """Registers the wrapped method as a JSON converter for the provided types.

    NOTE: custom converters are always globally applied

    """

    def register_json_converter(function):

        for kind in kinds:

            json_converters[kind] = function

        return function

    return register_json_converter

if numpy:

    @json_convert(numpy.ndarray)

    def numpy_listable(item):

        return item.tolist()

    @json_convert(str, numpy.unicode_)

    def numpy_stringable(item):

        return str(item)

    @json_convert(numpy.bytes_)

    def numpy_byte_decodeable(item):

        return item.decode()

    @json_convert(numpy.bool_)

    def numpy_boolable(item):

        return bool(item)

    @json_convert(numpy.integer)

    def numpy_integerable(item):

        return int(item)

    @json_convert(float, numpy.floating)

    def numpy_floatable(item):

        return float(item)

@content_type("application/json; charset=utf-8")

def json(content, request=None, response=None, ensure_ascii=False, **kwargs):

    """JSON (Javascript Serialized Object Notation)"""

    if hasattr(content, "read"):

        return content

    if isinstance(content, tuple) and getattr(content, "_fields", None):

        content = {field: getattr(content, field) for field in content._fields}

    return json_converter.dumps(

        content, default=_json_converter, ensure_ascii=ensure_ascii, **kwargs

    ).encode("utf8")

def on_valid(valid_content_type, on_invalid=json):

    """Renders as the specified content type only if no errors are found in the provided data object"""

    invalid_kwargs = introspect.generate_accepted_kwargs(on_invalid, "request", "response")

    invalid_takes_response = introspect.takes_all_arguments(on_invalid, "response")

    def wrapper(function):

        valid_kwargs = introspect.generate_accepted_kwargs(function, "request", "response")

        valid_takes_response = introspect.takes_all_arguments(function, "response")

        @content_type(valid_content_type)

        @wraps(function)

        def output_content(content, response, **kwargs):

            if type(content) == dict and "errors" in content:

                response.content_type = on_invalid.content_type

                if invalid_takes_response:

                    kwargs["response"] = response

                return on_invalid(content, **invalid_kwargs(kwargs))

            if valid_takes_response:

                kwargs["response"] = response

            return function(content, **valid_kwargs(kwargs))

        return output_content

    return wrapper

@content_type("text/plain; charset=utf-8")

def text(content, **kwargs):

    """Free form UTF-8 text"""

    if hasattr(content, "read"):

        return content

    return str(content).encode("utf8")

@content_type("text/html; charset=utf-8")

def html(content, **kwargs):

    """HTML (Hypertext Markup Language)"""

    if hasattr(content, "read"):

        return content

    elif hasattr(content, "render"):

        return content.render().encode("utf8")

    return str(content).encode("utf8")

def _camelcase(content):

    if isinstance(content, dict):

        new_dictionary = {}

        for key, value in content.items():

            if isinstance(key, str):

                key = camelcase(key)

            new_dictionary[key] = _camelcase(value)

        return new_dictionary

    elif isinstance(content, list):

        new_list = []

        for element in content:

            new_list.append(_camelcase(element))

        return new_list

    else:

        return content

@content_type("application/json; charset=utf-8")

def json_camelcase(content, **kwargs):

    """JSON (Javascript Serialized Object Notation) with all keys camelCased"""

    return json(_camelcase(content), **kwargs)

@content_type("application/json; charset=utf-8")

def pretty_json(content, **kwargs):

    """JSON (Javascript Serialized Object Notion) pretty printed and indented"""

    return json(content, indent=4, separators=(",", ": "), **kwargs)

def image(image_format, doc=None):

    """Dynamically creates an image type handler for the specified image type"""

    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

    image_handler.__doc__ = doc or "{0} formatted image".format(image_format)

    return image_handler

for image_type in IMAGE_TYPES:

    globals()["{0}_image".format(image_type.replace("+", "_"))] = image(image_type)

def video(video_type, video_mime, doc=None):

    """Dynamically creates a video type handler for the specified video type"""

    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

    video_handler.__doc__ = doc or "{0} formatted video".format(video_type)

    return video_handler

for (video_type, video_mime) in VIDEO_TYPES:

    globals()["{0}_video".format(video_type)] = video(video_type, video_mime)

@on_valid("file/dynamic")

def file(data, response, **kwargs):

    """A dynamically retrieved file"""

    if not data:

        response.content_type = "text/plain"

        return ""

    if hasattr(data, "read"):

        name, data = getattr(data, "name", ""), data

    elif os.path.isfile(data):

        name, data = data, open(data, "rb")

    else:

        response.content_type = "text/plain"

        response.status = HTTP_NOT_FOUND

        return "File not found!"

    response.content_type = mimetypes.guess_type(name, None)[0] or "application/octet-stream"

    return data

def on_content_type(

    handlers, default=None, error="The requested content type does not match any of those allowed"

):

    """Returns a content in a different format based on the clients provided content type,

       should pass in a dict with the following format:

            {'[content-type]': action,

             ...

            }

    """

    def output_type(data, request, response):

        handler = handlers.get(request.content_type.split(";")[0], default)

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ or function.__name__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

def accept_quality(accept, default=1):

    """Separates out the quality score from the accepted content_type"""

    quality = default

    if accept and ";" in accept:

        accept, rest = accept.split(";", 1)

        accept_quality = RE_ACCEPT_QUALITY.search(rest)

        if accept_quality:

            quality = float(accept_quality.groupdict().get("quality", quality).strip())

    return (quality, accept.strip())

def accept(

    handlers, default=None, error="The requested content type does not match any of those allowed"

):

    """Returns a content in a different format based on the clients defined accepted content type,

       should pass in a dict with the following format:

            {'[content-type]': action,

             ...

            }

    """

    def output_type(data, request, response):

        accept = request.accept

        if accept in ("", "*", "/"):

            handler = default or handlers and next(iter(handlers.values()))

        else:

            handler = default

            accepted = [accept_quality(accept_type) for accept_type in accept.split(",")]

            accepted.sort(key=itemgetter(0))

            for _quality, accepted_content_type in reversed(accepted):

                if accepted_content_type in handlers:

                    handler = handlers[accepted_content_type]

                    break

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

def suffix(

    handlers, default=None, error="The requested suffix does not match any of those allowed"

):

    """Returns a content in a different format based on the suffix placed at the end of the URL route

       should pass in a dict with the following format:

            {'[suffix]': action,

             ...

            }

    """

    def output_type(data, request, response):

        path = request.path

        handler = default

        for suffix_test, suffix_handler in handlers.items():

            if path.endswith(suffix_test):

                handler = suffix_handler

                break

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

def prefix(

    handlers, default=None, error="The requested prefix does not match any of those allowed"

):

    """Returns a content in a different format based on the prefix placed at the end of the URL route

       should pass in a dict with the following format:

            {'[prefix]': action,

             ...

            }

    """

    def output_type(data, request, response):

        path = request.path

        handler = default

        for prefix_test, prefix_handler in handlers.items():

            if path.startswith(prefix_test):

                handler = prefix_handler

                break

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

Variables

HTTP_NOT_FOUND
IMAGE_TYPES
RE_ACCEPT_QUALITY
VIDEO_TYPES
image_type
json_converters
video_mime
video_type

Functions

3gp_video

def 3gp_video(
    data,
    **kwargs
)

3gp formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

accept

def accept(
    handlers,
    default=None,
    error='The requested content type does not match any of those allowed'
)

Returns a content in a different format based on the clients defined accepted content type, should pass in a dict with the following format:

 {'[content-type]': action,
  ...
 }
View Source
def accept(

    handlers, default=None, error="The requested content type does not match any of those allowed"

):

    """Returns a content in a different format based on the clients defined accepted content type,

       should pass in a dict with the following format:

            {'[content-type]': action,

             ...

            }

    """

    def output_type(data, request, response):

        accept = request.accept

        if accept in ("", "*", "/"):

            handler = default or handlers and next(iter(handlers.values()))

        else:

            handler = default

            accepted = [accept_quality(accept_type) for accept_type in accept.split(",")]

            accepted.sort(key=itemgetter(0))

            for _quality, accepted_content_type in reversed(accepted):

                if accepted_content_type in handlers:

                    handler = handlers[accepted_content_type]

                    break

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

accept_quality

def accept_quality(
    accept,
    default=1
)

Separates out the quality score from the accepted content_type

View Source
def accept_quality(accept, default=1):

    """Separates out the quality score from the accepted content_type"""

    quality = default

    if accept and ";" in accept:

        accept, rest = accept.split(";", 1)

        accept_quality = RE_ACCEPT_QUALITY.search(rest)

        if accept_quality:

            quality = float(accept_quality.groupdict().get("quality", quality).strip())

    return (quality, accept.strip())

avi_video

def avi_video(
    data,
    **kwargs
)

avi formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

bmp_image

def bmp_image(
    data,
    **kwargs
)

bmp formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

cur_image

def cur_image(
    data,
    **kwargs
)

cur formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

dcx_image

def dcx_image(
    data,
    **kwargs
)

dcx formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

eps_image

def eps_image(
    data,
    **kwargs
)

eps formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

file

def file(
    data,
    response,
    **kwargs
)

A dynamically retrieved file

View Source
@on_valid("file/dynamic")

def file(data, response, **kwargs):

    """A dynamically retrieved file"""

    if not data:

        response.content_type = "text/plain"

        return ""

    if hasattr(data, "read"):

        name, data = getattr(data, "name", ""), data

    elif os.path.isfile(data):

        name, data = data, open(data, "rb")

    else:

        response.content_type = "text/plain"

        response.status = HTTP_NOT_FOUND

        return "File not found!"

    response.content_type = mimetypes.guess_type(name, None)[0] or "application/octet-stream"

    return data

flc_image

def flc_image(
    data,
    **kwargs
)

flc formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

fli_image

def fli_image(
    data,
    **kwargs
)

fli formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

flv_video

def flv_video(
    data,
    **kwargs
)

flv formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

gbr_image

def gbr_image(
    data,
    **kwargs
)

gbr formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

gd_image

def gd_image(
    data,
    **kwargs
)

gd formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

gif_image

def gif_image(
    data,
    **kwargs
)

gif formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

html

def html(
    content,
    **kwargs
)

HTML (Hypertext Markup Language)

View Source
@content_type("text/html; charset=utf-8")

def html(content, **kwargs):

    """HTML (Hypertext Markup Language)"""

    if hasattr(content, "read"):

        return content

    elif hasattr(content, "render"):

        return content.render().encode("utf8")

    return str(content).encode("utf8")

icns_image

def icns_image(
    data,
    **kwargs
)

icns formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

ico_image

def ico_image(
    data,
    **kwargs
)

ico formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

im_image

def im_image(
    data,
    **kwargs
)

im formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

image

def image(
    image_format,
    doc=None
)

Dynamically creates an image type handler for the specified image type

View Source
def image(image_format, doc=None):

    """Dynamically creates an image type handler for the specified image type"""

    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

    image_handler.__doc__ = doc or "{0} formatted image".format(image_format)

    return image_handler

imt_image

def imt_image(
    data,
    **kwargs
)

imt formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

iptc_image

def iptc_image(
    data,
    **kwargs
)

iptc formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

jpeg_image

def jpeg_image(
    data,
    **kwargs
)

jpeg formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

jpg_image

def jpg_image(
    data,
    **kwargs
)

jpg formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

json

def json(
    content,
    request=None,
    response=None,
    ensure_ascii=False,
    **kwargs
)

JSON (Javascript Serialized Object Notation)

View Source
@content_type("application/json; charset=utf-8")

def json(content, request=None, response=None, ensure_ascii=False, **kwargs):

    """JSON (Javascript Serialized Object Notation)"""

    if hasattr(content, "read"):

        return content

    if isinstance(content, tuple) and getattr(content, "_fields", None):

        content = {field: getattr(content, field) for field in content._fields}

    return json_converter.dumps(

        content, default=_json_converter, ensure_ascii=ensure_ascii, **kwargs

    ).encode("utf8")

json_camelcase

def json_camelcase(
    content,
    **kwargs
)

JSON (Javascript Serialized Object Notation) with all keys camelCased

View Source
@content_type("application/json; charset=utf-8")

def json_camelcase(content, **kwargs):

    """JSON (Javascript Serialized Object Notation) with all keys camelCased"""

    return json(_camelcase(content), **kwargs)

json_convert

def json_convert(
    *kinds
)

Registers the wrapped method as a JSON converter for the provided types.

NOTE: custom converters are always globally applied

View Source
def json_convert(*kinds):

    """Registers the wrapped method as a JSON converter for the provided types.

    NOTE: custom converters are always globally applied

    """

    def register_json_converter(function):

        for kind in kinds:

            json_converters[kind] = function

        return function

    return register_json_converter

m3u8_video

def m3u8_video(
    data,
    **kwargs
)

m3u8 formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

mcidas_image

def mcidas_image(
    data,
    **kwargs
)

mcidas formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

mov_video

def mov_video(
    data,
    **kwargs
)

mov formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

mp4_video

def mp4_video(
    data,
    **kwargs
)

mp4 formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

mpo_image

def mpo_image(
    data,
    **kwargs
)

mpo formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

msp_image

def msp_image(
    data,
    **kwargs
)

msp formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

naa_image

def naa_image(
    data,
    **kwargs
)

naa formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

numpy_boolable

def numpy_boolable(
    item
)
View Source
    @json_convert(numpy.bool_)

    def numpy_boolable(item):

        return bool(item)

numpy_byte_decodeable

def numpy_byte_decodeable(
    item
)
View Source
    @json_convert(numpy.bytes_)

    def numpy_byte_decodeable(item):

        return item.decode()

numpy_floatable

def numpy_floatable(
    item
)
View Source
    @json_convert(float, numpy.floating)

    def numpy_floatable(item):

        return float(item)

numpy_integerable

def numpy_integerable(
    item
)
View Source
    @json_convert(numpy.integer)

    def numpy_integerable(item):

        return int(item)

numpy_listable

def numpy_listable(
    item
)
View Source
    @json_convert(numpy.ndarray)

    def numpy_listable(item):

        return item.tolist()

numpy_stringable

def numpy_stringable(
    item
)
View Source
    @json_convert(str, numpy.unicode_)

    def numpy_stringable(item):

        return str(item)

on_content_type

def on_content_type(
    handlers,
    default=None,
    error='The requested content type does not match any of those allowed'
)

Returns a content in a different format based on the clients provided content type, should pass in a dict with the following format:

 {'[content-type]': action,
  ...
 }
View Source
def on_content_type(

    handlers, default=None, error="The requested content type does not match any of those allowed"

):

    """Returns a content in a different format based on the clients provided content type,

       should pass in a dict with the following format:

            {'[content-type]': action,

             ...

            }

    """

    def output_type(data, request, response):

        handler = handlers.get(request.content_type.split(";")[0], default)

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ or function.__name__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

on_valid

def on_valid(
    valid_content_type,
    on_invalid=<function json at 0x7f7ee3199840>
)

Renders as the specified content type only if no errors are found in the provided data object

View Source
def on_valid(valid_content_type, on_invalid=json):

    """Renders as the specified content type only if no errors are found in the provided data object"""

    invalid_kwargs = introspect.generate_accepted_kwargs(on_invalid, "request", "response")

    invalid_takes_response = introspect.takes_all_arguments(on_invalid, "response")

    def wrapper(function):

        valid_kwargs = introspect.generate_accepted_kwargs(function, "request", "response")

        valid_takes_response = introspect.takes_all_arguments(function, "response")

        @content_type(valid_content_type)

        @wraps(function)

        def output_content(content, response, **kwargs):

            if type(content) == dict and "errors" in content:

                response.content_type = on_invalid.content_type

                if invalid_takes_response:

                    kwargs["response"] = response

                return on_invalid(content, **invalid_kwargs(kwargs))

            if valid_takes_response:

                kwargs["response"] = response

            return function(content, **valid_kwargs(kwargs))

        return output_content

    return wrapper

pcd_image

def pcd_image(
    data,
    **kwargs
)

pcd formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

pcx_image

def pcx_image(
    data,
    **kwargs
)

pcx formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

png_image

def png_image(
    data,
    **kwargs
)

png formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

ppm_image

def ppm_image(
    data,
    **kwargs
)

ppm formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

prefix

def prefix(
    handlers,
    default=None,
    error='The requested prefix does not match any of those allowed'
)

Returns a content in a different format based on the prefix placed at the end of the URL route should pass in a dict with the following format:

 {'[prefix]': action,
  ...
 }
View Source
def prefix(

    handlers, default=None, error="The requested prefix does not match any of those allowed"

):

    """Returns a content in a different format based on the prefix placed at the end of the URL route

       should pass in a dict with the following format:

            {'[prefix]': action,

             ...

            }

    """

    def output_type(data, request, response):

        path = request.path

        handler = default

        for prefix_test, prefix_handler in handlers.items():

            if path.startswith(prefix_test):

                handler = prefix_handler

                break

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

pretty_json

def pretty_json(
    content,
    **kwargs
)

JSON (Javascript Serialized Object Notion) pretty printed and indented

View Source
@content_type("application/json; charset=utf-8")

def pretty_json(content, **kwargs):

    """JSON (Javascript Serialized Object Notion) pretty printed and indented"""

    return json(content, indent=4, separators=(",", ": "), **kwargs)

psd_image

def psd_image(
    data,
    **kwargs
)

psd formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

sgi_image

def sgi_image(
    data,
    **kwargs
)

sgi formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

spider_image

def spider_image(
    data,
    **kwargs
)

spider formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

suffix

def suffix(
    handlers,
    default=None,
    error='The requested suffix does not match any of those allowed'
)

Returns a content in a different format based on the suffix placed at the end of the URL route should pass in a dict with the following format:

 {'[suffix]': action,
  ...
 }
View Source
def suffix(

    handlers, default=None, error="The requested suffix does not match any of those allowed"

):

    """Returns a content in a different format based on the suffix placed at the end of the URL route

       should pass in a dict with the following format:

            {'[suffix]': action,

             ...

            }

    """

    def output_type(data, request, response):

        path = request.path

        handler = default

        for suffix_test, suffix_handler in handlers.items():

            if path.endswith(suffix_test):

                handler = suffix_handler

                break

        if not handler:

            raise falcon.HTTPNotAcceptable(error)

        response.content_type = handler.content_type

        return handler(data, request=request, response=response)

    output_type.__doc__ = "Supports any of the following formats: {0}".format(

        ", ".join(function.__doc__ for function in handlers.values())

    )

    output_type.content_type = ", ".join(handlers.keys())

    return output_type

svg_image

def svg_image(
    data,
    **kwargs
)

svg formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

svg_xml_image

def svg_xml_image(
    data,
    **kwargs
)

svg+xml formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

text

def text(
    content,
    **kwargs
)

Free form UTF-8 text

View Source
@content_type("text/plain; charset=utf-8")

def text(content, **kwargs):

    """Free form UTF-8 text"""

    if hasattr(content, "read"):

        return content

    return str(content).encode("utf8")

tga_image

def tga_image(
    data,
    **kwargs
)

tga formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

tiff_image

def tiff_image(
    data,
    **kwargs
)

tiff formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

ts_video

def ts_video(
    data,
    **kwargs
)

ts formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

video

def video(
    video_type,
    video_mime,
    doc=None
)

Dynamically creates a video type handler for the specified video type

View Source
def video(video_type, video_mime, doc=None):

    """Dynamically creates a video type handler for the specified video type"""

    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

    video_handler.__doc__ = doc or "{0} formatted video".format(video_type)

    return video_handler

wal_image

def wal_image(
    data,
    **kwargs
)

wal formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

webp_image

def webp_image(
    data,
    **kwargs
)

webp formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

wmv_video

def wmv_video(
    data,
    **kwargs
)

wmv formatted video

View Source
    @on_valid(video_mime)

    def video_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            data.save(output, format=video_type.upper())

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

xbm_image

def xbm_image(
    data,
    **kwargs
)

xbm formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

xpm_image

def xpm_image(
    data,
    **kwargs
)

xpm formatted image

View Source
    @on_valid("image/{0}".format(image_format))

    def image_handler(data, **kwargs):

        if hasattr(data, "read"):

            return data

        elif hasattr(data, "save"):

            output = stream()

            if introspect.takes_all_arguments(data.save, "format") or introspect.takes_kwargs(

                data.save

            ):

                data.save(output, format=image_format.upper())

            else:

                data.save(output)

            output.seek(0)

            return output

        elif hasattr(data, "render"):

            return data.render()

        elif os.path.isfile(data):

            return open(data, "rb")

Classes

stream

class stream(
    /,
    *args,
    **kwargs
)

Buffered I/O implementation using an in-memory bytes buffer.

Ancestors (in MRO)

  • _io._BufferedIOBase
  • _io._IOBase

Class variables

closed

Methods

close
def close(
    self,
    /
)

Disable all I/O operations.

detach
def detach(
    self,
    /
)

Disconnect this buffer from its underlying raw stream and return it.

After the raw stream has been detached, the buffer is in an unusable state.

fileno
def fileno(
    self,
    /
)

Returns underlying file descriptor if one exists.

OSError is raised if the IO object does not use a file descriptor.

flush
def flush(
    self,
    /
)

Does nothing.

getbuffer
def getbuffer(
    self,
    /
)

Get a read-write view over the contents of the BytesIO object.

getvalue
def getvalue(
    self,
    /
)

Retrieve the entire contents of the BytesIO object.

isatty
def isatty(
    self,
    /
)

Always returns False.

BytesIO objects are not connected to a TTY-like device.

read
def read(
    self,
    size=-1,
    /
)

Read at most size bytes, returned as a bytes object.

If the size argument is negative, read until EOF is reached. Return an empty bytes object at EOF.

read1
def read1(
    self,
    size=-1,
    /
)

Read at most size bytes, returned as a bytes object.

If the size argument is negative or omitted, read until EOF is reached. Return an empty bytes object at EOF.

readable
def readable(
    self,
    /
)

Returns True if the IO object can be read.

readinto
def readinto(
    self,
    buffer,
    /
)

Read bytes into buffer.

Returns number of bytes read (0 for EOF), or None if the object is set not to block and has no data to read.

readinto1
def readinto1(
    self,
    buffer,
    /
)
readline
def readline(
    self,
    size=-1,
    /
)

Next line from the file, as a bytes object.

Retain newline. A non-negative size argument limits the maximum number of bytes to return (an incomplete line may be returned then). Return an empty bytes object at EOF.

readlines
def readlines(
    self,
    size=None,
    /
)

List of bytes objects, each a line from the file.

Call readline() repeatedly and return a list of the lines so read. The optional size argument, if given, is an approximate bound on the total number of bytes in the lines returned.

seek
def seek(
    self,
    pos,
    whence=0,
    /
)

Change stream position.

Seek to byte offset pos relative to position indicated by whence: 0 Start of stream (the default). pos should be >= 0; 1 Current position - pos may be negative; 2 End of stream - pos usually negative. Returns the new absolute position.

seekable
def seekable(
    self,
    /
)

Returns True if the IO object can be seeked.

tell
def tell(
    self,
    /
)

Current file position, an integer.

truncate
def truncate(
    self,
    size=None,
    /
)

Truncate the file to at most size bytes.

Size defaults to the current file position, as returned by tell(). The current file position is unchanged. Returns the new size.

writable
def writable(
    self,
    /
)

Returns True if the IO object can be written.

write
def write(
    self,
    b,
    /
)

Write bytes to file.

Return the number of bytes written.

writelines
def writelines(
    self,
    lines,
    /
)

Write lines to the file.

Note that newlines are not added. lines can be any iterable object producing bytes-like objects. This is equivalent to calling write() for each element.