Source code for hgflask.server

__all__ = ['start']

# -*- coding: utf-8 -*-
import cytoolz as toolz
import functools as ft
import json
import requests
import slugid
import multiprocessing as mp
import sh

import hgtiles.bed2ddb as hgb2
import hgtiles.cooler as hgco
import hgtiles.hitile as hghi
import hgtiles.bigwig as hgbi
import hgtiles.files as hgfi

import os
import os.path as op
import sys
import multiprocessing as mp
import time

from flask import Flask
from flask import request, jsonify
#from flask_restful import reqparse, abort, Api, Resource
from flask_cors import CORS

from fuse import FUSE


def create_app(tilesets):
    app = Flask(__name__)
    CORS(app)

    TILESETS = tilesets

    #############
    ### VIEWS ###
    #############


    @app.route('/api/v1/')
    def hello():
        return("Hello World!")


    '''
    CHROMSIZES = {
        'hg19': {
            "chr1": {"size": 249250621}, 
            "chr2": {"size": 243199373}, 
            "chr3": {"size": 198022430}, 
            "chr4": {"size": 191154276}, 
            "chr5": {"size": 180915260}, 
            "chr6": {"size": 171115067}, 
            "chr7": {"size": 159138663}, 
            "chr8": {"size": 146364022}, 
            "chr9": {"size": 141213431}, 
            "chr10": {"size": 135534747}, 
            "chr11": {"size": 135006516}, 
            "chr12": {"size": 133851895}, 
            "chr13": {"size": 115169878}, 
            "chr14": {"size": 107349540}, 
            "chr15": {"size": 102531392}, 
            "chr16": {"size": 90354753}, 
            "chr17": {"size": 81195210}, 
            "chr18": {"size": 78077248}, 
            "chr19": {"size": 59128983}, 
            "chr20": {"size": 63025520}, 
            "chr21": {"size": 48129895}, 
            "chr22": {"size": 51304566}, 
            "chrX": {"size": 155270560}, 
            "chrY": {"size": 59373566}, 
            "chrM": {"size": 16571},
        },
    }

    @app.route('/api/v1/available-chrom-sizes/', methods=['GET'])
    def available_chrom_sizes():
        return jsonify({
            "count": len(CHROMSIZES), 
            "results": {i: CHROMSIZES[i] for i in range(len(CHROMSIZES))}
        })
    '''


    @app.route('/api/v1/chrom-sizes/', methods=['GET'])
    def chrom_sizes():
        uuid = request.args.get('id', None)
        res_type = request.args.get('type', 'tsv')
        incl_cum = request.args.get('cum', False)

        ts = next((ts for ts in TILESETS if ts.uuid == uuid), None)

        if ts is None:
            return jsonify({"error": "Not found"}), 404
        
        data = ts.chromsizes()
        
        new_data = []
        
        if incl_cum:
            cum = 0
            for (chrom, size) in data:
                new_data += [(chrom, size, cum)]
                cum += size 

            for chrom in data.keys():   # dictionaries in py3.6+ are ordered!
                data[chrom]['offset'] = cum
                cum += data[chrom]['size']

        if res_type == 'json':
            # should return
            # { uuid: {
            #   'chr1': {'size': 2343, 'offset': 0},
            #
            if incl_cum:
                j = {
                        ts.uuid: dict([(chrom, { 'size': size, 'offset': offset }) for 
                            (chrom, size, offset) in data])
                    }
            else:
                j = {
                        ts.uuid: dict([(chrom, { 'size': size }) for 
                            (chrom, size) in data])
                    }
            return jsonify(j)

        elif res_type == 'tsv':
            if incl_cum:
               return '\n'.join('{}\t{}\t{}'.format(chrom, size, cum)
                       for chrom, size, cum in data)
            else:
                return '\n'.join('{}\t{}'.format(chrom, size)
                    for chrom, size in data)

        else:
            return jsonify({"error": "Unknown response type"}), 500


    @app.route('/api/v1/uids_by_filename/', methods=['GET'])
    def uids_by_filename():
        return jsonify({
            "count": len(TILESETS), 
            "results": {i: TILESETS[i] for i in range(len(TILESETS))}
        })


    @app.route('/api/v1/tilesets/', methods=['GET'])
    def tilesets():
        return jsonify({
            "count": len(TILESETS),
            "next": None,
            "previous": None,
            "results": TILESETS,
        })

    def get_filepath(tileset_def):
        '''
        Get the filepath from a tileset definition

        Parameters
        ----------
        tileset_def: { 'filepath': ..., 'uid': ..., 'filetype': ...}
            The tileset definition     
        returns: string
            The filepath, either as specified in the tileset_def or
            None
        '''
        if 'filepath' in tileset_def:
            filepath = tileset_def['filepath']
            
            if filepath[:7] == 'http://':
                filepath = http_directory + '//' + filepath[5:] + ".."
            if filepath[:8] == 'https://':
                filepath = https_directory + '//' + filepath[6:] + ".."
            
            return filepath

        return None

    def get_filetype(tileset_def):
        '''
        Get the filetype for the given dataset.

        Parameters
        ----------
        tileset_def: { 'filepath': ..., 'uid': ..., 'filetype': ...}
            The tileset definition     
        returns: string
            The filetype, either as specified in the tileset_def or
            inferred
        '''
        if 'filetype' in tileset_def:
            return tileset_def['filetype']

        if 'filepath' not in tileset_def:
            return None

        return hgfi.infer_filetype(tileset_def['filepath'])

    @app.route('/api/v1/tileset_info/', methods=['GET'])
    def tileset_info():
        uuids = request.args.getlist("d")

        info = {}
        for uuid in uuids:
            ts = next((ts for ts in TILESETS if ts.uuid == uuid), None)
            
            if ts is not None:
                info[uuid] = ts.tileset_info()
            else:
                info[uuid] = {
                    'error': 'No such tileset with uid: {}'.format(uuid)
                }

        return jsonify(info)


    @app.route('/api/v1/tiles/', methods=['GET'])
    def tiles():
        tids_requested = set(request.args.getlist("d"))
        
        if not tids_requested:
            return jsonify({'error': 'No tiles requested'}), 400
        
        extract_uuid = lambda tid: tid.split('.')[0]
        uuids_to_tids = toolz.groupby(extract_uuid, tids_requested)
        
        tiles = []
        for uuid, tids in uuids_to_tids.items():
            ts = next((ts for ts in TILESETS if ts.uuid == uuid), None)
            tiles.extend(ts.tiles(tids))
        data = {tid: tval for tid, tval in tiles}
        return jsonify(data)

    # if __name__ == '__main__':
    #     app.run(debug=True, port=5000)

        # import threading
        # from functools import partial
        # t = threading.Thread(target=partial(app.run, debug=True, port=5000))

    return app

def get_open_port():
        import socket
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(("",0))
        s.listen(1)
        port = s.getsockname()[1]
        s.close()
        return port

class RunningServer():
    def __init__(self, port, process, host='localhost'):
        '''
        Maintain a reference to a running higlass server

        Parameters:
        ----------
        port: int
            The port that this server is running on
        process: Popen.process
            The process running the server
        '''
        self.host = 'localhost'
        self.port = port
        self.process = process

    def tileset_info(self, uid):
        '''
        Return the tileset info for the given tileset
        '''
        url = 'http://localhost:{port}/api/v1/tileset_info/?d={uid}'.format(
                port=self.port, uid=uid)

        req = requests.get(url)
        if req.status_code != 200:
            raise Exception('Error fetching tileset_info:', req.content)

        content = json.loads(req.content)
        return content[uid]

    def tiles(self, uid, z, x, y=None):
        '''
        Return tiles from the specified dataset (uid) at
        the given position (z,x,[u])
        '''
        tile_id ='{uid}.{z}.{x}'.format(uid=uid, z=z, x=x)
        if y is not None:
            tile_id += '.{y}'.format(y=y)
        url = 'http://localhost:{port}/api/v1/tiles/?d={tile_id}'.format(
                port=self.port, tile_id=tile_id)

        req = requests.get(url)
        if req.status_code != 200:
            raise Exception('Error fetching tile:', req.content)

        content = json.loads(req.content)
        return content[tile_id]

    def chromsizes(self, uid):
        '''
        Return the chromosome sizes from the given filename
        '''
        url = 'http://localhost:{port}/api/v1/chrom-sizes/?id={uid}'.format(
                port=self.port,
                uid=uid)

        req = requests.get(url)
        if req.status_code != 200:
            raise Exception('Error fetching chromsizes:', req.content)

        return req.content

    def stop(self):
        '''
        Stop this server so that the calling process can exit
        '''
        # unsetup_fuse()

        self.process.terminate()

    @property
    def api_address(self):
        return 'http://{}:{}/api/v1'.format(self.host, self.port)

'''
Keep track of the server processes that have been started.
So that when someone says 'start', the old ones are terminated
'''
processes = {}

http_directory = '/tmp/hgflask/http'
https_directory = '/tmp/hgflask/https'
diskcache_directory = '/tmp/hgflask/dc'

def unsetup_fuse():
    global http_directory 
    global https_directory

    try:
        sh.umount(http_directory)
    except Exception as ex:
        pass

    try:
        sh.umount(https_directory)
    except Exception as ex:
        pass


def setup_fuse(tmp_dir):
    '''
    Set up filesystem in user space for http and https 
    so that we can retrieve tiles from remote sources.
    
    Parameters 
    ----------
    tmp_dir: string 
        The temporary directory where to create the 
        http and https directories 
    '''
    import hgflask.httpfs as hht

    global processes
    global http_directory 
    global https_directory

    http_directory = op.join(tmp_dir, 'http')
    https_directory = op.join(tmp_dir, 'https')
    diskcache_directory = op.join(tmp_dir, 'dc')

    if not op.exists(http_directory):
        os.makedirs(http_directory)
    if not op.exists(https_directory):
        os.makedirs(https_directory)
    if not op.exists(diskcache_directory):
        os.makedirs(diskcache_directory)

    try:
        sh.umount(http_directory)
    except Exception as ex:
        pass

    try:
        sh.umount(https_directory)
    except Exception as ex:
        pass

    disk_cache_size = 2**25
    disk_cache_dir = diskcache_directory
    lru_capacity = 400
    print("diskcache_directory", diskcache_directory, op.exists(diskcache_directory))

    def start_fuse(directory):
        print("starting fuse")
        fuse = FUSE(
            hht.HttpFs('http',
                   disk_cache_size=disk_cache_size,
                   disk_cache_dir=diskcache_directory,
                   lru_capacity=lru_capacity,
                ), 
            directory,
            foreground=False
        )

    thread = mp.Process(target = start_fuse, args=[http_directory])
    thread.start()
    thread.join()
    # start_fuse(http_directory)

    '''
    fuse = FUSE(
        HttpFs('https',
               disk_cache_size=disk_cache_size,
               disk_cache_dir=disk_cache_dir,
               lru_capacity=lru_capacity,
            ), 
        https_directory,
        foreground=False
    )
    '''


[docs]def start(tilesets, port=None, tmp_dir='/tmp/hgflask'): ''' Start the hgflask server. If a port is not specified, an open port will be automatically selected. Parameters ---------- tilesets: object The list of tilesets to serve. For example: .. code-block:: python import hgflask.tilesets as hfti tilesets = [ hfti.cooler('mycooler.cool', hfti.bigwig('mybigwig.bigWig', ] port: int The port to start this server on. If it is None, a port will automatically be assigned. tmp_dir: string A temporary directory to be used for mounting http files (experimental) ''' # fuse = setup_fuse(tmp_dir) to_delete = [] # simmple integrity check """ for tileset in tilesets: try: if ('filepath' not in tileset and ('filetype' in tileset and tileset['filetype'] not in filetype_handlers) and 'handlers' not in tileset): print("WARNING: tileset missing filepath or filetype handler", tileset) except TypeError: print("ERROR: Are you sure the list of tilesets is a list?") raise if 'filepath' in tileset: tileset['filepath'] = op.expanduser(tileset['filepath']) """ for puid in processes: print("terminating:", puid) processes[puid].terminate() to_delete += [puid] for puid in to_delete: del processes[puid] # we're going to assign a uuid to each server process so that if anything # goes wrong, the variable referencing the process doesn't get lost app = create_app(tilesets=tilesets)# we're going to assign a uuid to each server process so that if anything # goes wrong, the variable referencing the process doesn't get lost app = create_app(tilesets=tilesets) port=get_open_port() if port is None else port uuid = slugid.nice().decode('utf8') processes[uuid] = mp.Process( target=ft.partial(app.run, threaded=True, debug=True, port=port, host='0.0.0.0', use_reloader=False)) processes[uuid].start() connected = False while not connected: try: ret = requests.get('http://localhost:{}/api/v1/tileset_info/?d=a'.format(port)) print('ret:', ret.status_code, ret.content) connected = True except Exception as err: print('sleeping') time.sleep(.2) pass print("returning") return RunningServer(port, processes[uuid])