如何使用 Agenda 和 MongoDB 在 Node.js 中設置作業隊列
如何使用 Agenda 和 MongoDB 設置作業隊列,以及如何通過 Express 管理的 API 端點將作業發送到該隊列。
開始使用
對於本教程,我們將使用 CheatCode Node.js Boilerplate 作為我們工作的起點。首先,讓我們克隆一個副本:
終端
git clone https://github.com/cheatcode/nodejs-server-boilerplate
接下來,安裝樣板的依賴:
終端
cd nodejs-server-boilerplate && npm install
之後,安裝我們將用於創建作業服務器 agenda
的依賴項 :
終端
npm i agenda
安裝好所有依賴後,啟動開發服務器:
終端
npm run dev
有了這個,我們就可以開始了。
添加用於定義作業的 API 端點
首先,我們需要通過樣板文件中現有的 Express.js 服務器添加 API 端點。這將允許我們將作業遠程發送到我們的服務器。
/api/jobs/index.js
export default (app) => {
// We'll define our API endpoint via Express here.
};
為了組織起見,首先,我們將為所有與工作相關的路線創建一個單獨的文件(從技術上講,我們只有一個,但這將使我們的代碼保持靈活,以防您以後想擴展它) .
按照樣板文件中的現有模式,在這裡,我們定義一個函數並將其作為我們文件的默認導出。這個函數需要一個現有的 Express app
實例作為其唯一參數傳入。在我們實現我們的路由之前,讓我們看看現有的 app
是設置以及如何調用這個函數。
/index.js
import express from "express";
import startup from "./lib/startup";
import api from "./api/index";
import jobs from "./api/jobs";
import middleware from "./middleware/index";
import logger from "./lib/logger";
startup()
.then(() => {
const app = express();
const port = process.env.PORT || 5001;
middleware(app);
api(app);
jobs(app);
app.listen(port, () => { ... });
process.on("message", (message) => { ... });
})
.catch((error) => { ... });
在這裡,我們打開了主要的 index.js
樣板文件。這是我們設置 Express 服務器並“設置”我們的應用程序的地方。這裡我們要注意的部分就在中間,我們在這裡調用我們剛剛從另一個文件導出並作為 jobs
導入到這個文件中的函數 .在這個函數的調用之上,我們正在創建我們的 Express app
實例。
調用我們的函數,我們傳入 app
實例,以便我們可以使用它來添加我們將為隊列接受新作業的路由。
/api/jobs/index.js
export default (app) => {
app.use("/jobs", (req, res) => {
res.send("Job added to queue!");
});
};
回到我們定義函數的文件,現在,使用 app
我們傳入的實例,我們在 /jobs
添加到我們的 Express 服務器的路由 .這將作為 API 端點,我們將在其中接收調度新作業的請求(在本地運行時,可在 http://localhost:5001/jobs
訪問 )。
在我們的路由的回調中,我們添加了一個通過 res.send()
對路由的請求的響應 功能。讓我們完成我們現在正在響應的消息並設置我們將添加作業的隊列。
使用議程設置作業隊列
為了簡單起見,在同一個文件中,讓我們導入 agenda
我們在教程開始時安裝的依賴項並為我們的作業創建隊列:
/api/jobs/index.js
import Agenda from "agenda";
import settings from "../../lib/settings";
const jobQueue = new Agenda({
db: {
address: settings.databases.mongodb.uri,
collection: "jobs",
},
});
jobQueue.start();
export default (app) => {
app.use("/jobs", (req, res) => {
res.send("Job added to queue!");
});
};
在頂部,我們導入 Agenda
來自 agenda
我們之前安裝的包(我們使用大寫的 A
對於導入的值,因為我們希望它是一個 JavaScript 類構造函數——使用大寫字母是該語言的常見做法。
我們還導入 settings
對於樣板。這是樣板中內置的功能,允許我們為我們的應用程序存儲配置數據。在那個 /lib/settings
裡面 文件,我們有代碼將嘗試加載與當前環境名稱匹配的設置文件。現在,我們在 development
中運行 環境,因此它會嘗試加載 settings-development.json
文件(來自應用程序的根目錄)。如果我們在 production
環境,它會嘗試加載 settings-production.json
從應用的根目錄。
在開發中,一個 settings-development.json
文件已經為我們提供。此外,對於我們的下一步,它還包括運行 MongoDB 數據庫的 URL。真的很快,如果我們打開那個文件,我們可以看到結構:
/settings-development.json
{
...
"databases": {
"mongodb": {
"uri": "mongodb://127.0.0.1/app"
}
},
...
}
在開發中,我們只是指向在 localhost 上啟動的 MongoDB 副本(這裡記為 127.0.0.1
,localhost
的IP地址版本 ) 在我們啟動樣板時自動為我們服務。
/api/jobs/index.js
import Agenda from "agenda";
import settings from "../../lib/settings";
const jobQueue = new Agenda({
db: {
address: settings.databases.mongodb.uri,
collection: "jobs",
},
});
jobQueue.start();
export default (app) => {
app.use("/jobs", (req, res) => {
res.send("Job added to queue!");
});
};
回顧我們的代碼,使用 Agenda
導入後,我們使用 new Agenda()
創建它的新實例 傳入我們隊列的設置。為了 agenda
為了工作,我們需要告訴它要在哪個 MongoDB 數據庫中存儲我們的作業,以及可選的,該數據庫中將存儲我們的作業的集合的名稱(每個作業都有一個對象,其中包含有關它應該何時運行的信息,任何與之相關的數據等)。
在這裡,我們傳遞 settings.databases.mongodb.uri
我們剛剛在 /settings-development.json
中看到的值 作為 db.address
值並設置 collection
jobs
的名稱 (您可以將其更改為您想要的任何內容)。需要注意的是,我們存儲調用new Agenda()
的結果 在變量 jobQueue
中 .這現在包含我們將用於添加和管理作業的隊列實例。
最後,就在我們對 const jobQueue
的定義之下 我們確保調用 jobQueue.start()
.這可以確保議程實際處理我們交給它的工作。在這裡,只要我們正在處理的文件加載到服務器上(即在服務器啟動時),我們就啟動它。在您自己的應用中,您可能希望在更有條件的基礎上啟動它。
接下來,我們需要為我們的作業設置處理函數。我們將定義兩個函數:一個用於在作業添加到隊列後立即演示正在運行的作業,另一個用於在延遲後演示正在運行的作業。
/api/jobs/index.js
import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";
const jobQueue = new Agenda({
db: {
address: settings.databases.mongodb.uri,
collection: "jobs",
},
});
jobQueue.define("instantJob", async (job) => {
const data = job?.attrs?.data;
console.log(
"This job is running as soon as it was received. This is the data that was sent:"
);
console.log(data);
});
jobQueue.define("delayedJob", async (job) => {
const data = job?.attrs?.data;
console.log(
"This job is running after a 5 second delay. This is the data that was sent:"
);
console.log(data);
});
jobQueue.start();
export default (app) => {
app.use("/jobs", (req, res) => {
res.send("Job added to queue!");
});
};
在我們的 jobQueue
之間 定義和對 jobQueue.start()
的調用 ,我們添加了兩個對 jobQueue.define()
的調用 .這是我們用來告訴 Agenda 在隊列中接下來要運行給定類型的作業時要做什麼的函數。在這裡,我們定義了兩種類型的作業instantJob
和 delayedJob
(我們將這些名稱作為第一個參數傳遞給 jobQueue.define()
)。
在每個作業類型的回調函數內部,我們提取我們期望傳遞給作業的數據(對於我們的示例,這將只是虛擬數據,但對於您自己的應用程序,這將在運行您的作業時提供額外的上下文 - 用戶 ID, job.attrs.data
中的一些要存儲的數據等) job
的值 通過 Agenda 傳遞給我們,並包含一個描述我們正在嘗試運行的當前作業的對象。我們傳遞的自定義數據存儲在該對象的 attrs.data
中 價值。
有了這些數據,接下來,我們會在兩個作業中註銷一條消息,告訴我們正在運行什麼類型的作業,然後是 data
的日誌 我們過去了。在您自己的應用程序中,您可以在此處為您的工作運行代碼。
現在,這可能看起來令人困惑——我們已經定義了兩種幾乎相同的工作。接下來,我們將學習如何通過我們的 API 端點接收作業,以及如何區分上面定義的兩種類型。
通過 API 端點調度作業
為了更容易理解,我們現在將添加所有剩餘的代碼並單步執行。
/api/jobs/index.js
import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";
const jobQueue = new Agenda({ ... });
jobQueue.define("instantJob", async (job) => { ... });
jobQueue.define("delayedJob", async (job) => { ... });
jobQueue.start();
export default (app) => {
app.use("/jobs", (req, res) => {
const jobType = req?.query?.jobType;
const allowedJobs = Object.keys(jobQueue._definitions);
if (!jobType) {
return res.send("Must pass a jobType in the query params.");
}
if (!allowedJobs.includes(jobType)) {
return res.send(
`${jobType} is not supported. Must pass one of ${allowedJobs.join(
", or "
)} as jobType in the query params.`
);
}
if (jobType === "instantJob") {
jobQueue.now(req?.query?.jobType, req.body);
}
if (jobType === "delayedJob") {
jobQueue.schedule(
dayjs().add(5, "seconds").format(),
req?.query?.jobType,
req.body
);
}
res.send("Job added to queue!");
});
};
著眼於我們路由的回調函數,我們在這裡添加的代碼解決了三個問題:確定要運行哪個作業(instantJob
或 delayedJob
),驗證該作業是否是我們定義的作業,如果是,則將該作業添加到隊列中。
為了識別哪個 作業運行,我們查看query
傳入 req
的對象 對像傳遞給我們的路由回調。這裡,query
表示調用路由時URL中包含的查詢參數,如?jobType=instantJob
.這裡的想法是,當我們運行我們的工作時,我們將使用查詢參數 jobType
告訴我們我們的工作將走向何方。
在此下方,我們使用內置的 JavaScript 方法 Object.keys()
獲取允許的作業列表 取回我們定義了 jobQueue
的作業數組 (這些存儲在 _definitions
我們的議程實例上的對象)。
接下來,首先,我們確保 jobType
已在我們的查詢參數中傳遞。如果沒有,我們會以警告消息響應請求。
如果我們做 有一個 jobType
,接下來,我們驗證它是我們的 allowedJobs
之一 .假設 jobType
中存儲的值 在該數組中,我們繼續將作業添加到隊列中。如果不是,我們會發出另一個警告,讓用戶知道傳遞的 jobType
無效並提供可能的可用作業類型列表(詳細信息!)。
繼續對我們的作業進行排隊,回想一下我們的目標是 a.) 將我們的作業添加到隊列中以立即運行,或者 b.) 安排該作業在將來運行。在這裡,對於 instantJob
類型的作業 ,我們調用.now()
jobQueue
上的方法 ,傳入我們想要運行的作業類型——從我們的查詢參數——和數據(我們從 job.attrs.data
在作業回調內部)我們要傳入,在本例中是 body
req
對象(提示:我們假設我們的作業作為 HTTP POST
傳遞給我們的路由 請求)。
接下來,對於我們的 delayedJob
類型,我們調用 jobQueue.schedule()
,傳遞一個我們希望我們的工作與我們的 jobType
一起運行的日期 和 req.body
,就像我們為 instantJob
所做的那樣 .為了生成日期,我們在這裡使用 dayjs
我們正在使用的樣板文件中包含的庫。 dayjs().add(5, "seconds").format()
行 這裡說“獲取當前日期和時間,加上五秒,然後將其格式化為 ISO-8601 字符串(簡稱“iso 字符串”,一種標準化的日期格式,如 2021-07-29T23:00:00Z
)。”
而已!如果我們測試不同的作業,我們會看到我們之前添加的日誌顯示在我們的服務器控制台中:
總結
在本教程中,我們學習瞭如何使用 Agenda 庫和 MongoDB 來實現作業隊列。我們學習瞭如何通過 Express 設置接收作業的路由,如何使用 Agenda 設置隊列,如何在該隊列上定義作業類型,最後如何將通過我們的 API 路由接收到的作業添加到該隊列。