overridedefrun(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources) if (isTemp) { // We first load resources and then put the builder in the function registry. catalog.loadFunctionResources(resources) catalog.registerFunction(func, overrideIfExists = replace) } else { // Handles `CREATE OR REPLACE FUNCTION AS ... USING ...` if (replace && catalog.functionExists(func.identifier)) { // alter the function in the metastore catalog.alterFunction(func) } else { // For a permanent, we will store the metadata into underlying external catalog. // This function will be loaded into the FunctionRegistry when a query uses it. // We do not load it into FunctionRegistry right now. catalog.createFunction(func, ignoreIfExists) } } Seq.empty[Row] } }
deflookupFunction( name: FunctionIdentifier, children: Seq[Expression]): Expression = synchronized { // Note: the implementation of this function is a little bit convoluted. // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions // (built-in, temp, and external). if (name.database.isEmpty && functionRegistry.functionExists(name)) { // This function has been already loaded into the function registry. return functionRegistry.lookupFunction(name, children) }
// If the name itself is not qualified, add the current database to it. val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val qualifiedName = name.copy(database = Some(database))
if (functionRegistry.functionExists(qualifiedName)) { // This function has been already loaded into the function registry. // Unlike the above block, we find this function by using the qualified name. return functionRegistry.lookupFunction(qualifiedName, children) }
// The function has not been loaded to the function registry, which means // that the function is a permanent function (if it actually has been registered // in the metastore). We need to first put the function in the FunctionRegistry. // TODO: why not just check whether the function exists first? val catalogFunction = try { externalCatalog.getFunction(database, name.funcName) } catch { case _: AnalysisException => failFunctionLookup(name) case _: NoSuchPermanentFunctionException => failFunctionLookup(name) } loadFunctionResources(catalogFunction.resources) // Please note that qualifiedName is provided by the user. However, // catalogFunction.identifier.unquotedString is returned by the underlying // catalog. So, it is possible that qualifiedName is not exactly the same as // catalogFunction.identifier.unquotedString (difference is on case-sensitivity). // At here, we preserve the input from the user. registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false) // Now, we need to create the Expression. functionRegistry.lookupFunction(qualifiedName, children) }
// MyAddUDF实现了简单的加法 create function testadd as 'com.test.udf.MyAddUDF' using jar 'hdfs://localhost:9000/udf/MyTestUDF-1.0.jar', jar 'hdfs://localhost:9000/udf/UDFDep-1.0.jar'; // MyTestUDF的依赖
// should be called after session registry is checked private FunctionInfo registerToSessionRegistry(String qualifiedName, FunctionInfo function) { FunctionInforet=null; // 或许当前Session的ClassLoader ClassLoaderprev= Utilities.getSessionSpecifiedClassLoader(); try { // Found UDF in metastore - now add it to the function registry // At this point we should add any relevant jars that would be needed for the UDf. FunctionResource[] resources = function.getResources(); try { FunctionTask.addFunctionResources(resources); } catch (Exception e) { LOG.error("Unable to load resources for " + qualifiedName + ":" + e, e); returnnull; } ClassLoaderloader= Utilities.getSessionSpecifiedClassLoader(); Class<?> udfClass = Class.forName(function.getClassName(), true, loader);
// Make sure the FunctionInfo is listed as PERSISTENT (rather than TEMPORARY) // when it is registered to the system registry. ret = SessionState.getRegistryForWrite().registerFunction( qualifiedName, FunctionType.PERSISTENT, udfClass, resources); if (ret == null) { LOG.error(function.getClassName() + " is not a valid UDF class and was not registered."); } if (SessionState.get().isHiveServerQuery()) { SessionState.getRegistryForWrite().addToUDFLoaders(loader); } } catch (ClassNotFoundException e) { // Lookup of UDf class failed LOG.error("Unable to load UDF class: " + e); Utilities.restoreSessionSpecifiedClassLoader(prev); } function.shareStateWith(ret); return ret; }