#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importfunctoolsimporthashlibimportjsonimportloggingimportosimportzipfilefromcopyimportdeepcopyfromioimportBytesIOimportyamlfromgraphscope.framework.contextimportcreate_context_nodefromgraphscope.framework.dagimportDAGNodefromgraphscope.framework.dag_utilsimportbind_appfromgraphscope.framework.dag_utilsimportcreate_appfromgraphscope.framework.dag_utilsimportunload_appfromgraphscope.framework.errorsimportInvalidArgumentErrorfromgraphscope.framework.errorsimportcheck_argumentfromgraphscope.framework.utilsimportgraph_type_to_cpp_classfromgraphscope.protoimportgraph_def_pb2logger=logging.getLogger("graphscope")DEFAULT_GS_CONFIG_FILE=".gs_conf.yaml"defproject_to_simple(func):"""Decorator to project a property graph to the simple graph. Default to uses `weight` as edge data key to correspond to the edge weight, and uses `attribute` as node data key to correspond to the node attribute. Examples: >>> @project_to_simple >>> def sssp(G, src, weight="dist") >>> pass """@functools.wraps(func)defwrapper(*args,**kwargs):graph=args[0]ifnothasattr(graph,"graph_type"):raiseInvalidArgumentError("Missing graph_type attribute in graph object.")ifgraph.graph_type==graph_def_pb2.ARROW_PROPERTY:if"weight"inkwargs:# func has 'weight' argumentweight=kwargs.get("weight",None)projected=graph._project_to_simple(e_prop=weight)elif"attribute"inkwargs:# func has 'attribute' argumentattribute=kwargs.get("attribute",None)projected=graph._project_to_simple(v_prop=attribute)else:projected=graph._project_to_simple()projected._base_graph=graphelse:projected=graphreturnfunc(projected,*args[1:],**kwargs)returnwrapperdefnot_compatible_for(*graph_types):"""Decorator to mark builtin algorithms as not compatible with graph. Args: graph_types: list of string Entries must be one of 'arrow_property', 'dynamic_property', 'arrow_projected', 'dynamic_projected', 'directed', 'undirected' Returns: The decorated function. Raises: RuntimeError: If graph is not compatible. KeyError: If parameter is not correctly. Notes: Multiple types or use multiple @not_compatible_for() lines are joined logically with "or". Examples: >>> @not_compatible_for('arrow_property', 'dynamic_property') >>> def sssp(G, src, weight="dist"): >>> pass """def_not_compatible_for(not_compatible_for_func,*args,**kwargs):@functools.wraps(not_compatible_for_func)defwrapper(*args,**kwargs):graph=args[0]ifnothasattr(graph,"graph_type"):raiseInvalidArgumentError("Missing graph_type attribute in graph object.")terms={"arrow_property":graph.graph_type==graph_def_pb2.ARROW_PROPERTY,"dynamic_property":graph.graph_type==graph_def_pb2.DYNAMIC_PROPERTY,"arrow_projected":graph.graph_type==graph_def_pb2.ARROW_PROJECTED,"dynamic_projected":graph.graph_type==graph_def_pb2.DYNAMIC_PROJECTED,"arrow_flattened":graph.graph_type==graph_def_pb2.ARROW_FLATTENED,"directed":graph.is_directed(),"undirected":notgraph.is_directed(),}matched,tag=False,""try:fortingraph_types:ifterms[t]:matched,tag=True,tbreakexceptKeyError:raiseInvalidArgumentError("Use one or more of arrow_property,dynamic_property,""arrow_projected,dynamic_projected,arrow_flattened,directed,undirected",)ifmatched:raiseInvalidArgumentError("Algorithm '%s' isn't compatible for '%s' graphs"%(not_compatible_for_func.__name__,tag))else:returnnot_compatible_for_func(*args,**kwargs)returnwrapperreturn_not_compatible_for
[docs]classAppAssets(DAGNode):"""A class represents an app asset node in a DAG that holds the bytes of the gar resource. Assets includes an algorithm name, and gar (for user defined algorithm), a context type (one of 'tensor', 'vertex_data', 'vertex_property', 'labeled_vertex_data', 'dynamic_vertex_data', 'labeled_vertex_property'), and its type (one of `cpp_pie`, `cpp_pregel`, `cython_pie`, `cython_pregel`), The instance of this class can be passed to init :class:`graphscope.framework.app.AppDAGNode` """_support_context_type=["tensor","vertex_data","vertex_property","labeled_vertex_data","dynamic_vertex_data","labeled_vertex_property",]
[docs]def__init__(self,algo=None,context=None,gar=None,cmake_extra_options=None):"""Init assets of the algorithm. Args: algo (str): Represent specific algo inside resource. context (str): Type of context that hold the calculation results. It will get from gar if param is None. Defaults to None. gar (bytes or BytesIO, optional): The bytes that encodes the application's source code. Defaults to None. """self._algo=algoself._context_type=contextifisinstance(self._algo,str)and("giraph:"inself._algoor"java_pie:"inself._algo):self._type="java_pie"else:self._type="cpp_pie"# default is builtin app with `built_in` typeself._meta={}# used for gar resourceifgarisnotNoneandisinstance(gar,(BytesIO,bytes)):self._gar=garifisinstance(gar,bytes)elsegar.getvalue()self._extract_meta_info()else:# built_in apps has no gar resource.self._gar=Noneself._cmake_extra_options=cmake_extra_optionsifself._context_typenotinself._support_context_type:raiseInvalidArgumentError("Unsupport context type: {0}".format(self._context_type))self._op=create_app(self)
def__repr__(self)->str:returnf"graphscope.framework.app.AppAssets <type: {self._type}, algo: {self._algo}, context: {self._context_type}>"def_extract_meta_info(self):"""Extract app meta info from gar resource. Raises: InvalidArgumentError: - :code:`gs_conf.yaml` not exist in gar resource. - App not found in gar resource. """fp=BytesIO(self._gar)archive=zipfile.ZipFile(fp,"r")config=yaml.safe_load(archive.read(DEFAULT_GS_CONFIG_FILE))# default app will used if there is only one app in itifself._algoisNoneandlen(config["app"])==1:self._algo=config["app"][0]["algo"]logger.info("Default app %s will be used.",self._algo)formetainconfig["app"]:ifself._algo==meta["algo"]:if"context_type"inmeta:self._context_type=meta["context_type"]self._type=meta["type"]self._meta=metareturnraiseInvalidArgumentError("App not found in gar: {}".format(self._algo))@propertydefalgo(self):"""Algorithm name, e.g. sssp, pagerank. Returns: str: Algorithm name of this asset. """returnself._algo@propertydefcontext_type(self):"""Context type, e.g. vertex_property, labeled_vertex_data. Returns: str: Type of the app context. """returnself._context_type@propertydeftype(self):"""Algorithm type, one of `cpp_pie`, `cpp_pregel`, `cython_pie`, `java_pie` or `cython_pregel`. Returns: str: Algorithm type of this asset. """returnself._type@propertydefgar(self):"""Gar resource. Returns: bytes: gar resource of this asset. """returnself._gar@classmethoddefto_gar(cls,path):ifos.path.exists(path):raiseRuntimeError("Path exist: {}.".format(path))withopen(path,"wb")asf:f.write(cls._gar)@classmethoddefbytes(cls):returncls._gar@propertydefcmake_extra_options(self):returnself._cmake_extra_options@propertydefsignature(self):"""Generate a signature of the app assets by its algo name (and gar resources). Used to uniquely identify a app assets. Returns: str: signature of this assets """s=hashlib.sha256()s.update(self._algo.encode("utf-8",errors="ignore"))ifself._gar:s.update(self._gar)returns.hexdigest()
[docs]defis_compatible(self,graph):"""Determine if this algorithm can run on this type of graph. Args: graph (:class:`GraphDAGNode`): A graph instance. Raises: InvalidArgumentError: - App is not compatible with graph ScannerError: - Yaml file format is incorrect. """# builtin appifself._garisNone:return# check yaml filegraph_type=graph_type_to_cpp_class(graph.graph_type)ifgraph_typenotinself._meta["compatible_graph"]:raiseInvalidArgumentError("App is uncompatible with graph {}".format(graph_type))returnTrue
def__call__(self,graph,*args,**kwargs):"""Instantiate an App and do queries over it."""app_=graph.session._wrapper(AppDAGNode(graph,self))returnapp_(*args,**kwargs)
[docs]classAppDAGNode(DAGNode):"""A class represents a app node in a DAG. In GraphScope, an app node binding a concrete graph node that query executed on. """def__init__(self,graph,app_assets:AppAssets):"""Create an application using given :code:`gar` file, or given application class name. Args: graph (:class:`GraphDAGNode`): A :class:`GraphDAGNode` instance. app_assets: A :class:`AppAssets` instance. """self._graph=graphself._app_assets=app_assetsself._session=graph.sessionself._app_assets.is_compatible(self._graph)self._op=bind_app(graph,self._app_assets)# add app_assets op to dag is not existifnotself._session.dag.exists(self._app_assets.op):self._session.dag.add_op(self._app_assets.op)# add op to dagself._session.dag.add_op(self._op)# statically create the unload op to prevent a possible segmentation fault# inside the protobuf library.self._unload_op=unload_app(self)def__repr__(self):s=f"graphscope.App <type: {self._app_assets.type}, algorithm: {self._app_assets.algo} "s+=f"bounded_graph: {str(self._graph)}>"returns@propertydefalgo(self):"""Algorithm name, e.g. sssp, pagerank. Returns: str: Algorithm name of this asset. """returnself._app_assets.algo@propertydefgar(self):"""Gar resource. Returns: bytes: gar resource of this asset. """returnself._app_assets.gardef__call__(self,*args,**kwargs):"""When called, check arguments based on app type, Then do build and query. Raises: InvalidArgumentError: If app_type is None, or positional argument found when app_type not `cpp_pie`. Returns: :class:`Context`: Query context, include running results of the app. """app_type=self._app_assets.typecheck_argument(app_typeisnotNone)context_type=self._app_assets.context_typeifnotisinstance(self._graph,DAGNode)andnotself._graph.loaded():raiseRuntimeError("The graph is not loaded")ifself._app_assets.typein["cpp_pregel","cython_pie","cython_pregel","java_pie",]:# cython app support kwargs onlycheck_argument(notargs,"Only support using keyword arguments in cython app.")returncreate_context_node(context_type,self,self._graph,json.dumps(kwargs))returncreate_context_node(context_type,self,self._graph,*args,**kwargs)def__del__(self):try:self.session.run(self._unload())exceptException:# pylint: disable=broad-exceptpassdef_unload(self):"""Unload this app from graphscope engine. Returns: :class:`graphscope.framework.app.UnloadedApp`: Evaluated in eager mode. """returnUnloadedApp(self._session,self._unload_op)
[docs]classApp(object):"""An application that can run on graphs and produce results. Analytical engine will build the app dynamic library when instantiate a app instance. And the dynamic library will be reused if subsequent app's signature matches one of previous ones. """
[docs]def__init__(self,app_node,key):self._app_node=app_nodeself._session=self._app_node.sessionself._key=key# copy and set op evaluatedself._app_node.op=deepcopy(self._app_node.op)self._app_node.evaluated=Trueself._app_node._unload_op=unload_app(self._app_node)self._session.dag.add_op(self._app_node.op)self._saved_signature=self.signature
def__getattr__(self,name):ifhasattr(self._app_node,name):returngetattr(self._app_node,name)raiseAttributeError("{0} not found.".format(name))@propertydefkey(self):"""A unique identifier of App."""returnself._key@propertydefsignature(self):"""Signature is computed by all critical components of the App."""returnhashlib.sha256("{}.{}".format(self._app_assets.signature,self._graph.template_str).encode("utf-8",errors="ignore")).hexdigest()def_unload(self):returnself._session._wrapper(self._app_node._unload())
[docs]def__del__(self):"""Unload app. Both on engine side and python side. Set the key to None."""try:self.session.run(self._unload())exceptException:# pylint: disable=broad-exceptpass
classUnloadedApp(DAGNode):"""Unloaded app node in a DAG."""def__init__(self,session,op):self._session=sessionself._op=op# add op to dagself._session.dag.add_op(self._op)
[docs]defload_app(gar=None,algo=None,context=None,**kwargs):"""Load an app from gar. Args: algo: str Algo name inside resource. None will extract name from gar resource if there is only one app in it. gar: bytes or BytesIO or str str represent the path of resource, bytes or the resource of the specified path or bytes. For java apps, gar can be none to indicate we should find the app in previouse added libs. Returns: Instance of <graphscope.framework.app.AppAssets> Raises: FileNotFoundError: File not exist. PermissionError: Permission denied of path. TypeError: File is not a zip file. Examples: >>> sssp = load_app(gar='./resource.gar', algo='sssp') >>> sssp(src=4) which will have following `.gs_conf.yaml` in resource.gar: app: - algo: sssp type: cpp_pie class_name: grape:SSSP context_type: vertex_data src: sssp/sssp.h compatible_graph: - gs::ArrowProjectedFragment """ifisinstance(gar,(BytesIO,bytes)):returnAppAssets(algo,context,gar,**kwargs)elifisinstance(gar,str):withopen(gar,"rb")asf:content=f.read()ifnotzipfile.is_zipfile(gar):raiseInvalidArgumentError("{} is not a zip file.".format(gar))returnAppAssets(algo,context,content,**kwargs)elifisinstance(algo,str)and(algo.startswith("giraph:")oralgo.startswith("java_pie:")):ifgarisnotNone:raiseInvalidArgumentError("Running java app expect no gar resource")returnAppAssets(algo,"vertex_data",None,**kwargs)else:raiseInvalidArgumentError("Wrong type with {}".format(gar))