This URL has Read-Only access.

Statistics
| Branch: | Tag: | Revision:

root / py / scenic / streamer.py @ eb9e75ef

History | View | Annotate | Download (13.8 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
# 
4
# Scenic
5
# Copyright (C) 2008 Société des arts technologiques (SAT)
6
# http://www.sat.qc.ca
7
# All rights reserved.
8
#
9
# This file is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU General Public License as published by
11
# the Free Software Foundation, either version 2 of the License, or
12
# (at your option) any later version.
13
#
14
# Scenic is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
# GNU General Public License for more details.
18
#
19
# You should have received a copy of the GNU General Public License
20
# along with Scenic. If not, see <http://www.gnu.org/licenses/>.
21

    
22
"""
23
Manages local streamer processes.
24
"""
25

    
26
from scenic import process
27
from scenic import sig
28

    
29
class StreamerManager(object):
30
    """
31
    Manages local streamer processes.
32
    """
33
    def __init__(self, app):
34
        self.app = app
35
        # commands
36
        self.milhouse_recv_cmd = None
37
        self.milhouse_send_cmd = None
38
        self.sender = None
39
        self.receiver = None
40
        self.state = process.STATE_STOPPED
41
        self.state_changed_signal = sig.Signal()
42
        # for stats
43
        #TODO: add ports?
44
        self.session_details = None # either None or a big dict
45
        self.receiving_audio_from_peer = False
46
        self.receiving_video_from_peer = False
47
        self.sending_audio_to_peer = False
48
        self.sending_video_to_peer = False
49

    
50
    def _gather_config_to_stream(self, addr):
51
        """
52
        Gathers all settings in a big dict.
53
        
54
        Useful for feedback to the user.
55
        """
56
        contact_name = addr
57
        contact = self.app._get_contact_by_addr(addr)
58
        if contact is not None:
59
            contact_name = contact["name"]
60
        
61
        remote_config = self.app.remote_config
62
        send_width, send_height = self.app.config.video_capture_size.split("x")
63
        receive_width, receive_height = remote_config["video"]["capture_size"].split("x")
64
        self.session_details = {
65
            "peer": {
66
                "address": addr,
67
                "name": contact_name,
68
                },
69
            # ----------------- send ---------------
70
            "send": {
71
                "video": {
72
                    "source": self.app.config.video_source,
73
                    "device": self.app.config.video_device,
74
                    "bitrate": self.app.config.video_bitrate,
75
                    "codec": remote_config["video"]["codec"],
76
                    "width": int(send_width), # int
77
                    "height": int(send_height), # int
78
                    "aspect-ratio": self.app.config.video_aspect_ratio,
79
                    "port": self.app.remote_config["video"]["port"], 
80
                },
81
                "audio": {
82
                    "source": self.app.config.audio_source,
83
                    "numchannels": self.app.config.audio_channels,
84
                    "codec": self.app.config.audio_codec,
85
                    "port": self.app.remote_config["audio"]["port"], 
86
                }
87
            },
88
            # -------------------- recv ------------
89
            "receive": {
90
                "video": {
91
                    "sink": self.app.config.video_sink,
92
                    "codec": self.app.config.video_codec,
93
                    "width": int(receive_width), # int
94
                    "height": int(receive_height), # int
95
                    "deinterlace": self.app.config.video_deinterlace, # bool
96
                    "aspect-ratio": remote_config["video"]["aspect_ratio"],
97
                    "port": str(self.app.recv_video_port), #decided by the app
98
                    "window-title": "\"From %s\"" % (contact_name), #TODO: i18n
99
                    "jitterbuffer": self.app.config.video_jitterbuffer, 
100
                    "fullscreen": self.app.config.video_fullscreen, # bool
101
                    "bitrate": remote_config["video"]["bitrate"], # float
102
                },
103
                "audio": {
104
                    "numchannels": self.app.config.audio_channels, # int
105
                    "codec": self.app.config.audio_codec, 
106
                    "port": self.app.recv_audio_port,
107
                    "sink": self.app.config.audio_sink
108
                }
109
            }
110
        }
111
        if self.session_details["send"]["video"]["source"] != "v4l2src":
112
            self.session_details["send"]["video"]["device"] = None
113
        if self.session_details["send"]["video"]["codec"] == "theora":
114
            self.session_details["send"]["video"]["bitrate"] = None
115
        print(str(self.session_details))
116
        
117
    def start(self, host):
118
        """
119
        Starts the sender and receiver processes.
120
        
121
        @param host: str ip addr
122
        Raises a RuntimeError if a sesison is already in progress.
123
        """
124
        if self.state != process.STATE_STOPPED:
125
            raise RuntimeError("Cannot start streamers since they are %s." % (self.state)) # the programmer has done something wrong if we're here.
126
        
127
        self._gather_config_to_stream(host)
128
        details = self.session_details
129

    
130
        # ------------------ send ---------------
131
        self.milhouse_send_cmd = [
132
            "milhouse", 
133
            '--sender', 
134
            '--address', details["peer"]["address"],
135
            '--videosource', details["send"]["video"]["source"],
136
            '--videocodec', details["send"]["video"]["codec"],
137
            '--videoport', str(details["send"]["video"]["port"]),
138
            '--width', str(details["send"]["video"]["width"]),
139
            '--height', str(details["send"]["video"]["height"]),
140
            '--aspect-ratio', str(details["send"]["video"]["aspect-ratio"]),
141
            '--audiosource', details["send"]["audio"]["source"],
142
            '--numchannels', str(details["send"]["audio"]["numchannels"]),
143
            '--audiocodec', details["send"]["audio"]["codec"],
144
            '--audioport', str(details["send"]["audio"]["port"]),
145
            ]
146
        if details["send"]["video"]["source"] == "v4l2src":
147
            self.milhouse_send_cmd.extend(["--videodevice", details["send"]["video"]["device"]])
148
        if details["send"]["video"]["codec"] != "theora":
149
            self.milhouse_send_cmd.extend(['--videobitrate', str(int(details["send"]["video"]["bitrate"] * 1000000))])
150

    
151
        # ------------------- recv ----------------
152
        self.milhouse_recv_cmd = [
153
            "milhouse",
154
            '--receiver', 
155
            '--address', details["peer"]["address"],
156
            '--videosink', details["receive"]["video"]["sink"],
157
            '--videocodec', details["receive"]["video"]["codec"],
158
            '--videoport', str(details["receive"]["video"]["port"]),
159
            '--jitterbuffer', str(details["receive"]["video"]["jitterbuffer"]),
160
            '--width', str(details["receive"]["video"]["width"]),
161
            '--height', str(details["receive"]["video"]["height"]),
162
            '--aspect-ratio', details["receive"]["video"]["aspect-ratio"],
163
            '--audiosink', details["receive"]["audio"]["sink"],
164
            '--numchannels', str(details["receive"]["audio"]["numchannels"]),
165
            '--audiocodec', details["receive"]["audio"]["codec"],
166
            '--audioport', str(details["receive"]["audio"]["port"]),
167
            '--window-title', details["receive"]["video"]["window-title"],
168
            ]
169
        if details["receive"]["video"]["fullscreen"]:
170
            self.milhouse_recv_cmd.append('--fullscreen')
171
        if details["receive"]["video"]["deinterlace"]:
172
            self.milhouse_recv_cmd.append('--deinterlace')
173

    
174
        # setting up
175
        self.rtcp_stats = {
176
            "send": {
177
                "video": {
178
                    "packets-lost": 0,
179
                    "jitter": 0,
180
                    "bitrate": 0,
181
                    "connected": False
182
                },
183
                "audio": {
184
                    "packets-lost": 0,
185
                    "jitter": 0,
186
                    "bitrate": 0,
187
                    "connected": False
188
                }
189
            },
190
            "receive": {
191
                "video": {
192
                    "connected": False
193
                },
194
                "audio": {
195
                    "connected": False
196
                }
197
            }
198
        }
199

    
200
        recv_cmd = " ".join(self.milhouse_recv_cmd)
201
        self.receiver = process.ProcessManager(command=recv_cmd, identifier="receiver")
202
        self.receiver.state_changed_signal.connect(self.on_process_state_changed)
203
        self.receiver.stdout_line_signal.connect(self.on_receiver_stdout_line)
204
        self.receiver.stderr_line_signal.connect(self.on_receiver_stderr_line)
205
        send_cmd = " ".join(self.milhouse_send_cmd)
206
        self.sender = process.ProcessManager(command=send_cmd, identifier="sender")
207
        self.sender.state_changed_signal.connect(self.on_process_state_changed)
208
        self.sender.stdout_line_signal.connect(self.on_sender_stdout_line)
209
        self.sender.stderr_line_signal.connect(self.on_sender_stderr_line)
210
        # starting
211
        self._set_state(process.STATE_STARTING)
212
        print "$", send_cmd
213
        self.sender.start()
214
        print "$", recv_cmd
215
        self.receiver.start()
216

    
217
    def on_receiver_stdout_line(self, line):
218
        """
219
        Handles a new line from our receiver process' stdout
220
        """
221
        if "stream connected" in line:
222
            if "audio" in line:
223
                self.rtcp_stats["receive"]["audio"]["connected"] = True
224
            elif "video" in line:
225
                self.rtcp_stats["receive"]["video"]["connected"] = True
226
        else:
227
            print "%9s stdout: %s" % (self.receiver.identifier, line)
228

    
229
    def on_receiver_stderr_line(self, line):
230
        """
231
        Handles a new line from our receiver process' stderr
232
        """
233
        print "%9s stdout: %s" % (self.receiver.identifier, line)
234
    
235
    def on_sender_stdout_line(self, line):
236
        """
237
        Handles a new line from our receiver process' stdout
238
        """
239
        print "%9s stdout: %s" % (self.sender.identifier, line)
240
        if "PACKETS-LOST" in line:
241
            if "mpeg4" in line or "theora" in line or "h263" in line or "h264" in line:
242
                self.rtcp_stats["send"]["video"]["packets-lost"] = int(line.split(":")[-1])
243
            elif "raw" in line or "mp3" in line or "vorbis" in line:
244
                self.rtcp_stats["send"]["audio"]["packets-lost"] = int(line.split(":")[-1])
245
        elif "JITTER" in line:
246
            if "mpeg4" in line or "theora" in line or "h263" in line or "h264" in line:
247
                self.rtcp_stats["send"]["video"]["jitter"] = int(line.split(":")[-1])
248
            elif "raw" in line or "mp3" in line or "vorbis" in line:
249
                self.rtcp_stats["send"]["audio"]["jitter"] = int(line.split(":")[-1])
250
        elif "connected" in line:
251
            if "mpeg4" in line or "theora" in line or "h263" in line or "h264" in line:
252
                self.rtcp_stats["send"]["video"]["connected"] = True
253
            elif "raw" in line or "mp3" in line or "vorbis" in line:
254
                self.rtcp_stats["send"]["audio"]["connected"] = True
255

    
256
    def on_sender_stderr_line(self, line):
257
        """
258
        Handles a new line from our receiver process' stderr
259
        """
260
        print "%9s stderr: %s" % (self.sender.identifier, line)
261

    
262
    def is_busy(self):
263
        """
264
        Retuns True if a streaming session is in progress.
265
        """
266
        return self.state != process.STATE_STOPPED
267

    
268
    def on_process_state_changed(self, process_manager, process_state):
269
        """
270
        Slot for the ProcessManager.state_changed_signal
271
        Calls stop() if one of the processes crashed.
272
        """
273
        print process_manager, process_state
274
        if process_state == process.STATE_RUNNING:
275
            # As soon as one is running, set our state to running
276
            if self.state == process.STATE_STARTING:
277
                self._set_state(process.STATE_RUNNING)
278
        elif process_state == process.STATE_STOPPING:
279
            pass
280
        elif process_state == process.STATE_STARTING:
281
            pass
282
        elif process_state == process.STATE_STOPPED:
283
            # As soon as one crashes or is not able to start, stop all streamer processes.
284
            if self.state in [process.STATE_RUNNING, process.STATE_STARTING]:
285
                print("A streamer process died. Stopping the local streamer manager.")
286
                self.stop() # sets self.state to STOPPING
287
            # Next, if all streamers are dead, we can say this manager is stopped
288
            if self.state == process.STATE_STOPPING:
289
                one_is_left = False
290
                for proc in [self.sender, self.receiver]:
291
                    if process_manager is not proc and proc.state != process.STATE_STOPPED:
292
                        print("Streamer process %s is not dead, so we are not done stopping. Its state is %s." % (proc, proc.state))
293
                        one_is_left = True
294
                if not one_is_left:
295
                    print "Setting streamers manager to STOPPED"
296
                    self._set_state(process.STATE_STOPPED)
297
    
298
    def _set_state(self, new_state):
299
        """
300
        Handles state changes.
301
        """
302
        if self.state != new_state:
303
            self.state_changed_signal(self, new_state)
304
            self.state = new_state
305
        else:
306
            raise RuntimeError("Setting state to %s, which is already the current state." % (self.state))
307
            
308
    def stop(self):
309
        """
310
        Stops the sender and receiver processes.
311
        Does not send any message to the remote peer ! This must be done somewhere else.
312
        """
313
        #TODO: return a deferred
314
        # stopping
315
        if self.state in [process.STATE_RUNNING, process.STATE_STARTING]:
316
            self._set_state(process.STATE_STOPPING)
317
            for proc in [self.sender, self.receiver]:
318
                if proc is not None:
319
                    if proc.state != process.STATE_STOPPED and proc.state != process.STATE_STOPPING:
320
                        proc.stop()