#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#""" Manage sessions to the GraphScope coordinator."""importatexitimportbase64importcontextlibimportcopyimportgcimportjsonimportloggingimportosimportsignalimportthreadingimporttimeimportuuidimportwarningsfromtypingimportAnyfromtypingimportUnionimportgraphscopefromgraphscope.analytical.udf.utilsimportInMemoryZipfromgraphscope.client.archiveimportOutArchivefromgraphscope.client.rpcimportGRPCClientfromgraphscope.client.utilsimportCaptureKeyboardInterruptfromgraphscope.client.utilsimportGSLoggerfromgraphscope.client.utilsimportSignalIgnorefromgraphscope.client.utilsimportset_defaultsfromgraphscope.configimportConfigfromgraphscope.configimportgs_configfromgraphscope.deploy.hosts.clusterimportHostsClusterLauncherfromgraphscope.deploy.kubernetes.clusterimportKubernetesClusterLauncherfromgraphscope.deploy.kubernetes.utilsimportresolve_api_clientfromgraphscope.framework.appimportAppfromgraphscope.framework.contextimportContextfromgraphscope.framework.dagimportDagfromgraphscope.framework.dagimportDAGNodefromgraphscope.framework.errorsimportFatalErrorfromgraphscope.framework.errorsimportInvalidArgumentErrorfromgraphscope.framework.graphimportGraphfromgraphscope.framework.graphimportGraphDAGNodefromgraphscope.framework.operationimportOperationfromgraphscope.framework.utilsimportdecode_dataframefromgraphscope.framework.utilsimportdecode_numpyfromgraphscope.framework.utilsimportdeprecatedfromgraphscope.framework.utilsimportrandom_stringfromgraphscope.interactive.queryimportInteractiveQueryfromgraphscope.protoimportgraph_def_pb2fromgraphscope.protoimportmessage_pb2fromgraphscope.protoimportop_def_pb2fromgraphscope.protoimporttypes_pb2try:importvineyardfromkubernetesimportclientaskube_clientfromkubernetesimportconfigaskube_configexceptImportError:kube_client=Nonekube_config=None_session_dict={}logger=logging.getLogger("graphscope")class_FetchHandler(object):"""Handler for structured fetches. This class takes care of extracting a sub-DAG as targets for a user-provided structure for fetches, which can be used for a low level `run` call of grpc_client. Given the results of the low level run call, this class can also rebuild a result structure matching the user-provided structure for fetches, but containing the corresponding results. """def__init__(self,dag,fetches):self._fetches=fetchesself._ops=list()self._unpack=Falseifnotisinstance(self._fetches,(list,tuple)):self._fetches=[self._fetches]self._unpack=Trueforfetchinself._fetches:ifhasattr(fetch,"op"):fetch=fetch.opifnotisinstance(fetch,Operation):raiseValueError("Expect an `Operation` in sess run method.")self._ops.append(fetch)# extract sub dagself._sub_dag=dag.extract_subdag_for(self._ops)if"GRAPHSCOPE_DEBUG"inos.environ:logger.info("sub_dag: %s",self._sub_dag)@propertydeftargets(self):returnself._sub_dagdef_rebuild_graph(self,seq,op_result:op_def_pb2.OpResult):ifisinstance(self._fetches[seq],Operation):# for nx Graphreturnop_result.graph_def# get graph dag node as basegraph_dag_node=self._fetches[seq]# construct graphg=Graph(graph_dag_node)# update graph flied from graph_defg.update_from_graph_def(op_result.graph_def)returngdef_rebuild_app(self,seq,op_result:op_def_pb2.OpResult):fromgraphscope.framework.appimportApp# get app dag node as baseapp_dag_node=self._fetches[seq]# construct appapp=App(app_dag_node,op_result.result.decode("utf-8",errors="ignore"))returnappdef_rebuild_context(self,seq,op_result:op_def_pb2.OpResult):fromgraphscope.framework.contextimportContext# get context dag node as basecontext_dag_node=self._fetches[seq]ret=json.loads(op_result.result.decode("utf-8",errors="ignore"))context_type=ret["context_type"]ifcontext_type=="dynamic_vertex_data":# for nxfromgraphscope.framework.contextimportDynamicVertexDataContextreturnDynamicVertexDataContext(context_dag_node,ret["context_key"])returnContext(context_dag_node,ret["context_key"],ret["context_schema"])defwrap_results(self,response:message_pb2.RunStepResponse):# noqa: C901rets=list()forseq,opinenumerate(self._ops):forop_resultinresponse.results:ifop.key==op_result.key:ifop.output_types==types_pb2.RESULTS:ifop.type==types_pb2.RUN_APP:rets.append(self._rebuild_context(seq,op_result))elifop.type==types_pb2.REPORT_GRAPH:rets.append(OutArchive(op_result.result))else:# for nx Graphrets.append(op_result.result.decode("utf-8",errors="ignore"))ifop.output_types==types_pb2.GRAPH:rets.append(self._rebuild_graph(seq,op_result))ifop.output_types==types_pb2.APP:rets.append(None)ifop.output_types==types_pb2.BOUND_APP:rets.append(self._rebuild_app(seq,op_result))ifop.output_typesin(types_pb2.VINEYARD_TENSOR,types_pb2.VINEYARD_DATAFRAME,):rets.append(json.loads(op_result.result.decode("utf-8",errors="ignore"))["object_id"])ifop.output_typesin(types_pb2.TENSOR,types_pb2.DATAFRAME):if(op.type==types_pb2.CONTEXT_TO_DATAFRAMEorop.type==types_pb2.GRAPH_TO_DATAFRAME):rets.append(decode_dataframe(op_result.result))if(op.type==types_pb2.CONTEXT_TO_NUMPYorop.type==types_pb2.GRAPH_TO_NUMPY):rets.append(decode_numpy(op_result.result))ifop.output_types==types_pb2.NULL_OUTPUT:rets.append(None)breakreturnrets[0]ifretsandself._unpackelseretsdefget_dag_for_unload(self):"""Unload operations (graph, app, context) in dag which are not existed in fetches. """unload_dag=op_def_pb2.DagDef()keys_of_fetches=set([op.keyforopinself._ops])mapping={types_pb2.CREATE_GRAPH:types_pb2.UNLOAD_GRAPH,types_pb2.CREATE_APP:types_pb2.UNLOAD_APP,types_pb2.RUN_APP:types_pb2.UNLOAD_CONTEXT,}forop_definself._sub_dag.op:ifop_def.opinmappingandop_def.keynotinkeys_of_fetches:unload_op_def=op_def_pb2.OpDef(op=mapping[op_def.op],key=uuid.uuid4().hex)unload_op_def.parents.extend([op_def.key])unload_dag.op.extend([unload_op_def])returnunload_dag
[docs]classSession(object):"""A class for interacting with GraphScope graph computation service cluster. A :class:`Session` object encapsulates the environment in which :class:`Operation` objects are executed/evaluated. A session may own resources. It is important to release these resources when they are no longer required. To do this, invoke the :meth:`close` method on the session. A Session can register itself as default session with :meth:`as_default`, and all operations after that will use the default session. Session deregister itself as a default session when closed. The following example demonstrates its usage: .. code:: python >>> import graphscope as gs >>> # use session object explicitly >>> sess = gs.session() >>> g = sess.g() >>> pg = g.project(vertices={'v': []}, edges={'e': ['dist']}) >>> r = gs.sssp(g, 4) >>> sess.close() >>> # or use a session as default >>> sess = gs.session().as_default() >>> g = gs.g() >>> pg = g.project(vertices={'v': []}, edges={'e': ['dist']}) >>> r = gs.sssp(pg, 4) >>> sess.close() We support setup a service cluster and create a RPC session in following ways: - GraphScope graph computation service run in cluster managed by kubernetes. >>> s = graphscope.session() Also, :class:`Session` provides several keyword params for users to define the cluster. You may use the param :code:`k8s_gs_image` to specify the image for all engine pod, and param :code:`k8s_engine_cpu` or :code:`k8s_engine_mem` to specify the resources. More, you can find all params detail in :meth:`__init__` method. >>> s = graphscope.session( ... k8s_vineyard_cpu=0.1, ... k8s_vineyard_mem="256Mi", ... vineyard_shared_mem="4Gi", ... k8s_engine_cpu=0.1, ... k8s_engine_mem="256Mi") - or all params can be provided by a json configuration file or configuration dict. >>> s = graphscope.session(config='/tmp/config.yaml') >>> # Or >>> s = graphscope.session(config={'k8s_engine_cpu': 5, 'k8s_engine_mem': '5Gi'}) """
[docs]def__init__(self,config:Union[Config,str]=None,api_client:kube_client.ApiClient=None,**kw,):"""Construct a new GraphScope session. Args: config (dict or str, optional): The configuration dict or file about how to launch the GraphScope instance. For str, it will identify it as a path and read the configuration file to build a session if file exist. If not specified, the global default configuration will be used Note that it will overwrite explicit parameters. Defaults to None. api_client: The kube api client used in kubernetes cluster kw: Configurable keys. For backward compatibility. For more details, see `Config` class in `config.py` addr (str, optional): The endpoint of a pre-launched GraphScope instance with '<ip>:<port>' format. A new session id will be generated for each session connection. mode (str, optional): optional values are eager and lazy. Defaults to eager. Eager execution is a flexible platform for research and experimentation, it provides: An intuitive interface: Quickly test on small data. Easier debugging: Call ops directly to inspect running models and test changes. Lazy execution means GraphScope does not process the data till it has to. It just gathers all the information to a DAG that we feed into it, and processes only when we execute :code:`sess.run(fetches)` cluster_type (str, optional): Deploy GraphScope instance on hosts or k8s cluster. Defaults to k8s. Available options: "k8s" and "hosts". Note that only support deployed on localhost with hosts mode. num_workers (int, optional): The number of workers to launch GraphScope engine. Defaults to 2. preemptive (bool, optional): If True, GraphScope instance will treat resource params (e.g. k8s_coordinator_cpu) as limits and provide the minimum available value as requests, but this will make pod has a `Burstable` QOS, which can be preempted by other pods with high QOS. Otherwise, it will set both requests and limits with the same value. k8s_namespace (str, optional): Contains the namespace to create all resource inside. If param missing, it will try to read namespace from kubernetes context, or a random namespace will be created and deleted if namespace not exist. Defaults to None. k8s_service_type (str, optional): Type determines how the GraphScope service is exposed. Valid options are NodePort, and LoadBalancer. Defaults to NodePort. k8s_image_registry (str, optional): The GraphScope image registry. k8s_image_repository (str, optional): The GraphScope image repository. k8s_image_tag (str, optional): The GraphScope image tag. k8s_image_pull_policy (str, optional): Kubernetes image pull policy. Defaults to "IfNotPresent". k8s_image_pull_secrets (List[str], optional): A list of secret name used to authorize pull image. k8s_vineyard_image (str, optional): The image of vineyard. k8s_vineyard_deployment (str, optional): The name of vineyard deployment to use. GraphScope will try to discovery the deployment from kubernetes cluster, then use it if exists, and fallback to launching a bundled vineyard container otherwise. k8s_vineyard_cpu (float, optional): Number of CPU cores request for vineyard container. Defaults to 0.2. k8s_vineyard_mem (str, optional): Number of memory request for vineyard container. Defaults to '256Mi' k8s_engine_cpu (float, optional): Number of CPU cores request for engine container. Defaults to 1. k8s_engine_mem (str, optional): Number of memory request for engine container. Defaults to '4Gi'. k8s_coordinator_cpu (float, optional): Number of CPU cores request for coordinator. Defaults to 0.5. k8s_coordinator_mem (str, optional): Number of memory request for coordinator. Defaults to '512Mi'. etcd_addrs (str, optional): The addr of external etcd cluster, with formats like 'etcd01:port,etcd02:port,etcd03:port' k8s_mars_worker_cpu (float, optional): Minimum number of CPU cores request for Mars worker container. Defaults to 0.2. k8s_mars_worker_mem (str, optional): Minimum number of memory request for Mars worker container. Defaults to '4Mi'. k8s_mars_scheduler_cpu (float, optional): Minimum number of CPU cores request for Mars scheduler container. Defaults to 0.2. k8s_mars_scheduler_mem (str, optional): Minimum number of memory request for Mars scheduler container. Defaults to '4Mi'. k8s_coordinator_pod_node_selector (dict, optional): Node selector to the coordinator pod on k8s. Default is None. See also: https://tinyurl.com/3nx6k7ph k8s_engine_pod_node_selector = None Node selector to the engine pod on k8s. Default is None. See also: https://tinyurl.com/3nx6k7ph with_mars (bool, optional): Launch graphscope with Mars. Defaults to False. enabled_engines (str, optional): Select a subset of engines to enable. Only make sense in k8s mode. with_dataset (bool, optional): Create a container and mount aliyun demo dataset bucket to the path `/dataset`. k8s_volumes (dict, optional): A dict of k8s volume which represents a directory containing data, accessible to the containers in a pod. Defaults to {}. For example, you can mount host path with: k8s_volumes = { "my-data": { "type": "hostPath", "field": { "path": "<path>", "type": "Directory" }, "mounts": [ { "mountPath": "<path1>" }, { "mountPath": "<path2>" } ] } } Or you can mount PVC with: k8s_volumes = { "my-data": { "type": "persistentVolumeClaim", "field": { "claimName": "your-pvc-name" }, "mounts": [ { "mountPath": "<path1>" } ] } } Also, you can mount a single volume with: k8s_volumes = { "my-data": { "type": "hostPath", "field": {xxx}, "mounts": { "mountPath": "<path1>" } } } timeout_seconds (int, optional): For waiting service ready (or waiting for delete if k8s_waiting_for_delete is True). dangling_timeout_seconds (int, optional): After seconds of client disconnect, coordinator will kill this graphscope instance. Defaults to 600. Expect this value to be greater than 5 (heartbeat interval). Disable dangling check by setting -1. k8s_deploy_mode (str, optional): the deploy mode of engines on the kubernetes cluster. Default to eager. eager: create all engine pods at once lazy: create engine pods when called k8s_waiting_for_delete (bool, optional): Waiting for service delete or not. Defaults to False. k8s_client_config (dict, optional): config_file: Name of the kube-config file. Provide configurable parameters for connecting to remote k8s e.g. "~/.kube/config" reconnect (bool, optional): When connecting to a pre-launched GraphScope cluster with :code:`addr`, the connect request would be rejected with there is still an existing session connected. There are cases where the session still exists and user's client has lost connection with the backend, e.g., in a jupyter notebook. We have a :code:`dangling_timeout_seconds` for it, but a more deterministic behavior would be better. If :code:`reconnect` is True, the existing session will be reused. It is the user's responsibility to ensure there's no such an active client. Defaults to :code:`False`. Raises: TypeError: If the given argument combination is invalid and cannot be used to create a GraphScope session. """# suppress the grpc warnings, see also grpc/grpc#29103os.environ["GRPC_ENABLE_FORK_SUPPORT"]="false"self._accessable_params=("addr","mode","cluster_type","num_workers","preemptive","k8s_namespace","k8s_service_type","k8s_image_registry","k8s_image_repository","k8s_image_tag","k8s_image_pull_policy","k8s_image_pull_secrets","k8s_coordinator_cpu","k8s_coordinator_mem","etcd_addrs","etcd_listening_client_port","etcd_listening_peer_port","k8s_vineyard_image","k8s_vineyard_deployment","k8s_vineyard_cpu","k8s_vineyard_mem","vineyard_shared_mem","k8s_engine_cpu","k8s_engine_mem","k8s_mars_worker_cpu","k8s_mars_worker_mem","k8s_mars_scheduler_cpu","k8s_mars_scheduler_mem","k8s_coordinator_pod_node_selector","k8s_engine_pod_node_selector","enabled_engines","reconnect","k8s_volumes","k8s_waiting_for_delete","k8s_deploy_mode","timeout_seconds","dangling_timeout_seconds","with_mars","with_dataset","hosts",)# parse config, which should be a path to config file, or dict# config has the highest priorityifconfigisnotNone:ifisinstance(config,str):self._config=self._load_config_from_file(config,silent=False)else:self._config=copy.deepcopy(config)else:self._config=copy.deepcopy(gs_config)# default configself._api_client=api_clientforkey,valueinkw.items():self._config.set_option(key,value)self._config.session.instance_id=random_string(6)# initial setting of cluster_typeself._cluster_type=self._parse_cluster_type()# initial dagself._dag=Dag()# the mapping table from old vineyard object id to new vineyard object idself._vineyard_object_mapping_table={}self._log_session_info()self._closed=False# coordinator service endpointself._coordinator_endpoint=Noneself._launcher=Noneself._heartbeat_sending_thread=Noneself._grpc_client:GRPCClient=Noneself._session_id:str=None# unique identifier across sessions# engine config:## {# "experiment": "ON/OFF",# "vineyard_socket": "...",# "vineyard_rpc_endpoint": "..."# }self._engine_config:dict=None# interactive instance related graph mapself._interactive_instance_dict={}# learning engine related graph mapself._learning_instance_dict={}self._default_session=Noneatexit.register(self.close)# create and connect sessionwithCaptureKeyboardInterrupt(self.close):self._connect()self._disconnected:bool=False# heartbeatself._heartbeat_interval_seconds:int=5self._heartbeat_sending_thread=threading.Thread(target=self._send_heartbeat,args=())self._heartbeat_sending_thread.daemon=Trueself._heartbeat_sending_thread.start()self._heartbeat_maximum_failures:int=3# networkx moduleself._nx=Noneself._lock=threading.RLock()
def__repr__(self):returnstr(self.info)def__str__(self):returnrepr(self)@propertydefsession_id(self)->str:returnself._session_id@propertydefdag(self)->Dag:returnself._dagdef_log_session_info(self):ifself._config.coordinator.endpointisnotNone:logger.info("Connecting graphscope session with address: %s",self._config.coordinator.endpoint,)else:logger.info("Initializing graphscope session with parameters: %s",self._config.dumps_json(),)def_load_config_from_file(self,path,silent=True):config_path=os.path.expandvars(os.path.expanduser(path))try:returnConfig.load(config_path,drop_extra_fields=False)exceptExceptionasexp:# noqaifnotsilent:raiseexpdef_parse_cluster_type(self):# get the cluster type after connectingifself._config.launcher_type=="hosts":cluster_type=types_pb2.HOSTSelifself._config.launcher_type=="k8s":cluster_type=types_pb2.K8Selse:raiseValueError("Expect 'hosts' or 'k8s' for cluster_type parameter")returncluster_type@propertydefengine_config(self):"""Show the engine configuration associated with session in json format."""returnself._engine_config@propertydefinfo(self):"""Show all resource info associated with session in json format."""info={}ifself._closed:info["status"]="closed"elifself._grpc_clientisNoneorself._disconnected:info["status"]="disconnected"else:info["status"]="active"ifself._cluster_type==types_pb2.K8S:info["type"]="k8s"info["engine_hosts"]=",".join(self._pod_name_list)info["namespace"]=self._config.kubernetes_launcher.namespaceelse:info["type"]="hosts"info["engine_hosts"]=self._engine_config["engine_hosts"]info["cluster_type"]=types_pb2.ClusterType.Name(self._cluster_type)info["session_id"]=self.session_idinfo["num_workers"]=self._config.session.num_workersinfo["coordinator_endpoint"]=self._coordinator_endpointinfo["engine_config"]=self._engine_configreturninfo@propertydefclosed(self):returnself._closed@propertydefdisconnected(self):returnself._grpc_clientisNoneorself._disconnecteddefeager(self):returnself._config.session.execution_mode=="eager"def_send_heartbeat(self):# >1: failure, 0: reset when successheartbeat_failure_count=0whilenotself._closed:ifself._grpc_client:try:self._grpc_client.send_heartbeat()exceptExceptionasexc:ifheartbeat_failure_count==0:logger.warning("Failed to send heartbeat message",exc_info=exc)heartbeat_failure_count=heartbeat_failure_count+1ifheartbeat_failure_count>self._heartbeat_maximum_failures:logger.error("The connection between coordinator has lost after %d times ""of heartbeat failure, closing the session ...",heartbeat_failure_count,)self.close()self._disconnected=Trueelse:heartbeat_failure_count=0self._disconnected=Falsetime.sleep(self._heartbeat_interval_seconds)
[docs]defconnected(self)->bool:"""Check if the session is still connected and available. Returns: True or False """returnnotself._disconnected
[docs]defclose(self):"""Closes this session. This method frees all resources associated with the session. Note that closing will ignore SIGINT and SIGTERM signal and recover later. """ifthreading.current_thread()isthreading.main_thread():withSignalIgnore([signal.SIGINT,signal.SIGTERM]):self._close()else:self._close()
def_close(self):# noqa: C901ifself._closed:returnself._closed=Trueself._coordinator_endpoint=Noneself._unregister_default()ifself._heartbeat_sending_thread:try:self._heartbeat_sending_thread.join(timeout=self._heartbeat_interval_seconds)exceptRuntimeError:# ignore the "cannot join current thread" errorpassself._heartbeat_sending_thread=Noneself._disconnected=True# close all interactive instancesforinstanceinself._interactive_instance_dict.values():try:instance.close()exceptException:passself._interactive_instance_dict.clear()# close all learning instancesforinstanceinself._learning_instance_dict.values():try:instance.close()exceptException:passself._learning_instance_dict.clear()ifself._grpc_client:try:self._grpc_client.close()exceptException:passself._grpc_client=None_session_dict.pop(self._session_id,None)# clean upifself._config.coordinator.endpointisNone:try:ifself._launcher:self._launcher.stop()exceptException:passself._pod_name_list=[]def_close_interactive_instance(self,instance):"""Close an interactive instance."""self._grpc_client.close_interactive_instance(instance.object_id)def_close_learning_instance(self,instance):"""Close a learning instance."""self._grpc_client.close_learning_instance(instance.object_id)def__del__(self):# cleanly ignore all exceptionstry:self.close()exceptException:# pylint: disable=broad-exceptpassdef_check_closed(self,msg=None):"""Internal: raise a ValueError if session is closed"""ifself.closed:raiseValueError(msgor"Operation on closed session.")# Context manager
[docs]def__enter__(self):"""Context management protocol. Returns self and register self as default session. """self._check_closed()self.as_default()returnself
[docs]def__exit__(self,exc_type,exc_value,exc_tb):"""Unregister self from the default session, close the session and release the resources, ignore all exceptions in close(). """try:self._unregister_default()self.close()exceptException:pass
[docs]defas_default(self):"""Obtain a context manager that make this object as default session. This method is used when a Session is constructed, which will immediately install self as a default session. Raises: ValueError: If default session exist in current context. Returns: A context manager using this session as the default session. """ifnot_default_session_stack.is_cleared():raiseValueError("A default session is already active. You must explicitly call Session.close().")# session context managerself._default_session=default_session(self)self._default_session.__enter__()
def_unregister_default(self):"""Remove self from the default session stack."""ifself._default_session:self._default_session.__exit__(None,None,None)self._default_session=Nonedef_wrapper(self,dag_node:DAGNode)->Union[DAGNode,App,Context,Graph,Any]:ifself.eager():returnself.run(dag_node)returndag_nodedefrun(self,fetches):"""Run operations of `fetches`. Args: fetches: :class:`Operation` Raises: RuntimeError: Client disconnect to the service. Or run on a closed session. ValueError: If fetch is not an instance of :class:`Operation`. Or the fetch has been evaluated. InvalidArgumentError: Not recognized on output type. Returns: Different values for different output types of :class:`Operation` """# There might be a deadlock without `gc.collect()`:## - thread 1 uses `run()` to issue grpc requests# - during the process, e.g., print traceback, it triggers certain `__del__()`# and that issues a `run_fetches()` again, that further requires the lock# - then a deadlock been introduced.## Thus, we simply choose to call `gc.collect()` to force those `__del__()` been# invoked before actually issuing the grpc request to avoid the deadlock.#gc.collect()withself._lock:returnself.run_fetches(fetches)
[docs]defrun_fetches(self,fetches):"""Run operations of `fetches` without the session lock."""ifself._closed:raiseRuntimeError("Attempted to use a closed Session.")ifnotself._grpc_client:raiseRuntimeError("Session disconnected.")fetch_handler=_FetchHandler(self.dag,fetches)try:response=self._grpc_client.run(fetch_handler.targets)exceptFatalError:self.close()raiseifnotself.eager():# Unload operations that cannot be touched anymoredag_to_unload=fetch_handler.get_dag_for_unload()try:self._grpc_client.run(dag_to_unload)exceptFatalError:self.close()raisereturnfetch_handler.wrap_results(response)
def_connect(self):ifself._config.coordinator.endpointisnotNone:# try to connect to exist coordinatorself._coordinator_endpoint=self._config.coordinator.endpointelifself._cluster_type==types_pb2.K8S:self._launcher=KubernetesClusterLauncher(config=self._config,api_client=self._get_api_client())elifself._cluster_type==types_pb2.HOSTS:# launch coordinator with hostsself._launcher=HostsClusterLauncher(config=self._config)else:raiseRuntimeError(f"Unrecognized cluster type {types_pb2.ClusterType.Name(self._cluster_type)}.")# launching graphscope serviceifself._launcherisnotNone:self._launcher.start()self._coordinator_endpoint=self._launcher.coordinator_endpoint# waiting service readyself._grpc_client=GRPCClient(self._launcher,self._coordinator_endpoint,self._config.session.reconnect)self._grpc_client.waiting_service_ready(timeout_seconds=self._config.session.timeout_seconds,)# connect and fetch logs from rpc servertry:(self._session_id,self._cluster_type,self._config.session.num_workers,self._config.kubernetes_launcher.namespace,self._engine_config,pod_name_list,)=self._grpc_client.connect(cleanup_instance=notbool(self._config.coordinator.endpoint),dangling_timeout_seconds=self._config.session.dangling_timeout_seconds,)self._pod_name_list=list(pod_name_list)# fetch logsifself._config.coordinator.endpointorself._cluster_type==types_pb2.K8S:self._grpc_client.fetch_logs()_session_dict[self._session_id]=self# Launch analytical engine right after session connected.# This may be changed to on demand launching in the futureifnotself._engine_configandnotself._pod_name_list:(self._engine_config,pod_name_list,)=self._grpc_client.create_analytical_instance()self._pod_name_list=list(pod_name_list)exceptException:self.close()raise
[docs]defget_config(self):"""Get configuration of the session."""returnself._config
def_get_api_client(self):ifself._api_clientisnotNone:returnself._api_clienttry:config_file=self._config.kubernetes_launcher.config_fileself._api_client=resolve_api_client(config_file)exceptkube_config.ConfigExceptionase:raiseRuntimeError("Kubernetes environment not found, you may want to"' launch session locally with param cluster_type="hosts"')fromereturnself._api_clientdef_ensure_pvc_exists(self,pvc_name,pvc_namespace):_core_api=kube_client.CoreV1Api(self._get_api_client())try:_core_api.read_namespaced_persistent_volume_claim(name=pvc_name,namespace=pvc_namespace,)exceptkube_client.rest.ApiExceptionase:raiseRuntimeError(f"PVC {pvc_name} not found in namespace {pvc_namespace}")fromedef_ensure_vineyard_deployment_exists(self,vineyard_deployment_name,vineyard_deployment_namespace):_app_api=kube_client.AppsV1Api(self._get_api_client())try:_app_api.read_namespaced_deployment(name=vineyard_deployment_name,namespace=vineyard_deployment_namespace,)exceptkube_client.rest.ApiExceptionase:ife.status==404:raiseRuntimeError(f"Vineyard deployment {vineyard_deployment_name} not found in namespace {vineyard_deployment_namespace}")fromeelse:raisee
[docs]defstore_to_pvc(self,graphIDs,path:str,pvc_name:str):""" Stores the given graph IDs to the given path with the given PVC. Also, if you want to store graphs of different sessions to the same pv, you'd better to create different pvc for different sessions at first. Notice, before calling this function, the KUBECONFIG environment variable should be set to the path of your kubeconfig file. And you should make sure that the pvc is bound to the pv and the pv's capacity is enough to store the graphs. The method uses the vineyardctl to create a kubernetes job to serialize the selected graphs. For more information, see the vineyardctl documentation. https://github.com/v6d-io/v6d/tree/main/k8s/cmd#vineyardctl-deploy-backup-job Args: graph_ids: The list of graph IDs to store. Supported types: - list: list of vineyard.ObjectID or graphscope.Graph path: The path in the pv to which the pvc is bound. pvc_name: The name of the PVC. Raises: RuntimeError: If the cluster type is not Kubernetes. """ifself._cluster_type!=types_pb2.K8S:raiseRuntimeError("Only support kubernetes cluster")object_ids=[]forobjectingraphIDs:ifisinstance(object,Graph):object_ids.append(object.vineyard_id)else:object_ids.append(vineyard.ObjectID(object))object_ids=",".join(repr(id)foridinobject_ids)vineyard_deployment_name=self._config.vineyard.deployment_namenamespace=self._config.kubernetes_launcher.namespaceself._ensure_vineyard_deployment_exists(vineyard_deployment_name,namespace)self._ensure_pvc_exists(pvc_name,namespace)# The next function will create a kubernetes job for backing up# the specific graphIDs to the specific path of the specific pvcvineyard.deploy.vineyardctl.deploy.backup_job(backup_name="vineyard-backup-"+random_string(6),vineyard_deployment_name=vineyard_deployment_name,vineyard_deployment_namespace=namespace,namespace=namespace,path=path,objectids=object_ids,pvc_name=pvc_name,)
[docs]defrestore_from_pvc(self,path:str,pvc_name:str):""" Restores the graphs from the given path in the given PVC. Notice, before calling this function, the KUBECONFIG environment variable should be set to the path of your kubeconfig file. Args: path: The path in the pv to which the pvc is bound. pvc_name: The name of the PVC. Raises: RuntimeError: If the cluster type is not Kubernetes. """ifself._cluster_type!=types_pb2.K8S:raiseRuntimeError("Only support kubernetes cluster")vineyard_deployment_name=self._config.vineyard.deployment_namenamespace=self._config.kubernetes_launcher.namespaceself._ensure_vineyard_deployment_exists(vineyard_deployment_name,namespace)self._ensure_pvc_exists(pvc_name,namespace)random_suffix=random_string(6)vineyard.deploy.vineyardctl.deploy.recover_job(recover_name="vineyard-recover-"+random_suffix,vineyard_deployment_name=vineyard_deployment_name,vineyard_deployment_namespace=namespace,namespace=namespace,recover_path=path,pvc_name=pvc_name,)_core_api=kube_client.CoreV1Api(self._get_api_client())try:config_map=_core_api.read_namespaced_config_map(name="vineyard-recover-"+random_suffix+"-mapping-table",namespace=namespace,)exceptkube_client.rest.ApiExceptionase:ife.status==404:raiseRuntimeError(f"ConfigMap vineyard-recover-{random_suffix}-mapping-table not found in namespace {namespace}")fromeelse:raisee# parse configmap data to the self._vineyard_object_mapping_tableself._vineyard_object_mapping_table=config_map.data
[docs]defget_vineyard_object_mapping_table(self):""" Get the vineyard object mapping table from the old object id to new object id during storing and restoring graph to pvc on the kubernetes cluster. """returnself._vineyard_object_mapping_table
[docs]defg(self,incoming_data=None,oid_type="int64",vid_type="uint64",directed=True,generate_eid=True,retain_oid=True,vertex_map="global",compact_edges=False,use_perfect_hash=False,)->Union[Graph,GraphDAGNode]:"""Construct a GraphScope graph object on the default session. It will launch and set a session to default when there is no default session found. See params detail in :class:`graphscope.framework.graph.GraphDAGNode` Returns: :class:`graphscope.framework.graph.GraphDAGNode`: Evaluated in eager mode. Examples: .. code:: python >>> import graphscope >>> g = graphscope.g() >>> import graphscope >>> sess = graphscope.session() >>> g = sess.g() # creating graph on the session "sess" """if(isinstance(incoming_data,vineyard.ObjectID)andrepr(incoming_data)inself._vineyard_object_mapping_table):graph_vineyard_id=self._vineyard_object_mapping_table[repr(incoming_data)]logger.info("Restore graph from original graph: %s",graph_vineyard_id)incoming_data=vineyard.ObjectID(graph_vineyard_id)returnself._wrapper(GraphDAGNode(self,incoming_data,oid_type,vid_type,directed,generate_eid,retain_oid,vertex_map,compact_edges,use_perfect_hash,))
[docs]defload_from(self,*args,**kwargs):"""Load a graph within the session. See more information in :meth:`graphscope.load_from`. """withdefault_session(self):returngraphscope.load_from(*args,**kwargs)
@deprecated("Please use `sess.interactive` instead.")defgremlin(self,graph,params=None):"""This method is going to be deprecated. Use :meth:`interactive` to get an interactive engine handler supports both gremlin and cypher queries """returnself.interactive(graph,params)
[docs]definteractive(self,graph,params=None,with_cypher=False):"""Get an interactive engine handler to execute gremlin and cypher queries. It will return an instance of :class:`graphscope.interactive.query.InteractiveQuery`, .. code:: python >>> # close and recreate InteractiveQuery. >>> interactive_query = sess.interactive(g) >>> interactive_query.close() >>> interactive_query = sess.interactive(g) Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): The graph to create interactive instance. params: A dict consists of configurations of GIE instance. Raises: InvalidArgumentError: - :code:`graph` is not a property graph. Returns: :class:`graphscope.interactive.query.InteractiveQuery`: InteractiveQuery to execute gremlin and cypher queries. """ifself._session_id!=graph.session_id:raiseRuntimeError("Failed to create interactive engine on the graph with different session: {0} vs {1}".format(self._session_id,graph.session_id))ifnotgraph.graph_type==graph_def_pb2.ARROW_PROPERTY:raiseInvalidArgumentError("The graph should be a property graph.")ifnotisinstance(graph,Graph):# Is a GraphDAGNodegraph=self.run(graph)object_id=graph.vineyard_idschema_path=graph.schema_path(gremlin_endpoint,cypher_endpoint,)=self._grpc_client.create_interactive_instance(object_id,schema_path,params,with_cypher)interactive_query=InteractiveQuery(graph,gremlin_endpoint,cypher_endpoint)self._interactive_instance_dict[object_id]=interactive_querygraph._attach_interactive_instance(interactive_query)returninteractive_query
@deprecated("Please use `graphlearn` instead.")deflearning(self,graph,nodes=None,edges=None,gen_labels=None):"""Start a graph learning engine. Note that this method has been deprecated, using `graphlearn` replace. """warnings.warn("The method 'learning' has been deprecated, using graphlearn replace.")returnself.graphlearn(graph,nodes,edges,gen_labels)
[docs]defgraphlearn(self,graph,nodes=None,edges=None,gen_labels=None):"""Start a graph learning engine. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): The graph to create learning instance. nodes (list, optional): list of node types that will be used for GNN training, the element of list can be `"node_label"` or `(node_label, features)`. If the element of the list is a tuple and contains selected feature list, it would use the selected feature list for training. Default is None which use all type of nodes and for the GNN training. edges (list, optional): list of edge types that will be used for GNN training. We use `(src_label, edge_label, dst_label)` to specify one edge type. Default is None which use all type of edges for GNN training. gen_labels (list, optional): Alias node and edge labels and extract train/validation/test dataset from original graph for supervised GNN training. The detail is explained in the examples below. Examples -------- >>> # Assume the input graph contains one label node `paper` and one edge label `link`. >>> features = ["weight", "name"] # use properties "weight" and "name" as features >>> lg = sess.graphlearn( graph, nodes=[("paper", features)]) # use "paper" node and features for training edges=[("paper", "links", "paper")] # use the `paper->links->papers` edge type for training gen_labels=[ # split "paper" nodes into 100 pieces, and uses random 75 pieces (75%) as training dataset ("train", "paper", 100, (0, 75)), # split "paper" nodes into 100 pieces, and uses random 10 pieces (10%) as validation dataset ("val", "paper", 100, (75, 85)), # split "paper" nodes into 100 pieces, and uses random 15 pieces (15%) as test dataset ("test", "paper", 100, (85, 100)), ] ) Note that the training, validation and test datasets are not overlapping. And for unsupervised learning: >>> lg = sess.graphlearn( graph, nodes=[("paper", features)]) # use "paper" node and features for training edges=[("paper", "links", "paper")] # use the `paper->links->papers` edge type for training gen_labels=[ # split "paper" nodes into 100 pieces, and uses all pieces as training dataset ("train", "paper", 100, (0, 100)), ] ) """ifself._session_id!=graph.session_id:raiseRuntimeError("Failed to create learning engine on the graph with different session: {0} vs {1}".format(self._session_id,graph.session_id))ifnotgraph.graph_type==graph_def_pb2.ARROW_PROPERTY:raiseInvalidArgumentError("The graph should be a property graph.")fromgraphscope.learning.graphimportGraphasLearningGraphfromgraphscope.learning.graphimportget_gl_handlehandle=get_gl_handle(graph.schema,graph.vineyard_id,self._pod_name_list,self._engine_config,graph.fragments,)config=LearningGraph.preprocess_args(handle,nodes,edges,gen_labels)config=base64.b64encode(json.dumps(config).encode("utf-8",errors="ignore")).decode("utf-8",errors="ignore")handle,config,endpoints=self._grpc_client.create_learning_instance(graph.vineyard_id,handle,config,message_pb2.LearningBackend.GRAPHLEARN)handle=json.loads(base64.b64decode(handle).decode("utf-8",errors="ignore"))handle["server"]=",".join(endpoints)handle["client_count"]=1# construct learning graphg=LearningGraph(graph,handle,config,graph.vineyard_id)self._learning_instance_dict[graph.vineyard_id]=ggraph._attach_learning_instance(g)returng
defgraphlearn_torch(self,graph,edges,edge_weights=None,node_features=None,edge_features=None,node_labels=None,edge_dir="out",random_node_split=None,num_clients=1,manifest_path=None,client_folder_path="./",):fromgraphscope.learning.gl_torch_graphimportGLTorchGraphfromgraphscope.learning.utilsimportfill_params_in_yamlfromgraphscope.learning.utilsimportread_folder_files_contenthandle={"vineyard_socket":self._engine_config["vineyard_socket"],"vineyard_id":graph.vineyard_id,"fragments":graph.fragments,"num_servers":len(graph.fragments),"num_clients":num_clients,}manifest_params={"NUM_CLIENT_NODES":handle["num_clients"],"NUM_SERVER_NODES":handle["num_servers"],"NUM_WORKER_REPLICAS":handle["num_clients"]-1,}ifmanifest_pathisnotNone:handle["manifest"]=fill_params_in_yaml(manifest_path,manifest_params)ifclient_folder_pathisnotNone:handle["client_content"]=read_folder_files_content(client_folder_path)handle=base64.b64encode(json.dumps(handle).encode("utf-8",errors="ignore")).decode("utf-8",errors="ignore")config={"edges":edges,"edge_weights":edge_weights,"node_features":node_features,"edge_features":edge_features,"node_labels":node_labels,"edge_dir":edge_dir,"random_node_split":random_node_split,}GLTorchGraph.check_params(graph.schema,config)config=GLTorchGraph.transform_config(config)config=base64.b64encode(json.dumps(config).encode("utf-8",errors="ignore")).decode("utf-8",errors="ignore")handle,config,endpoints=self._grpc_client.create_learning_instance(graph.vineyard_id,handle,config,message_pb2.LearningBackend.GRAPHLEARN_TORCH,)g=GLTorchGraph(endpoints)self._learning_instance_dict[graph.vineyard_id]=ggraph._attach_learning_instance(g)returngdefnx(self):ifnotself.eager():raiseRuntimeError("Networkx module need the session to be eager mode. ""Current session is lazy mode.")ifself._nx:returnself._nximportimportlib.utilspec=importlib.util.find_spec("graphscope.nx")mod=importlib.util.module_from_spec(spec)spec.loader.exec_module(mod)graph=type("Graph",(mod.Graph.__base__,),dict(mod.Graph.__dict__))digraph=type("DiGraph",(mod.DiGraph.__base__,),dict(mod.DiGraph.__dict__))setattr(graph,"_session",self)setattr(digraph,"_session",self)setattr(mod,"Graph",graph)setattr(mod,"DiGraph",digraph)self._nx=modreturnself._nx
[docs]defadd_lib(self,resource_name):""" add the specified resource to the k8s cluster from client machine. """logger.info("client: adding lib %s",resource_name)ifnotos.path.isfile(resource_name):raiseRuntimeError("Resource {} can not be found".format(resource_name))# pack into a gar filegarfile=InMemoryZip()resource_reader=open(resource_name,"rb")bytes_=resource_reader.read()iflen(bytes_)<=0:raiseRuntimeError("Expect a non-empty file.")# the uploaded file may be placed in the same directorygarfile.append(resource_name.split("/")[-1],bytes_)self._grpc_client.add_lib(garfile.read_bytes().getvalue())
session=Session
[docs]defset_option(**kwargs):"""Set the value of specified options. Find params detail in :class:`graphscope.Session` Available options: - num_workers - log_level - show_log - vineyard_shared_mem - k8s_namespace - k8s_service_type - k8s_gs_image - k8s_etcd_image - k8s_image_pull_policy - k8s_image_pull_secrets - k8s_coordinator_cpu - k8s_coordinator_mem - k8s_vineyard_deployment - k8s_vineyard_cpu - k8s_vineyard_mem - k8s_engine_cpu - k8s_engine_mem - k8s_mars_worker_cpu - k8s_mars_worker_mem - k8s_mars_scheduler_cpu - k8s_mars_scheduler_mem - enabled_engines - with_mars - with_dataset - k8s_volumes - k8s_waiting_for_delete - timeout_seconds - dataset_download_retries - k8s_deploy_mode Args: kwargs: dict kv pair of GraphScope config you want to set. Raises: ValueError: If no such option exists. Returns: None """fork,vinkwargs.items():gs_config.set_option(k,v)# use string as log levelifk=="log_level"andisinstance(v,int):level=logging.getLevelName(v)if" "notinlevel:# invalid number will return "Level xxx"gs_config.set_option(k,level.upper())GSLogger.update()
defdefault_session(session):"""Python's :code:`with` handler for defining a default session. This function provides a means of registering a session for handling and code that need a default session calls. The :code:`with` keyword to specify that code invocations within the scope of a block should be executed by a particular session. Args: session: :class:`Session` The session to be installed as the default session. Returns: A context manager for the default session. """return_default_session_stack.get_controller(session)
[docs]defhas_default_session()->bool:"""True if default session exists in current context."""returnnot_default_session_stack.empty()
[docs]defget_default_session()->Session:"""Returns the default session for the current context. Note that a new session will be created if there is no default session in current context. Returns: The default :class:`graphscope.Session`. """return_default_session_stack.get_default()
defget_session_by_id(handle):"""Return the session by handle."""ifhandlenotin_session_dict:raiseValueError(f"Session {handle} not exists.")return_session_dict.get(handle)class_DefaultSessionStack(object):"""A stack of objects for providing implicit defaults."""def__init__(self):super().__init__()self.stack=[]defget_default(self)->Session:ifnotself.stack:logger.info("Creating default session ...")sess=session(cluster_type="hosts",num_workers=gs_config.session.default_local_num_workers,)sess.as_default()returnself.stack[-1]defempty(self)->bool:returnlen(self.stack)==0defreset(self):self.stack=[]defis_cleared(self):returnnotself.stack@contextlib.contextmanagerdefget_controller(self,default):"""A context manager for manipulating a default stack."""self.stack.append(default)try:yielddefaultfinally:# stack may be empty if reset() was calledifself.stack:self.stack.remove(default)_default_session_stack=_DefaultSessionStack()# pylint: disable=protected-access
[docs]defg(incoming_data=None,oid_type="int64",vid_type="uint64",directed=True,generate_eid=True,retain_oid=True,vertex_map="global",compact_edges=False,use_perfect_hash=False,):"""Construct a GraphScope graph object on the default session. It will launch and set a session to default when there is no default session found. See params detail in :class:`graphscope.framework.graph.GraphDAGNode` Returns: :class:`graphscope.framework.graph.GraphDAGNode`: Evaluated in eager mode. Examples: .. code:: python >>> import graphscope >>> g = graphscope.g() >>> import graphscope >>> sess = graphscope.session() >>> sess.as_default() >>> g = graphscope.g() # creating graph on the session "sess" """returnget_default_session().g(incoming_data,oid_type,vid_type,directed,generate_eid,retain_oid,vertex_map,compact_edges,use_perfect_hash,)
@deprecated("Please use `graphscope.interactive` instead.")defgremlin(graph,params=None):"""This method is going to be deprecated in the future. Use :meth:`graphscope.interactive` instead. """returninteractive(graph,params)definteractive(graph,params=None,with_cypher=False):"""Create an interactive engine and get the handler to execute gremlin and cypher queries. See params detail in :meth:`graphscope.Session.interactive` Returns: :class:`graphscope.interactive.query.InteractiveQueryDAGNode`: InteractiveQuery to execute gremlin and cypher queries, evaluated in eager mode. Examples: .. code:: python >>> import graphscope >>> g = graphscope.g() >>> interactive_query = graphscope.interactive() """assertgraphisnotNone,"graph cannot be None"sess=graph._session# pylint: disable=protected-accessassertsessisnotNone,"The graph object is invalid"returnsess.interactive(graph,params,with_cypher)
[docs]defgraphlearn(graph,nodes=None,edges=None,gen_labels=None):"""Create a graph learning engine. See params detail in :meth:`graphscope.Session.graphlearn` Returns: :class:`graphscope.learning.GraphDAGNode`: An instance of learning graph that could be feed to the learning engine, evaluated in eager node. Example: .. code:: python >>> import graphscope >>> g = graphscope.g() >>> lg = graphscope.learning(g) """assertgraphisnotNone,"graph cannot be None"assert(graph._sessionisnotNone),"The graph object is invalid"# pylint: disable=protected-accessreturngraph._session.graphlearn(graph,nodes,edges,gen_labels)# pylint: disable=protected-access
defgraphlearn_torch(graph,edges,edge_weights=None,node_features=None,edge_features=None,node_labels=None,edge_dir="out",random_node_split=None,num_clients=1,manifest_path=None,client_folder_path="./",):assertgraphisnotNone,"graph cannot be None"assert(graph._sessionisnotNone),"The graph object is invalid"# pylint: disable=protected-accessreturngraph._session.graphlearn_torch(graph,edges,edge_weights,node_features,edge_features,node_labels,edge_dir,random_node_split,num_clients,manifest_path,client_folder_path,)# pylint: disable=protected-access