APScheduler

定时任务框架APScheduler学习详解 APScheduler简介 APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型 的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。 APScheduler有四种组成部分: 触发器(trigger)包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。 作业存储(job store)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保 存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。 执行器(executor)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。 调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了 处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。 简单应用: import time from apscheduler.schedulers.blocking import BlockingScheduler def my_job(): print time.strftime(‘%Y-%m-%d %H:%M:%S’, time.localtime(time.time())) sched = BlockingScheduler() sched.add_job(my_job, ‘interval’, seconds=5) sched.start() 上面的例子表示每隔5s执行一次my_job函数,输出当前时间信息 操作作业 1. 添加作业 上面是通过add_job()来添加作业,另外还有一种方式是通过scheduled_job()修饰器来修饰函数 import time from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() @sched.scheduled_job(‘interval’, seconds=5) def my_job(): print time.strftime(‘%Y-%m-%d %H:%M:%S’, time.localtime(time.time())) sched.start() 2. 移除作业 job = scheduler.add_job(myfunc, ‘interval’, minutes=2) job.remove() ###如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效 sched.add_job(myfunc, ‘interval’, minutes=2, id=‘my_job_id’) sched.remove_job(‘my_job_id’) 3. [Read More]

python elasticsearch

from elasticsearch import Elasticsearch
from elasticsearch import helpers

class ESutill():
    def __init__(self,hosts,index,doc_type,body):
        self.index = index
        self.body = body
        self.doc_type = doc_type
        self.hosts = hosts
        self.es = Elasticsearch(hosts=self.hosts)
        if not self.es.indices.exists(index=self.index):
            self.es.indices.create(index=self.index)
            self.es.indices.put_mapping(index=self.index,doc_type=self.doc_type,body=self.body)

pymysql 模板

import pymysql # MySQL 建立连接 class MySQLPipeline(object): def __init__(self, host, port, user, password, db): self.mysql_host = host self.mysql_port = port self.mysql_user = user self.mysql_password = password self.mysql_db = db # 创建MYSQL数据库链接对象 self.conn = pymysql.connect(host=self.mysql_host, user=self.mysql_user, password=self.mysql_password, db=self.mysql_db, charset="utf8") # 查询数据 def searching(self, sql): try: with self.conn as cur: cur.execute(sql) logger.info("sql查询成功") return cur except Exception as e: print(e) logger.error(e) return None #增删改 def processing(self, sql): try: with self.conn as cur: cur.execute(sql) logger. [Read More]

python redis utils

import redis


class RedisPipline():

    def __init__(self):
        self.pool = redis.ConnectionPool(host=redis_host, port=redis_port, password=redis_password, db=redis_db)
        self.redis_conn = redis.Redis(connection_pool=self.pool)

    # set
    def set_data(self, key, data):
        self.redis_conn.set(key, data)

    # get
    def get_data(self, key):
        return self.redis_conn.get(key)

    # rpush
    def rpush_data(self, key, data):
        self.redis_conn.rpush(key, json.dumps(data))

    # lpop
    def lpop_data(self, key):
        self.redis_conn.lpop(key)

    # 删除key
    def delete_key(self, key):
        if self.redis_conn.exists(key):
            self.redis_conn.delete(key)

python retrying 重试装饰器

python retrying chache def login_required(view_func): """检验用户的登录状态""" @wraps(view_func) # 不改变原有函数的签名文档 def wrapper(*args, **kwargs): user_id = session.get("user_id") if user_id is not None: # 表示用户已经登录 # 使用g对象保存user_id,在视图函数中可以直接使用 g.user_id = user_id return view_func(*args, **kwargs) else: # 用户未登录 resp = { "errno": RET.SESSIONERR, "errmsg": "用户未登录" } return jsonify(resp) return wrapper # 构建装饰器 def wraps(func): @functools.wraps(func) def decorator( *args, **kwargs): print("开始验证") print("%scalled" % func.__name__) result = func(*args, **kwargs) print("验证完毕") print("%send" % func.__name__) return result return decorator @wraps def connnet_db(): print("searching") def singleton(fun): instances = {} def decorator(*args,**kwargs): if fun not in instances: print(111,fun) instances[fun] = fun(*args,**kwargs) return instances[fun] else: print("已存在") return decorator def singleton(fun): cache = {} def decorator(*args,**kwargs): if fun not in cache: print(111,fun) cache[fun] = fun(*args,**kwargs) return cache[fun] else: print("已存在") return decorator @singleton def fib(i): if i < 2: return 1 return fib(i-1) + fib(i-2) if __name__ == '__main__': for i in range(10): test() import requests from tenacity import * ## 爬虫专用 def scraping(url): r = requests. [Read More]