Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions nexus_standalone_operations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
This sample demonstrates how to execute Nexus operations directly from client code,
without wrapping them in a workflow. It shows both synchronous and asynchronous
(workflow-backed) operations, plus listing and counting operations.

## Note: Standalone Nexus operations require a server version that supports this feature.

### Sample directory structure

- [service.py](./service.py) - Nexus service definition with echo (sync) and hello (async) operations
- [handler.py](./handler.py) - Nexus operation handlers and the backing workflow for the async operation
- [worker.py](./worker.py) - Temporal worker that hosts the Nexus service
- [starter.py](./starter.py) - Client that executes standalone Nexus operations


### Instructions

Start a Temporal server. (See the main samples repo [README](../README.md)).

Create the Nexus endpoint:

```
temporal operator nexus endpoint create \
--name nexus-standalone-operations-endpoint \
--target-namespace default \
--target-task-queue nexus-standalone-operations
```

In one terminal, start the worker:
```
uv run nexus_standalone_operations/worker.py
```

In another terminal, run the starter:
```
uv run nexus_standalone_operations/starter.py
```

### Expected output

```
Echo result: hello
Hello result: Hello, World!

Listing Nexus operations:
OperationId: echo-..., Operation: echo, Status: COMPLETED
OperationId: hello-..., Operation: hello, Status: COMPLETED

Total Nexus operations: 2
```

If you run the starter code multiple times, you should see additional operations in the listing results, as more operations are run.
The same goes for the total number of operations.
Empty file.
42 changes: 42 additions & 0 deletions nexus_standalone_operations/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Nexus service handler and backing workflow for standalone operations sample."""

from __future__ import annotations

import uuid

import nexusrpc.handler
from temporalio import nexus, workflow

from nexus_standalone_operations.service import (
EchoInput,
EchoOutput,
HelloInput,
HelloOutput,
MyNexusService,
)


@workflow.defn
class HelloWorkflow:
@workflow.run
async def run(self, input: HelloInput) -> HelloOutput:
return HelloOutput(greeting=f"Hello, {input.name}!")


@nexusrpc.handler.service_handler(service=MyNexusService)
class MyNexusServiceHandler:
@nexusrpc.handler.sync_operation
async def echo(
self, _ctx: nexusrpc.handler.StartOperationContext, input: EchoInput
) -> EchoOutput:
return EchoOutput(message=input.message)

@nexus.workflow_run_operation
async def hello(
self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput
) -> nexus.WorkflowHandle[HelloOutput]:
return await ctx.start_workflow(
HelloWorkflow.run,
input,
id=str(uuid.uuid4()),
)
39 changes: 39 additions & 0 deletions nexus_standalone_operations/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Nexus service definition for standalone operations sample.

Defines a Nexus service with two operations:
- echo: a synchronous operation that echoes the input message
- hello: an asynchronous (workflow-backed) operation that returns a greeting

This service definition is used by both the handler (to validate operation
signatures) and the client (to create type-safe nexus clients).
"""

from dataclasses import dataclass

import nexusrpc


@dataclass
class EchoInput:
message: str


@dataclass
class EchoOutput:
message: str


@dataclass
class HelloInput:
name: str


@dataclass
class HelloOutput:
greeting: str


@nexusrpc.service
class MyNexusService:
echo: nexusrpc.Operation[EchoInput, EchoOutput]
hello: nexusrpc.Operation[HelloInput, HelloOutput]
74 changes: 74 additions & 0 deletions nexus_standalone_operations/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Starter that demonstrates standalone Nexus operation execution.

Unlike other Nexus samples that call operations from within a workflow, this
sample executes Nexus operations directly from client code using the standalone
Nexus operation APIs.
"""

import asyncio
import uuid
from datetime import timedelta

from temporalio.client import Client
from temporalio.envconfig import ClientConfig

from nexus_standalone_operations.service import (
EchoInput,
HelloInput,
MyNexusService,
)

ENDPOINT_NAME = "nexus-standalone-operations-endpoint"


async def main() -> None:
config = ClientConfig.load_client_connect_config()
_ = config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

# Create a typed NexusClient bound to the endpoint and service.
# The endpoint must be pre-created on the server (see README).
nexus_client = client.create_nexus_client(
service=MyNexusService, endpoint=ENDPOINT_NAME
)

# Start sync echo operation and await the result immediately.
echo_result = await nexus_client.execute_operation(
MyNexusService.echo,
EchoInput(message="hello"),
id=f"echo-{uuid.uuid4()}",
schedule_to_close_timeout=timedelta(seconds=10),
)
print(f"Echo result: {echo_result.message}")

# Start async (workflow-backed) hello operation and get a NexusOperationHandle.
handle = await nexus_client.start_operation(
MyNexusService.hello,
HelloInput(name="World"),
id=f"hello-{uuid.uuid4()}",
schedule_to_close_timeout=timedelta(seconds=10),
)

print(f"\nStarted `MyNexusService.Hello`. OperationID: {handle.operation_id}")

# Use the NexusOperationHandle to await the result of the operation.
hello_result = await handle.result()
print(f"`MyNexusService.Hello` result: {hello_result.greeting}")

# List nexus operations.
query = f'Endpoint = "{ENDPOINT_NAME}"'
print("\nListing Nexus operations:")
async for op in client.list_nexus_operations(query):
print(
f" OperationId: {op.operation_id},",
f" Operation: {op.operation},",
f" Status: {op.status.name}",
)

# Count nexus operations.
count = await client.count_nexus_operations(query)
print(f"\nTotal Nexus operations: {count.count}")


if __name__ == "__main__":
asyncio.run(main())
41 changes: 41 additions & 0 deletions nexus_standalone_operations/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Worker that hosts the Nexus service for standalone operations sample."""

import asyncio
import logging

from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler

interrupt_event = asyncio.Event()

TASK_QUEUE = "nexus-standalone-operations"


async def main() -> None:
logging.basicConfig(level=logging.INFO)

config = ClientConfig.load_client_connect_config()
_ = config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloWorkflow],
nexus_service_handlers=[MyNexusServiceHandler()],
):
logging.info("Worker started, ctrl+c to exit")
_ = await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@ ignore_errors = true
[[tool.mypy.overrides]]
module = "opentelemetry.*"
ignore_errors = true

[tool.uv.sources]
temporalio = { git = "https://github.com/temporalio/sdk-python", branch = "amazzeo/sano" }
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
import os
import uuid
from datetime import timedelta

import pytest
from temporalio.client import Client, NexusOperationFailureError
from temporalio.service import RPCError
from temporalio.worker import Worker

from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler
from nexus_standalone_operations.service import (
EchoInput,
EchoOutput,
HelloInput,
HelloOutput,
MyNexusService,
)
from nexus_standalone_operations.worker import TASK_QUEUE
from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint


async def test_nexus_standalone_operations(client: Client):
if not os.getenv("ENABLE_STANDALONE_NEXUS_TESTS"):
pytest.skip(
"Standalone Nexus operations not yet supported by default dev server. Set ENABLE_STANDALONE_NEXUS_TESTS=1 to enable."
)

endpoint_name = f"test-nexus-standalone-{uuid.uuid4()}"

create_response = await create_nexus_endpoint(
name=endpoint_name,
task_queue=TASK_QUEUE,
client=client,
)
try:
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloWorkflow],
nexus_service_handlers=[MyNexusServiceHandler()],
):
nexus_client = client.create_nexus_client(
service=MyNexusService, endpoint=endpoint_name
)

# Test sync echo operation (with retry for endpoint propagation)
echo_result = None
for _ in range(30):
try:
echo_result = await nexus_client.execute_operation(
MyNexusService.echo,
EchoInput(message="test-echo"),
id=str(uuid.uuid4()),
schedule_to_close_timeout=timedelta(seconds=10),
)
break
except (RPCError, NexusOperationFailureError):
await asyncio.sleep(0.5)
assert isinstance(echo_result, EchoOutput)
assert echo_result.message == "test-echo"
Comment on lines +47 to +61
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove this retry loop when no longer needed. See this comment from the draft Go SANO sample:

temporalio/samples-go#456 (comment)


# Test async hello operation
hello_result = await nexus_client.execute_operation(
MyNexusService.hello,
HelloInput(name="Test"),
id=str(uuid.uuid4()),
schedule_to_close_timeout=timedelta(seconds=10),
)
assert isinstance(hello_result, HelloOutput)
assert hello_result.greeting == "Hello, Test!"

# Test count operations
count = await client.count_nexus_operations(f'Endpoint = "{endpoint_name}"')
assert count.count >= 0
finally:
_ = await delete_nexus_endpoint(
id=create_response.endpoint.id,
version=create_response.endpoint.version,
client=client,
)
16 changes: 4 additions & 12 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading