Skip to content
Snippets Groups Projects
Verified Commit 2dfc93cc authored by Paul Dechamps's avatar Paul Dechamps :speech_balloon:
Browse files

(refactor) Renamed su2 interface

parent 4016f192
Branches su2interface
No related tags found
No related merge requests found
Pipeline #52849 failed
......@@ -90,7 +90,7 @@ def initBlast(iconfig, vconfig, iSolver='DART', task='analysis'):
if iSolver == 'DART':
from blast.interfaces.dart.blDartInterface import DartInterface as interface
elif iSolver == 'SU2':
from blast.interfaces.su2.SU2Interface import SU2Interface as interface
from blast.interfaces.su2.blSU2Interface import SU2Interface as interface
else:
raise RuntimeError(f"Solver {iSolver} currently not implemented")
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 University of Liège
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SU2 interface
# Paul Dechamps
import numpy as np
import os
import sys
from blast.interfaces.blSolversInterface import SolversInterface
import pysu2
class SU2Interface(SolversInterface):
def __init__(self, su2config, vconfig, task='analysis'):
self.filename = su2config.get('filename')
self._modify_meshpath(self.filename, su2config.get('meshfile'))
self.verbose = su2config.get('Verb', 0)
self.have_mpi, self.comm, self.status, self.myid = self.__setup_mpi()
self.solver = pysu2.CSinglezoneDriver(self.filename, 1, self.comm)
# Prepare solver
self.comm.barrier()
self.solver.Preprocess(0)
self.wingMarkersToID = {key: i for i, key in enumerate(self.solver.GetMarkerTags())}
self.wingTags = su2config.get('wingTags')
print('SU2Interface::SU2 solver initialized for', task)
SolversInterface.__init__(self, vconfig)
def __setup_mpi(self):
import sys
from mpi4py import MPI
if 'mpi4py.MPI' in sys.modules:
have_mpi = True
comm = MPI.COMM_WORLD
status = MPI.Status()
myid = comm.Get_rank()
else:
have_mpi = False
comm = None
status = None
myid = 0
return have_mpi, comm, status, myid
def run(self):
#Remove output in the terminal
if self.getVerbose() < 2:
sys.stdout = open(os.devnull, 'w')
self.solver.Run()
self.solver.Postprocess()
# write outputs
self.solver.Monitor(0)
self.solver.Output(0)
self.comm.barrier()
#restore stdout
if self.getVerbose() < 2:
sys.stdout = sys.__stdout__
if self.getVerbose() > 0:
print(f'SU2Interface::SU2 solver finished in {self.solver.GetOutputValue("INNER_ITER"):.0f} iterations. RMS_DENSITY: {self.solver.GetOutputValue("RMS_DENSITY"):.2f}')
return 0
def writeCp(self, sfx=''):
""" Write Cp distribution around the airfoil on file.
"""
#GEt farfield pressure, velocity and density
vel_x_idx = self.solver.GetPrimitiveIndices()["VELOCITY_X"]
vel_y_idx = self.solver.GetPrimitiveIndices()["VELOCITY_Y"]
pressure_idx = self.solver.GetPrimitiveIndices()["PRESSURE"]
density_idx = self.solver.GetPrimitiveIndices()["DENSITY"]
nNodes_farfield = self.solver.GetNumberMarkerNodes(self.wingMarkersToID['farfield'])
velocity_farfield = np.zeros(nNodes_farfield)
pressure_farfield = np.zeros(nNodes_farfield)
density_farfield = np.zeros(nNodes_farfield)
primitives = self.solver.Primitives()
for inode in range(nNodes_farfield):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID['farfield'], inode)
# Velocity
vel_x = primitives.Get(nodeIndex, vel_x_idx)
vel_y = primitives.Get(nodeIndex, vel_y_idx)
velocity_farfield[inode] = np.sqrt(vel_x**2 + vel_y**2)
# Pressure
pressure_farfield[inode] = primitives.Get(nodeIndex, pressure_idx)
# Density
density_farfield[inode] = primitives.Get(nodeIndex, density_idx)
ref_pressure = np.mean(pressure_farfield)
ref_velocity = np.mean(velocity_farfield)
ref_density = np.mean(density_farfield)
cp = np.empty(0, dtype=float)
for body in self.iBnd:
for reg in body:
for inode, nodeIndexFloat in enumerate(reg.nodesCoord[:,-1]):
nodeIndex = int(nodeIndexFloat)
pressure = self.solver.Primitives().Get(nodeIndex, pressure_idx)
cp = np.append(cp, (pressure - ref_pressure)/(1/2*ref_density*ref_velocity**2))
np.savetxt(f'Cp{sfx}.dat', np.column_stack((self.iBnd[0][0].nodesCoord[:, 0], cp)))
def save(self, writer):
pass
def getAoA(self):
return self.solver.GetOutputValue('AOA') * np.pi/180
def getnDim(self):
return self.solver.GetNumberDimensions()
def getnBodies(self):
return 1
def getSpan(self, bodyname):
return 1.0
def getBlowingTypes(self, bodyname):
return['wing']
def getMinf(self):
vel_x_idx = self.solver.GetPrimitiveIndices()["VELOCITY_X"]
vel_y_idx = self.solver.GetPrimitiveIndices()["VELOCITY_Y"]
sound_speed_idx = self.solver.GetPrimitiveIndices()["SOUND_SPEED"]
nNodes_farfield = self.solver.GetNumberMarkerNodes(self.wingMarkersToID['farfield'])
mach_farfield = np.zeros(nNodes_farfield)
primitives = self.solver.Primitives()
for inode in range(nNodes_farfield):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID['farfield'], inode)
vel_x = primitives.Get(nodeIndex, vel_x_idx)
vel_y = primitives.Get(nodeIndex, vel_y_idx)
sound_speed = primitives.Get(nodeIndex, sound_speed_idx)
mach_farfield[inode] = np.sqrt(vel_x**2 + vel_y**2) / sound_speed
return np.mean(mach_farfield)
def getCl(self):
return self.solver.GetOutputValue('LIFT')
def getCd(self):
return self.solver.GetOutputValue('DRAG')
def getCm(self):
return self.solver.Get_Mx()
def getVerbose(self):
return self.verbose
def reset(self):
if self.getVerbose() > 0:
print('SU2Interface::Warning: Cannot reset solver.')
pass
def getInviscidBC(self):
""" Extract the inviscid paramaters at the edge of the boundary layer
and use it as a boundary condition in the viscous solver.
"""
if self.getnDim() == 2:
velComponents = ['VELOCITY_X', 'VELOCITY_Y']
elif self.getnDim() == 3:
velComponents = ['VELOCITY_X', 'VELOCITY_Y', 'VELOCITY_Z']
else:
raise RuntimeError('su2Interface::Incorrect number of dimensions.')
velIdx = np.zeros(self.getnDim(), dtype=int)
for i, velComp in enumerate(velComponents):
velIdx[i] = self.solver.GetPrimitiveIndices()[velComp]
soundSpeedIdx = self.solver.GetPrimitiveIndices()["SOUND_SPEED"]
densityIdx = self.solver.GetPrimitiveIndices()["DENSITY"]
for body in self.iBnd:
for reg in body:
for inode, nodeIndexFloat in enumerate(reg.nodesCoord[:,-1]):
nodeIndex = int(nodeIndexFloat)
# Velocity
for idim in range(self.getnDim()):
reg.V[inode,idim] = self.solver.Primitives().Get(int(nodeIndex), int(velIdx[idim]))
# Density
reg.Rho[inode] = self.solver.Primitives().Get(nodeIndex, densityIdx)
# Mach number
vel_norm = 0.0
for idim in range(self.getnDim()):
vel_norm += reg.V[inode,idim]**2
reg.M[inode] = np.sqrt(vel_norm) / self.solver.Primitives().Get(nodeIndex, soundSpeedIdx)
def setBlowingVelocity(self):
"""
Extracts the blowing velocity from the viscous calculation and applies it
as a boundary condition in the inviscid solver.
Notes
-----
- In SU2, the "Grid velocity" is a vector defined at the nodes in the global
frame of reference.
- In BLASTER, the blowing velocity is computed as a **scalar normal component**
at the element level.
- To impose the correct boundary condition, the normal blowing velocity must
be projected into the global frame and interpolated to the nodes.
"""
# Compute blowing velocity in the global frame of reference on the elements
blw_elems = np.zeros((self.iBnd[0][0].elemsCoord.shape[0], self.getnDim()), dtype=float)
normals = []
for ielm in range(len(self.iBnd[0][0].elemsCoord)):
nod1 = ielm
nod2 = ielm + 1
x1 = self.iBnd[0][0].nodesCoord[nod1,:self.getnDim()]
x2 = self.iBnd[0][0].nodesCoord[nod2,:self.getnDim()]
# Components of the tangent vector
t = np.empty(self.getnDim())
for idim in range(self.getnDim()):
t[idim] = x2[idim] - x1[idim]
normt = np.linalg.norm(t)
# Components of the normal vector
n = np.empty(self.getnDim())
if self.getnDim() == 2:
# 90° rotation
n[0] = t[1] / normt
n[1] = -t[0] / normt
elif self.getnDim() == 3:
# Compute using cross product with another edge
raise RuntimeError('3D normal not implemented yet (use SU2 function if possible).')
normals.append(n)
for idim in range(self.getnDim()):
blw_elems[ielm, idim] = self.iBnd[0][0].blowingVel[ielm] * n[idim]
# Compute blowing velocity at nodes
blw_nodes = np.zeros((self.iBnd[0][0].nodesCoord.shape[0], 3), dtype=float)
for inode in range(self.iBnd[0][0].nodesCoord.shape[0]):
# Look for the elements sharing node inode
# At TE, elm1 is the last element of upper side and elm2 is the last element of lower side.
if inode == 0 or inode == self.iBnd[0][0].nodesCoord.shape[0]-1:
elm1 = 0
elm2 = -1
else:
elm1 = inode - 1
elm2 = inode
# Because the blowing velocity on the lower side points downwards
sign = 1
if inode > self.iBnd[0][0].nodesCoord.shape[0]//2:
sign = -1
# The blowing velocity on one node is the average
# of the blowing velocity on the elements sharing the node
for idim in range(self.getnDim()):
blw_nodes[inode, idim] = sign * 0.5*(blw_elems[elm1, idim] + blw_elems[elm2, idim])
blw_nodes[blw_nodes < -0.01] = -0.01
blw_nodes[blw_nodes > 0.01] = 0.01
# Set blowing velocity in SU2 solver
for arfTag in self.wingTags:
for ivertex in range(self.solver.GetNumberMarkerNodes(self.wingMarkersToID[arfTag])):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID[arfTag], ivertex)
blasterIndex = np.where(self.iBnd[0][0].nodesCoord[:, -1] == nodeIndex)[0][0]
self.solver.SetBlowingVelocity(nodeIndex, blw_nodes[blasterIndex, 0], blw_nodes[blasterIndex, 1], blw_nodes[blasterIndex, 2])
def getWing(self):
if self.getnDim() == 2:
self._getAirfoil(0)
elif self.getnDim() == 3:
self._getWing3D(0)
else:
raise RuntimeError('su2Interface::Incorrect number of dimensions.')
def _getAirfoil(self, ibody):
""" Retreives the mesh used for SU2 Euler calculation.
Airfoil is already in seilig format (Upper TE -> Lower TE).
"""
nodesCoord = np.zeros((0, 4))
for arfTag in self.wingTags:
nodesOnSide = np.zeros((self.solver.GetNumberMarkerNodes(self.wingMarkersToID[arfTag]), 4))
for i in range(self.solver.GetNumberMarkerNodes(self.wingMarkersToID[arfTag])):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID[arfTag], i)
nodesOnSide[i, :self.getnDim()] = self.solver.Coordinates().Get(nodeIndex)
nodesOnSide[i, -1] = nodeIndex
# Sort in ascending order of first column
nodesOnSide = nodesOnSide[nodesOnSide[:,0].argsort()]
if len(self.wingTags) > 1:
if arfTag == 'airfoil':
nodesOnSide = np.flip(nodesOnSide, axis=0)
elif arfTag == 'airfoil_':
nodesOnSide = np.delete(nodesOnSide, 0, axis=0)
else:
# If we cannot identify upper and lower sides, we sort using
# the angle between the node and the TE.
# This method can fail for highly cambered airfoils.
# Find the trailing edge
te = nodesOnSide[np.argmax(nodesOnSide[:, 0])]
# Compute angles relative to TE
angles = np.arctan2(nodesOnSide[:, 1] - te[1], nodesOnSide[:, 0] - te[0])
# Sort by angle in descending order
nodesOnSide = nodesOnSide[np.argsort(-angles)]
# Selig format
te_index = np.argmax(nodesOnSide[:, 0]) # Leading edge has min x
nodesOnSide[:te_index] = np.flip(nodesOnSide[:te_index], axis=0)
nodesOnSide[te_index:] = np.flip(nodesOnSide[te_index:], axis=0)
# Duplicate TE point
nodesOnSide = np.row_stack((nodesOnSide[-1], nodesOnSide))
nodesCoord = np.row_stack((nodesCoord, nodesOnSide))
elemsCoord = np.zeros((nodesCoord.shape[0]-1, 4))
for i in range(nodesCoord.shape[0]-1):
elemsCoord[i, :self.getnDim()] = (nodesCoord[i, :self.getnDim()] + nodesCoord[i+1, :self.getnDim()]) / 2
elemsCoord[i, -1] = i
# Check that the airfoil has closed trailing edge
if np.any(nodesCoord[0, [0,self.getnDim()-1]] != nodesCoord[-1, [0,self.getnDim()-1]]):
raise RuntimeError('su2Interface::Airfoil has an open trailing edge.')
# Check that the airfoil has one point at the leading edge
if len(np.where(nodesCoord[:,0] == np.min(nodesCoord[:,0]))[0]) > 1:
print(len(np.where(nodesCoord[:,0] == np.min(nodesCoord[:,0]))[0]))
raise RuntimeError('su2Interface::Airfoil has multiple points at the leading edge.')
# Check for duplicated elements
if len(np.unique(elemsCoord[:,-1])) != elemsCoord.shape[0]:
raise RuntimeError('su2Interface::Duplicated elements in airfoil mesh.')
# Fill the data structures
self.iBnd[ibody][0].initStructures(nodesCoord.shape[0], elemsCoord.shape[0])
self.iBnd[ibody][0].nodesCoord = nodesCoord
self.iBnd[ibody][0].elemsCoord = elemsCoord
self.iBnd[ibody][0].setConnectList(np.array(nodesCoord[:,-1],dtype=int),
np.array(elemsCoord[:,-1],dtype=int))
def _modify_meshpath(self, config, mesh):
# Open config file and look for line MESH_FILENAME
# replace this line with MESH_FILENAME = mesh
with open(config, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines):
if line.startswith('MESH_FILENAME'):
lines[i] = f'MESH_FILENAME = {mesh}\n'
with open(config, 'w') as f:
f.writelines(lines)
break
else:
raise RuntimeError('su2Interface::MESH_FILENAME not found in config file.')
......@@ -15,24 +15,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# SU2 interface
# Paul Dechamps
import numpy as np
import os
import sys
from blast.interfaces.blSolversInterface import SolversInterface
import pysu2
class SU2Interface(SolversInterface):
def __init__(self, fileName, vSolver, cfg):
def __init__(self, su2config, vconfig, task='analysis'):
self.filename = su2config.get('filename')
self._modify_meshpath(self.filename, su2config.get('meshfile'))
self.verbose = su2config.get('Verb', 0)
self.have_mpi, self.comm, self.status, self.myid = self.__setup_mpi()
try:
import pysu2
self.solver = pysu2.CSinglezoneDriver(fileName, 1, self.comm)
print('SU2 successfully loaded.')
except:
print(ccolors.ANSI_RED, 'Could not load SU2. Make sure it is installed.', ccolors.ANSI_RESET)
raise RuntimeError('Import error')
SolversInterface.__init__(self, vSolver, cfg)
self.solver = pysu2.CSinglezoneDriver(self.filename, 1, self.comm)
# Prepare solver
self.comm.barrier()
self.solver.Preprocess(0)
self.wingMarkersToID = {key: i for i, key in enumerate(self.solver.GetMarkerTags())}
self.wingTags = su2config.get('wingTags')
print('SU2Interface::SU2 solver initialized for', task)
SolversInterface.__init__(self, vconfig)
def __setup_mpi(self):
import sys
......@@ -48,162 +56,306 @@ class SU2Interface(SolversInterface):
status = None
myid = 0
return have_mpi, comm, status, myid
def run(self):
self.comm.barrier()
# run solver
#Remove output in the terminal
if self.getVerbose() < 2:
sys.stdout = open(os.devnull, 'w')
self.solver.Run()
self.solver.Postprocess()
# write outputs
self.solver.Monitor(0)
self.solver.Monitor(0)
self.solver.Output(0)
self.comm.barrier()
return 1
#restore stdout
if self.getVerbose() < 2:
sys.stdout = sys.__stdout__
if self.getVerbose() > 0:
print(f'SU2Interface::SU2 solver finished in {self.solver.GetOutputValue("INNER_ITER"):.0f} iterations. RMS_DENSITY: {self.solver.GetOutputValue("RMS_DENSITY"):.2f}')
return 0
def writeCp(self, sfx=''):
cp = np.zeros((self.nPoints[0], 4))
for i in range(len(self.iBnd[0].nodesCoord[:,0])):
cp[i, :3] = self.solver.GetVertexCoord(int(self.bndMarkers[0]), int(self.iBnd[0].connectListNodes[i]))
cp[i, 3] = self.solver.GetVertexCp(int(self.bndMarkers[0]), int(self.iBnd[0].connectListNodes[i]))
np.savetxt('Cp'+sfx+'.dat', cp, fmt='%1.8e', delimiter=', ',
header='{0:>14s}, {1:>14s}, {2:>14s}, {3:>14s}'.format('x','y', 'z', 'Cp'), comments='')
""" Write Cp distribution around the airfoil on file.
"""
#GEt farfield pressure, velocity and density
vel_x_idx = self.solver.GetPrimitiveIndices()["VELOCITY_X"]
vel_y_idx = self.solver.GetPrimitiveIndices()["VELOCITY_Y"]
pressure_idx = self.solver.GetPrimitiveIndices()["PRESSURE"]
density_idx = self.solver.GetPrimitiveIndices()["DENSITY"]
nNodes_farfield = self.solver.GetNumberMarkerNodes(self.wingMarkersToID['farfield'])
velocity_farfield = np.zeros(nNodes_farfield)
pressure_farfield = np.zeros(nNodes_farfield)
density_farfield = np.zeros(nNodes_farfield)
primitives = self.solver.Primitives()
for inode in range(nNodes_farfield):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID['farfield'], inode)
# Velocity
vel_x = primitives.Get(nodeIndex, vel_x_idx)
vel_y = primitives.Get(nodeIndex, vel_y_idx)
velocity_farfield[inode] = np.sqrt(vel_x**2 + vel_y**2)
# Pressure
pressure_farfield[inode] = primitives.Get(nodeIndex, pressure_idx)
# Density
density_farfield[inode] = primitives.Get(nodeIndex, density_idx)
ref_pressure = np.mean(pressure_farfield)
ref_velocity = np.mean(velocity_farfield)
ref_density = np.mean(density_farfield)
cp = np.empty(0, dtype=float)
for body in self.iBnd:
for reg in body:
for inode, nodeIndexFloat in enumerate(reg.nodesCoord[:,-1]):
nodeIndex = int(nodeIndexFloat)
pressure = self.solver.Primitives().Get(nodeIndex, pressure_idx)
cp = np.append(cp, (pressure - ref_pressure)/(1/2*ref_density*ref_velocity**2))
np.savetxt(f'Cp{sfx}.dat', np.column_stack((self.iBnd[0][0].nodesCoord[:, 0], cp)))
def save(self, writer):
pass
def getAoA(self):
return self.solver.GetOutputValue('AOA') * np.pi/180
def getnDim(self):
return self.solver.GetNumberDimensions()
def getnBodies(self):
return 1
def getSpan(self, bodyname):
return 1.0
def getBlowingTypes(self, bodyname):
return['wing']
def getMinf(self):
return 1
vel_x_idx = self.solver.GetPrimitiveIndices()["VELOCITY_X"]
vel_y_idx = self.solver.GetPrimitiveIndices()["VELOCITY_Y"]
sound_speed_idx = self.solver.GetPrimitiveIndices()["SOUND_SPEED"]
nNodes_farfield = self.solver.GetNumberMarkerNodes(self.wingMarkersToID['farfield'])
mach_farfield = np.zeros(nNodes_farfield)
primitives = self.solver.Primitives()
for inode in range(nNodes_farfield):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID['farfield'], inode)
vel_x = primitives.Get(nodeIndex, vel_x_idx)
vel_y = primitives.Get(nodeIndex, vel_y_idx)
sound_speed = primitives.Get(nodeIndex, sound_speed_idx)
mach_farfield[inode] = np.sqrt(vel_x**2 + vel_y**2) / sound_speed
return np.mean(mach_farfield)
def getCl(self):
return self.solver.Get_LiftCoeff()
return self.solver.GetOutputValue('LIFT')
def getCd(self):
return self.solver.Get_DragCoeff()
return self.solver.GetOutputValue('DRAG')
def getCm(self):
return self.solver.Get_Mx()
def getVerbose(self):
return 1
return self.verbose
def reset(self):
self.solver.Preprocess(0)
def imposeInvBC(self, couplIter):
if self.getVerbose() > 0:
print('SU2Interface::Warning: Cannot reset solver.')
pass
def getInviscidBC(self):
""" Extract the inviscid paramaters at the edge of the boundary layer
and use it as a boundary condition in the viscous solver.
"""
if self.getnDim() == 2:
velComponents = ['VELOCITY_X', 'VELOCITY_Y']
elif self.getnDim() == 3:
velComponents = ['VELOCITY_X', 'VELOCITY_Y', 'VELOCITY_Z']
else:
raise RuntimeError('su2Interface::Incorrect number of dimensions.')
Params
------
- couplIter (int): Coupling iteration.
velIdx = np.zeros(self.getnDim(), dtype=int)
for i, velComp in enumerate(velComponents):
velIdx[i] = self.solver.GetPrimitiveIndices()[velComp]
soundSpeedIdx = self.solver.GetPrimitiveIndices()["SOUND_SPEED"]
densityIdx = self.solver.GetPrimitiveIndices()["DENSITY"]
for body in self.iBnd:
for reg in body:
for inode, nodeIndexFloat in enumerate(reg.nodesCoord[:,-1]):
nodeIndex = int(nodeIndexFloat)
# Velocity
for idim in range(self.getnDim()):
reg.V[inode,idim] = self.solver.Primitives().Get(int(nodeIndex), int(velIdx[idim]))
# Density
reg.Rho[inode] = self.solver.Primitives().Get(nodeIndex, densityIdx)
# Mach number
vel_norm = 0.0
for idim in range(self.getnDim()):
vel_norm += reg.V[inode,idim]**2
reg.M[inode] = np.sqrt(vel_norm) / self.solver.Primitives().Get(nodeIndex, soundSpeedIdx)
def setBlowingVelocity(self):
"""
# Retreive parameters.
for n in range(len(self.iBnd)):
for iRow, row in enumerate(self.iBnd[n].connectListNodes):
row=int(row)
self.iBnd[n].V[iRow,:] = self.solver.GetVertexVelocity(self.bndMarkers[n], row)
self.iBnd[n].M[iRow] = self.solver.GetVertexMach(self.bndMarkers[n], row)
self.iBnd[n].Rho[iRow] = self.solver.GetVertexDensity(self.bndMarkers[n], row)
if self.iBnd[n].Rho[iRow] == 0:
self.iBnd[n].Rho[iRow] = 1
self.setViscousSolver(couplIter)
def imposeBlwVel(self):
""" Extract the solution of the viscous calculation (blowing velocity)
and use it as a boundary condition in the inviscid solver.
Extracts the blowing velocity from the viscous calculation and applies it
as a boundary condition in the inviscid solver.
Notes
-----
- In SU2, the "Grid velocity" is a vector defined at the nodes in the global
frame of reference.
- In BLASTER, the blowing velocity is computed as a **scalar normal component**
at the element level.
- To impose the correct boundary condition, the normal blowing velocity must
be projected into the global frame and interpolated to the nodes.
"""
if self.iBnd[0].connectListNodes[0] != self.iBnd[0].connectListNodes[-1]:
raise RuntimeError("List of connectivity is incorrect.")
self.getBlowingBoundary()
# Impose blowing velocities.
for n in range(len(self.bndMarkers)):
if n == 0:
self.iBnd[n].blowingVel[0] = .5 * (self.iBnd[n].blowingVel[0] + self.iBnd[n].blowingVel[-1])
self.iBnd[n].blowingVel = self.iBnd[n].blowingVel[self.iBnd[n].connectListElems[:-1].argsort()]
elif n == 1:
self.iBnd[1].blowingVel = np.insert(self.iBnd[1].blowingVel, 0, self.iBnd[0].blowingVel[0])
self.iBnd[n].blowingVel = self.iBnd[n].blowingVel[self.iBnd[n].connectListElems.argsort()]
for iVertex in range(len(self.iBnd[n].blowingVel)-1):
self.solver.SetBlowing(self.bndMarkers[n], iVertex, self.iBnd[n].blowingVel[iVertex])
def setMesh(self):
if self.cfg['nDim'] == 2:
self.getAirfoil()
elif self.cfg['nDim'] == 3:
self.getWing()
# Compute blowing velocity in the global frame of reference on the elements
blw_elems = np.zeros((self.iBnd[0][0].elemsCoord.shape[0], self.getnDim()), dtype=float)
normals = []
for ielm in range(len(self.iBnd[0][0].elemsCoord)):
nod1 = ielm
nod2 = ielm + 1
x1 = self.iBnd[0][0].nodesCoord[nod1,:self.getnDim()]
x2 = self.iBnd[0][0].nodesCoord[nod2,:self.getnDim()]
# Components of the tangent vector
t = np.empty(self.getnDim())
for idim in range(self.getnDim()):
t[idim] = x2[idim] - x1[idim]
normt = np.linalg.norm(t)
# Components of the normal vector
n = np.empty(self.getnDim())
if self.getnDim() == 2:
# 90° rotation
n[0] = t[1] / normt
n[1] = -t[0] / normt
elif self.getnDim() == 3:
# Compute using cross product with another edge
raise RuntimeError('3D normal not implemented yet (use SU2 function if possible).')
normals.append(n)
for idim in range(self.getnDim()):
blw_elems[ielm, idim] = self.iBnd[0][0].blowingVel[ielm] * n[idim]
# Compute blowing velocity at nodes
blw_nodes = np.zeros((self.iBnd[0][0].nodesCoord.shape[0], 3), dtype=float)
for inode in range(self.iBnd[0][0].nodesCoord.shape[0]):
# Look for the elements sharing node inode
# At TE, elm1 is the last element of upper side and elm2 is the last element of lower side.
if inode == 0 or inode == self.iBnd[0][0].nodesCoord.shape[0]-1:
elm1 = 0
elm2 = -1
else:
elm1 = inode - 1
elm2 = inode
# Because the blowing velocity on the lower side points downwards
sign = 1
if inode > self.iBnd[0][0].nodesCoord.shape[0]//2:
sign = -1
# The blowing velocity on one node is the average
# of the blowing velocity on the elements sharing the node
for idim in range(self.getnDim()):
blw_nodes[inode, idim] = sign * 0.5*(blw_elems[elm1, idim] + blw_elems[elm2, idim])
blw_nodes[blw_nodes < -0.01] = -0.01
blw_nodes[blw_nodes > 0.01] = 0.01
# Set blowing velocity in SU2 solver
for arfTag in self.wingTags:
for ivertex in range(self.solver.GetNumberMarkerNodes(self.wingMarkersToID[arfTag])):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID[arfTag], ivertex)
blasterIndex = np.where(self.iBnd[0][0].nodesCoord[:, -1] == nodeIndex)[0][0]
self.solver.SetBlowingVelocity(nodeIndex, blw_nodes[blasterIndex, 0], blw_nodes[blasterIndex, 1], blw_nodes[blasterIndex, 2])
def getWing(self):
if self.getnDim() == 2:
self._getAirfoil(0)
elif self.getnDim() == 3:
self._getWing3D(0)
else:
raise RuntimeError('su2Interface::Could not resolve how to set\
the mesh. Either ''nDim'' is incorrect or ''msh'' was not set for\
3D computations')
raise RuntimeError('su2Interface::Incorrect number of dimensions.')
def getAirfoil(self):
def _getAirfoil(self, ibody):
""" Retreives the mesh used for SU2 Euler calculation.
Airfoil is already in seilig format (Upper TE -> Lower TE).
"""
alltags = self.solver.GetAllBoundaryMarkers()
if 'wake' not in alltags:
#raise RuntimeError('Can not find tag wake.')
self.bndMarkers = [alltags['airfoil']]
del self.iBnd[1]
del self.vBnd[0][1]
nodesCoord = np.zeros((0, 4))
for arfTag in self.wingTags:
nodesOnSide = np.zeros((self.solver.GetNumberMarkerNodes(self.wingMarkersToID[arfTag]), 4))
for i in range(self.solver.GetNumberMarkerNodes(self.wingMarkersToID[arfTag])):
nodeIndex = self.solver.GetMarkerNode(self.wingMarkersToID[arfTag], i)
nodesOnSide[i, :self.getnDim()] = self.solver.Coordinates().Get(nodeIndex)
nodesOnSide[i, -1] = nodeIndex
# Sort in ascending order of first column
nodesOnSide = nodesOnSide[nodesOnSide[:,0].argsort()]
if len(self.wingTags) > 1:
if arfTag == 'airfoil':
nodesOnSide = np.flip(nodesOnSide, axis=0)
elif arfTag == 'airfoil_':
nodesOnSide = np.delete(nodesOnSide, 0, axis=0)
else:
# If we cannot identify upper and lower sides, we sort using
# the angle between the node and the TE.
# This method can fail for highly cambered airfoils.
# Find the trailing edge
te = nodesOnSide[np.argmax(nodesOnSide[:, 0])]
# Compute angles relative to TE
angles = np.arctan2(nodesOnSide[:, 1] - te[1], nodesOnSide[:, 0] - te[0])
# Sort by angle in descending order
nodesOnSide = nodesOnSide[np.argsort(-angles)]
# Selig format
te_index = np.argmax(nodesOnSide[:, 0]) # Leading edge has min x
nodesOnSide[:te_index] = np.flip(nodesOnSide[:te_index], axis=0)
nodesOnSide[te_index:] = np.flip(nodesOnSide[te_index:], axis=0)
# Duplicate TE point
nodesOnSide = np.row_stack((nodesOnSide[-1], nodesOnSide))
nodesCoord = np.row_stack((nodesCoord, nodesOnSide))
elemsCoord = np.zeros((nodesCoord.shape[0]-1, 4))
for i in range(nodesCoord.shape[0]-1):
elemsCoord[i, :self.getnDim()] = (nodesCoord[i, :self.getnDim()] + nodesCoord[i+1, :self.getnDim()]) / 2
elemsCoord[i, -1] = i
# Check that the airfoil has closed trailing edge
if np.any(nodesCoord[0, [0,self.getnDim()-1]] != nodesCoord[-1, [0,self.getnDim()-1]]):
raise RuntimeError('su2Interface::Airfoil has an open trailing edge.')
# Check that the airfoil has one point at the leading edge
if len(np.where(nodesCoord[:,0] == np.min(nodesCoord[:,0]))[0]) > 1:
print(len(np.where(nodesCoord[:,0] == np.min(nodesCoord[:,0]))[0]))
raise RuntimeError('su2Interface::Airfoil has multiple points at the leading edge.')
# Check for duplicated elements
if len(np.unique(elemsCoord[:,-1])) != elemsCoord.shape[0]:
raise RuntimeError('su2Interface::Duplicated elements in airfoil mesh.')
# Fill the data structures
self.iBnd[ibody][0].initStructures(nodesCoord.shape[0], elemsCoord.shape[0])
self.iBnd[ibody][0].nodesCoord = nodesCoord
self.iBnd[ibody][0].elemsCoord = elemsCoord
self.iBnd[ibody][0].setConnectList(np.array(nodesCoord[:,-1],dtype=int),
np.array(elemsCoord[:,-1],dtype=int))
def _modify_meshpath(self, config, mesh):
# Open config file and look for line MESH_FILENAME
# replace this line with MESH_FILENAME = mesh
with open(config, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines):
if line.startswith('MESH_FILENAME'):
lines[i] = f'MESH_FILENAME = {mesh}\n'
with open(config, 'w') as f:
f.writelines(lines)
break
else:
self.bndMarkers = [alltags['airfoil'], alltags['wake']]
self.nPoints = [self.solver.GetNumberVertices(tag) for tag in self.bndMarkers]
data = [np.zeros((nPts, 4)) for nPts in self.nPoints]
for n in range(len(self.nPoints)):
for i in range(self.nPoints[n]):
data[n][i,:3] = self.solver.GetVertexCoord(self.bndMarkers[n], i)
data[n][i,3] = i
# Find TE and reorder coordinates
teNode = data[0][:,0].argmax()
"""if teNode != 0:
save = data[0][0,:].copy()
data[0][0,:] = data[0][teNode,:].copy()
data[0][teNode,:] = save
teNode = data[0][:,0].argmax()
print('Airfoil coordinates were reordered automatically.')"""
for i in range(len(data[0][0,:])):
data[0][:teNode+1,i] = np.flip(data[0][:teNode+1,i])
data[0][teNode+1:,i] = np.flip(data[0][teNode+1:,i])
# Duplicate TE point
data[0] = np.row_stack((data[0], data[0][0,:]))
# Find LE point
self.leNode = data[0][:,0].argmin()+1
# Recompute number of points
self.nPoints = [len(data[n]) for n in range(len(data))]
dataElems = [np.zeros((nPts-1, 3)) for nPts in self.nPoints]
for n in range(len(data)):
for iPoint in range(len(data[n][:,0]) - 1):
dataElems[n][iPoint, :] = .5*(data[n][iPoint+1,:3] + data[n][iPoint, :3])
# Nodes coordinates in the transformed space
self.phantomNodesCoord = [d.copy() for d in data]
self.phantomNodesCoord[0][:self.leNode, 0] *= -1
# Compute elements cg coordinates.
self.elemCgs = [np.zeros((nPts-1, 4)) for nPts in self.nPoints]
for n in range(len(self.elemCgs)):
for i in range(len(self.elemCgs[n][:,0])):
self.elemCgs[n][i,:] = (data[n][i,:] + data[n][i+1,:]) / 2
# Elements cg coordinates in the transformed space
self.phantomElemsCoord = [ecg.copy() for ecg in self.elemCgs]
self.phantomElemsCoord[0][:self.leNode, 0] *= -1
for n, bnd in enumerate(self.iBnd):
bnd.elemsCoord = dataElems[n]
bnd.nodesCoord = data[n]
bnd.setConnectList(data[n][:,3], data[n][:,3])
bnd.V = np.zeros((self.nPoints[n], 3))
bnd.M = np.zeros(self.nPoints[n])
bnd.Rho = np.zeros(self.nPoints[n])
self.solver.InitBlowing()
raise RuntimeError('su2Interface::MESH_FILENAME not found in config file.')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment