• Facebook
  • Twitter
  • Reddit
  • StumbleUpon
  • Digg
  • email

##############################################################################
# VoiceCode, a programming-by-voice environment
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
# (C)2002, National Research Council of Canada
#
##############################################################################
 
"""Classes for threads to monitor TCP/IP connections, used by tcp_server and 
tcp_client.
"""
 
import vc_globals
 
import os, re, select, socket
import string, sys, threading, time
 
import messaging
import Object
 
import debug
import messaging, Object
import util
import Queue
 
# from thread_communication_win32 import *
 
 
##############################################################################
# Classes for new version of the server with abstract interfaces for
# sending messages between threads.
#
# calls to/from natlink are still handled by the main thread, like in
# ServerSingleThread.
##############################################################################
 
 
class ListenAndQueueMsgsThread(threading.Thread, Object.Object):
    """class for a thread which listens for messages using a given 
    Messenger puts completed messages on a Queue.
 
    **INSTANCE ATTRIBUTES**
 
    [Messenger] *underlying* -- underlying messenger (usually
    [MessengerBasic]) used to receive and unpack them messages.
 
    Queue.Queue *completed_msgs* -- Queue on which to deposit the
    completed messages.
 
    SocketHasDataEvent *event* -- object used to notify the main thread
    that a socket has data
 
    Event *connection_ending* -- threading.Event object which will be set
    to true if the connection has been terminated and the thread should die
 
 
    (STR, {STR: STR}) conn_broken_msg -- message to put onto the Queue to 
    indicate that the connection was broken unexpectedly.  Note that
    because of thread timing issues, this can occur even if the
    mediator previously received an editor_disconnecting message, or
    even if the editor disconnected in response to a
    mediator_closing or terminating message.  However, in the former case, 
    AppStateMessaging should process the editor_disconnecting message
    first, and will therefore ignore any subsequent messages in the
    queue (including the conn_broken one).  In the latter case, the mediator 
    should already have left the message loop, so, again, the
    conn_broken message should not be processed.
 
    CLASS ATTRIBUTES**
 
    *none* --
 
    .. [Messenger] file:///./messenger.Messenger.html
    .. [MessengerBasic] file:///./messenger.MessengerBasic.html"""
    def __init__(self, underlying, completed_msgs, event,
            connection_ending, conn_broken_msg, **args_super):
        self.deep_construct(ListenAndQueueMsgsThread, 
                            {'underlying': underlying,
                             'completed_msgs': completed_msgs,
                             'event': event,
                             'connection_ending': connection_ending,
                             'conn_broken_msg': conn_broken_msg
                            }, 
                            args_super, 
                            exclude_bases={'threading.Thread': 1})
# provides debug messages
#        threading.Thread.__init__(self, verbose = 1)
        threading.Thread.__init__(self)
 
    def message_queue(self):
        """returns a reference to the message queue in which the thread
        puts completed messages
 
        **INPUTS**
 
        *none*
 
        **OUTPUTS**
 
        *Queue.Queue* -- the message queue
        """
        debug.trace('ListenAndQueueMsgsThread.message_queue', '** invoked, call stack is:')
#        debug.trace_call_stack('ListenAndQueueMsgsThread.message_queue', '**')
        return self.completed_msgs
 
    def get_mess(self):
        """Gets a message from the external editor.
 
        **INPUTS**
 
        *none*
 
        **OUTPUTS**
 
        (STR, {STR: STR}) name_argvals_mess -- The message retrieved
         from external editor in *(mess_name, {arg:val})* format.
         from external editor in *(mess_name, {arg:val})* format, or
         None if no message is available."""
 
        debug.trace('ListenAndQueueMsgsThread.get_mess', 'invoked')
        return self.underlying.get_mess()
 
    def notify_main(self):
        """notify the main thread that there is a new message waiting in 
        the Queue, and return asynchronously.
 
        **INPUTS**
 
        **OUTPUTS**
 
        *none*
        """
        debug.trace('ListenAndQueueMsgsThread.notify_main', 'self.event=%s' % self.event)
#        debug.trace_call_stack('ListenAndQueueMsgsThread.notify_main', '**')
        self.event.notify()
 
    def run(self):
        """Start listening for data.
 
        **INPUTS**
 
        *none* -- 
 
        **OUTPUTS**
 
        *none* -- 
        """
        debug.trace('ListenAndQueueMsgsThread.run', 
            'thread %s starting' % threading.currentThread().getName())
        while 1:
            try: 
                debug.trace('ListenAndQueueMsgsThread.run', '** getting a message')
                data = self.get_mess()
            except messaging.SocketError, err:
                if self.connection_ending.isSet():
#                    sys.stderr.write('SocketError, but connection_ending was set\n')
                    break
# connection broken unexpectedly (unless we just didn't get the
# connection_ending event in time)
#                sys.stderr.write('unexpected SocketError\n')
                self.completed_msgs.put(self.conn_broken_msg)
                self.notify_main()
                break
            except messaging.WokenUp:
                break
 
            if debug.tracing('ListenAndQueueMsgsThread.run'):
                debug.trace('ListenAndQueueMsgsThread.run', 
                    '** data=%s' % repr(data))                
            if data:
                debug.trace('ListenAndQueueMsgsThread.run', '** sending notification message that data was received.')
                self.completed_msgs.put(data)
                self.notify_main()
#            time.sleep(0.01)
#            time.sleep(1)
#           waits for timeout, or until connection_ending is set
            self.connection_ending.wait(0.01)
#            self.connection_ending.wait(1.0)
            if self.connection_ending.isSet():
#                sys.stderr.write('connection_ending detected\n')
                break
        self.underlying = None
 
 
class ListenNewConnThread(threading.Thread, Object.Object):
    """Abstract base class which listens for new socket connections on 
    a port number and uses an InterThreadEvent object to
    notify the main thread about the new connection.
 
    Concrete subclasses will define log_conn method to add each new socket 
    to a data structure containing uninitialised connections.
 
    **INSTANCE ATTRIBUTES**
 
    INT *port* -- Port on which to listen for connections.
 
    InterThreadEvent *event* -- object use to send event to the main
    thread
 
    **CLASS ATTRIBUTES**
 
    *none* -- 
    """
 
    def __init__(self, port, event, **args_super):
        self.deep_construct(ListenNewConnThread, 
                            {'port': port, 
                             'event': event},
                            args_super, 
                            exclude_bases={'threading.Thread': 1})
        threading.Thread.__init__(self)        
 
    def notify_main(self):
        """notify the main thread that there is a new connection waiting
        for a handshake, and return asynchronously.
 
        **INPUTS**
 
        **OUTPUTS**
 
        *none*
        """
#        debug.trace_call_stack('ListenNewConnThread.notify_main', '**')
        self.event.notify()
 
    def run(self):
        """Start listening for new connections.
 
        **INPUTS**
 
        *none* -- 
 
 
        **OUTPUTS**
 
        *none* -- 
        """
 
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.bind((socket.gethostname(), self.port))
        server_socket.listen(5)
 
        while 1:
 
            # Accept a new connection
            (client_socket, address) = server_socket.accept()
 
            debug.trace('ListenNewConnThread.run', 'got new connection on port=%s' % self.port)
 
            #
            # Log it notify the main event loop that
            # it should shake hands with it
            #
            self.log_new_conn(client_socket)
            self.notify_main()
 
            #
            # When debugging, increase this if you want to see things happen
            # in slow motion
            #
            time.sleep(0.05)
#            time.sleep(1)
 
 
    def log_new_conn(self, client_socket):
 
        """Logs a newly received socket connection, so that main event
        loop can later shake hands with it.
 
        **INPUTS**
 
        *socket client_socket* -- Newly received socket connection
 
        **OUTPUTS**
 
        *none* -- 
        """
        debug.virtual('ListenNewConnThread.log_new_conn')
 
 
class NewConnListThread(ListenNewConnThread):
    """Listens for new socket connections on a port number
    and use an InterThreadEvent object to
    notify the main thread about the new connection.
 
    Adds each new socket to a list of uninitialised connections.
 
    This version is used for new talker sockets, because they may not
    come in in the same order as the previously connected new listener
    sockets, so you have to look through the whole list anyway.
 
    **INSTANCE ATTRIBUTES**
 
    [socket] *new_socks* -- List on which to add any new connection.
 
    [lock] *new_socks_lock* -- Lock on the new connection list.
 
    **CLASS ATTRIBUTES**
 
    *none* -- 
    """
 
    def __init__(self, new_socks, new_socks_lock, **args_super):
        self.deep_construct(NewConnListThread, 
                            {'new_socks': new_socks,
                             'new_socks_lock': new_socks_lock},
                            args_super)
 
    def log_new_conn(self, client_socket):
 
        """Logs a newly received socket connection, so that main event
        loop can later shake hands with it.
 
        **INPUTS**
 
        *socket client_socket* -- Newly received socket connection
 
        **OUTPUTS**
 
        *none* -- 
        """
 
        self.new_socks_lock.acquire()
        self.new_socks.append((client_socket, [None, None, None, None]))
        self.new_socks_lock.release()        
 
class ListenNewEditorsThread(ListenNewConnThread):
    """Listens for new socket editor connections on a port number
    and use an InterThreadEvent object to notify the main thread about 
    the new connection.
 
    Adds each new socket to a Queue of uninitialised connections.
 
    This version is used for new listener sockets, which should be
    processed in the order they come in.
 
    **INSTANCE ATTRIBUTES**
 
    [Queue] *new_socks* -- Queue to which to add any new connection.
 
    **CLASS ATTRIBUTES**
 
    *none* -- 
    """
 
    def __init__(self, new_socks, **args_super):
        self.deep_construct(ListenNewEditorsThread, 
                            {'new_socks': new_socks},
                            args_super)
 
    def log_new_conn(self, client_socket):
 
        """Logs a newly received socket connection, so that main event
        loop can later shake hands with it.
 
        **INPUTS**
 
        *socket client_socket* -- Newly received socket connection
 
        **OUTPUTS**
 
        *none* -- 
        """
 
        self.new_socks.put(client_socket)