Source code for wcraas_control.wcraas_control
# -*- coding: utf-8 -*-
"""The WCraaS Control module is responsible for the orchestration of tasks in the platform."""
import asyncio
import logging
import traceback
import aio_pika
from collections import deque
from enum import unique, IntEnum
from aio_pika.patterns import RPC
from aioredis import create_redis_pool
from wcraas_common import AMQPConfig, WcraasWorker
from wcraas_control.config import RedisConfig
__all__ = ("RedisLockState", "ControlWorker")
[docs]@unique
class RedisLockState(IntEnum):
FREE = 0
LOCK = 1
DONE = 2
FAIL = 3
[docs]class ControlWorker(WcraasWorker):
"""
Control Worker for the WCraaS platfrom, responsible for the orchestration of tasks.
>>> from wcraas_control.config import Config
>>> cn = ControlWorker(*Config.fromenv())
"""
def __init__(
self,
amqp: AMQPConfig,
redis: RedisConfig,
interval: int,
loglevel: int,
*args,
**kwargs,
):
super().__init__(amqp, loglevel)
self.redis = redis
self.poll_interval = interval
self.logger = logging.getLogger("wcraas.control")
self.logger.setLevel(loglevel)
[docs] async def crawl(self, url: str):
"""
Given URL orchestrate crawling of the target.
:param url: Entrypoint URL for crawling the target.
:type url: string
"""
redis_pool = await create_redis_pool(self.redis)
urls = deque((url,))
async with self._amqp_pool.acquire() as channel: # type: aio_pika.Channel
rpc = await RPC.create(channel)
while True:
# If there are no URLs left to crawl exit.
if not urls:
break
_url = urls.pop()
self.logger.info(f"Processing {_url} ...")
# Acquire lock for the URL
await redis_pool.set(_url, RedisLockState.LOCK.value)
# Delegate discovery through RPC
try:
resp = await rpc.proxy.discover(url=_url)
self.logger.info(f"RPC for {_url} completed successfully!")
self.logger.debug(f"RPC response for {_url}:")
self.logger.debug(resp)
except Exception as err:
self.logger.error(traceback.format_exc())
self.logger.error(err)
await redis_pool.set(_url, RedisLockState.FAIL.value)
continue
await redis_pool.set(_url, RedisLockState.DONE.value)
# For all inbound URLs (== same protocol, host, port) check if the URL
# exists in redis & is not FREE; if so skip it otherwise add it to the
# deque of URLs to crawl.
for inbound_url in resp["data"]["inbound"]:
self.logger.info(f"Checking {inbound_url} against cache ...")
if await redis_pool.get(inbound_url):
self.logger.info(f"Skipping {inbound_url} due to cache hit!")
continue
urls.append(inbound_url)
self.logger.info(f"Adding {inbound_url} due to cache miss!")
await asyncio.sleep(self.poll_interval)
[docs] async def start(self): pass
[docs] async def list_collections(self):
"""
List the collections available at the storage node.
"""
async with self._amqp_pool.acquire() as channel:
rpc = await RPC.create(channel)
resp = await rpc.proxy.list_collections()
return resp
def __repr__(self):
return f"{self.__class__.__name__}(amqp={self.amqp}, redis={self.redis}, interval={self.poll_interval}, loglevel={self.loglevel})"