Merge pull request #296 from pypeclub/feature/modify_cloud_mongo

Feature/modify cloud mongo
This commit is contained in:
Milan Kolar 2020-06-23 11:26:49 +02:00 committed by GitHub
commit dda6ece20c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 120 additions and 132 deletions

View file

@ -6,6 +6,12 @@ from pypeapp import (
execute
)
from pypeapp.lib.mongo import (
decompose_url,
compose_url,
get_default_components
)
from .plugin import (
Extractor,
@ -32,10 +38,7 @@ from .lib import (
get_version_from_path,
get_last_version_from_path,
modified_environ,
add_tool_to_environment,
decompose_url,
compose_url,
get_default_components
add_tool_to_environment
)
# Special naming case for subprocess since its a built-in method.

View file

@ -17,10 +17,6 @@ import six
import avalon.api
from .api import config
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
log = logging.getLogger(__name__)
@ -1391,73 +1387,3 @@ def ffprobe_streams(path_to_file):
popen_output = popen.communicate()[0]
log.debug("FFprobe output: {}".format(popen_output))
return json.loads(popen_output)["streams"]
def decompose_url(url):
components = {
"scheme": None,
"host": None,
"port": None,
"username": None,
"password": None,
"query": None
}
result = urlparse(url)
components["scheme"] = result.scheme
components["host"] = result.hostname
try:
components["port"] = result.port
except ValueError:
raise RuntimeError("invalid port specified")
components["username"] = result.username
components["password"] = result.password
components["query"] = result.query
return components
def compose_url(scheme=None,
host=None,
username=None,
password=None,
database=None,
collection=None,
port=None,
query=None):
url = "{scheme}://"
if username and password:
url += "{username}:{password}@"
url += "{host}"
if database:
url += "/{database}"
if database and collection:
url += "/{collection}"
if port:
url += ":{port}"
if query:
url += "?{}".format(query)
return url.format(**{
"scheme": scheme,
"host": host,
"username": username,
"password": password,
"database": database,
"collection": collection,
"port": port,
"query": query
})
def get_default_components():
return decompose_url(os.environ["MONGO_URL"])

View file

@ -16,7 +16,7 @@ import contextlib
from avalon import schema
from avalon.vendor import requests
from pype.api import get_default_components, compose_url
from avalon.io import extract_port_from_url
# Third-party dependencies
import pymongo
@ -73,15 +73,17 @@ class DbConnector(object):
self.Session.update(self._from_environment())
timeout = int(self.Session["AVALON_TIMEOUT"])
mongo_url = self.Session["AVALON_MONGO"]
kwargs = {
"host": mongo_url,
"serverSelectionTimeoutMS": timeout
}
components = get_default_components()
port = components.pop("port")
host = compose_url(**components)
self._mongo_client = pymongo.MongoClient(
host=host,
port=port,
serverSelectionTimeoutMS=timeout
)
port = extract_port_from_url(mongo_url)
if port is not None:
kwargs["port"] = int(port)
self._mongo_client = pymongo.MongoClient(**kwargs)
for retry in range(3):
try:
@ -389,6 +391,10 @@ class DbConnector(object):
if document is None:
break
if document.get("type") == "master_version":
_document = self.find_one({"_id": document["version_id"]})
document["data"] = _document["data"]
parents.append(document)
return parents

View file

@ -4,14 +4,14 @@ import json
import bson
import bson.json_util
from pype.modules.rest_api import RestApi, abort, CallbackResult
from pype.modules.ftrack.lib.custom_db_connector import DbConnector
from pype.modules.ftrack.lib.io_nonsingleton import DbConnector
class AvalonRestApi(RestApi):
dbcon = DbConnector(os.environ["AVALON_DB"])
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dbcon = DbConnector()
self.dbcon.install()
@RestApi.route("/projects/<project_name>", url_prefix="/avalon", methods="GET")

View file

@ -15,8 +15,10 @@ import uuid
import ftrack_api
import pymongo
from pype.modules.ftrack.lib import credentials
from pype.modules.ftrack.ftrack_server.lib import check_ftrack_url
from pype.api import get_default_components, compose_url
from pype.modules.ftrack.ftrack_server.lib import (
check_ftrack_url, get_ftrack_event_mongo_info
)
import socket_thread
@ -187,9 +189,10 @@ def main_loop(ftrack_url):
os.environ["FTRACK_EVENT_SUB_ID"] = str(uuid.uuid1())
# Get mongo hostname and port for testing mongo connection
components = get_default_components()
mongo_port = components.pop("port")
mongo_hostname = compose_url(**components)
mongo_uri, mongo_port, database_name, collection_name = (
get_ftrack_event_mongo_info()
)
# Current file
file_path = os.path.dirname(os.path.realpath(__file__))
@ -267,13 +270,12 @@ def main_loop(ftrack_url):
ftrack_accessible = check_ftrack_url(ftrack_url)
if not mongo_accessible:
mongo_accessible = check_mongo_url(mongo_hostname, mongo_port)
mongo_accessible = check_mongo_url(mongo_uri, mongo_port)
# Run threads only if Ftrack is accessible
if not ftrack_accessible or not mongo_accessible:
if not mongo_accessible and not printed_mongo_error:
mongo_url = "{}:{}".format(mongo_hostname, mongo_port)
print("Can't access Mongo {}".format(mongo_url))
print("Can't access Mongo {}".format(mongo_uri))
if not ftrack_accessible and not printed_ftrack_error:
print("Can't access Ftrack {}".format(ftrack_url))

View file

@ -19,7 +19,12 @@ import ftrack_api._centralized_storage_scenario
import ftrack_api.event
from ftrack_api.logging import LazyLogMessage as L
from pype.api import Logger, compose_url, get_default_components
from pype.api import (
Logger,
get_default_components,
decompose_url,
compose_url
)
from pype.modules.ftrack.lib.custom_db_connector import DbConnector
@ -29,11 +34,28 @@ TOPIC_STATUS_SERVER_RESULT = "pype.event.server.status.result"
def get_ftrack_event_mongo_info():
url = compose_url(get_default_components())
database = os.environ.get("FTRACK_EVENTS_MONGO_DB") or "pype"
collection = os.environ.get("FTRACK_EVENTS_MONGO_COL") or "ftrack_events"
database_name = (
os.environ.get("FTRACK_EVENTS_MONGO_DB") or "pype"
)
collection_name = (
os.environ.get("FTRACK_EVENTS_MONGO_COL") or "ftrack_events"
)
return url, database, collection
mongo_url = os.environ.get("FTRACK_EVENTS_MONGO_URL")
if mongo_url is not None:
components = decompose_url(mongo_url)
_used_ftrack_url = True
else:
components = get_default_components()
_used_ftrack_url = False
if not _used_ftrack_url or components["database"] is None:
components["database"] = database_name
components["collection"] = collection_name
uri = compose_url(components)
return uri, components["port"], database_name, collection_name
def check_ftrack_url(url, log_errors=True):
@ -137,15 +159,17 @@ class StorerEventHub(SocketBaseEventHub):
class ProcessEventHub(SocketBaseEventHub):
hearbeat_msg = b"processor"
url, database, table_name = get_ftrack_event_mongo_info()
uri, port, database, table_name = get_ftrack_event_mongo_info()
is_table_created = False
pypelog = Logger().get_logger("Session Processor")
def __init__(self, *args, **kwargs):
self.dbcon = DbConnector(
database_name=self.database,
table_name=self.table_name
self.uri,
self.port,
self.database,
self.table_name
)
super(ProcessEventHub, self).__init__(*args, **kwargs)

View file

@ -23,11 +23,8 @@ class SessionFactory:
session = None
url, database, table_name = get_ftrack_event_mongo_info()
dbcon = DbConnector(
database_name=database,
table_name=table_name
)
uri, port, database, table_name = get_ftrack_event_mongo_info()
dbcon = DbConnector(uri, port, database, table_name)
# ignore_topics = ["ftrack.meta.connected"]
ignore_topics = []

View file

@ -12,8 +12,7 @@ import atexit
# Third-party dependencies
import pymongo
from pype.api import get_default_components, compose_url
from pype.api import decompose_url
class NotActiveTable(Exception):
@ -65,12 +64,29 @@ class DbConnector:
log = logging.getLogger(__name__)
timeout = 1000
def __init__(self, database_name, table_name=None):
def __init__(
self, uri, port=None, database_name=None, table_name=None
):
self._mongo_client = None
self._sentry_client = None
self._sentry_logging_handler = None
self._database = None
self._is_installed = False
self._uri = uri
components = decompose_url(uri)
if port is None:
port = components.get("port")
if database_name is None:
database_name = components.get("database")
if database_name is None:
raise ValueError(
"Database is not defined for connection. {}".format(uri)
)
self._port = port
self._database_name = database_name
self.active_table = table_name
@ -96,14 +112,16 @@ class DbConnector:
atexit.register(self.uninstall)
logging.basicConfig()
components = get_default_components()
port = components.pop("port")
host = compose_url(**components)
self._mongo_client = pymongo.MongoClient(
host=host,
port=port,
serverSelectionTimeoutMS=self.timeout
)
kwargs = {
"host": self._uri,
"serverSelectionTimeoutMS": self.timeout
}
if self._port is not None:
kwargs["port"] = self._port
self._mongo_client = pymongo.MongoClient(**kwargs)
if self._port is None:
self._port = self._mongo_client.PORT
for retry in range(3):
try:
@ -118,11 +136,11 @@ class DbConnector:
else:
raise IOError(
"ERROR: Couldn't connect to %s in "
"less than %.3f ms" % (host, self.timeout)
"less than %.3f ms" % (self._uri, self.timeout)
)
self.log.info("Connected to %s, delay %.3f s" % (
host, time.time() - t1
self._uri, time.time() - t1
))
self._database = self._mongo_client[self._database_name]

View file

@ -16,6 +16,7 @@ import contextlib
from avalon import schema
from avalon.vendor import requests
from avalon.io import extract_port_from_url
# Third-party dependencies
import pymongo
@ -72,11 +73,17 @@ class DbConnector(object):
self.Session.update(self._from_environment())
timeout = int(self.Session["AVALON_TIMEOUT"])
self._mongo_client = pymongo.MongoClient(
host=os.environ["AVALON_MONGO_HOST"],
port=int(os.environ["AVALON_MONGO_PORT"]),
serverSelectionTimeoutMS=timeout
)
mongo_url = self.Session["AVALON_MONGO"]
kwargs = {
"host": mongo_url,
"serverSelectionTimeoutMS": timeout
}
port = extract_port_from_url(mongo_url)
if port is not None:
kwargs["port"] = int(port)
self._mongo_client = pymongo.MongoClient(**kwargs)
for retry in range(3):
try:
@ -384,6 +391,10 @@ class DbConnector(object):
if document is None:
break
if document.get("type") == "master_version":
_document = self.find_one({"_id": document["version_id"]})
document["data"] = _document["data"]
parents.append(document)
return parents

View file

@ -1,7 +1,7 @@
import collections
from Qt import QtCore
from pype.api import Logger
from pypeapp.lib.log import _bootstrap_mongo_log, COLLECTION
from pypeapp.lib.log import _bootstrap_mongo_log, LOG_COLLECTION_NAME
log = Logger().get_logger("LogModel", "LoggingModule")
@ -40,10 +40,11 @@ class LogModel(QtCore.QAbstractItemModel):
super(LogModel, self).__init__(parent)
self._root_node = Node()
database = _bootstrap_mongo_log()
self.dbcon = None
if COLLECTION in database.list_collection_names():
self.dbcon = database[COLLECTION]
# Crash if connection is not possible to skip this module
database = _bootstrap_mongo_log()
if LOG_COLLECTION_NAME in database.list_collection_names():
self.dbcon = database[LOG_COLLECTION_NAME]
def add_log(self, log):
node = Node(log)