JavaScript >> Javascript 文檔 >  >> Node.js

如何使用 Agenda 和 MongoDB 在 Node.js 中設置作業隊列

如何使用 Agenda 和 MongoDB 設置作業隊列,以及如何通過 Express 管理的 API 端點將作業發送到該隊列。

開始使用

對於本教程,我們將使用 CheatCode Node.js Boilerplate 作為我們工作的起點。首先,讓我們克隆一個副本:

終端

git clone https://github.com/cheatcode/nodejs-server-boilerplate

接下來,安裝樣板的依賴:

終端

cd nodejs-server-boilerplate && npm install

之後,安裝我們將用於創建作業服務器 agenda 的依賴項 :

終端

npm i agenda

安裝好所有依賴後,啟動開發服務器:

終端

npm run dev

有了這個,我們就可以開始了。

添加用於定義作業的 API 端點

首先,我們需要通過樣板文件中現有的 Express.js 服務器添加 API 端點。這將允許我們將作業遠程發送到我們的服務器。

/api/jobs/index.js

export default (app) => {
  // We'll define our API endpoint via Express here.
};

為了組織起見,首先,我們將為所有與工作相關的路線創建一個單獨的文件(從技術上講,我們只有一個,但這將使我們的代碼保持靈活,以防您以後想擴展它) .

按照樣板文件中的現有模式,在這裡,我們定義一個函數並將其作為我們文件的默認導出。這個函數需要一個現有的 Express app 實例作為其唯一參數傳入。在我們實現我們的路由之前,讓我們看看現有的 app 是設置以及如何調用這個函數。

/index.js

import express from "express";
import startup from "./lib/startup";
import api from "./api/index";
import jobs from "./api/jobs";
import middleware from "./middleware/index";
import logger from "./lib/logger";

startup()
  .then(() => {
    const app = express();
    const port = process.env.PORT || 5001;

    middleware(app);
    api(app);
    jobs(app);

    app.listen(port, () => { ... });

    process.on("message", (message) => { ... });
  })
  .catch((error) => { ... });

在這裡,我們打開了主要的 index.js 樣板文件。這是我們設置 Express 服務器並“設置”我們的應用程序的地方。這裡我們要注意的部分就在中間,我們在這裡調用我們剛剛從另一個文件導出並作為 jobs 導入到這個文件中的函數 .在這個函數的調用之上,我們正​​在創建我們的 Express app 實例。

調用我們的函數,我們傳入 app 實例,以便我們可以使用它來添加我們將為隊列接受新作業的路由。

/api/jobs/index.js

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

回到我們定義函數的文件,現在,使用 app 我們傳入的實例,我們在 /jobs 添加到我們的 Express 服務器的路由 .這將作為 API 端點,我們將在其中接收調度新作業的請求(在本地運行時,可在 http://localhost:5001/jobs 訪問 )。

在我們的路由的回調中,我們添加了一個通過 res.send() 對路由的請求的響應 功能。讓我們完成我們現在正在響應的消息並設置我們將添加作業的隊列。

使用議程設置作業隊列

為了簡單起見,在同一個文件中,讓我們導入 agenda 我們在教程開始時安裝的依賴項並為我們的作業創建隊列:

/api/jobs/index.js

import Agenda from "agenda";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

在頂部,我們導入 Agenda 來自 agenda 我們之前安裝的包(我們使用大寫的 A 對於導入的值,因為我們希望它是一個 JavaScript 類構造函數——使用大寫字母是該語言的常見做法。

我們還導入 settings 對於樣板。這是樣板中內置的功能,允許我們為我們的應用程序存儲配置數據。在那個 /lib/settings 裡面 文件,我們有代碼將嘗試加載與當前環境名稱匹配的設置文件。現在,我們在 development 中運行 環境,因此它會嘗試加載 settings-development.json 文件(來自應用程序的根目錄)。如果我們在 production 環境,它會嘗試加載 settings-production.json 從應用的根目錄。

在開發中,一個 settings-development.json 文件已經為我們提供。此外,對於我們的下一步,它還包括運行 MongoDB 數據庫的 URL。真的很快,如果我們打開那個文件,我們可以看到結構:

/settings-development.json

{
  ...
  "databases": {
    "mongodb": {
      "uri": "mongodb://127.0.0.1/app"
    }
  },
  ...
}

在開發中,我們只是指向在 localhost 上啟動的 MongoDB 副本(這裡記為 127.0.0.1localhost的IP地址版本 ) 在我們啟動樣板時自動為我們服務。

/api/jobs/index.js

import Agenda from "agenda";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

回顧我們的代碼,使用 Agenda 導入後,我們使用 new Agenda() 創建它的新實例 傳入我們隊列的設置。為了 agenda 為了工作,我們需要告訴它要在哪個 MongoDB 數據庫中存儲我們的作業,以及可選的,該數據庫中將存儲我們的作業的集合的名稱(每個作業都有一個對象,其中包含有關它應該何時運行的信息,任何與之相關的數據等)。

在這裡,我們傳遞 settings.databases.mongodb.uri 我們剛剛在 /settings-development.json 中看到的值 作為 db.address 值並設置 collection jobs 的名稱 (您可以將其更改為您想要的任何內容)。需要注意的是,我們存儲調用new Agenda()的結果 在變量 jobQueue 中 .這現在包含我們將用於添加和管理作業的隊列實例。

最後,就在我們對 const jobQueue 的定義之下 我們確保調用 jobQueue.start() .這可以確保議程實際處理我們交給它的工作。在這裡,只要我們正在處理的文件加載到服務器上(即在服務器啟動時),我們就啟動它。在您自己的應用中,您可能希望在更有條件的基礎上啟動它。

接下來,我們需要為我們的作業設置處理函數。我們將定義兩個函數:一個用於在作業添加到隊列後立即演示正在運行的作業,另一個用於在延遲後演示正在運行的作業。

/api/jobs/index.js

import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.define("instantJob", async (job) => {
  const data = job?.attrs?.data;
  console.log(
    "This job is running as soon as it was received. This is the data that was sent:"
  );
  console.log(data);
});

jobQueue.define("delayedJob", async (job) => {
  const data = job?.attrs?.data;
  console.log(
    "This job is running after a 5 second delay. This is the data that was sent:"
  );
  console.log(data);
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

在我們的 jobQueue 之間 定義和對 jobQueue.start() 的調用 ,我們添加了兩個對 jobQueue.define() 的調用 .這是我們用來告訴 Agenda 在隊列中接下來要運行給定類型的作業時要做什麼的函數。在這裡,我們定義了兩種類型的作業instantJobdelayedJob (我們將這些名稱作為第一個參數傳遞給 jobQueue.define() )。

在每個作業類型的回調函數內部,我們提取我們期望傳遞給作業的數據(對於我們的示例,這將只是虛擬數據,但對於您自己的應用程序,這將在運行您的作業時提供額外的上下文 - 用戶 ID, job.attrs.data 中的一些要存儲的數據等) job 的值 通過 Agenda 傳遞給我們,並包含一個描述我們正在嘗試運行的當前作業的對象。我們傳遞的自定義數據存儲在該對象的 attrs.data 中 價值。

有了這些數據,接下來,我們會在兩個作業中註銷一條消息,告訴我們正在運行什麼類型的作業,然後是 data 的日誌 我們過去了。在您自己的應用程序中,您可以在此處為您的工作運行代碼。

現在,這可能看起來令人困惑——我們已經定義了兩種幾乎相同的工作。接下來,我們將學習如何通過我們的 API 端點接收作業,以及如何區分上面定義的兩種類型。

通過 API 端點調度作業

為了更容易理解,我們現在將添加所有剩餘的代碼並單步執行。

/api/jobs/index.js

import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";

const jobQueue = new Agenda({ ... });

jobQueue.define("instantJob", async (job) => { ... });

jobQueue.define("delayedJob", async (job) => { ... });

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    const jobType = req?.query?.jobType;
    const allowedJobs = Object.keys(jobQueue._definitions);

    if (!jobType) {
      return res.send("Must pass a jobType in the query params.");
    }

    if (!allowedJobs.includes(jobType)) {
      return res.send(
        `${jobType} is not supported. Must pass one of ${allowedJobs.join(
          ", or "
        )} as jobType in the query params.`
      );
    }

    if (jobType === "instantJob") {
      jobQueue.now(req?.query?.jobType, req.body);
    }

    if (jobType === "delayedJob") {
      jobQueue.schedule(
        dayjs().add(5, "seconds").format(),
        req?.query?.jobType,
        req.body
      );
    }

    res.send("Job added to queue!");
  });
};

著眼於我們路由的回調函數,我們在這裡添加的代碼解決了三個問題:確定要運行哪個作業(instantJobdelayedJob ),驗證該作業是否是我們定義的作業,如果是,則將該作業添加到隊列中。

為了識別哪個 作業運行,我們查看query 傳入 req 的對象 對像傳遞給我們的路由回調。這裡,query 表示調用路由時URL中包含的查詢參數,如?jobType=instantJob .這裡的想法是,當我們運行我們的工作時,我們將使用查詢參數 jobType 告訴我們我們的工作將走向何方。

在此下方,我們使用內置的 JavaScript 方法 Object.keys() 獲取允許的作業列表 取回我們定義了 jobQueue 的作業數組 (這些存儲在 _definitions 我們的議程實例上的對象)。

接下來,首先,我們確保 jobType 已在我們的查詢參數中傳遞。如果沒有,我們會以警告消息響應請求。

如果我們 有一個 jobType ,接下來,我們驗證它是我們的 allowedJobs 之一 .假設 jobType 中存儲的值 在該數組中,我們繼續將作業添加到隊列中。如果不是,我們會發出另一個警告,讓用戶知道傳遞的 jobType 無效並提供可能的可用作業類型列表(詳細信息!)。

繼續對我們的作業進行排隊,回想一下我們的目標是 a.) 將我們的作業添加到隊列中以立即運行,或者 b.) 安排該作業在將來運行。在這裡,對於 instantJob 類型的作業 ,我們調用.now() jobQueue 上的方法 ,傳入我們想要運行的作業類型——從我們的查詢參數——和數據(我們從 job.attrs.data 在作業回調內部)我們要傳入,在本例中是 body req 對象(提示:我們假設我們的作業作為 HTTP POST 傳遞給我們的路由 請求)。

接下來,對於我們的 delayedJob 類型,我們調用 jobQueue.schedule() ,傳遞一個我們希望我們的工作與我們的 jobType 一起運行的日期 和 req.body ,就像我們為 instantJob 所做的那樣 .為了生成日期,我們在這裡使用 dayjs 我們正在使用的樣板文件中包含的庫。 dayjs().add(5, "seconds").format() 行 這裡說“獲取當前日期和時間,加上五秒,然後將其格式化為 ISO-8601 字符串(簡稱“iso 字符串”,一種標準化的日期格式,如 2021-07-29T23:00:00Z )。”

而已!如果我們測試不同的作業,我們會看到我們之前添加的日誌顯示在我們的服務器控制台中:

總結

在本教程中,我們學習瞭如何使用 Agenda 庫和 MongoDB 來實現作業隊列。我們學習瞭如何通過 Express 設置接收作業的路由,如何使用 Agenda 設置隊列,如何在該隊列上定義作業類型,最後如何將通過我們的 API 路由接收到的作業添加到該隊列。


Tutorial JavaScript 教程
  1. window[] 和 eval() 之間的區別 - Javascript

  2. Javascript 數組方法:重新實現 Slice 🍕

  3. 使用 jQuery DataTables 時禁用最後一列的排序

  4. 你能在 JavaScript 中設置 console.log() 的樣式嗎

  5. 用 JavaScript 計算昨天的日期

  6. JavaScript const 關鍵字 |基本

  7. 使用 Docker 將 Flutter Web 應用程序容器化

  1. 我如何在 next.js 中製作復活節彩蛋功能

  2. JavaScript 代碼每日挑戰 #11

  3. 如何更改 Alpine.js 中的共享狀態?

  4. 使用 React 構建 RPG 風格的庫存(第 1 部分)

  5. MongoDB - 聚合框架 - 系列 #02

  6. JavaScript 截斷數字 | trunc() 方法示例

  7. 使用 Node JS 在 Twitter 中點贊、轉發和關注

  1. 重新實現 Facebook 的反應動畫

  2. 如何使用 TypeScript 輕鬆修改 Minecraft

  3. 前端渲染:SSG vs ISG vs SSR vs CSR - 什麼時候使用?

  4. 我如何將我的開發環境遷移到 Raspberry Pi