Stream conversations, messages, and preference updates
Stream new group chat and DM conversations
Listens to the network for new group chats and DMs. Whenever a new conversation starts, it triggers the provided callback function with a ConversationContainer
object. This allows the client to immediately respond to any new group chats or DMs initiated by other users.
const stream = await client.conversations.stream();
try {
for await (const conversation of stream) {
// Received a conversation
console.log("New conversation:", conversation);
}
} catch (error) {
// Log any stream errors
console.error(error);
}
Stream new group chat and DM messages and preference updates
This function listens to the network for new messages within all active group chats and DMs, as well as preference updates.
Whenever a new message is sent to any of these conversations, the callback is triggered with a DecodedMessage
object. This keeps the inbox up to date by streaming in messages as they arrive.
By default, streamAll
streams only conversations with a consent state of allowed or unknown.
We recommend streaming messages for allowed conversations only. This ensures that spammy conversations with a consent state of unknown don't take up networking resources. This also ensures that unwanted spam messages aren't stored in the user's local database.
To stream all conversations regardless of consent state, pass [Allowed, Unknown, Denied]
.
const stream = await client.conversations.streamAllMessages(["allowed"]);
try {
for await (const message of stream) {
// Received a message
console.log("New message:", message);
}
} catch (error) {
// Log any stream errors
console.error(error);
}
Handle stream failures
All streaming methods accept a callback as the last argument that will be called when the stream closes. Use this callback to restart the stream.
const MAX_RETRIES = 5;
// wait 5 seconds before each retry
const RETRY_INTERVAL = 5000;
let retries = MAX_RETRIES;
const retry = () => {
console.log(
`Retrying in ${RETRY_INTERVAL / 1000}s, ${retries} retries left`,
);
if (retries > 0) {
retries--;
setTimeout(() => {
handleStream(client);
}, RETRY_INTERVAL);
} else {
console.log("Max retries reached, ending process");
process.exit(1);
}
};
const onFail = () => {
console.log("Stream failed");
retry();
};
const handleStream = async (client) => {
console.log("Syncing conversations...");
await client.conversations.sync();
const stream = await client.conversations.streamAllMessages(
onMessage,
undefined,
undefined,
onFail,
);
console.log("Waiting for messages...");
for await (const message of stream) {
// process streammessage
}
};
await handleStream(client);