Getting Twitch Live Chat Messages Into a Vector Database
Joseph Damiba
•We are in a golden age of vector search, where providers such as Qdrant and Pinecone have made it easier than ever to extract insights from the high-dimensional data that embedding models produce. However, in order to perform vector search, first you need to actually create your embeddings and get them into your vector database.
In this blog post, we will build a Node.js server that listens to live chat messages on a Twitch stream and then gets those messages into a Qdrant vector database. In another post, we will explore building a frontend that allows us to perform vector search on this data.
The first thing we need to do is to intialize our Node.js project:
npm init -y
Then, we can install the dependencies we will need for this project:
npm install dotenv redis tmi.js @qdrant/js-client-rest openai uuid
Next, we can create a .env file to store our environment variables:
touch .env
We are going to need a QDRANT_URL variable that points to the URL of your Qdrant cluster if you are using Qdrant Cloud or to the local address if you are using Qdrant locally. We also need a QDRANT_API_KEY variable, which can be found in your Qdrant account. We also need an OPENAI_API_KEY to use OpenAI's text embedding model and finally, a TWITCH_CHANNEL variable that points to the channel you want to listen to for messages.
Now, we can create a file called index.js, which will be our entry point for the Node.js script:
touch index.js
First, things first, we need to import the necessary dependencies:
//index.js
require("dotenv").config();
const tmi = require("tmi.js");
const Redis = require("redis");
const { QdrantClient } = require("@qdrant/js-client-rest");
const OpenAI = require("openai");
const { v4: uuidv4 } = require("uuid");
Next, we need to initialize our client libraries for Qdrant, OpenAI, Redis, and Twitch
//index.js
// Initialize OpenAI client
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
// Initialize Qdrant client
const qdrantClient = new QdrantClient({
url: process.env.QDRANT_URL,
apiKey: process.env.QDRANT_API_KEY,
});
// Initialize Redis client
const redisClient = Redis.createClient({
url: "redis://localhost:6379", // Default local Redis server URL
});
// Initialize Twitch client
const twitchClient = new tmi.Client({
channels: [process.env.TWITCH_CHANNEL],
});
With our client libraries initalized, we can create a connections to our Redis and Twitch clients and ensure that we have a collection set up to store embeddings in Qdrant.
//index.js
// Connect to Redis
async function main() {
await redisClient.connect();
// Check if collection exists
try {
const collections = await qdrantClient.getCollections();
const collectionExists = collections.collections.some(
(collection) => collection.name === "kai_cenat_twitch_messages"
);
if (!collectionExists) {
// Create Qdrant collection if it doesn't exist
await qdrantClient.createCollection("kai_cenat_twitch_messages", {
vectors: {
size: 1536, // OpenAI text-embedding-3-small dimension size
distance: "Cosine",
},
});
console.log("Created new collection with correct vector size");
} else {
console.log("Collection already exists, skipping creation");
}
} catch (error) {
console.error("Error checking/creating collection:", error.message);
}
}
main().catch(console.error);
// Connect to Twitch
twitchClient.connect();
Now, we are ready to recieve Twitch messages. When we get one from Twitch's servers, we want to store it in Redis. We can do this by using the on function on our Twitch client.
//index.js
// Listen for chat messages
twitchClient.on("message", async (channel, tags, message, self) => {
if (self) return; // Ignore messages from the bot itself
const chatMessage = {
channel,
username: tags.username,
message,
timestamp: new Date().toISOString(),
};
// Save message to Redis
await redisClient.lPush("chat_messages", JSON.stringify(chatMessage));
});
Next, we need to create a function that will look at Redis and parse individual messages. We then create an embedding of the message, and upsert it into our Qdrant cluster.
//index.js
// Function to process Redis messages and store in Qdrant
async function processMessagesAndStore() {
try {
// Check if the Redis client is connected
if (!redisClient.isOpen) {
console.error(
"Redis client is not connected. Attempting to reconnect..."
);
await redisClient.connect();
}
const messages = await redisClient.lRange("chat_messages", 0, -1);
if (messages.length === 0) {
console.log("No messages to process.");
return;
}
for (const messageJson of messages) {
const message = JSON.parse(messageJson);
try {
// Get embeddings for the message
const embedding = await getEmbeddings(message.message);
// Store in Qdrant with UUID
await qdrantClient.upsert("kai_cenat_twitch_messages", {
points: [
{
id: uuidv4(),
vector: embedding,
payload: {
channel: message.channel,
username: message.username,
message: message.message,
timestamp: message.timestamp,
},
},
],
});
console.log(`Stored embedding for message from ${message.username}`);
} catch (error) {
console.error("Error processing message:", error);
}
}
// Clear Redis after successful processing
await redisClient.del("chat_messages");
console.log(`Processed ${messages.length} messages`);
} catch (error) {
console.error("Error in processMessagesAndStore:", error);
}
}
Finally, we can use JavaScript's setInterval() function to run our function every five seconds.
//index.js
// Schedule periodic processing (every 5 seconds)
setInterval(processMessagesAndStore, 5000);
console.log(
"Twitch chat logger started with OpenAI embeddings and Qdrant storage."
);
// Ensure proper cleanup when the server is shutting down
process.on("SIGINT", async () => {
console.log("Shutting down...");
await redisClient.quit();
process.exit(0);
});
And that's it!. We now have a functioning Node.js script that can listen to Twitch and process messages into our Qdrant cluster.
Stay tuned for the next post in this series, where we will build a frontend that uses this data.