应求写一个刷逼站直播在线人数的脚本/A simple script to get lots of viewers of live.bilibili.com

应求,就这样。
什么Queue啊都不用了,投入list的怀抱吧。
gist.github.com/cnbeining/6b2273d7e332f29193d0

#!/usr/bin/env python
#coding:utf-8
# Author:  Beining --<cnbeining#gmail.com>
# Purpose: A simple script to get lots of viewers of Bilibili Live
# Created: 08/11/2015
# Error report: http://www.cnbeining.com/?p=952
# https://github.com/cnbeining  somewhere within my gists
import sys
import time
import getopt
from multiprocessing import Process
from websocket import create_connection, WebSocketConnectionClosedException
#----------------------------------------------------------------------
def fake_connector(cid):
    """"""
    try:
        ws = create_connection('ws://livecmt.bilibili.com:88/{cid}'.format(cid = cid))
        while 1:
            time.sleep(5)
            a = ws.recv()
    except WebSocketConnectionClosedException:
        return -1
    except Exception as e:
        print(e)
        return 0
    finally:
        return
#----------------------------------------------------------------------
def main(cid, thread_number):
    """"""
    #Get a list of threads
    process_list = [Process(target=fake_connector, args=((cid, ))) for i in xrange(int(thread_number))]  #Queue? Your ass
    [i.start() for i in process_list]  #ignite every one
    try:
        while 1:
            alive_list = [i.is_alive() for i in process_list]
            print('Active thread: ' + str(len(alive_list)))
            death_position_list = [i for i, x in enumerate(alive_list) if x == False]
            if len(death_position_list) > 0:  #someone died
                print('Some died, adding {COUNT} new thread'.format(COUNT = len(death_position_list)))
                for i in death_position_list:
                    del process_list[i]
                    process_list.append(Process(target=fake_connector, args=((cid, ))))
                    process_list[-1].start()
            time.sleep(3)
    except Exception as e:
        print(e)
        for i in process_list:
            try:
                i.terminate()
            except:
                pass
        exit()
#----------------------------------------------------------------------
def usage():
    """"""
    print('''Use as:
    -c: cid, room number
    -t: thread number
    Press Ctrl+C to exit.
    ''')
if __name__=='__main__':
    argv_list = sys.argv[1:]
    try:
        opts, args = getopt.getopt(argv_list, "hc:t:",
                                   ['help', "cid=", 'thread_number='])
    except getopt.GetoptError:
        usage()
        exit()
    for o, a in opts:
        if o in ('-h', '--help'):
            usage()
            exit()
        if o in ('-c', '--cid'):
            cid = a
        if o in ('-t', '--thread_number'):
            thread_number = int(a)
    print('Getting room {cid} {thread_number} viewers...'.format(cid = cid, thread_number = thread_number))
    main(cid, thread_number)

 

Leave a Reply

Your email address will not be published. Required fields are marked *