Kong Plugins + cluster events?

My question boils down to this. How in the Kong plugin architecture can I do cluster related events. Example:

I have in local cache key “secret123” string, I want to invalidate that cached element key->value pair on other Kong nodes in a specific mlcache.

My basic understanding is kong.cluster_events is one easy way to do that through the core cluster_events DB table and uses a pub/sub model, but where I am slightly confused is how that fits into plugin architecture too.

Since plugins execute on a per service/route match(or globally) for which they are configured or in the init() phase of nginx/kong startup how can anything plugin related have background polling taking place that is required in a pub/sub model?

Essentially I want a way to broadcast to other nodes that I want this key revoked from their caches if they have it, which I imagine is something like:

kong.cluster_events:broadcast("my_custom_invalidation_channel", secret, nbf)

Where I give a custom channel, my secret string as the key, and an nbf to ensure all nodes invalidate at a similar time(5-10 seconds into the future in in sync or w/e). And this could live in the access phase of the plugin itself when there is an event I want to broadcast based on like a /revoke call made with this plugin thrown on it. Pretty easy I think to get the event out there. Correct me if this is inaccurate but it seems to be the way it works.

I think where I am confused is where would the bit live that will subscribe to the events of my custom logic channel and then do the :delete() on its own nodes local mlcache entry or something of the sort. Can the subscription live in the plugin too in the init with a function that does the cache delete? Something like this in the plugins init phase maybe?

  local ok, err = kong.cluster_events:subscribe("my_custom_invalidation_channel", function(secret)
    custom_mlcache:delete(secret)
  end)
  if not ok then
    return nil, "failed to subscribe to my_custom_invalidation_channel cluster events " ..
                "channel: " .. err
  end

Then does this snippet of code just magically fire up a background process to run that delete logic if an invalidation event is found based on the global polling settings of kong(every 5 seconds or w/e) ^ ? Seems like it here:

Where the 3rd argument if set to true starts a ngx.timer for polling in the bg for me :slight_smile: ? Would it really be that easy? My first chance looking into how kong does clustering, seems pretty nifty!

Update from my own testing:

This works in the access phase to add a value to cluster_events table:

function _M.handleTokenRevoke(conf)
  ngx.req.read_body()
  local args, err = ngx.req.get_post_args()
  local nbf = ngx_now() + 5
  
  if err then
    ngx.log(ngx.ERR, err)
    kong.response.exit(400, { message = "Unable to parse request body"})
    return nil
  end
  
  local token = args["token"]
  if token == nil then
    kong.response.exit(400, { message = "Missing 'token' Parameter"})
    return nil
  end
  
  local status, err = singletons.cluster_events:broadcast("oidc_cache_invalidations",  token, nbf)
  ngx.log(ngx.ERR, "Status is: ", status , "NBF is: ", tostring(nbf))
  
  if err then
    ngx.log(ngx.ERR, err)
    kong.response.exit(500,err)
    return nil
  end
end

This fails though, seems cluster_events isn’t something that can be used in the init phase of plugins to kick off a background job to do polling + delete.

function TokenRevokeHandler:init_worker()
  TokenRevokeHandler.super.init_worker(self)
  --Subscribe to oidc token deletion events and evict from local cache
  local ok, err = cluster_events:subscribe("oidc_cache_invalidations", function(secret)
    kong.oidc_cache:delete(secret)
    kong.log.err("Tried to delete secret: " .. secret)
  end)
  
  if not ok then
    return nil, "failed to subscribe to oidc_cache_invalidations cluster events " ..
                "channel: " .. err
  end
  
end
2019/12/13 22:00:13 [error] 28#0: init_worker_by_lua error: .../5.1/kong/plugins/stargate-oidc-token-revoke/handler.lua:13: attempt to index upvalue 'cluster_events' (a nil value)

Looks like the only way to do it would be to inject my cluster event workload somewhere here:

Will give that a go but not in love with it, would have been nice if my own plugin could have it as an init() and then run in the background all compartmentalized.

Then I thought to use singletons in the init… no error!

local singletons = require "kong.singletons"

function TokenRevokeHandler:init_worker()
  TokenRevokeHandler.super.init_worker(self)
  --Subscribe to oidc token deletion events and evict from local cache
  local ok, err = singletons.cluster_events:subscribe("oidc_cache_invalidations", function(secret)
    kong.oidc_cache:delete(secret)
    kong.log.err("Tried to delete secret: " .. secret)
  end)
  
  if not ok then
    return nil, "failed to subscribe to oidc_cache_invalidations cluster events " ..
                "channel: " .. err
  end
  
end

Then from the other datacenter I tried to kick off the revoke with a POST, and in another datacenter I do see Kong printing out something so I know it made it to the callback subscribe somehow, but it responds with an error:

2019/12/13 22:59:25 [error] 25#0: *24316 lua entry thread aborted: runtime error: .../5.1/kong/plugins/stargate-oidc-token-revoke/handler.lua:14: no ipc to propagate deletion, specify opts.ipc_shm or opts.ipc
stack traceback:
coroutine 0:
	[C]: in function 'error'
	/usr/local/kong/luarocks/share/lua/5.1/resty/mlcache.lua:1284: in function 'delete'
	.../5.1/kong/plugins/stargate-oidc-token-revoke/handler.lua:14: in function 'cb'
	...kong/luarocks/share/lua/5.1/kong/cluster_events/init.lua:42: in function <...kong/luarocks/share/lua/5.1/kong/cluster_events/init.lua:37>, context: ngx.timer

Which looks more to be more related to my current MLCache requirement thing… Getting closer :slight_smile: !

  local oidccache, err = mlcache.new("kong_oidc_cache", "kong_oidc_cache", {
    shm_miss         = "kong_oidc_cache_miss",
    shm_locks        = "kong_oidc_cache_locks",
    shm_set_retries  = 3,
    lru_size         = 1000,
    ttl              = 43200,
    neg_ttl          = 30,
    resty_lock_opts  = {exptime = 10, timeout = 5,},
  })

https://github.com/thibaultcha/lua-resty-mlcache#new I guess I can make a shared dictionary for this ipc stuff. Looks like the kong singletons one uses a custom ipc for worker events that Kong has but why do I need worker events if my mlcache itself is a cross-worker global shared mem… Seems like I can get away with just making a new shm and that will do the trick for when I do a :delete().

Edit , yep added a new shm just for the ipc events and then threw an :update() in the access phase for a pre-processing check to make sure each worker gets the updates. seems pretty straighforward. I saw Kong’s singletons cache uses a custom ipc with inter-worker invalidations/updates but I was too scared to get into using that and writing my invalidation events around it :laughing: . Self solved!