Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ws): introduce message parser for ws adapter #14127

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions integration/websockets/e2e/ws-gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,61 @@ describe('WebSocketGateway (WsAdapter)', () => {
);
});

it('should set messageParser by using setMessageParser method', async () => {
const testingModule = await Test.createTestingModule({
providers: [ApplicationGateway],
}).compile();
app = testingModule.createNestApplication();

const wsAdapter = new WsAdapter(app);
wsAdapter.setMessageParser(data => {
const [event, payload] = JSON.parse(data.toString());
return { event, data: payload };
});
app.useWebSocketAdapter(wsAdapter);
await app.listen(3000);

ws = new WebSocket('ws://localhost:8080');
await new Promise(resolve => ws.on('open', resolve));

ws.send(JSON.stringify(['push', { test: 'test' }]));
await new Promise<void>(resolve =>
ws.on('message', data => {
expect(JSON.parse(data).data.test).to.be.eql('test');
ws.close();
resolve();
}),
);
});

it('should set messageParser by using constructor options', async () => {
const testingModule = await Test.createTestingModule({
providers: [ApplicationGateway],
}).compile();
app = testingModule.createNestApplication();

const wsAdapter = new WsAdapter(app, {
messageParser: data => {
const [event, payload] = JSON.parse(data.toString());
return { event, data: payload };
},
});
app.useWebSocketAdapter(wsAdapter);
await app.listen(3000);

ws = new WebSocket('ws://localhost:8080');
await new Promise(resolve => ws.on('open', resolve));

ws.send(JSON.stringify(['push', { test: 'test' }]));
await new Promise<void>(resolve =>
ws.on('message', data => {
expect(JSON.parse(data).data.test).to.be.eql('test');
ws.close();
resolve();
}),
);
});

afterEach(async function () {
await app.close();
});
Expand Down
26 changes: 24 additions & 2 deletions packages/platform-ws/adapters/ws-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type HttpServerRegistryKey = number;
type HttpServerRegistryEntry = any;
type WsServerRegistryKey = number;
type WsServerRegistryEntry = any[];
type WsData = string | Buffer | ArrayBuffer | Buffer[];
type WsMessageParser = (data: WsData) => { event: string; data: any } | void;
type WsAdapterOptions = {
messageParser?: WsMessageParser;
};

const UNDERLYING_HTTP_SERVER_PORT = 0;

Expand All @@ -41,10 +46,20 @@ export class WsAdapter extends AbstractWsAdapter {
WsServerRegistryKey,
WsServerRegistryEntry
>();
protected messageParser: WsMessageParser = data => {
return JSON.parse(data.toString());
};

constructor(appOrHttpServer?: INestApplicationContext | any) {
constructor(
appOrHttpServer?: INestApplicationContext | any,
options?: WsAdapterOptions,
) {
super(appOrHttpServer);
wsPackage = loadPackage('ws', 'WsAdapter', () => require('ws'));

if (options?.messageParser) {
this.messageParser = options.messageParser;
}
}

public create(
Expand Down Expand Up @@ -138,7 +153,10 @@ export class WsAdapter extends AbstractWsAdapter {
transform: (data: any) => Observable<any>,
): Observable<any> {
try {
const message = JSON.parse(buffer.data);
const message = this.messageParser(buffer.data);
if (!message) {
return EMPTY;
}
const messageHandler = handlersMap.get(message.event);
const { callback } = messageHandler;
return transform(callback(message.data, message.event));
Expand Down Expand Up @@ -179,6 +197,10 @@ export class WsAdapter extends AbstractWsAdapter {
this.wsServersRegistry.clear();
}

public setMessageParser(parser: WsMessageParser) {
this.messageParser = parser;
}

protected ensureHttpServerExists(
port: number,
httpServer = http.createServer(),
Expand Down