Python定时查询starrocks数据库并将结果保存在excel

发布时间:2026-01-07 16:51

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

import os

import shutil

import pandas as pd

import pymysql

import openpyxl

import datetime

class StarRocksExporter(object):

    def __init__(self, host, port, database, user, password, query,

                 srcdir, destdir,filename):

        self.host = host

        self.port = port

        self.database = database

        self.user = user

        self.password = password

        self.query = query

        self.srcdir = srcdir

        self.destdir = destdir

        self.filename = filename

        self.writer = pd.ExcelWriter(filename+str('.xlsx'))

    def export_to_excel(self):

        df = pd.read_sql(self.query, self.engine)

        df.to_excel(self.writer, sheet_name=self.filename, index=False)

        self.writer.save()

    def move_to_dest(self):

        if not os.path.isdir(self.destdir):

            self.destdir = os.mkdir(self.destdir)

        file_list = os.listdir(self.srcdir)

        for file in file_list:

            try:

                fiel_str = file.split('.')[1]

                if fiel_str == 'xlsx':

                    shutil.move(str(self.srcdir) + file, str(self.destdir) + file)

            except Exception:

                print("没有后缀的文件:",file)

    def execute(self):

        with pymysql.connect(host=self.host,port=self.port,database=self.database,user=self.user,password=self.password) as engine:

            self.engine = engine

            self.query_star_rock(self.query)

            self.export_to_excel()

            self.move_to_dest()

    def query_star_rock(self, query):

        cursor = self.engine.cursor()

        cursor.execute(query)

        results = cursor.fetchall()

        return  results

    def get_user(self, user):

        pass

    def get_password(self, password):

        pass

if __name__ == '__main__':

    Destdir = './../outputdir/'

    Srcdir = './'

    folder_path = './../SQLfileDir/'

    file_list = os.listdir(folder_path)

    for sqlfile in file_list:

        file_path=str(folder_path)+str(sqlfile)

        with open(file_path, "r", encoding='utf-8') as f:

           sql = f.read()

        srfilename = sqlfile.split('.')[0]

        exporter = StarRocksExporter('192.168.10.11', 19030, 'manager', 'sys_ro','sdagfsdg!@#saf134',

                                sql,Srcdir,Destdir,srfilename)

        exporter.execute()

网址:Python定时查询starrocks数据库并将结果保存在excel https://mxgxt.com/news/view/1927400

相关内容

StarRocks 数据库实时分析查询优化技术详解
StarRocks 数据库高性能查询实现技术详解
StarRocks 数据库高性能查询优化技术详解
理想汽车 x StarRocks:为 Hive 数据查询插上极速之翼!
StarRocks数据连接
如何利用 StarRocks 加速 Iceberg 数据湖的查询效率
StarRocks跨表查询
StarRocks 【新一代MPP数据库】(2)
StarRocks数据立方体
StarRocks数据流处理

随便看看