Skip to content

Stream conversations, messages, and preference updates

Stream new group chat and DM conversations

Listens to the network for new group chats and DMs. Whenever a new conversation starts, it triggers the provided callback function with a ConversationContainer object. This allows the client to immediately respond to any new group chats or DMs initiated by other users.

Browser
const stream = await client.conversations.stream();
 
try {
  for await (const conversation of stream) {
    // Received a conversation
    console.log("New conversation:", conversation);
  }
} catch (error) {
  // Log any stream errors
  console.error(error);
}

Stream new group chat and DM messages and preference updates

This function listens to the network for new messages within all active group chats and DMs, as well as preference updates.

Whenever a new message is sent to any of these conversations, the callback is triggered with a DecodedMessage object. This keeps the inbox up to date by streaming in messages as they arrive.

By default, streamAll streams only conversations with a consent state of allowed or unknown.

We recommend streaming messages for allowed conversations only. This ensures that spammy conversations with a consent state of unknown don't take up networking resources. This also ensures that unwanted spam messages aren't stored in the user's local database.

To stream all conversations regardless of consent state, pass [Allowed, Unknown, Denied].

Browser
const stream = await client.conversations.streamAllMessages(["allowed"]);
 
try {
  for await (const message of stream) {
    // Received a message
    console.log("New message:", message);
  }
} catch (error) {
  // Log any stream errors
  console.error(error);
}

Handle stream failures

All streaming methods accept a callback as the last argument that will be called when the stream closes. Use this callback to restart the stream.

Node
const MAX_RETRIES = 5;
// wait 5 seconds before each retry
const RETRY_INTERVAL = 5000;
 
let retries = MAX_RETRIES;
 
const retry = () => {
  console.log(
    `Retrying in ${RETRY_INTERVAL / 1000}s, ${retries} retries left`,
  );
  if (retries > 0) {
    retries--;
    setTimeout(() => {
      handleStream(client);
    }, RETRY_INTERVAL);
  } else {
    console.log("Max retries reached, ending process");
    process.exit(1);
  }
};
 
const onFail = () => {
  console.log("Stream failed");
  retry();
};
 
const handleStream = async (client) => {
  console.log("Syncing conversations...");
  await client.conversations.sync();
 
  const stream = await client.conversations.streamAllMessages(
    onMessage,
    undefined,
    undefined,
    onFail,
  );
 
  console.log("Waiting for messages...");
 
  for await (const message of stream) {
    // process streammessage
  }
};
 
await handleStream(client);