source: tmcsimulator/branches/FEPSimulator/FEPSim.cpp @ 84

Revision 84, 6.1 KB checked in by jtorres, 9 years ago (diff)

FEPSimulator.cpp: FEPSimulator socket server is now receiving full messages, no more partial messages over the TCP socket. General housekeeping on FEPSim.cpp, NetworkReader?.cpp, ATMSDriver.java, NetworkLoader?.java.

Line 
1#include "FEPSim.h"
2
3/**
4 * Constructor
5 *
6 * @param host The host rpc server ip address
7 * @param networkFile the xml network file
8 */
9FEPSim::FEPSim(char * ATMShost) {
10    this->ATMSHost = ATMShost;
11}
12
13/**
14 * Destructor
15 */
16FEPSim::~FEPSim() {
17    cout << "Destroying client..." << endl;
18    clnt_destroy(clnt);
19}
20
21/**
22 * Handler for the ATMS RPC Response (to the client RPC Call)
23 * @param response pointer to fep_reply struct
24 */
25void FEPSim::handleCallResponse(void *response) {
26    // Failed RPC Call
27    if (response == NULL) {
28        clnt_perror(clnt, "RPC call failed");
29    }
30    // Successful RPC Call
31    else {
32        cout << "Successful RPC call to ATMS..." << endl;
33    }
34}
35
36void FEPSim::sendReplys(char * xml)
37{
38    NetworkReader networkReader = NetworkReader(xml);
39    vector<FEP_LINE*> lines = networkReader.getLines();
40    vector<STATION*> ldsMap = networkReader.getStations();
41   
42    // Send one reply for every FEPLine
43    for (int i = 0; i < lines.size(); i++) {
44        fep_reply fepReply;
45        cout << "Sending fepReply for line #" << lines.at(i)->lineNum << endl;
46        // populate reply
47        fepReply.reply = SHORTPOLL;
48        fepReply.schedule = lines.at(i)->schedule;
49        fepReply.lineinfo = lines.at(i)->lineInfo;
50        fepReply.kind = (enum polltype) 0;
51        fepReply.flag = (enum replykind) 0;
52
53        /***********************************
54         * We might need to handle this on the java side if its an
55         * issue
56        lines.at(i).schedleSeq += 1;
57        lines.at(i).globalSeq += 51;
58         */
59
60        fepReply.schedule_sequence = lines.at(i)->schedleSeq;
61        fepReply.global_sequence = lines.at(i)->globalSeq;
62
63        // using current unix time, may need to look at later
64        fepReply.schedule_time = time(NULL);
65
66        fepReply.user_info1 = lines.at(i)->lineNum;
67        fepReply.user_info2 = lines.at(i)->lineNum;
68        fepReply.system_key = lines.at(i)->systemKey;
69
70        fepReply.answers.size = 1;
71        fepReply.answers.fep_answer_list_u.shortp.count = 1;
72
73        // construct a shortanswer for each station in line
74        for (int j = 0; j < lines.at(i)->lds.size(); j++) {
75            fep_shortanswer fsa;
76            int index = lines.at(i)->ldsIndex.at(j);
77            cout << "LDS index: " << index << endl;
78            // msg: oa, od, ldsMap.at(index).dataPack, od, ff
79            fsa.msg.message_len = ldsMap.at(index)->length + 2;
80            fsa.msg.message[0] = 0x0d;
81            fsa.msg.message[1] = 0x0a;
82            for (int k = 0; k < ldsMap.at(index)->length; k++)
83                fsa.msg.message[2 + k] = ldsMap.at(index)->dataPack[k];
84            int aa = ldsMap.at(index)->length;
85            fsa.msg.message[2 + aa] = 0x0d;
86            fsa.msg.message[3 + aa] = 0xff;
87
88            // info
89            fsa.info.poll_error_count = 0;
90            fsa.info.poll_user_info1 = ldsMap.at(index)->drop; // drop number
91            fsa.info.poll_user_info2 = 1; //always 1
92            fsa.info.retries = 0;
93            fsa.info.status = (enum replystatus) 1;
94
95            fepReply.answers.fep_answer_list_u.shortp.answers[0] = fsa;
96
97            // send out data
98            printf("Transferring line=%d, lds_drop_no=%d...\n", lines.at(i)->lineNum, ldsMap.at(index)->drop);
99            // Make RPC Call and handle response
100            printf("Handling ATMS response...\n");
101            handleCallResponse(fep_reply_xfer_32(&fepReply, clnt));
102        }
103    }
104}
105
106/**
107 * Sends an fep_reply for every line in the FEP.
108 */
109void FEPSim::updateATMS(char * xml) {
110    if(createClient(this->ATMSHost))
111    {
112        sendReplys(xml);
113    }
114}
115
116/**
117 * Creates the RPC Client. If not successful, exit with status 1.
118 * @param host rpc server ip
119 */
120bool FEPSim::createClient(char * host) {
121    /* Create RPC Client to communicate with ATMS */
122    bool success = true;
123    cout << "Creating RPC Client" << endl;
124    clnt = clnt_create(host, /*100090,*/ 103121, 32, "tcp");
125    /* Check if client creation failed */
126    if (clnt == (CLIENT *) NULL) {
127        cerr << "Can't create client to " << host << endl;
128        success = false;
129    }
130    else {
131        cout << "Client created" << endl;
132    }
133    return success;
134}
135
136/**
137 * Main driver for ATMSCommunicator. Creates an RPC Client from command line args.
138 * @param argc
139 * @param argv
140 * @return
141 */
142int main(int argc, char *argv[]) {
143    if(argc != 3)
144    {
145        cerr << "Usage: FEPSim <ATMS_Host> <FEP_ATMSDriver_PortNo" << endl;
146        exit(1);
147    }
148    char *FEPSimHost = argv[1];   
149    FEPSim fep = FEPSim(FEPSimHost);
150   
151    int sockfd, newsockfd, portno, clilen;
152    char buffer[BUFF_SIZE];
153    struct sockaddr_in serv_addr, cli_addr;
154    int n;
155    portno = atoi(argv[2]);
156
157    /* First call to socket() function */
158    sockfd = socket(AF_INET, SOCK_STREAM, 0);
159
160    if (sockfd < 0) {
161        perror("ERROR opening socket");
162        exit(1);
163    }
164
165    /* Initialize socket structure */
166    bzero((char *) &serv_addr, sizeof (serv_addr));
167
168    serv_addr.sin_family = AF_INET;
169    serv_addr.sin_addr.s_addr = INADDR_ANY;
170    serv_addr.sin_port = htons(portno);
171
172    /* Now bind the host address using bind() call.*/
173    if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0) {
174        perror("ERROR on binding");
175        exit(1);
176    }
177
178    /* Now start listening for the clients, here process will
179     * go in sleep mode and will wait for the incoming connection
180     */
181
182    listen(sockfd, 5);
183    clilen = sizeof (cli_addr);
184
185    /* Accept actual connections from the client */
186    while(1) {
187        newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, (socklen_t *)&clilen);
188
189        if (newsockfd < 0) {
190            perror("ERROR on accept");
191            exit(1);
192        }
193
194        /* If connection is established then start communicating */
195        // zero out buffer
196        bzero(buffer, BUFF_SIZE);
197       
198        // read XML from socket
199        int totBytes = 0;
200        while ((n = recv(newsockfd, &buffer[totBytes], sizeof(buffer), 0)) > 0) {
201            totBytes += n;
202        }
203       
204        if (n < 0) {
205            perror("ERROR reading from socket");
206            exit(1);
207        }
208       
209        // send data to atms
210        fep.updateATMS(buffer);
211    }
212
213    return 0;
214}
Note: See TracBrowser for help on using the repository browser.