Source code for graphscope.learning.graph

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import collections
import json
from copy import deepcopy

try:
    import graphlearn
    from graphlearn import Graph as GLGraph
except ImportError:
    GLGraph = object

from graphscope.framework.errors import InvalidArgumentError
from graphscope.framework.errors import check_argument
from graphscope.proto import graph_def_pb2


[docs]class Graph(GLGraph):
[docs] def __init__(self, graph, handle, config=None, object_id=None): """Initialize a graph for the learning engine using a handle.""" self.graph = graph self.graphscope_session = self.graph._session handle = self.decode_arg(handle) config = self.decode_arg(config) if config is None: if "config" in handle: config = handle["config"] if config is None: config = collections.defaultdict(lambda: dict) if object_id is None: object_id = handle["vineyard_id"] self.handle = handle self.config = config self.object_id = object_id self.closed = False super(Graph, self).__init__() self.vineyard(handle, config["nodes"], config["edges"]) for label, node_attr in config["node_attributes"].items(): n_ints, n_floats, n_strings = ( node_attr[1][0], node_attr[1][1], node_attr[1][2], ) self.node_attributes(label, node_attr[0], n_ints, n_floats, n_strings) for label, edge_attr in config["edge_attributes"].items(): n_ints, n_floats, n_strings = ( edge_attr[1][0], edge_attr[1][1], edge_attr[1][2], ) self.edge_attributes(label, edge_attr[0], n_ints, n_floats, n_strings) for mask, node_label, nsplit, split_range in config["gen_labels"]: self.node_view(node_label, mask, nsplit=nsplit, split_range=split_range) self.init_vineyard(worker_index=0, worker_count=1)
def decode_arg(self, arg): if arg is None or isinstance(arg, dict): return arg return json.loads( base64.b64decode(arg.encode("utf-8", errors="ignore")).decode( "utf-8", errors="ignore" ) ) def close(self): if self.closed or self.graphscope_session.closed: return self.closed = True super(Graph, self).close() # close client first # close server instance self.graphscope_session._close_learning_instance(self) @staticmethod # noqa: C901 def preprocess_args(handle, nodes, edges, gen_labels): # noqa: C901 handle = json.loads(base64.b64decode(handle).decode("utf-8", errors="ignore")) node_names = [] node_attributes = {} edge_names = [] edge_attributes = {} def selected_property_schema(attr_types, attributes): prop_counts = collections.defaultdict(lambda: 0) for attr in attributes: prop_counts[attr_types[attr]] += 1 return [prop_counts["i"], prop_counts["f"], prop_counts["s"]] if nodes is not None: for node in nodes: if isinstance(node, str): if node in node_names: raise InvalidArgumentError("Duplicate node type: %s" % node) node_names.append(node) elif isinstance(node, tuple): if node[0] in node_names: raise InvalidArgumentError("Duplicate node type: %s" % node[0]) node_names.append(node[0]) attr_types = handle["node_attribute_types"][node[0]] attr_schema = selected_property_schema(attr_types, node[1]) node_attributes[node[0]] = (node[1], attr_schema) else: raise InvalidArgumentError( "The node parameter is in bad format: %s" % node ) else: for node in handle["node_schema"]: node_names.append(node.split(":")[0]) if edges is not None: for edge in edges: if isinstance(edge, str): if len(node_names) > 1: raise InvalidArgumentError( "Cannot inference edge type when multiple kinds of nodes exists" ) edge_names.append((node_names[0], edge, node_names[0])) elif ( isinstance(edge, tuple) and isinstance(edge[0], str) and isinstance(edge[1], str) ): edge_names.append(edge) elif ( isinstance(edge, tuple) and isinstance(edge[0], str) and isinstance(edge[1], list) ): if len(node_names) > 1: raise InvalidArgumentError( "Cannot inference edge type when multiple kinds of nodes exists" ) edge_names.append((node_names[0], edge[0], node_names[0])) attr_types = handle["edge_attribute_types"][edge[0]] attr_schema = selected_property_schema(attr_types, edge[1]) edge_attributes[edge[0]] = (edge[1], attr_schema) elif ( isinstance(edge, tuple) and isinstance(edge[0], (list, tuple)) and isinstance(edge[1], list) ): edge_names.append(edge[0]) attr_types = handle["edge_attribute_types"][edge[0][1]] attr_schema = selected_property_schema(attr_types, edge[1]) edge_attributes[edge[0][1]] = (edge[1], attr_schema) else: raise InvalidArgumentError( "The edge parameter is in bad format: %s" % edge ) split_groups = collections.defaultdict(list) if gen_labels is not None: for label in gen_labels: if len(label) == 3 or len(label) == 4: split_groups[label[1]].append(label) else: raise InvalidArgumentError( "Bad gen_labels arguments: %s" % gen_labels ) split_labels = [] for label, group in split_groups.items(): lengths = [len(split) for split in group] check_argument( lengths[:-1] == lengths[1:], "Invalid gen labels: %s" % group ) if len(group[0]) == 3: length_sum = sum(split[2] for split in group) s, ss = 0, [] for split in group: ss.append((s, s + split[2])) s += split[2] group = [ (split[0], split[1], length_sum, s) for split, s in zip(group, ss) ] for split in group: split_labels.append(split) return { "nodes": node_names if node_names else None, "edges": edge_names if edge_names else None, "node_attributes": node_attributes, "edge_attributes": edge_attributes, "gen_labels": split_labels, }
[docs] def get_handle(self, worker_count=1): """Return a base64-encoded handle for distributed training.""" handle_copy = self.handle.copy() handle_copy["config"] = self.config handle_copy["client_count"] = worker_count return base64.b64encode( json.dumps(handle_copy).encode("utf-8", errors="ignore") ).decode("utf-8", errors="ignore")
[docs] def V( self, t, feed=None, node_from=graphlearn.pywrap.NodeFrom.NODE, mask=graphlearn.python.utils.Mask.NONE, ): """Entry of GSL, starting from VERTEX. Args: t (string): The type of node which is the entry of query or the type of edge when node is from edge source or dst. feed (None| numpy.ndarray | types.GeneratorType | `Nodes`): When `feed` is not `None`, the `type` should be a node type, which means query the attributes of the specified node ids. - None: Default. Sample nodes with the following .shuffle and .batch API. numpy.ndarray: Any shape of ids. Get nodes of the given ids and node_type. - types.Generator: A generator of numpy.ndarray. Get nodes of generated ids and given node_type. - `Nodes`: A `Nodes` object. node_from (NodeFrom): Default is `NodeFrom.NODE`, which means sample or or iterate node from node. `NodeFrom.EDGE_SRC` means sample or iterate node from source node of edge, and `NodeFrom.EDGE_DST` means sample or iterate node from destination node of edge. If node is from edge, the `type` must be an edge type. mask (NONE | TRAIN | TEST | VAL): The given node set is indexed by both the raw node type and mask value. The default mask value is NONE, which plays nothing on the index. """ return super(Graph, self).V(t, feed, node_from, mask)
[docs] def E(self, edge_type, feed=None, reverse=False): """Entry of GSL, starting from EDGE. Args: edge_type (string): The type of edge which is the entry of query. feed (None| (np.ndarray, np.ndarray) | types.GeneratorType | `Edges`): - None: Default. Sample edges with the following .shuffle and .batch API. (np.ndarray, np.ndarray): src_ids, dst_ids. Get edges of the given (src_ids, dst_ids) and given edge_type. src_ids and dst_ids must be the same shape, dtype is int. - types.Generator: A generator of (numpy.ndarray, numpy.ndarray). Get edges of generated (src_ids, dst_ids) and given edge_type. - `Edges`: An `Edges` object. """ return super(Graph, self).E(edge_type, feed, reverse)
def get_gl_handle(schema, vineyard_id, engine_hosts, engine_config, fragments=None): """Dump a handler for GraphLearn for interaction. Fields in :code:`schema` are: + the name of node type or edge type + whether the graph is weighted graph + whether the graph is labeled graph + the number of int attributes + the number of float attributes + the number of string attributes An example of the graph handle: .. code:: python { "server": "127.0.0.1:8888,127.0.0.1:8889", "client_count": 1, "vineyard_socket": "/var/run/vineyard.sock", "vineyard_id": 13278328736, "fragments": [13278328736, ...], # fragment ids "node_schema": [ "user:false:false:10:0:0", "item:true:false:0:0:5" ], "edge_schema": [ "user:click:item:true:false:0:0:0", "user:buy:item:true:true:0:0:0", "item:similar:item:false:false:10:0:0" ], "node_attribute_types": { "person": { "age": "i", "name": "s", }, }, "edge_attribute_types": { "knows": { "weight": "f", }, }, } The handle can be decoded using: .. code:: python base64.b64decode(handle.encode('ascii', errors="ignore")).decode('ascii', errors="ignore") Note that the ports are selected from a range :code:`(8000, 9000)`. Args: schema: The graph schema. vineyard_id: The object id of graph stored in vineyard. engine_hosts: A list of hosts for GraphScope engine workers. engine_config: dict of config for GAE engine. Returns: str: Base64 encoded handle """ def group_property_types(props): weighted, labeled, i, f, s, attr_types = "false", "false", 0, 0, 0, {} for prop in props: if prop.type in [graph_def_pb2.STRING]: s += 1 attr_types[prop.name] = "s" elif prop.type in (graph_def_pb2.FLOAT, graph_def_pb2.DOUBLE): f += 1 attr_types[prop.name] = "f" else: i += 1 attr_types[prop.name] = "i" if prop.name == "weight": weighted = "true" elif prop.name == "label": labeled = "true" return weighted, labeled, i, f, s, attr_types node_schema, node_attribute_types = [], dict() for label in schema.vertex_labels: weighted, labeled, i, f, s, attr_types = group_property_types( schema.get_vertex_properties(label) ) node_schema.append( "{}:{}:{}:{}:{}:{}".format(label, weighted, labeled, i, f, s) ) node_attribute_types[label] = attr_types edge_schema, edge_attribute_types = [], dict() for label in schema.edge_labels: weighted, labeled, i, f, s, attr_types = group_property_types( schema.get_edge_properties(label) ) for rel in schema.get_relationships(label): edge_schema.append( "{}:{}:{}:{}:{}:{}:{}:{}".format( rel[0], label, rel[1], weighted, labeled, i, f, s ) ) edge_attribute_types[label] = attr_types engine_hosts = ",".join(engine_hosts) handle = { "hosts": engine_hosts, "client_count": 1, "vineyard_id": vineyard_id, "vineyard_socket": engine_config["vineyard_socket"], "node_schema": node_schema, "edge_schema": edge_schema, "node_attribute_types": node_attribute_types, "edge_attribute_types": edge_attribute_types, "fragments": fragments, } handle_json_string = json.dumps(handle) return base64.b64encode(handle_json_string.encode("utf-8", errors="ignore")).decode( "utf-8", errors="ignore" )