/*
 * 	              	  	Exemple "Qsort.c" PVM versio 3.3
 *			 Susana Maria Bajo Sanchez (ei05646@salleURL.edu)
 *			 Josep Maria Garrell i Guiu (josepmg@salleURL.edu)
 *					Software Paral·lel
 *		           	  Departament d'Informatica (DI)
 *	               		Enginyeria i Arquitectura La Salle
 *		           	  Universitat Ramon Llull (URL)
 * 			                 Curs 1999/2000
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "pvm3.h"

int info;

//-----------------------------------------------------------------------
int *setup_spmd(int argc,char *argv[],int NPROC,int *ME)
{
int mytid = pvm_mytid();
int parent = pvm_parent();
int *taskid = (int*)malloc(NPROC*sizeof(*taskid));
int i,msglabel = 0;

	if (parent<0)	// Es la tarea padre
		{
		// Se salta todos los '/' y coje la ultima ocurrenca, si no la encuentra retorna NULL.
		char *a_out=strrchr(argv[0],'/');	
		
		taskid[0] = mytid;
	
		if (NPROC>1)
			{
			// Crea las tareas hijas
			info = pvm_spawn((a_out)? a_out+1 : argv[0], argv+1, PvmTaskDefault,"*",NPROC-1,&taskid[1]); 
			if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_spawn()\n");
			}
		
		// Envía las tablas de taskid a los otros procesadores
		info = pvm_initsend(PvmDataDefault);
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_initsend()\n");

		info = pvm_pkint(taskid,NPROC,1);
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_pkint()\n");

		info = pvm_mcast(&taskid[1],NPROC-1,msglabel);
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_mcast()\n");
		}
		else	// Es una tarea hija
			{
			// Recibe los taskids del proceso #0
			info = pvm_recv(parent,msglabel);
			if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_recv()\n");

			info = pvm_upkint(taskid,NPROC,1);
			if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_upkint()\n");
			}
		
	// Encuentra el índice del proceso en la tabla de taskid
	for (i=0;i<NPROC;i++)
		{
		if (taskid[i] == mytid)		// Soy yo
			{
			*ME=i;
			return  taskid;
			} 
		}

	*ME=-1;

return NULL;
}

//-----------------------------------------------------------------------
void verify_data(char *note,int ME,int NPROC,int *taskid,int *data,int Nlocal)
{
static int msglabel = 1000; 
int i, N=Nlocal*NPROC;

	if (ME>0)	// Tarea slave
	{
	info = pvm_initsend(PvmDataDefault);
	if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_initsend()\n");

	info = pvm_pkint(data,Nlocal,1);
	if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_pkint()\n");

	info = pvm_send(taskid[0],msglabel);
	if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_send()\n");
	}
	else	// Tarea master
		{
		int *full_data = (int*)malloc(N*sizeof(*full_data));

		// Copia el area de memoria: data en full_data
		memcpy(full_data,data,N*sizeof(*data));

		for (i=1;i<NPROC;i++)
			{
			info = pvm_recv(taskid[i],msglabel);
			if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_recv()\n");

			info = pvm_upkint(&full_data[Nlocal*i],Nlocal,1);
			if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_upkint()\n");
			}
		printf("QSORT: Datos %s:\n",note);
		for (i=0;i<N;i++) printf("%5d%s",full_data[i],(i+1)%10==0?"\n":"");
		printf("\n");
		free(full_data);
		}
	msglabel++;
}

//-----------------------------------------------------------------------
// Mira si a > b
int sort_function(const void *a, const void *b)
{
int *aa = (int*)a;
int *bb = (int*)b;

return(*aa>*bb)?1:-1;
}

//-----------------------------------------------------------------------
// Compara los arrays locales y los coloca en uno
void compare_exchange(int ME,int NPROC, int *taskid,int dest,int *data,int Nlocal)
{
static int k = 0;
	
	if (dest!=ME && dest>=0 && dest<NPROC)
		{
		int *buf1 = (int*)malloc(Nlocal*sizeof(*buf1));
		int *buf2 = (int*)malloc(Nlocal*sizeof(*buf2));
		
		// Envía la contribución local al destino
		info = pvm_initsend(PvmDataDefault);
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_initsend()\n");

		info = pvm_pkint(data,Nlocal,1);
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_pkint()\n");

		info = pvm_send(taskid[dest],1000+k); 
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_send()\n");
		
		// Recibe la contribución del destino
		info = pvm_recv(taskid[dest],1000+k);
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_recv()\n");

		info = pvm_upkint(buf2,Nlocal,1);
		if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_upkint()\n");
 
		// Une con el conjunto actual de datos
		memcpy(buf1,data,Nlocal*sizeof(*data));
		if (dest>ME)	// (ME+1)
			{
			int *p1=buf1, *p2=buf2;
			int rembuf1=Nlocal, rembuf2=Nlocal;
			int i=0;

			while (rembuf1>0 && rembuf2>0)
				{
				if (*p1<=*p2)
					{
					data[i++]=*p1++;	// Toma de buf1[]
					rembuf1--;
					}
				else
					{
					data[i++]=*p2++;	// Toma de buf2[]
					rembuf2--;
					} 
				} // while (rembuf1>0&&rembuf2>0)
			} // if (dest>ME) (ME-1)
			else
				{ // dest<ME
				int *p1=buf1+Nlocal-1,*p2=buf2+Nlocal-1;
				int rembuf1= Nlocal, rembuf2=Nlocal;
				int i=Nlocal-1;

				while (rembuf1>0&&rembuf2>0)
					{
					if (*p1>*p2)
						{
						data[i--]=*p1--; // Toma de buf1[]
						rembuf1--;
						}
						else
							{
							data[i--]=*p2--; // Toma de buf2[]
							rembuf2--; 
							}
					 } //	while(rembuf1>0&&rembuf2>0)
				}  //  dest<ME*
		free(buf1);
		free(buf2);
		} // if (dest!=ME&&dest>=0&&dest<NPROC)
	k++;
}

//-----------------------------------------------------------------------
int main(int argc,char*argv[])
{
int *taskid; 		
struct pvmhostinfo *hostp;
int narch,NPROC; 
int N=atoi(argv[1]);		// Número global de elementos a ordenar
int ME;				// ID del proceso [0..NPROC-1]
int *data; 				// Array local de datos para ser ordenado
int i,phase,Nlocal;

	// Obtiene informacion sobre la configuracion del PVM
	info = pvm_config(&NPROC,&narch,&hostp);
	if (info<0) pvm_perror("QSORT: <ERROR> Después pvm_config()\n");

	// Se ejecuta en tantos procesadores como tenga
	printf("QSORT: Ejecutando en %d procesadores\n",NPROC);
	if (NPROC>1) Nlocal =(N+NPROC-1)/NPROC;
		else Nlocal= N;

	N = Nlocal*NPROC;

	// Creacion las tareas retorna los tids
	taskid = setup_spmd(argc,argv,NPROC,&ME);

	// Si es la tarea master
	if (ME==0) printf("N=%d\tNlocal=%d\tNPROC=%d\n",N,Nlocal,NPROC);

	// Creacion del array de datos locales
	data = (int*)malloc(Nlocal*sizeof(*data));

	// Introduce datos en el array local
	for (i=0;i<Nlocal;i++) data[i] = N-(i+ME*Nlocal);
	
	// Comprueba y muestra todos los datos desordenados
	verify_data("Antes del sort",ME,NPROC,taskid,data,Nlocal);

	// Ordena los datos locales (array) 
	qsort(data,Nlocal,sizeof(*data),sort_function);
	
	// Datos despues del qsort() local
	verify_data("Despues del qsort() local",ME,NPROC,taskid,data,Nlocal);
	
	// Odd-Even loop
	for (phase=1;phase<=NPROC;phase++)
		{
		if (phase%2!=0)
			{ // Oddphase
			if (ME%2==0) compare_exchange(ME,NPROC,taskid,ME+1,data,Nlocal);
			else compare_exchange(ME,NPROC,taskid,ME-1,data,Nlocal);
			}
			else	{ // Evenphase
				if (ME%2==0) compare_exchange(ME,NPROC,taskid,ME-1,data,Nlocal);
				else compare_exchange(ME,NPROC,taskid,ME+1,data,Nlocal);
				}
		} 	

	// Comprueba el resultado
	verify_data("Despues del sort",ME,NPROC,taskid,data,Nlocal);
	
	printf("QSORT: Saliendo de PVM..\n");
	pvm_exit();

return 0;
}

