I’m new to programming in python, and I have a task to try and create continuous Queue but with Delay support. I haven’t added the threading or sharing the data q across threads yet but I still need the functionality for managing the delay.
With the examples I’ve seen using the queue class directly, it seems like I should be able to create a much more compact class than what I have written, but I wanted to see if someone could help pare this down to a more optimal class for managing. What I have here works, but hoping to be a bit more pythonic.
I’d really love it if someone could point out areas where I could be more efficient.
Here’s what I wrote so far:
import multiprocessing, requests import copy import time import pprint from enum import Enum from queue import Queue from datetime import datetime, timedelta from multiprocessing import Pool class QueueState(Enum): Inactive = 0 Sleeping = 1 Processing = 2 class DelayQueue: def __init__(self): self.state = QueueState.Inactive self.queue = [] def run(self): while True: currentwork = [item for item in self.queue if item.time <= datetime.now()] currentwork.sort(key=lambda x: x.time, reverse=False) print(len(currentwork), 'tasks ready for processing') #Process the currentwork for i, task in list(enumerate(currentwork)): if i == 0: print('Processing currentwork') print('Processing item', i) if task.process() == True: currentwork.remove(task) self.queue.remove(task) print('Processing current tasks complete') #No work ready yet if len(currentwork) == 0 and len(self.queue) > 0: print("No work ready to process") if len(self.queue) == 0: print('Queue empty closing queue') self.state = QueueState.Inactive break elif len(currentwork) == 0: self.queue.sort(key=lambda x: x.time, reverse=False) nexttask = self.queue[0] print("Getting next task", nexttask) delta = nexttask.time - datetime.now() print('Sleeping', delta.total_seconds(), 'seconds') self.state = QueueState.Sleeping if delta.total_seconds() > 0: time.sleep(delta.total_seconds()) def add(self, func, delay=0, **kwargs): self.queue.append(Task(func, datetime.now() + timedelta(milliseconds=delay), **kwargs)) #if self.state == QueueState.Inactive: #self.run() def remove(): raise NotImplementedError class Task: def __init__(self, func, time, **kwargs): print("New task", (func.__name__, time, kwargs)) self.func = func self.time = time self.args = kwargs def process(self): self.func(**self.args) return True def __str__(self): return str((self.func.__name__, self.time, self.args)) def get(**kwargs): url = kwargs['url'] print('Requesting', url) r = requests.get(url=url) print("Status Code", r.status_code) queue = DelayQueue() queue.add(get, 1500, url='http://www.google.com') queue.add(get, 4300, url='http://www.yahoo.com') queue.add(get, 3900, url='http://www.att.com') queue.add(get, 2500, url='http://www.nbc.com') queue.add(get, 2450, url='http://www.msn.com') queue.add(get, 2450, url='http://www.gmail.com') queue.run()
Output:
New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 51, 843225), {'url': 'http://www.google.com'}) New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 643616), {'url': 'http://www.yahoo.com'}) New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 243917), {'url': 'http://www.att.com'}) New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 844260), {'url': 'http://www.nbc.com'}) New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794472), {'url': 'http://www.msn.com'}) New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794630), {'url': 'http://www.gmail.com'}) 0 tasks ready for processing Processing current tasks complete No work ready to process Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 51, 843225), {'url': 'http://www.google.com'}) Sleeping 1.498037 seconds 1 tasks ready for processing Processing currentwork Processing item 0 Requesting http://www.google.com Status Code 200 Processing current tasks complete No work ready to process Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794472), {'url': 'http://www.msn.com'}) Sleeping 0.820134 seconds 2 tasks ready for processing Processing currentwork Processing item 0 Requesting http://www.msn.com Status Code 200 Processing item 1 Requesting http://www.gmail.com Status Code 200 Processing current tasks complete No work ready to process Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 844260), {'url': 'http://www.nbc.com'}) Sleeping -0.540612 seconds 1 tasks ready for processing Processing currentwork Processing item 0 Requesting http://www.nbc.com Status Code 200 Processing current tasks complete No work ready to process Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 243917), {'url': 'http://www.att.com'}) Sleeping 0.708722 seconds 1 tasks ready for processing Processing currentwork Processing item 0 Requesting http://www.att.com Status Code 200 Processing current tasks complete No work ready to process Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 643616), {'url': 'http://www.yahoo.com'}) Sleeping 0.143595 seconds 1 tasks ready for processing Processing currentwork Processing item 0 Requesting http://www.yahoo.com Status Code 200 Processing current tasks complete Queue empty closing queue