Manage Apache Pulsar Functions for stream processing. Pulsar Functions are lightweight compute processes that can consume messages from one or more Pulsar topics, apply user-defined processing logic, and produce results to another topic. Functions support Java, Python, and Go runtimes, enabling complex event processing, data transformations, filtering, and integration with external systems.
This tool provides a comprehensive set of operations to manage the entire function lifecycle:
-
list: List all functions in a namespace
tenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)
-
get: Get function configuration
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)
-
status: Get runtime status of a function (instances, metrics)
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)instanceId(string, optional): Instance ID for per-instance status
-
stats: Get detailed statistics of a function (throughput, processing latency)
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)instanceId(string, optional): Instance ID for per-instance stats
-
create: Deploy a new function with specified parameters
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, optional): The function name (can be inferred fromclassname)classname(string, optional): The fully qualified class name implementing the functionfunctionType(string, optional): Built-in function type (translated tobuiltin://)inputs(array, optional): The input topics for the functiontopicsPattern(string, optional): Topic pattern to consume frominputSpecs(object, optional): Map of input topics to consumer configoutput(string, optional): The output topic for function resultsjar(string, optional): Path to the JAR file for Java functionspy(string, optional): Path to the Python file for Python functionsgo(string, optional): Path to the Go binary for Go functionsparallelism(number, optional): The parallelism factor of the function (default: 1)cpu(number, optional): CPU cores per instanceram(number, optional): RAM bytes per instancedisk(number, optional): Disk bytes per instanceuserConfig(object, optional): User-defined config key/valuesproducerConfig(object, optional): Custom producer configurationlogTopic(string, optional): Topic for function logsschemaType(string, optional): Output schema type or classoutputSerdeClassName(string, optional): Output SerDe classcustomSerdeInputs(object, optional): Map of input topics to SerDe classcustomSchemaInputs(object, optional): Map of input topics to Schema classcustomSchemaOutputs(object, optional): Map of output topics to schema propertiesinputTypeClassName(string, optional): Input type class nameoutputTypeClassName(string, optional): Output type class nameprocessingGuarantees(string, optional): Delivery semanticsretainOrdering(boolean, optional): Process messages in orderretainKeyOrdering(boolean, optional): Process messages in key orderbatchBuilder(string, optional): Batch builder typeforwardSourceMessageProperty(boolean, optional): Forward properties to outputautoAck(boolean, optional): Automatically acknowledge messagessubsName(string, optional): Subscription name for inputssubsPosition(string, optional): Subscription positionskipToLatest(boolean, optional): Skip to latest on restarttimeoutMs(number, optional): Message timeout in msmaxMessageRetries(number, optional): Max retriesdeadLetterTopic(string, optional): Dead letter topiccustomRuntimeOptions(string, optional): Custom runtime optionssecrets(object, optional): Secrets mapcleanupSubscription(boolean, optional): Clean up subscription on deletewindowLengthCount(number, optional): Window length countwindowLengthDurationMs(number, optional): Window length duration in msslidingIntervalCount(number, optional): Sliding interval countslidingIntervalDurationMs(number, optional): Sliding interval duration in msfunctionConfigFile(string, optional): YAML config file path
-
update: Update an existing function
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)- Parameters similar to
createoperation updateAuthData(boolean, optional): Whether to update auth data
-
delete: Delete a function
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)
-
download: Download function package data from Pulsar to local storage
destinationFile(string, required): Local file path where the downloaded content should be writtenpath(string, optional): Direct Pulsar package storage path to download fromfqfn(string, optional): Fully qualified function nametenant/namespace/namewhen downloading by function identitytenant(string, optional): The tenant name (default:public) when downloading by function identitynamespace(string, optional): The namespace name (default:default) when downloading by function identityname(string, required unlesspathorfqfnis provided): The function name when downloading by function identity
-
start: Start a stopped function
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)
-
stop: Stop a running function
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)
-
restart: Restart a function
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)
-
querystate: Query state stored by a stateful function for a specific key
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)key(string, required): The state key to query
-
putstate: Store state in a function's state store
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)key(string, required): The state keyvalue(string, required): The state value
-
trigger: Manually trigger a function with a specific value
fqfn(string, optional): Fully qualified function nametenant/namespace/nametenant(string, optional): The tenant name (default:public)namespace(string, optional): The namespace name (default:default)name(string, required): The function name (unlessfqfnis provided)topic(string, optional): The specific topic to trigger ontriggerValue(string, optional): The value to trigger the function withtriggerFile(string, optional): File path containing the trigger value
-
upload: Upload a local file into Pulsar function package storage
sourceFile(string, required): Local file path whose content should be uploadedpath(string, required): Pulsar package storage path where the file should be stored