Переглянути джерело

Implement periodic ping functionality (#3246)

* Implement periodic ping functionality

allows periodically sending a POST request to one or more URL
specified by COMPANION_PERIODIC_PING_URLS
also add nock for easier testing

* Update packages/@uppy/companion/src/companion.js

Co-authored-by: Merlijn Vos <merlijn@soverin.net>

* Update packages/@uppy/companion/src/server/jobs.js

* allow posting a JSON body

COMPANION_PERIODIC_PING_STATIC_JSON_PAYLOAD

* add todo

* remove package-lock

accidentally added during merge

* Add version and processId

by default to periodic ping when enabled

Co-authored-by: Merlijn Vos <merlijn@soverin.net>
Mikael Finstad 3 роки тому
батько
коміт
4e827ee3f8

+ 21 - 1
packages/@uppy/companion/src/companion.js

@@ -6,6 +6,8 @@ const Grant = require('grant').express()
 const merge = require('lodash.merge')
 const cookieParser = require('cookie-parser')
 const interceptor = require('express-interceptor')
+const { isURL } = require('validator')
+const uuid = require('uuid')
 
 const grantConfig = require('./config/grant')()
 const providerManager = require('./server/provider')
@@ -21,6 +23,8 @@ const logger = require('./server/logger')
 const middlewares = require('./server/middlewares')
 const { ProviderApiError, ProviderAuthError } = require('./server/provider/error')
 const { getCredentialsOverrideMiddleware } = require('./server/provider/credentials')
+// @ts-ignore
+const { version } = require('../package.json')
 
 const defaultOptions = {
   server: {
@@ -39,6 +43,7 @@ const defaultOptions = {
   },
   debug: true,
   logClientVersion: true,
+  periodicPingUrls: [],
   streamingUpload: false,
 }
 
@@ -123,6 +128,17 @@ module.exports.app = (options = {}) => {
     jobs.startCleanUpJob(options.filePath)
   }
 
+  const processId = uuid.v4()
+
+  jobs.startPeriodicPingJob({
+    urls: options.periodicPingUrls,
+    interval: options.periodicPingInterval,
+    count: options.periodicPingCount,
+    staticPayload: options.periodicPingStaticPayload,
+    version,
+    processId,
+  })
+
   return app
 }
 
@@ -241,7 +257,8 @@ const validateConfig = (companionOptions) => {
     )
   }
 
-  const { providerOptions } = companionOptions
+  const { providerOptions, periodicPingUrls } = companionOptions
+
   if (providerOptions) {
     const deprecatedOptions = { microsoft: 'onedrive', google: 'drive' }
     Object.keys(deprecatedOptions).forEach((deprected) => {
@@ -254,4 +271,7 @@ const validateConfig = (companionOptions) => {
   if (companionOptions.uploadUrls == null || companionOptions.uploadUrls.length === 0) {
     logger.warn('Running without uploadUrls specified is a security risk if running in production', 'startup.uploadUrls')
   }
+
+  const periodicPingUrlsValid = Array.isArray(periodicPingUrls) && periodicPingUrls.every((url2) => isURL(url2, { protocols: ['http', 'https'], require_protocol: true, require_tld: false }))
+  if (!periodicPingUrlsValid) throw new TypeError('Invalid periodicPingUrls')
 }

+ 63 - 0
packages/@uppy/companion/src/server/jobs.js

@@ -1,9 +1,15 @@
 const schedule = require('node-schedule')
 const fs = require('fs')
 const path = require('path')
+const { promisify } = require('util')
+const request = require('request')
+
 const { FILE_NAME_PREFIX } = require('./Uploader')
 const logger = require('./logger')
 
+// TODO rewrite to use require('timers/promises').setTimeout when we support newer node versions
+const sleep = promisify(setTimeout)
+
 /**
  * Runs a function every 24 hours, to clean up stale, upload related files.
  *
@@ -54,3 +60,60 @@ const cleanUpFinishedUploads = (dirPath) => {
     })
   })
 }
+
+async function runPeriodicPing ({ urls, payload, requestTimeout }) {
+  // Run requests in parallel
+  await Promise.all(urls.map(async (url) => {
+    try {
+      // TODO rewrite to use a non-deprecated request library
+      const opts = { url, timeout: requestTimeout }
+      opts.body = payload
+      opts.json = true
+      const response = await promisify(request.post)(opts)
+      if (response.statusCode !== 200) throw new Error(`Status code was ${response.statusCode}`)
+    } catch (err) {
+      logger.warn(err, 'jobs.periodic.ping')
+    }
+  }))
+}
+
+// This function is used to start a periodic POST request against a user-defined URL
+// or set of URLs, for example as a watch dog health check.
+exports.startPeriodicPingJob = async ({ urls, interval = 60000, count, staticPayload = {}, version, processId }) => {
+  if (urls.length === 0) return
+
+  logger.info('Starting periodic ping job', 'jobs.periodic.ping.start')
+
+  let requesting = false
+
+  const requestTimeout = interval / 2
+
+  // Offset by a random value, so that we don't flood recipient if running many instances in a cluster
+  const delayBySec = Math.random() * interval
+  logger.info(`Delaying periodic ping by ${delayBySec}ms`, 'jobs.periodic.ping.start')
+  await sleep(delayBySec)
+
+  let i = 0
+  const intervalRef = setInterval(async () => {
+    // Used for testing:
+    // TODO implement a stop method instead, but this needs to be propagated all the way out, so it's a big rewrite
+    if (count != null && i >= count) {
+      clearInterval(intervalRef)
+      return
+    }
+    i++
+
+    if (requesting) {
+      logger.warn('Periodic ping request already in progress', 'jobs.periodic.ping')
+      return
+    }
+
+    try {
+      requesting = true
+      const payload = { version, processId, ...staticPayload }
+      await runPeriodicPing({ urls, payload, requestTimeout })
+    } finally {
+      requesting = false
+    }
+  }, interval)
+}

+ 7 - 0
packages/@uppy/companion/src/standalone/helper.js

@@ -89,6 +89,13 @@ const getConfigFromEnv = () => {
       oauthDomain: process.env.COMPANION_OAUTH_DOMAIN,
       validHosts,
     },
+    periodicPingUrls: process.env.COMPANION_PERIODIC_PING_URLS ? process.env.COMPANION_PERIODIC_PING_URLS.split(',') : [],
+    periodicPingInterval: process.env.COMPANION_PERIODIC_PING_INTERVAL
+      ? parseInt(process.env.COMPANION_PERIODIC_PING_INTERVAL, 10) : undefined,
+    periodicPingStaticPayload: process.env.COMPANION_PERIODIC_PING_STATIC_JSON_PAYLOAD
+      ? JSON.parse(process.env.COMPANION_PERIODIC_PING_STATIC_JSON_PAYLOAD) : undefined,
+    periodicPingCount: process.env.COMPANION_PERIODIC_PING_COUNT
+      ? parseInt(process.env.COMPANION_PERIODIC_PING_COUNT, 10) : undefined,
     filePath: process.env.COMPANION_DATADIR,
     redisUrl: process.env.COMPANION_REDIS_URL,
     // adding redisOptions to keep all companion options easily visible

+ 22 - 0
packages/@uppy/companion/test/__tests__/companion.js

@@ -1,6 +1,7 @@
 /* global jest:false, test:false, expect:false, describe:false */
 
 const mockOauthState = require('../mockoauthstate')()
+const { version } = require('../../package.json')
 
 jest.mock('tus-js-client')
 jest.mock('purest')
@@ -9,6 +10,7 @@ jest.mock('../../src/server/helpers/oauth-state', () => ({
   ...mockOauthState,
 }))
 
+const nock = require('nock')
 const request = require('supertest')
 const tokenService = require('../../src/server/helpers/jwt')
 const { getServer } = require('../mockserver')
@@ -153,3 +155,23 @@ describe('handle main oauth redirect', () => {
       .expect(400)
   })
 })
+
+it('periodically pings', (done) => {
+  nock('http://localhost').post('/ping', (body) => (
+    body.some === 'value'
+    && body.version === version
+    && typeof body.processId === 'string'
+  )).reply(200, () => done())
+
+  getServer({
+    COMPANION_PERIODIC_PING_URLS: 'http://localhost/ping',
+    COMPANION_PERIODIC_PING_STATIC_JSON_PAYLOAD: '{"some": "value"}',
+    COMPANION_PERIODIC_PING_INTERVAL: '10',
+    COMPANION_PERIODIC_PING_COUNT: '1',
+  })
+}, 1000)
+
+afterAll(() => {
+  nock.cleanAll()
+  nock.restore()
+})