Docs
Concepts
ThreadsafeFunction

ThreadsafeFunction

Threadsafe Function (opens in a new tab) is a complex concept in Node.js. As we all know, Node.js is single-threaded, so you can't access napi_env, napi_value, and napi_ref on another thread.

💡

napi_env, napi_value, and napi_ref are low level concepts in Node-API, which the #[napi] macro of NAPI-RS is built on top of. NAPI-RS also provides a low level API to access the original Node-API.

Node-API provides complex Threadsafe Function APIs to call JavaScript functions on other threads. It's very complex, so many developers don't understand how to use it correctly. NAPI-RS provides a limited version of Threadsafe Function APIs to make it easier to use:

lib.rs
use std::{sync::Arc, thread};
 
use napi::{
    bindgen_prelude::*,
    threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
 
#[napi]
pub fn call_threadsafe_function(callback: ThreadsafeFunction<u32>) -> Result<()> {
  let tsfn = Arc::new(callback);
  for n in 0..100 {
    let tsfn = tsfn.clone();
    thread::spawn(move || {
      tsfn.call(Ok(n), ThreadsafeFunctionCallMode::Blocking);
    });
  }
  Ok(())
}

⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️

index.d.ts
export function callThreadsafeFunction(
  callback: (err: null | Error, result: number) => void,
): void

Return type

The return type of the ThreadsafeFunction is the same as the return type of the JavaScript callback, you can define the return type in the second generic parameter of ThreadsafeFunction:

lib.rs
use std::thread;
 
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi;
 
#[napi]
pub fn call_threadsafe_function(callback: ThreadsafeFunction<u32, u32>) {
  thread::spawn(move || {
    callback.call_with_return_value(Ok(1), ThreadsafeFunctionCallMode::Blocking, |ret, _| {
      println!("ret: {:?}", ret); // 101
      Ok(())
    });
  });
}
index.ts
import { callThreadsafeFunction } from './index.js'
 
callThreadsafeFunction((err, result) => {
  return result + 100
})

CallJsBackArgs

Sometimes the args passed to the ThreadsafeFunction are not the same as the args passed to the JavaScript callback. You can build the ThreadsafeFunction from Function with the CallJsBackArgs to achieve this:

lib.rs
use std::thread;
 
use napi::{
  bindgen_prelude::*,
  threadsafe_function::{ThreadsafeCallContext, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
 
struct Data {
  name: String,
}
 
#[napi]
pub fn call_threadsafe_function(callback: Function<String, ()>) -> Result<()> {
  let tsfn = callback
    .build_threadsafe_function()
    .build_callback(|ctx: ThreadsafeCallContext<Data>| Ok(format!("Hello {}", ctx.value.name)))?;
  thread::spawn(move || {
    tsfn.call(
      Data {
        name: "John".to_string(),
      },
      ThreadsafeFunctionCallMode::NonBlocking,
    );
  });
  Ok(())
}

⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️

index.ts
import { callThreadsafeFunction } from './index.js'
 
callThreadsafeFunction((data) => {
  console.log(data) // Hello John
})

Error Status

The error status of the ThreadsafeFunction is the same as the error status of the JavaScript callback. You can define the error status in the fourth generic parameter of ThreadsafeFunction:

lib.rs
use std::{sync::Arc, thread};
 
use napi::{
  bindgen_prelude::*,
  threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
 
pub struct CustomErrorStatus(String);
 
impl AsRef<str> for CustomErrorStatus {
  fn as_ref(&self) -> &str {
    &self.0
  }
}
 
impl From<Status> for CustomErrorStatus {
  fn from(value: Status) -> Self {
    CustomErrorStatus(value.to_string())
  }
}
 
#[napi]
pub fn call_threadsafe_function(
  tsfn: Arc<ThreadsafeFunction<u32, u32, u32, CustomErrorStatus>>,
) -> Result<()> {
  for n in 0..100 {
    let tsfn = tsfn.clone();
    thread::spawn(move || {
      tsfn.call(
        Err(Error::new(
          CustomErrorStatus("Custom".to_owned()),
          format!("Custom error: {}", n),
        )),
        ThreadsafeFunctionCallMode::Blocking,
      );
    });
  }
  Ok(())
}

ErrorStrategy

There are two different error-handling strategies for Threadsafe Function. The strategy can be defined in the fifth generic parameter of ThreadsafeFunction:

lib.rs
let tsfn: ThreadsafeFunction<u32, u32, u32, false> = ...

CalleeHandled: true (default behavior)

Err from Rust code will be passed into the first argument of the JavaScript callback. This behavior follows the async callback conventions from Node.js: https://nodejs.org/en/learn/asynchronous-work/javascript-asynchronous-programming-and-callbacks#handling-errors-in-callbacks (opens in a new tab). Many async APIs in Node.js are designed this way, like fs.read.

With CalleeHandled: true, you must call the ThreadsafeFunction with the Result type, so that the Error will be handled and passed back to the JavaScript callback:

lib.rs
use std::{sync::Arc, thread};
 
use napi::{
  bindgen_prelude::*,
  threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
 
#[napi]
pub fn call_threadsafe_function(
  tsfn: Arc<ThreadsafeFunction<u32, u32, u32, Status, true>>,
) -> Result<()> {
  for n in 0..100 {
    let tsfn = tsfn.clone();
    thread::spawn(move || {
      tsfn.call(
        Err(Error::new(
          Status::GenericFailure,
          format!("Error with: {n}"),
        )),
        ThreadsafeFunctionCallMode::Blocking,
      );
    });
  }
  Ok(())
}

⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️

index.ts
import { callThreadsafeFunction } from './index.js'
 
callThreadsafeFunction((err, result) => {
  if (err) {
    console.error(err) // [Error: Error with: 0] { code: 'GenericFailure' }
  }
  console.log(result)
})

CalleeHandled: false

No Error will be passed back to the JavaScript side. You can use this strategy to avoid the Ok wrapping on the Rust side if your code will never return Err.

With this strategy, ThreadsafeFunction doesn't need to be called with Result<T>, and the first argument of JavaScript callback is the value from the Rust, not Error | null.

⚠️

With the CalleeHandled: false strategy, the ThreadsafeFunction will not be able to handle the error in the Rust threads, so you can't pass the Error back to the JavaScript side.

It's only recommended if you are sure the threads where the ThreadsafeFunction is called will not return Err or panic.

lib.rs
use std::{sync::Arc, thread};
 
use napi::{
  bindgen_prelude::*,
  threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
 
#[napi]
pub fn call_threadsafe_function(
  tsfn: Arc<ThreadsafeFunction<u32, (), u32, Status, false>>,
) -> Result<()> {
  for n in 0..100 {
    let tsfn = tsfn.clone();
    thread::spawn(move || {
      tsfn.call(n, ThreadsafeFunctionCallMode::Blocking);
    });
  }
  Ok(())
}

⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️

index.d.ts
export declare function callThreadsafeFunction(
  tsfn: (arg: number) => void,
): void

Weak ThreadsafeFunction

By default, the ThreadsafeFunction will cause the event loop on the thread on which it is created to remain alive until the ThreadsafeFunction is destroyed. See Deciding whether to keep the process running.

If you don't want to keep the Node.js process/event loop alive, you can define the Weak parameter of ThreadsafeFunction to true:

lib.rs
use std::{sync::Arc, thread};
 
use napi::{
  bindgen_prelude::*,
  threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
 
#[napi]
pub fn call_threadsafe_function(
  tsfn: Arc<ThreadsafeFunction<u32, u32, u32, Status, false, true>>,
) -> Result<()> {
  for n in 0..100 {
    let tsfn = tsfn.clone();
    thread::spawn(move || {
      tsfn.call(n, ThreadsafeFunctionCallMode::Blocking);
    });
  }
  Ok(())
}

If you call this function like this:

index.ts
import { callThreadsafeFunction } from './index.js'
 
// log nothing because the event loop exit immediately
callThreadsafeFunction((err, n) => {
  if (err) {
    console.error(err)
  } else {
    console.log(n)
  }
})

There won't be any logs in the console, because the event loop and Node.js process exit immediately.

MaxQueueSize

You can set the MaxQueueSize parameter of ThreadsafeFunction to limit the number of messages in the queue.

If call the ThreadsafeFunction with the Blocking mode, the MaxQueueSize parameter will have no effect. Blocking mode would block the queue when the queue is full. NonBlocking mode would return immediately with the Status::QueueFull when the queue is full. See napi_call_threadsafe_function for more details.

lib.rs
use std::{sync::Arc, thread};
 
use napi::{
  bindgen_prelude::*,
  threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
 
#[napi]
pub fn call_threadsafe_function(
  tsfn: Arc<ThreadsafeFunction<u32, (), u32, Status, false, false, 1>>,
) -> Result<()> {
  thread::spawn(move || {
    for n in 0..100 {
      let tsfn = tsfn.clone();
      let status = tsfn.call(n, ThreadsafeFunctionCallMode::NonBlocking);
      println!("{}", status)
    }
  });
  Ok(())
}

When you call this function, and add heavy work in the callback, you will see the QueueFull status return from the tsfn.call:

index.ts
import { callThreadsafeFunction } from './index.js'
 
function fib(n) {
  if (n <= 1) return n
  return fib(n - 1) + fib(n - 2)
}
 
callThreadsafeFunction(() => {
  fib(40)
})

this would produce the following output:

Ok
Ok
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
...