tornado-SSE exmaple demo

d37eirin / 2024-11-05 / 原文

html不需要库直接F12看(get请求直接访问接口不需要html亦可 http://localhost:3000/events直接访问)
tornado:
1.注意同步异步,其他接口不要卡死 tornado.gen.sleep(1)
2.跨域Access-Control要注意 我之前忘了那三个语句卡住了
还是不熟悉基础知识,要多补习

tornado:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
# @Author  : d7L
# @Date    : 2024/10/21 10:46
# @File    : sse_tornado.py
# @Description: 
"""
import tornado.ioloop
import tornado.web
import datetime

class SSEHandler(tornado.web.RequestHandler):
    def initialize(self):
        self.set_header('Content-Type', 'text/event-stream')
        self.set_header('Cache-Control', 'no-cache')
        self.set_header('Connection', 'keep-alive')

        self.set_header('Access-Control-Allow-Origin', "*")
        self.set_header('Access-Control-Allow-Headers', "Content-Type")
        self.set_header('Access-Control-Allow-Methods', "GET, POST, OPTIONS")

    async def get(self):
        try:
            while True:
                data = f'data: Current time is {datetime.datetime.now().strftime("%H:%M:%S")}\n\n'
                self.write(data)
                await self.flush()
                await tornado.gen.sleep(1)
        except tornado.iostream.StreamClosedError:
            pass

def make_app():
    return tornado.web.Application([
        (r"/events", SSEHandler),
    ])

if __name__ == "__main__":
    print("go...")
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

html::

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
</head>

<body>
    <script>
        // 创建 EventSource 对象

        const eventSource = new EventSource('http://127.0.0.1:8000/events');
        // 处理接收到的消息
        eventSource.onmessage = function (event) {
            console.log(event.data);
            document.getElementById('result').innerHTML = event.data;
        };

        // 处理错误
        eventSource.onerror = function (error) {
            console.error('SSE Error:', error);
        };
    </script>
    <div id="result"></div>
</body>

</html>

js(没有tornado才用,测后端不需要,前端自己调的):

const express = require('express');
const cors = require('cors');
const app = express();

app.use(cors({
    origin: '*',
    methods: ['GET', 'POST', 'OPTIONS'],
    allowedHeaders: ['Content-Type']
}));

app.get('/events', (req, res) => {
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });

    const intervalId = setInterval(() => {
        const data = `data: Current time is ${new Date().toLocaleTimeString()}\n\n`;
        res.write(data);
    }, 1000);

    req.on('close', () => {
        clearInterval(intervalId);
    });
});

const port = 3000;
app.listen(port, () => {
    console.log(`Server running at port ${port}`);
});