#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import pickle
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
from graphscope.framework import utils
from graphscope.framework.errors import check_argument
from graphscope.framework.operation import Operation
from graphscope.proto import attr_value_pb2
from graphscope.proto import graph_def_pb2
from graphscope.proto import types_pb2
[docs]def create_app(app_assets):
"""Wrapper for create an `CREATE_APP` Operation with configuration.
This op will do nothing but provide required information for `BOUND_APP`
"""
config = {types_pb2.APP_ALGO: utils.s_to_attr(app_assets.algo)}
if app_assets.gar is not None:
config[types_pb2.GAR] = utils.bytes_to_attr(app_assets.gar)
op = Operation(
None, types_pb2.CREATE_APP, config=config, output_types=types_pb2.APP
)
return op
[docs]def bind_app(graph, app_assets):
"""Wrapper for create an `BIND_APP` Operation with configuration.
Compile and load an application after evaluated.
Args:
graph (:class:`GraphDAGNode`): A :class:`GraphDAGNode` instance
app (:class:`AppAssets`): A :class:`AppAssets` instance.
Returns:
An :class:`Operation` with configuration that instruct
analytical engine how to build the app.
"""
inputs = [graph.op, app_assets.op]
config = {}
config[types_pb2.APP_ALGO] = utils.s_to_attr(app_assets.algo)
if hasattr(graph, "_vertex_map"):
config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(graph._vertex_map)
if hasattr(graph, "_compact_edges"):
config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(graph._compact_edges)
if hasattr(graph, "_use_perfect_hash"):
config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(graph._use_perfect_hash)
if app_assets.cmake_extra_options is not None:
config[types_pb2.CMAKE_EXTRA_OPTIONS] = utils.s_to_attr(
app_assets.cmake_extra_options
)
op = Operation(
graph.session_id,
types_pb2.BIND_APP,
inputs=inputs,
config=config,
output_types=types_pb2.BOUND_APP,
)
return op
[docs]def run_app(app, *args, **kwargs):
"""Run `bound app` on the `graph`.
Args:
app (:class:`AppDAGNode`): A :class:`AppDAGNode` instance which represent a bound app.
key (str): Key of query results, can be used to retrieve results.
*args: Additional query params that will be used in evaluation.
**kwargs: Key-value formated query params that mostly used in Cython apps.
Returns:
An op to run app on the specified graph, with optional query parameters.
"""
inputs = [app.op]
config = {}
output_prefix = kwargs.pop("output_prefix", ".")
config[types_pb2.OUTPUT_PREFIX] = utils.s_to_attr(output_prefix)
# optional query arguments.
params = utils.pack_query_params(*args, **kwargs)
query_args = types_pb2.QueryArgs()
query_args.args.extend(params)
op = Operation(
app.session_id,
types_pb2.RUN_APP,
inputs=inputs,
config=config,
output_types=types_pb2.RESULTS,
query_args=query_args,
)
return op
[docs]def create_graph(session_id, graph_type, inputs=None, **kwargs):
"""Create an `CREATE_GRAPH` op, add op to default dag.
Args:
session_id (str): Refer to session that the graph will be create on.
graph_type (:class:`GraphType`): GraphType defined in proto.types.proto.
**kwargs: additional properties respect to different `graph_type`.
Returns:
An op to create a graph in c++ side with necessary configurations.
"""
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph_type),
}
if graph_type == graph_def_pb2.ARROW_PROPERTY:
attrs = kwargs.pop("attrs", None)
if attrs:
for k, v in attrs.items():
if isinstance(v, attr_value_pb2.AttrValue):
config[k] = v
elif graph_type == graph_def_pb2.DYNAMIC_PROPERTY:
config[types_pb2.E_FILE] = utils.s_to_attr(kwargs["efile"])
config[types_pb2.V_FILE] = utils.s_to_attr(kwargs["vfile"])
config[types_pb2.DIRECTED] = utils.b_to_attr(kwargs["directed"])
config[types_pb2.DISTRIBUTED] = utils.b_to_attr(kwargs["distributed"])
else:
raise RuntimeError("Not supported graph type {}".format(graph_type))
op = Operation(
session_id,
types_pb2.CREATE_GRAPH,
inputs=inputs,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def create_loader(vertex_or_edge_label_list):
"""Create a loader operation.
Args:
vertex_or_edge_label_list: List of
(:class:`graphscope.framework.graph_utils.VertexLabel`) or
(:class:`graphscope.framework.graph_utils.EdgeLabel`)
Returns:
An op to take various data sources as a loader.
"""
if not isinstance(vertex_or_edge_label_list, list):
vertex_or_edge_label_list = [vertex_or_edge_label_list]
large_attr = attr_value_pb2.LargeAttrValue()
for label in vertex_or_edge_label_list:
large_attr.chunk_list.items.extend(label.attr())
op = Operation(
vertex_or_edge_label_list[0]._session_id,
types_pb2.DATA_SOURCE,
config={},
large_attr=large_attr,
output_types=types_pb2.NULL_OUTPUT,
)
return op
[docs]def add_labels_to_graph(graph, loader_op):
"""Add new labels to existed graph.
Args:
graph (:class:`Graph`): A graph instance.
May not be fully loaded. i.e. it's in a building
procedure.
loader_op (:class:`graphscope.framework.operation.Operation`):
Operation of loader.
Raises:
NotImplementedError: When encountered not supported graph type.
Returns:
The operation.
Notes:
Since we don't want to trigger the loading, we must not use
any api that can trigger the loading process implicitly.
"""
from graphscope.framework.graph import GraphDAGNode
assert isinstance(graph, GraphDAGNode)
inputs = [graph.op, loader_op]
# vid_type is fixed
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.DIRECTED: utils.b_to_attr(graph._directed),
types_pb2.EXTEND_LABEL_DATA: utils.i_to_attr(graph._extend_label_data),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid),
types_pb2.RETAIN_OID: utils.b_to_attr(graph._retain_oid),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges),
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
types_pb2.IS_FROM_GAR: utils.b_to_attr(False),
}
# inferred from the context of the dag.
config.update({types_pb2.GRAPH_NAME: utils.s_to_attr("")})
if graph._graph_type != graph_def_pb2.ARROW_PROPERTY:
raise NotImplementedError(
f"Add vertices or edges is not supported yet on graph type {graph._graph_type}"
)
op = Operation(
graph._session.session_id,
types_pb2.ADD_LABELS,
inputs=inputs,
config=config,
output_types=types_pb2.GRAPH,
)
return op
def consolidate_columns(
graph,
label: str,
columns: Union[List[str], Tuple[str]],
result_column: str,
):
"""Consolidate property columns in the graph.
Args:
graph (:class:`Graph`)
label (str): The label of the vertex/edge to be consolidated.
columns: The columns to be consolidated.
result_column: The column name of the result.
Returns:
Operation
"""
check_argument(graph.graph_type == graph_def_pb2.ARROW_PROPERTY)
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.DIRECTED: utils.b_to_attr(graph._directed),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type),
types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid),
types_pb2.RETAIN_OID: utils.b_to_attr(graph._retain_oid),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges),
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash),
types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False),
types_pb2.IS_FROM_GAR: utils.b_to_attr(False),
types_pb2.CONSOLIDATE_COLUMNS_LABEL: utils.s_to_attr(label),
types_pb2.CONSOLIDATE_COLUMNS_COLUMNS: utils.s_to_attr(",".join(columns)),
types_pb2.CONSOLIDATE_COLUMNS_RESULT_COLUMN: utils.s_to_attr(result_column),
}
# The key maybe filled later in coordinator
if hasattr(graph, "key"):
config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key)
op = Operation(
graph.session_id,
types_pb2.CONSOLIDATE_COLUMNS,
config=config,
inputs=[graph.op],
output_types=types_pb2.GRAPH,
)
return op
[docs]def dynamic_to_arrow(graph):
"""Create an op to transform a :class:`nx.Graph` object to :class:`Graph`.
Args:
graph (:class:`Graph`): Source graph, which type should be DYNAMIC_PROPERTY
Returns: An op of transform dynamic graph to arrow graph with necessary configurations.
"""
check_argument(graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY)
oid_type = None
for node in graph:
if oid_type is None:
oid_type = type(node)
elif oid_type != type(node):
raise RuntimeError(
"The vertex type is not consistent {} vs {}, can not convert it to arrow graph".format(
str(oid_type), str(type(node))
)
)
if oid_type == int or oid_type is None:
oid_type = utils.data_type_to_cpp(graph_def_pb2.LONG)
elif oid_type == str:
oid_type = utils.data_type_to_cpp(graph_def_pb2.STRING)
else:
raise RuntimeError("Unsupported oid type: " + str(oid_type))
vid_type = utils.data_type_to_cpp(graph_def_pb2.ULONG)
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph_def_pb2.ARROW_PROPERTY),
types_pb2.DST_GRAPH_TYPE: utils.graph_type_to_attr(
graph_def_pb2.ARROW_PROPERTY
),
types_pb2.OID_TYPE: utils.s_to_attr(oid_type),
types_pb2.VID_TYPE: utils.s_to_attr(vid_type),
}
op = Operation(
graph.session_id,
types_pb2.TRANSFORM_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def arrow_to_dynamic(graph):
"""Transform a :class:`Graph` object to :class:`nx.Graph`.
Args:
graph (:class:`Graph`): Source graph, which type should be ARROW_PROPERTY.
Returns:
An op of transform arrow graph to dynamic graph with necessary configurations.
"""
check_argument(graph.graph_type == graph_def_pb2.ARROW_PROPERTY)
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph_def_pb2.ARROW_PROPERTY),
types_pb2.DST_GRAPH_TYPE: utils.graph_type_to_attr(
graph_def_pb2.DYNAMIC_PROPERTY
),
types_pb2.OID_TYPE: utils.s_to_attr(
utils.data_type_to_cpp(graph.schema.oid_type)
),
types_pb2.VID_TYPE: utils.s_to_attr(
utils.data_type_to_cpp(graph.schema.vid_type)
),
types_pb2.DEFAULT_LABEL_ID: utils.i_to_attr(graph._default_label_id),
}
op = Operation(
graph.session_id,
types_pb2.TRANSFORM_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def modify_edges(graph, modify_type, edges, attr={}, weight=None):
"""Create modify edges operation for nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
modify_type (`type_pb2.(NX_ADD_EDGES | NX_DEL_EDGES | NX_UPDATE_EDGES)`): The modify type
edges (list): List of edges to be inserted into or delete from graph based on `modify_type`
Returns:
An op to modify edges on the graph.
"""
check_argument(graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY)
config = {}
config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key)
config[types_pb2.MODIFY_TYPE] = utils.modify_type_to_attr(modify_type)
config[types_pb2.PROPERTIES] = utils.s_to_attr(json.dumps(attr))
if weight:
config[types_pb2.EDGE_KEY] = utils.s_to_attr(weight)
op = Operation(
graph.session_id,
types_pb2.MODIFY_EDGES,
config=config,
large_attr=utils.bytes_to_large_attr(edges),
output_types=types_pb2.GRAPH,
)
return op
[docs]def modify_vertices(graph, modify_type, vertices, attr={}):
"""Create modify vertices operation for nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
modify_type (`type_pb2.(NX_ADD_NODES | NX_DEL_NODES | NX_UPDATE_NODES)`): The modify type
vertices (list): node list.
Returns:
An op to modify vertices on the graph.
"""
check_argument(graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY)
config = {}
config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key)
config[types_pb2.MODIFY_TYPE] = utils.modify_type_to_attr(modify_type)
config[types_pb2.PROPERTIES] = utils.s_to_attr(json.dumps(attr))
op = Operation(
graph.session_id,
types_pb2.MODIFY_VERTICES,
config=config,
large_attr=utils.bytes_to_large_attr(vertices),
output_types=types_pb2.GRAPH,
)
return op
[docs]def report_graph(
graph,
report_type,
node=None,
edge=None,
fid=None,
lid=None,
key=None,
label_id=None,
gid=None,
):
"""Create report operation for nx graph.
This operation is used to simulate networkx graph reporting methods with variaty
report type and corresponding config parameters.
Args:
graph (`nx.Graph`): A nx graph.
report_type: report type, can be
type_pb2.(NODE_NUM,
EDGE_NUM,
HAS_NODE,
HAS_EDGE,
NODE_DATA,
EDGE_DATA,
NEIGHBORS_BY_NODE,
SUCCS_BY_NODE,
PREDS_BY_NODE,
NEIGHBORS_BY_LOC,
SUCCS_BY_LOC,
PREDS_BY_LOC,
DEG_BY_NODE,
IN_DEG_BY_NODE,
OUT_DEG_BY_NODE,
DEG_BY_LOC,
IN_DEG_BY_LOC,
OUT_DEG_BY_LOC,
NODES_BY_LOC)
node (str): node id, used as node id with 'NODE' report types. (optional)
edge (str): an edge with 'EDGE' report types. (optional)
fid (int): fragment id, with 'LOC' report types. (optional)
lid (int): local id of node in grape_engine, with 'LOC; report types. (optional)
key (str): edge key for MultiGraph or MultiDiGraph, with 'EDGE' report types. (optional)
Returns:
An op to do reporting job.
"""
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
types_pb2.REPORT_TYPE: utils.report_type_to_attr(report_type),
}
if graph.graph_type == graph_def_pb2.ARROW_PROPERTY:
config[types_pb2.DEFAULT_LABEL_ID] = utils.i_to_attr(graph._default_label_id)
if node is not None:
config[types_pb2.NODE] = utils.bytes_to_attr(node)
if edge is not None:
config[types_pb2.EDGE] = utils.bytes_to_attr(edge)
if fid is not None:
config[types_pb2.FID] = utils.i_to_attr(fid)
if lid is not None:
config[types_pb2.LID] = utils.i_to_attr(lid)
if label_id is not None:
config[types_pb2.V_LABEL_ID] = utils.i_to_attr(label_id)
if gid is not None:
config[types_pb2.GID] = utils.u_to_attr(gid)
config[types_pb2.EDGE_KEY] = utils.s_to_attr(str(key) if key is not None else "")
op = Operation(
graph.session_id,
types_pb2.REPORT_GRAPH,
config=config,
output_types=types_pb2.RESULTS,
)
return op
[docs]def project_arrow_property_graph(graph, vertex_collections, edge_collections):
check_argument(graph.graph_type == graph_def_pb2.ARROW_PROPERTY)
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph.graph_type),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges),
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash),
}
config.update(
{
types_pb2.VERTEX_COLLECTIONS: utils.s_to_attr(vertex_collections),
types_pb2.EDGE_COLLECTIONS: utils.s_to_attr(edge_collections),
}
)
op = Operation(
graph.session_id,
types_pb2.PROJECT_GRAPH,
config=config,
inputs=[graph.op],
output_types=types_pb2.GRAPH,
)
return op
[docs]def project_to_simple(
graph,
v_prop,
e_prop,
):
"""Project property graph to a simple graph.
Args:
graph: Source graph, which type should be a property graph
v_prop (str): The node attribute key to project.
e_prop (str): The edge attribute key to project.
Returns:
Operation to project a property graph, results in a simple graph.
"""
check_argument(
graph.graph_type
in (graph_def_pb2.ARROW_PROPERTY, graph_def_pb2.DYNAMIC_PROPERTY)
)
config = {
types_pb2.V_PROP_KEY: utils.s_to_attr(v_prop),
types_pb2.E_PROP_KEY: utils.s_to_attr(e_prop),
}
if hasattr(graph, "_vertex_map"):
config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(graph._vertex_map)
if hasattr(graph, "_compact_edges"):
config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(graph._compact_edges)
if hasattr(graph, "_use_perfect_hash"):
config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(graph._use_perfect_hash)
op = Operation(
graph.session_id,
types_pb2.PROJECT_TO_SIMPLE,
config=config,
inputs=[graph.op],
output_types=types_pb2.GRAPH,
)
return op
[docs]def copy_graph(graph, copy_type="identical"):
"""Create copy operation for nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
copy_type (str): 'identical': copy graph to destination graph without any change.
'reverse': copy graph to destination graph with reversing the graph edges
Returns:
Operation
"""
check_argument(
graph.graph_type
in (graph_def_pb2.ARROW_PROPERTY, graph_def_pb2.DYNAMIC_PROPERTY)
)
check_argument(copy_type in ("identical", "reverse"))
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
types_pb2.COPY_TYPE: utils.s_to_attr(copy_type),
}
op = Operation(
graph.session_id,
types_pb2.COPY_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def to_directed(graph):
"""Create to_directed operation graph.
Args:
graph (:class:`nx.Graph`)
Returns:
Operation
"""
check_argument(
graph.graph_type
in (graph_def_pb2.DYNAMIC_PROPERTY, graph_def_pb2.ARROW_PROPERTY)
)
config = {}
# The key maybe filled later in coordinator
if hasattr(graph, "key"):
config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key)
op = Operation(
graph.session_id,
types_pb2.TO_DIRECTED,
config=config,
inputs=[graph.op],
output_types=types_pb2.GRAPH,
)
return op
[docs]def to_undirected(graph):
"""Create to_undirected operation for graph.
Args:
graph (:class:`nx.Graph`)
Returns:
Operation
"""
check_argument(
graph.graph_type
in (graph_def_pb2.DYNAMIC_PROPERTY, graph_def_pb2.ARROW_PROPERTY)
)
config = {}
# The key maybe filled later in coordinator
if hasattr(graph, "key"):
config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key)
op = Operation(
graph.session_id,
types_pb2.TO_UNDIRECTED,
config=config,
inputs=[graph.op],
output_types=types_pb2.GRAPH,
)
return op
[docs]def create_graph_view(graph, view_type):
"""Create view of nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
view_type (str): 'reversed': get a reverse view of graph.
'directed': get a directed view of graph
'undirected': get a undirected view of graph
Returns:
Operation
"""
check_argument(graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY)
check_argument(view_type in ("reversed", "directed", "undirected"))
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
types_pb2.VIEW_TYPE: utils.s_to_attr(view_type),
}
op = Operation(
graph.session_id,
types_pb2.VIEW_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def clear_graph(graph):
"""Create clear graph operation for nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
Returns:
An op to modify edges on the graph.
"""
check_argument(graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY)
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
}
op = Operation(
graph.session_id,
types_pb2.CLEAR_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def clear_edges(graph):
"""Create clear edges operation for nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
Returns:
An op to modify edges on the graph.
"""
check_argument(graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY)
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
}
op = Operation(
graph.session_id,
types_pb2.CLEAR_EDGES,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def create_subgraph(graph, nodes=None, edges=None):
"""Create subgraph operation for nx graph.
Args:
graph (:class:`nx.Graph`): A nx graph.
nodes (list): the nodes to induce a subgraph.
edges (list): the edges to induce a edge-induced subgraph.
Returns:
Operation
"""
check_argument(graph.graph_type == graph_def_pb2.DYNAMIC_PROPERTY)
config = {
types_pb2.GRAPH_NAME: utils.s_to_attr(graph.key),
}
if nodes is not None:
config[types_pb2.NODES] = utils.bytes_to_attr(nodes)
if edges is not None:
config[types_pb2.EDGES] = utils.bytes_to_attr(edges)
op = Operation(
graph.session_id,
types_pb2.INDUCE_SUBGRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
[docs]def create_unload_op(session_id, op_type, inputs):
"""Uility method to create a unload `Operation` based on op type and op."""
op = Operation(
session_id,
op_type,
inputs=inputs,
output_types=types_pb2.NULL_OUTPUT,
)
return op
[docs]def unload_app(app):
"""Unload a loaded app.
Args:
app (:class:`AppDAGNode`): The app to unload.
Returns:
An op to unload the `app`.
"""
return create_unload_op(app.session_id, types_pb2.UNLOAD_APP, [app.op])
[docs]def unload_graph(graph):
"""Unload a graph.
Args:
graph (:class:`GraphDAGNode`): The graph to unload.
Returns:
An op to unload the `graph`.
"""
return create_unload_op(graph.session_id, types_pb2.UNLOAD_GRAPH, [graph.op])
[docs]def unload_context(context):
return create_unload_op(context.session_id, types_pb2.UNLOAD_CONTEXT, [context.op])
[docs]def context_to_numpy(context, selector=None, vertex_range=None, axis=0):
"""Retrieve results as a numpy ndarray.
Args:
results (:class:`Context`): Results return by `run_app` operation, store the query results.
selector (str): Select the type of data to retrieve.
vertex_range (str): Specify a range to retrieve.
Returns:
An op to retrieve query results and convert to numpy ndarray.
"""
config = {}
if selector is not None:
config[types_pb2.SELECTOR] = utils.s_to_attr(selector)
if vertex_range is not None:
config[types_pb2.VERTEX_RANGE] = utils.s_to_attr(vertex_range)
if axis is not None:
config[types_pb2.AXIS] = utils.i_to_attr(axis)
op = Operation(
context.session_id,
types_pb2.CONTEXT_TO_NUMPY,
config=config,
inputs=[context.op],
output_types=types_pb2.TENSOR,
)
return op
[docs]def context_to_dataframe(context, selector=None, vertex_range=None):
"""Retrieve results as a pandas DataFrame.
Args:
results (:class:`Context`): Results return by `run_app` operation, store the query results.
selector (str): Select the type of data to retrieve.
vertex_range (str): Specify a range to retrieve.
Returns:
An op to retrieve query results and convert to pandas DataFrame.
"""
config = {}
if selector is not None:
config[types_pb2.SELECTOR] = utils.s_to_attr(selector)
if vertex_range is not None:
config[types_pb2.VERTEX_RANGE] = utils.s_to_attr(vertex_range)
op = Operation(
context.session_id,
types_pb2.CONTEXT_TO_DATAFRAME,
config=config,
inputs=[context.op],
output_types=types_pb2.DATAFRAME,
)
return op
[docs]def to_vineyard_tensor(context, selector=None, vertex_range=None, axis=None):
"""Retrieve results as vineyard tensor.
Parameters:
results (:class:`Context`): Results return by `run_app` operation, store the query results.
selector (str): Select the type of data to retrieve.
vertex_range (str): Specify a range to retrieve.
Returns:
An op to convert query results into a vineyard tensor.
"""
config = {}
if selector is not None:
config[types_pb2.SELECTOR] = utils.s_to_attr(selector)
if vertex_range is not None:
config[types_pb2.VERTEX_RANGE] = utils.s_to_attr(vertex_range)
if axis is not None:
config[types_pb2.AXIS] = utils.i_to_attr(axis)
op = Operation(
context.session_id,
types_pb2.TO_VINEYARD_TENSOR,
config=config,
inputs=[context.op],
output_types=types_pb2.VINEYARD_TENSOR,
)
return op
[docs]def to_vineyard_dataframe(context, selector=None, vertex_range=None):
"""Retrieve results as vineyard dataframe.
Parameters:
results (:class:`Context`): Results return by `run_app` operation, store the query results.
selector (str): Select the type of data to retrieve.
vertex_range (str): Specify a range to retrieve.
Returns:
An op to convert query results into a vineyard dataframe.
"""
config = {}
if selector is not None:
config[types_pb2.SELECTOR] = utils.s_to_attr(selector)
if vertex_range is not None:
config[types_pb2.VERTEX_RANGE] = utils.s_to_attr(vertex_range)
op = Operation(
context.session_id,
types_pb2.TO_VINEYARD_DATAFRAME,
config=config,
inputs=[context.op],
output_types=types_pb2.VINEYARD_DATAFRAME,
)
return op
[docs]def to_data_sink(result, fd, storage_options=None, write_options=None, **kwargs):
"""Dump result to `fd` by drivers in vineyard.
Parameters:
result (:class:`graphscope.framework.context.ResultDAGNode`):
Dataframe or numpy or result hold the object id of vineyard dataframe.
fd (str): Such as `hdfs:///tmp/result_path`
kwargs (dict, optional): Storage options with respect to output storage type
Returns:
An op to dump result to `fd`.
"""
if storage_options is None:
storage_options = {}
storage_options.update(kwargs)
if write_options is None:
write_options = {}
write_options.update(kwargs)
config = {
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(storage_options)),
types_pb2.WRITE_OPTIONS: utils.s_to_attr(json.dumps(write_options)),
types_pb2.FD: utils.s_to_attr(str(fd)),
}
op = Operation(
result.session_id,
types_pb2.DATA_SINK,
config=config,
inputs=[result.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op
[docs]def output(
context,
fd,
selector,
vertex_range,
storage_options=None,
write_options=None,
**kwargs,
):
"""Output result to `fd`, this will be handled by registered vineyard C++ adaptor.
Args:
results (:class:`Context`): Results return by `run_app` operation, store the query results.
fd (str): Such as `file:///tmp/result_path`
selector (str): Select the type of data to retrieve.
vertex_range (str): Specify a range to retrieve.
Returns:
An op to output results to `fd`.
"""
if storage_options is None:
storage_options = {}
storage_options.update(kwargs)
if write_options is None:
write_options = {}
write_options.update(kwargs)
config = {}
config[types_pb2.FD] = utils.s_to_attr(fd)
config[types_pb2.SELECTOR] = utils.s_to_attr(selector)
config[types_pb2.STORAGE_OPTIONS] = utils.s_to_attr(json.dumps(storage_options))
config[types_pb2.WRITE_OPTIONS] = utils.s_to_attr(json.dumps(write_options))
if vertex_range is not None:
config[types_pb2.VERTEX_RANGE] = utils.s_to_attr(vertex_range)
op = Operation(
context.session_id,
types_pb2.OUTPUT,
config=config,
inputs=[context.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op
[docs]def get_context_data(results, node):
config = {
types_pb2.CONTEXT_KEY: utils.s_to_attr(results.key),
types_pb2.NODE: utils.s_to_attr(node),
}
op = Operation(
results._session_id,
types_pb2.GET_CONTEXT_DATA,
config=config,
output_types=types_pb2.RESULTS,
)
return op
[docs]def add_column(graph, results, selector):
"""Add a column to `graph`, produce a new graph.
Args:
graph (:class:`Graph`): Source ArrowProperty graph.
results (:class:`Context`): Results that generated by previous app querying.
selector (str): Used to select a subrange of data of results, add them
as one column of graph.
Returns:
A new graph with new columns added.
"""
config = {types_pb2.SELECTOR: utils.s_to_attr(selector)}
op = Operation(
graph.session_id,
types_pb2.ADD_COLUMN,
config=config,
inputs=[graph.op, results.op],
output_types=types_pb2.GRAPH,
)
return op
[docs]def graph_to_numpy(graph, selector=None, vertex_range=None):
"""Retrieve graph raw data as a numpy ndarray.
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
selector (str): Select the type of data to retrieve.
vertex_range (str): Specify a range to retrieve.
Returns:
An op to convert a graph's data to numpy ndarray.
"""
config = {}
if selector is not None:
config[types_pb2.SELECTOR] = utils.s_to_attr(selector)
if vertex_range is not None:
config[types_pb2.VERTEX_RANGE] = utils.s_to_attr(vertex_range)
op = Operation(
graph.session_id,
types_pb2.GRAPH_TO_NUMPY,
config=config,
inputs=[graph.op],
output_types=types_pb2.TENSOR,
)
return op
[docs]def graph_to_dataframe(graph, selector=None, vertex_range=None):
"""Retrieve graph raw data as a pandas DataFrame.
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
selector (str): Select the type of data to retrieve.
vertex_range (str): Specify a range to retrieve.
Returns:
An op to convert a graph's data to pandas DataFrame.
"""
config = {}
if selector is not None:
config[types_pb2.SELECTOR] = utils.s_to_attr(selector)
if vertex_range is not None:
config[types_pb2.VERTEX_RANGE] = utils.s_to_attr(vertex_range)
op = Operation(
graph.session_id,
types_pb2.GRAPH_TO_DATAFRAME,
config=config,
inputs=[graph.op],
output_types=types_pb2.DATAFRAME,
)
return op
[docs]def gremlin_to_subgraph(
interactive_query, gremlin_script, request_options=None, oid_type="int64"
):
"""Create a subgraph from gremlin output.
Args:
interactive_query (:class:`graphscope.interactive.query.InteractiveQueryDAGNode`):
The GIE instance holds the graph that gremlin query on.
gremlin_script (str):
gremlin script to be executed.
request_options (dict, optional): gremlin request options. format:
{
"engine": "gae"
}
oid_type (str, optional):
Type of vertex original id. Defaults to "int64".
Returns:
An op to create the subgraph from gremlin script
"""
config = {}
config[types_pb2.GIE_GREMLIN_QUERY_MESSAGE] = utils.s_to_attr(gremlin_script)
config[types_pb2.OID_TYPE] = utils.s_to_attr(oid_type)
config[types_pb2.VINEYARD_ID] = utils.i_to_attr(interactive_query.object_id)
if request_options:
config[types_pb2.GIE_GREMLIN_REQUEST_OPTIONS] = utils.s_to_attr(
json.dumps(request_options)
)
op = Operation(
interactive_query.session_id,
types_pb2.SUBGRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op
def save_to_graphar(graph, path: str, **kwargs):
"""Archive a graph to gar format with a path.
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
path (str): The path to archive the graph.
Returns:
An op to archive the graph to a path.
"""
config = {
types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type),
types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type),
types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type),
types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map),
types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges),
types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash),
types_pb2.WRITE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(path),
}
op = Operation(
graph.session_id,
types_pb2.ARCHIVE_GRAPH,
config=config,
inputs=[graph.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op
def serialize_graph(graph, path: str, **kwargs):
"""Serialize graph to the specified location
The meta and data of graph is dumped to specified location,
and can be restored by `Graph.load_from` in other sessions.
Each worker will write a `path_{worker_id}.meta` file and
a `path_{worker_id}` file to storage.
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph.
path (str): The path to serialize the graph, on each worker, supported
storages are local, hdfs, oss, s3
Returns:
An op to serialize the graph to a path.
"""
config = {
types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path),
types_pb2.VINEYARD_ID: utils.i_to_attr(graph._vineyard_id),
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
}
op = Operation(
graph.session_id,
types_pb2.SERIALIZE_GRAPH,
config=config,
inputs=[graph.op],
output_types=types_pb2.NULL_OUTPUT,
)
return op
def deserialize_graph(path: str, sess, **kwargs):
"""Deserialize graph from the specified location.
Args:
path (str): The path contains the serialization files.
sess (`graphscope.Session`): The target session
that the graph will be construct in.
Returns:
`Graph`: A new graph object. Schema and data is supposed to be
identical with the one that called serialized method.
"""
config = {
types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path),
types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)),
}
op = Operation(
sess.session_id,
types_pb2.DESERIALIZE_GRAPH,
config=config,
output_types=types_pb2.GRAPH,
)
return op